diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java index 07989589..933b76c8 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -124,7 +124,7 @@ public class ArbitraryDataFileListManager { if (timeSinceLastAttempt > 15 * 1000L) { // We haven't tried for at least 15 seconds - if (networkBroadcastCount < 3) { + if (networkBroadcastCount < 12) { // We've made less than 3 total attempts return true; } @@ -134,7 +134,7 @@ public class ArbitraryDataFileListManager { if (timeSinceLastAttempt > 60 * 1000L) { // We haven't tried for at least 1 minute - if (networkBroadcastCount < 8) { + if (networkBroadcastCount < 40) { // We've made less than 8 total attempts return true; } @@ -588,7 +588,7 @@ public class ArbitraryDataFileListManager { // Forward to requesting peer LOGGER.debug("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer); if (!requestingPeer.sendMessage(forwardArbitraryDataFileListMessage)) { - requestingPeer.disconnect("failed to forward arbitrary data file list"); + // requestingPeer.disconnect("failed to forward arbitrary data file list"); } } } @@ -787,7 +787,7 @@ public class ArbitraryDataFileListManager { if (!peer.sendMessage(arbitraryDataFileListMessage)) { LOGGER.debug("Couldn't send list of hashes"); - peer.disconnect("failed to send list of hashes"); + // peer.disconnect("failed to send list of hashes"); continue; } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 4b50603c..24cec145 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -1,6 +1,7 @@ package org.qortal.controller.arbitrary; import com.google.common.net.InetAddresses; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.arbitrary.ArbitraryDataFile; @@ -23,11 +24,15 @@ import org.qortal.utils.NTP; import java.security.SecureRandom; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.qortal.network.PeerSendManager; + public class ArbitraryDataFileManager extends Thread { private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class); @@ -65,11 +70,40 @@ public class ArbitraryDataFileManager extends Thread { public static int MAX_FILE_HASH_RESPONSES = 1000; +private final Map peerSendManagers = new ConcurrentHashMap<>(); + +private PeerSendManager getOrCreateSendManager(Peer peer) { + return peerSendManagers.computeIfAbsent(peer, p -> new PeerSendManager(p)); +} + +public void queueFileSendToPeer(Peer peer, Message fileMessage) { + getOrCreateSendManager(peer).queueMessage(fileMessage); +} + private ArbitraryDataFileManager() { this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS); this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS); + ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor(); + + cleaner.scheduleAtFixedRate(() -> { + long idleCutoff = TimeUnit.MINUTES.toMillis(2); + Iterator> iterator = peerSendManagers.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + Peer peer = entry.getKey(); + PeerSendManager manager = entry.getValue(); + + if (manager.isIdle(idleCutoff)) { + iterator.remove(); // SAFE removal during iteration + manager.shutdown(); + LOGGER.debug("Cleaned up PeerSendManager for peer {}", peer); + } + } + }, 0, 30, TimeUnit.MINUTES); + } public static ArbitraryDataFileManager getInstance() { @@ -79,6 +113,8 @@ public class ArbitraryDataFileManager extends Thread { return instance; } + + @Override public void run() { Thread.currentThread().setName("Arbitrary Data File Manager"); @@ -231,7 +267,7 @@ public class ArbitraryDataFileManager extends Thread { LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); if (response == null) { - LOGGER.debug("Received null response from peer {}", peer); + LOGGER.info("Received null response from peer {}", peer); return null; } if (response.getType() != MessageType.ARBITRARY_DATA_FILE) { @@ -374,13 +410,16 @@ public class ArbitraryDataFileManager extends Thread { // The ID needs to match that of the original request message.setId(originalMessage.getId()); - if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { - LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer); - requestingPeer.disconnect("failed to forward arbitrary data file"); - } - else { - LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer); - } + // if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { + // LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer); + // requestingPeer.disconnect("failed to forward arbitrary data file"); + // } + + // else { + // LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer); + // } + getOrCreateSendManager(requestingPeer).queueMessage(message); + } catch (Exception e) { LOGGER.error(e.getMessage(), e); } @@ -643,7 +682,7 @@ public class ArbitraryDataFileManager extends Thread { byte[] signature = getArbitraryDataFileMessage.getSignature(); Controller.getInstance().stats.getArbitraryDataFileMessageStats.requests.incrementAndGet(); - LOGGER.debug("Received GetArbitraryDataFileMessage from peer {} for hash {}", peer, Base58.encode(hash)); + LOGGER.info("Received GetArbitraryDataFileMessage from peer {} for hash {}", peer, Base58.encode(hash)); try { ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature); @@ -656,13 +695,15 @@ public class ArbitraryDataFileManager extends Thread { LOGGER.debug("Sending file {}...", arbitraryDataFile); ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile); arbitraryDataFileMessage.setId(message.getId()); - if (!peer.sendMessageWithTimeout(arbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { - LOGGER.debug("Couldn't send file {}", arbitraryDataFile); - peer.disconnect("failed to send file"); - } - else { - LOGGER.debug("Sent file {}", arbitraryDataFile); - } + // if (!peer.sendMessageWithTimeout(arbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { + // LOGGER.info("Couldn't send file {}", arbitraryDataFile); + // // peer.disconnect("failed to send file"); + // } + // else { + // LOGGER.debug("Sent file {}", arbitraryDataFile); + // } + getOrCreateSendManager(peer).queueMessage(arbitraryDataFileMessage); + } else if (relayInfo != null) { LOGGER.debug("We have relay info for hash {}", Base58.encode(hash)); @@ -696,7 +737,7 @@ public class ArbitraryDataFileManager extends Thread { fileUnknownMessage.setId(message.getId()); if (!peer.sendMessage(fileUnknownMessage)) { LOGGER.debug("Couldn't sent file-unknown response"); - peer.disconnect("failed to send file-unknown response"); + // peer.disconnect("failed to send file-unknown response"); } else { LOGGER.debug("Sent file-unknown response for file {}", arbitraryDataFile); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index a0a70144..5bac208f 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -45,7 +45,7 @@ public class ArbitraryDataManager extends Thread { public static final long ARBITRARY_REQUEST_TIMEOUT = 12 * 1000L; // ms /** Maximum time to hold information about an in-progress relay */ - public static final long ARBITRARY_RELAY_TIMEOUT = 90 * 1000L; // ms + public static final long ARBITRARY_RELAY_TIMEOUT = 120 * 1000L; // ms /** Maximum time to hold direct peer connection information */ public static final long ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT = 2 * 60 * 1000L; // ms diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java index d38d329f..87175585 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java @@ -361,7 +361,7 @@ public class ArbitraryMetadataManager { // Forward to requesting peer LOGGER.debug("Forwarding metadata to requesting peer: {}", requestingPeer); if (!requestingPeer.sendMessage(forwardArbitraryMetadataMessage)) { - requestingPeer.disconnect("failed to forward arbitrary metadata"); + // requestingPeer.disconnect("failed to forward arbitrary metadata"); } } } @@ -479,7 +479,7 @@ public class ArbitraryMetadataManager { arbitraryMetadataMessage.setId(message.getId()); if (!peer.sendMessage(arbitraryMetadataMessage)) { LOGGER.debug("Couldn't send metadata"); - peer.disconnect("failed to send metadata"); + // peer.disconnect("failed to send metadata"); continue; } LOGGER.debug("Sent metadata"); diff --git a/src/main/java/org/qortal/data/arbitrary/FileFetchPeerStatsManager.java b/src/main/java/org/qortal/data/arbitrary/FileFetchPeerStatsManager.java new file mode 100644 index 00000000..58104da8 --- /dev/null +++ b/src/main/java/org/qortal/data/arbitrary/FileFetchPeerStatsManager.java @@ -0,0 +1,100 @@ +package org.qortal.data.arbitrary; + +import org.qortal.network.Peer; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.*; + +public class FileFetchPeerStatsManager { + + public static class FilePeerStats { + private static final int MAX_HISTORY = 20; + private static final int MIN_REQUIRED_ATTEMPTS = 10; + + private final Deque resultHistory = new ArrayDeque<>(MAX_HISTORY); + private long lastUsed = System.currentTimeMillis(); + + public synchronized void recordResult(boolean success) { + if (resultHistory.size() >= MAX_HISTORY) { + resultHistory.removeFirst(); + } + resultHistory.addLast(success); + lastUsed = System.currentTimeMillis(); + } + + public synchronized double getSuccessRate() { + if (resultHistory.isEmpty()) return 1.0; + long successCount = resultHistory.stream().filter(b -> b).count(); + return (double) successCount / resultHistory.size(); + } + + public synchronized boolean hasEnoughHistory() { + return resultHistory.size() >= MIN_REQUIRED_ATTEMPTS; + } + + public synchronized long getLastUsed() { + return lastUsed; + } + } + + private final ConcurrentMap statsMap = new ConcurrentHashMap<>(); + private final long ttlMillis; + private final ScheduledExecutorService cleanupScheduler; + + public FileFetchPeerStatsManager(long ttlMillis) { + this.ttlMillis = ttlMillis; + this.cleanupScheduler = Executors.newSingleThreadScheduledExecutor(); + startCleanupTask(); + } + + private String makeKey(String signature58, Peer peer, int hops) { + return signature58 + "|" + peer.toString() + "|hops=" + hops; + } + + public void recordSuccess(String signature58, Peer peer, int hops) { + getOrCreateStats(signature58, peer, hops).recordResult(true); + } + + public void recordFailure(String signature58, Peer peer, int hops) { + getOrCreateStats(signature58, peer, hops).recordResult(false); + } + + private FilePeerStats getOrCreateStats(String signature58, Peer peer, int hops) { + String key = makeKey(signature58, peer, hops); + return statsMap.computeIfAbsent(key, k -> new FilePeerStats()); + } + + public FilePeerStats getStats(String signature58, Peer peer, int hops) { + String key = makeKey(signature58, peer, hops); + return statsMap.computeIfAbsent(key, k -> new FilePeerStats()); + } + + public double getSuccessRate(String signature58, Peer peer, int hops) { + return getStats(signature58, peer, hops).getSuccessRate(); + } + + public boolean hasEnoughHistory(String signature58, Peer peer, int hops) { + return getStats(signature58, peer, hops).hasEnoughHistory(); + } + + public void clearStatsForSignature(String signature58) { + statsMap.keySet().removeIf(key -> key.startsWith(signature58 + "|")); + } + + private void startCleanupTask() { + cleanupScheduler.scheduleAtFixedRate(() -> { + try { + long now = System.currentTimeMillis(); + statsMap.entrySet().removeIf(entry -> now - entry.getValue().getLastUsed() > ttlMillis); + } catch (Exception e) { + System.err.println("Error during FilePeerStats cleanup: " + e.getMessage()); + e.printStackTrace(); + } + }, 1, 1, TimeUnit.MINUTES); + } + + public void shutdown() { + cleanupScheduler.shutdownNow(); + } +} diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index fc28ea87..90aba049 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -640,10 +640,13 @@ public class Peer { return false; try { - this.outputBuffer = ByteBuffer.wrap(message.toBytes()); + byte[] messageBytes = message.toBytes(); + + this.outputBuffer = ByteBuffer.wrap(messageBytes); this.outputMessageType = message.getType().name(); this.outputMessageId = message.getId(); + LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", this.peerConnectionId, this.outputMessageType, this.outputMessageId, this); @@ -661,14 +664,31 @@ public class Peer { // If output byte buffer is not null, send from that int bytesWritten = this.socketChannel.write(outputBuffer); - - LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", this.peerConnectionId, - bytesWritten, this.outputMessageType, this.outputMessageId, this, outputBuffer.limit()); - - // If we've sent 0 bytes then socket buffer is full so we need to wait until it's empty again - if (bytesWritten == 0) { - return true; - } + if ("ARBITRARY_DATA_FILE".equals(this.outputMessageType)) { + LOGGER.info("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", + this.peerConnectionId, + bytesWritten, + this.outputMessageType, + this.outputMessageId, + this, + outputBuffer.limit()); +} + int zeroSendCount = 0; +while (bytesWritten == 0) { + if (zeroSendCount > 9) { + LOGGER.warn("Socket write stuck for too long, returning"); + return true; + } + try { + Thread.sleep(10); // 10MS CPU Sleep to try and give it time to flush the socket + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; // optional, if you want to signal shutdown +} + zeroSendCount++; + bytesWritten = this.socketChannel.write(outputBuffer); +} // If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more) if (!this.outputBuffer.hasRemaining()) { @@ -729,7 +749,7 @@ public class Peer { try { // Queue message, to be picked up by ChannelWriteTask and then peer.writeChannel() - LOGGER.trace("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId, + LOGGER.debug("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId, message.getType().name(), message.getId(), this); // Check message properly constructed diff --git a/src/main/java/org/qortal/network/PeerSendManager.java b/src/main/java/org/qortal/network/PeerSendManager.java new file mode 100644 index 00000000..8b02e461 --- /dev/null +++ b/src/main/java/org/qortal/network/PeerSendManager.java @@ -0,0 +1,110 @@ +package org.qortal.network; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.network.message.Message; + +public class PeerSendManager { + private static final Logger LOGGER = LogManager.getLogger(PeerSendManager.class); + + private static final int MAX_RETRIES = 15; + private static final int BASE_RETRY_DELAY_MS = 100; + + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final Peer peer; + private static final AtomicInteger threadCount = new AtomicInteger(1); + +private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("PeerSendManager-" + peer.getResolvedAddress().getHostString() + "-" + threadCount.getAndIncrement()); + return t; + } +}); + + public PeerSendManager(Peer peer) { + this.peer = peer; + start(); + } + + private void start() { + executor.submit(() -> { + while (true) { + try { + Message message = queue.take(); // Blocks until available + boolean success = false; + int attempt = 0; + + while (attempt < MAX_RETRIES) { + try { + if (peer.sendMessageWithTimeout(message, 5000)) { + success = true; + break; + } + } catch (Exception e) { + LOGGER.warn("Send attempt {} failed for {} message ID {} to peer {}: {}", + attempt + 1, + message.getType().name(), + message.getId(), + peer, + e.getMessage()); + } + + attempt++; + try { + long delay = Math.min(BASE_RETRY_DELAY_MS * (1L << attempt), 2000); + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + + if (!success) { + LOGGER.warn("Failed to send {} message ID {} to peer {} after {} attempts. Disconnecting...", + message.getType().name(), + message.getId(), + peer, + MAX_RETRIES); + peer.disconnect("SendMessage retries exceeded"); + queue.clear(); + break; + } + + // Throttle after successful send + Thread.sleep(50); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + LOGGER.warn("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage()); + } + } + }); + } + + private volatile long lastUsed = System.currentTimeMillis(); + +public void queueMessage(Message message) { + lastUsed = System.currentTimeMillis(); + this.queue.offer(message); +} + +public boolean isIdle(long cutoffMillis) { + return System.currentTimeMillis() - lastUsed > cutoffMillis; +} + + public void shutdown() { + queue.clear(); + executor.shutdownNow(); + } +} diff --git a/src/main/java/org/qortal/network/task/ChannelWriteTask.java b/src/main/java/org/qortal/network/task/ChannelWriteTask.java index 6f28a942..0bea4ba3 100644 --- a/src/main/java/org/qortal/network/task/ChannelWriteTask.java +++ b/src/main/java/org/qortal/network/task/ChannelWriteTask.java @@ -31,11 +31,16 @@ public class ChannelWriteTask implements Task { @Override public void perform() throws InterruptedException { try { - - boolean isSocketClogged; + + boolean isSocketClogged; + int clogCounter = 0; do { isSocketClogged = peer.writeChannel(); + if (clogCounter > 9) { + LOGGER.warn("10 Socket Clogs - GIVING UP"); + break; + } if (isSocketClogged) { LOGGER.info( "socket is clogged: peer = {} {}, retrying", @@ -43,9 +48,11 @@ public class ChannelWriteTask implements Task { Thread.currentThread().getName() ); Thread.sleep(1000); + clogCounter++; } + } while( isSocketClogged ); - + // Tell Network that we've finished Network.getInstance().notifyChannelNotWriting(socketChannel); @@ -62,4 +69,4 @@ public class ChannelWriteTask implements Task { peer.disconnect("I/O error"); } } -} +} \ No newline at end of file