mirror of
https://github.com/Qortal/qortal.git
synced 2025-07-29 13:11:22 +00:00
increase request timeout
This commit is contained in:
@@ -42,7 +42,7 @@ public class ArbitraryDataManager extends Thread {
|
||||
private int powDifficulty = 14; // Must not be final, as unit tests need to reduce this value
|
||||
|
||||
/** Request timeout when transferring arbitrary data */
|
||||
public static final long ARBITRARY_REQUEST_TIMEOUT = 12 * 1000L; // ms
|
||||
public static final long ARBITRARY_REQUEST_TIMEOUT = 24 * 1000L; // ms
|
||||
|
||||
/** Maximum time to hold information about an in-progress relay */
|
||||
public static final long ARBITRARY_RELAY_TIMEOUT = 120 * 1000L; // ms
|
||||
|
@@ -1,10 +1,6 @@
|
||||
package org.qortal.network;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@@ -13,113 +9,123 @@ import org.qortal.network.message.Message;
|
||||
|
||||
public class PeerSendManager {
|
||||
private static final Logger LOGGER = LogManager.getLogger(PeerSendManager.class);
|
||||
private volatile boolean coolingDown = false;
|
||||
|
||||
private static final int MAX_RETRIES = 15;
|
||||
private static final int BASE_RETRY_DELAY_MS = 100;
|
||||
private static final int MAX_FAILURES = 15;
|
||||
private static final int MAX_MESSAGE_ATTEMPTS = 2;
|
||||
private static final int SEND_TIMEOUT_MS = 500;
|
||||
private static final int RETRY_DELAY_MS = 100;
|
||||
private static final long MAX_QUEUE_DURATION_MS = 20_000;
|
||||
private static final long COOLDOWN_DURATION_MS = 20_000;
|
||||
|
||||
private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
|
||||
private final Peer peer;
|
||||
private static final AtomicInteger threadCount = new AtomicInteger(1);
|
||||
private final BlockingQueue<TimedMessage> queue = new LinkedBlockingQueue<>();
|
||||
private final ExecutorService executor;
|
||||
private final AtomicInteger failureCount = new AtomicInteger(0);
|
||||
private static final AtomicInteger threadCount = new AtomicInteger(1);
|
||||
|
||||
private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
t.setName("PeerSendManager-" + peer.getResolvedAddress().getHostString() + "-" + threadCount.getAndIncrement());
|
||||
return t;
|
||||
}
|
||||
});
|
||||
private volatile boolean coolingDown = false;
|
||||
private volatile long lastUsed = System.currentTimeMillis();
|
||||
|
||||
public PeerSendManager(Peer peer) {
|
||||
this.peer = peer;
|
||||
this.executor = Executors.newSingleThreadExecutor(r -> {
|
||||
Thread t = new Thread(r);
|
||||
t.setName("PeerSendManager-" + peer.getResolvedAddress().getHostString() + "-" + threadCount.getAndIncrement());
|
||||
return t;
|
||||
});
|
||||
start();
|
||||
}
|
||||
|
||||
private void start() {
|
||||
executor.submit(() -> {
|
||||
while (true) {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
Message message = queue.take(); // Blocks until available
|
||||
boolean success = false;
|
||||
int attempt = 0;
|
||||
TimedMessage timedMessage = queue.take();
|
||||
long age = System.currentTimeMillis() - timedMessage.timestamp;
|
||||
|
||||
while (attempt < MAX_RETRIES) {
|
||||
if (age > MAX_QUEUE_DURATION_MS) {
|
||||
LOGGER.debug("Dropping stale message {} ({}ms old)", timedMessage.message.getId(), age);
|
||||
continue;
|
||||
}
|
||||
|
||||
Message message = timedMessage.message;
|
||||
boolean success = false;
|
||||
|
||||
for (int attempt = 1; attempt <= MAX_MESSAGE_ATTEMPTS; attempt++) {
|
||||
try {
|
||||
if (peer.sendMessageWithTimeout(message, 5000)) {
|
||||
if (peer.sendMessageWithTimeout(message, SEND_TIMEOUT_MS)) {
|
||||
success = true;
|
||||
failureCount.set(0); // reset on success
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Send attempt {} failed for {} message ID {} to peer {}: {}",
|
||||
attempt + 1,
|
||||
message.getType().name(),
|
||||
message.getId(),
|
||||
peer,
|
||||
e.getMessage());
|
||||
LOGGER.warn("Attempt {} failed for message {} to peer {}: {}", attempt, message.getId(), peer, e.getMessage());
|
||||
}
|
||||
|
||||
attempt++;
|
||||
try {
|
||||
long delay = Math.min(BASE_RETRY_DELAY_MS * (1L << attempt), 2000);
|
||||
Thread.sleep(delay);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
Thread.sleep(RETRY_DELAY_MS);
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
LOGGER.warn("Failed to send {} message ID {} to peer {} after {} attempts. Disconnecting...",
|
||||
message.getType().name(),
|
||||
message.getId(),
|
||||
peer,
|
||||
MAX_RETRIES);
|
||||
peer.disconnect("SendMessage retries exceeded");
|
||||
coolingDown = true;
|
||||
queue.clear();
|
||||
|
||||
try {
|
||||
Thread.sleep(30_000); // Give time for peer to possibly reconnect
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
coolingDown = false;
|
||||
int totalFailures = failureCount.incrementAndGet();
|
||||
LOGGER.warn("Failed to send message {} to peer {}. Total failures: {}", message.getId(), peer, totalFailures);
|
||||
|
||||
continue; // Loop again in case the peer reconnects
|
||||
if (totalFailures >= MAX_FAILURES) {
|
||||
LOGGER.warn("Peer {} exceeded failure limit ({}). Disconnecting...", peer, totalFailures);
|
||||
peer.disconnect("Too many message send failures");
|
||||
coolingDown = true;
|
||||
queue.clear();
|
||||
|
||||
try {
|
||||
Thread.sleep(COOLDOWN_DURATION_MS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} finally {
|
||||
coolingDown = false;
|
||||
failureCount.set(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Throttle after successful send
|
||||
Thread.sleep(50);
|
||||
|
||||
Thread.sleep(50); // small throttle
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage());
|
||||
LOGGER.warn("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private volatile long lastUsed = System.currentTimeMillis();
|
||||
public void queueMessage(Message message) {
|
||||
if (coolingDown) {
|
||||
LOGGER.debug("In cooldown, ignoring message {}", message.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
public void queueMessage(Message message) {
|
||||
if (coolingDown) {
|
||||
LOGGER.debug("PeerSendManager in cooldown, ignoring message {}", message.getId());
|
||||
return; // or block/wait if you prefer
|
||||
lastUsed = System.currentTimeMillis();
|
||||
if (!queue.offer(new TimedMessage(message))) {
|
||||
LOGGER.warn("Send queue full, dropping message {}", message.getId());
|
||||
}
|
||||
}
|
||||
lastUsed = System.currentTimeMillis();
|
||||
this.queue.offer(message);
|
||||
}
|
||||
|
||||
public boolean isIdle(long cutoffMillis) {
|
||||
return System.currentTimeMillis() - lastUsed > cutoffMillis;
|
||||
}
|
||||
public boolean isIdle(long cutoffMillis) {
|
||||
return System.currentTimeMillis() - lastUsed > cutoffMillis;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
queue.clear();
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
private static class TimedMessage {
|
||||
final Message message;
|
||||
final long timestamp;
|
||||
|
||||
TimedMessage(Message message) {
|
||||
this.message = message;
|
||||
this.timestamp = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user