From fd62e6156c5b398b125a62e1a0b8602980f1ad13 Mon Sep 17 00:00:00 2001 From: PhilReact Date: Thu, 10 Jul 2025 17:27:04 +0300 Subject: [PATCH] increase request timeout --- .../arbitrary/ArbitraryDataManager.java | 2 +- .../org/qortal/network/PeerSendManager.java | 146 +++++++++--------- 2 files changed, 77 insertions(+), 71 deletions(-) diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index 5bac208f..8f0bf708 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -42,7 +42,7 @@ public class ArbitraryDataManager extends Thread { private int powDifficulty = 14; // Must not be final, as unit tests need to reduce this value /** Request timeout when transferring arbitrary data */ - public static final long ARBITRARY_REQUEST_TIMEOUT = 12 * 1000L; // ms + public static final long ARBITRARY_REQUEST_TIMEOUT = 24 * 1000L; // ms /** Maximum time to hold information about an in-progress relay */ public static final long ARBITRARY_RELAY_TIMEOUT = 120 * 1000L; // ms diff --git a/src/main/java/org/qortal/network/PeerSendManager.java b/src/main/java/org/qortal/network/PeerSendManager.java index 1a59c9fd..d305bf54 100644 --- a/src/main/java/org/qortal/network/PeerSendManager.java +++ b/src/main/java/org/qortal/network/PeerSendManager.java @@ -1,10 +1,6 @@ package org.qortal.network; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; @@ -13,113 +9,123 @@ import org.qortal.network.message.Message; public class PeerSendManager { private static final Logger LOGGER = LogManager.getLogger(PeerSendManager.class); - private volatile boolean coolingDown = false; - private static final int MAX_RETRIES = 15; - private static final int BASE_RETRY_DELAY_MS = 100; + private static final int MAX_FAILURES = 15; + private static final int MAX_MESSAGE_ATTEMPTS = 2; + private static final int SEND_TIMEOUT_MS = 500; + private static final int RETRY_DELAY_MS = 100; + private static final long MAX_QUEUE_DURATION_MS = 20_000; + private static final long COOLDOWN_DURATION_MS = 20_000; - private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final Peer peer; - private static final AtomicInteger threadCount = new AtomicInteger(1); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final ExecutorService executor; + private final AtomicInteger failureCount = new AtomicInteger(0); + private static final AtomicInteger threadCount = new AtomicInteger(1); -private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("PeerSendManager-" + peer.getResolvedAddress().getHostString() + "-" + threadCount.getAndIncrement()); - return t; - } -}); + private volatile boolean coolingDown = false; + private volatile long lastUsed = System.currentTimeMillis(); public PeerSendManager(Peer peer) { this.peer = peer; + this.executor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r); + t.setName("PeerSendManager-" + peer.getResolvedAddress().getHostString() + "-" + threadCount.getAndIncrement()); + return t; + }); start(); } private void start() { executor.submit(() -> { - while (true) { + while (!Thread.currentThread().isInterrupted()) { try { - Message message = queue.take(); // Blocks until available - boolean success = false; - int attempt = 0; + TimedMessage timedMessage = queue.take(); + long age = System.currentTimeMillis() - timedMessage.timestamp; - while (attempt < MAX_RETRIES) { + if (age > MAX_QUEUE_DURATION_MS) { + LOGGER.debug("Dropping stale message {} ({}ms old)", timedMessage.message.getId(), age); + continue; + } + + Message message = timedMessage.message; + boolean success = false; + + for (int attempt = 1; attempt <= MAX_MESSAGE_ATTEMPTS; attempt++) { try { - if (peer.sendMessageWithTimeout(message, 5000)) { + if (peer.sendMessageWithTimeout(message, SEND_TIMEOUT_MS)) { success = true; + failureCount.set(0); // reset on success break; } } catch (Exception e) { - LOGGER.warn("Send attempt {} failed for {} message ID {} to peer {}: {}", - attempt + 1, - message.getType().name(), - message.getId(), - peer, - e.getMessage()); + LOGGER.warn("Attempt {} failed for message {} to peer {}: {}", attempt, message.getId(), peer, e.getMessage()); } - attempt++; - try { - long delay = Math.min(BASE_RETRY_DELAY_MS * (1L << attempt), 2000); - Thread.sleep(delay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } + Thread.sleep(RETRY_DELAY_MS); } if (!success) { - LOGGER.warn("Failed to send {} message ID {} to peer {} after {} attempts. Disconnecting...", - message.getType().name(), - message.getId(), - peer, - MAX_RETRIES); - peer.disconnect("SendMessage retries exceeded"); - coolingDown = true; - queue.clear(); - - try { - Thread.sleep(30_000); // Give time for peer to possibly reconnect - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - coolingDown = false; + int totalFailures = failureCount.incrementAndGet(); + LOGGER.warn("Failed to send message {} to peer {}. Total failures: {}", message.getId(), peer, totalFailures); - continue; // Loop again in case the peer reconnects + if (totalFailures >= MAX_FAILURES) { + LOGGER.warn("Peer {} exceeded failure limit ({}). Disconnecting...", peer, totalFailures); + peer.disconnect("Too many message send failures"); + coolingDown = true; + queue.clear(); + + try { + Thread.sleep(COOLDOWN_DURATION_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } finally { + coolingDown = false; + failureCount.set(0); + } + } } - // Throttle after successful send - Thread.sleep(50); - + Thread.sleep(50); // small throttle } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { - LOGGER.warn("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage()); + LOGGER.warn("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage(), e); } } }); } - private volatile long lastUsed = System.currentTimeMillis(); + public void queueMessage(Message message) { + if (coolingDown) { + LOGGER.debug("In cooldown, ignoring message {}", message.getId()); + return; + } -public void queueMessage(Message message) { - if (coolingDown) { - LOGGER.debug("PeerSendManager in cooldown, ignoring message {}", message.getId()); - return; // or block/wait if you prefer + lastUsed = System.currentTimeMillis(); + if (!queue.offer(new TimedMessage(message))) { + LOGGER.warn("Send queue full, dropping message {}", message.getId()); + } } - lastUsed = System.currentTimeMillis(); - this.queue.offer(message); -} -public boolean isIdle(long cutoffMillis) { - return System.currentTimeMillis() - lastUsed > cutoffMillis; -} + public boolean isIdle(long cutoffMillis) { + return System.currentTimeMillis() - lastUsed > cutoffMillis; + } public void shutdown() { queue.clear(); executor.shutdownNow(); } + + private static class TimedMessage { + final Message message; + final long timestamp; + + TimedMessage(Message message) { + this.message = message; + this.timestamp = System.currentTimeMillis(); + } + } }