mirror of
https://github.com/Qortal/qortal.git
synced 2025-07-30 05:31:23 +00:00
PeerSendManagement loose-ends
This commit is contained in:
31563
qortal.log
Normal file
31563
qortal.log
Normal file
File diff suppressed because it is too large
Load Diff
@@ -125,7 +125,7 @@ public class ArbitraryDataFileListManager {
|
|||||||
// We haven't tried for at least 15 seconds
|
// We haven't tried for at least 15 seconds
|
||||||
|
|
||||||
if (networkBroadcastCount < 12) {
|
if (networkBroadcastCount < 12) {
|
||||||
// We've made less than 3 total attempts
|
// We've made less than 12 total attempts
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -135,7 +135,7 @@ public class ArbitraryDataFileListManager {
|
|||||||
// We haven't tried for at least 1 minute
|
// We haven't tried for at least 1 minute
|
||||||
|
|
||||||
if (networkBroadcastCount < 40) {
|
if (networkBroadcastCount < 40) {
|
||||||
// We've made less than 8 total attempts
|
// We've made less than 40 total attempts
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -76,9 +76,7 @@ private PeerSendManager getOrCreateSendManager(Peer peer) {
|
|||||||
return peerSendManagers.computeIfAbsent(peer, p -> new PeerSendManager(p));
|
return peerSendManagers.computeIfAbsent(peer, p -> new PeerSendManager(p));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void queueFileSendToPeer(Peer peer, Message fileMessage) {
|
|
||||||
getOrCreateSendManager(peer).queueMessage(fileMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -267,7 +265,7 @@ public void queueFileSendToPeer(Peer peer, Message fileMessage) {
|
|||||||
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
|
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
|
||||||
|
|
||||||
if (response == null) {
|
if (response == null) {
|
||||||
LOGGER.info("Received null response from peer {}", peer);
|
LOGGER.debug("Received null response from peer {}", peer);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (response.getType() != MessageType.ARBITRARY_DATA_FILE) {
|
if (response.getType() != MessageType.ARBITRARY_DATA_FILE) {
|
||||||
@@ -674,7 +672,7 @@ public void queueFileSendToPeer(Peer peer, Message fileMessage) {
|
|||||||
byte[] signature = getArbitraryDataFileMessage.getSignature();
|
byte[] signature = getArbitraryDataFileMessage.getSignature();
|
||||||
Controller.getInstance().stats.getArbitraryDataFileMessageStats.requests.incrementAndGet();
|
Controller.getInstance().stats.getArbitraryDataFileMessageStats.requests.incrementAndGet();
|
||||||
|
|
||||||
LOGGER.info("Received GetArbitraryDataFileMessage from peer {} for hash {}", peer, Base58.encode(hash));
|
LOGGER.debug("Received GetArbitraryDataFileMessage from peer {} for hash {}", peer, Base58.encode(hash));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature);
|
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature);
|
||||||
|
@@ -1,100 +0,0 @@
|
|||||||
package org.qortal.data.arbitrary;
|
|
||||||
|
|
||||||
import org.qortal.network.Peer;
|
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
|
||||||
import java.util.Deque;
|
|
||||||
import java.util.concurrent.*;
|
|
||||||
|
|
||||||
public class FileFetchPeerStatsManager {
|
|
||||||
|
|
||||||
public static class FilePeerStats {
|
|
||||||
private static final int MAX_HISTORY = 20;
|
|
||||||
private static final int MIN_REQUIRED_ATTEMPTS = 10;
|
|
||||||
|
|
||||||
private final Deque<Boolean> resultHistory = new ArrayDeque<>(MAX_HISTORY);
|
|
||||||
private long lastUsed = System.currentTimeMillis();
|
|
||||||
|
|
||||||
public synchronized void recordResult(boolean success) {
|
|
||||||
if (resultHistory.size() >= MAX_HISTORY) {
|
|
||||||
resultHistory.removeFirst();
|
|
||||||
}
|
|
||||||
resultHistory.addLast(success);
|
|
||||||
lastUsed = System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized double getSuccessRate() {
|
|
||||||
if (resultHistory.isEmpty()) return 1.0;
|
|
||||||
long successCount = resultHistory.stream().filter(b -> b).count();
|
|
||||||
return (double) successCount / resultHistory.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean hasEnoughHistory() {
|
|
||||||
return resultHistory.size() >= MIN_REQUIRED_ATTEMPTS;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized long getLastUsed() {
|
|
||||||
return lastUsed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final ConcurrentMap<String, FilePeerStats> statsMap = new ConcurrentHashMap<>();
|
|
||||||
private final long ttlMillis;
|
|
||||||
private final ScheduledExecutorService cleanupScheduler;
|
|
||||||
|
|
||||||
public FileFetchPeerStatsManager(long ttlMillis) {
|
|
||||||
this.ttlMillis = ttlMillis;
|
|
||||||
this.cleanupScheduler = Executors.newSingleThreadScheduledExecutor();
|
|
||||||
startCleanupTask();
|
|
||||||
}
|
|
||||||
|
|
||||||
private String makeKey(String signature58, Peer peer, int hops) {
|
|
||||||
return signature58 + "|" + peer.toString() + "|hops=" + hops;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void recordSuccess(String signature58, Peer peer, int hops) {
|
|
||||||
getOrCreateStats(signature58, peer, hops).recordResult(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void recordFailure(String signature58, Peer peer, int hops) {
|
|
||||||
getOrCreateStats(signature58, peer, hops).recordResult(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private FilePeerStats getOrCreateStats(String signature58, Peer peer, int hops) {
|
|
||||||
String key = makeKey(signature58, peer, hops);
|
|
||||||
return statsMap.computeIfAbsent(key, k -> new FilePeerStats());
|
|
||||||
}
|
|
||||||
|
|
||||||
public FilePeerStats getStats(String signature58, Peer peer, int hops) {
|
|
||||||
String key = makeKey(signature58, peer, hops);
|
|
||||||
return statsMap.computeIfAbsent(key, k -> new FilePeerStats());
|
|
||||||
}
|
|
||||||
|
|
||||||
public double getSuccessRate(String signature58, Peer peer, int hops) {
|
|
||||||
return getStats(signature58, peer, hops).getSuccessRate();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasEnoughHistory(String signature58, Peer peer, int hops) {
|
|
||||||
return getStats(signature58, peer, hops).hasEnoughHistory();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void clearStatsForSignature(String signature58) {
|
|
||||||
statsMap.keySet().removeIf(key -> key.startsWith(signature58 + "|"));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startCleanupTask() {
|
|
||||||
cleanupScheduler.scheduleAtFixedRate(() -> {
|
|
||||||
try {
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
statsMap.entrySet().removeIf(entry -> now - entry.getValue().getLastUsed() > ttlMillis);
|
|
||||||
} catch (Exception e) {
|
|
||||||
System.err.println("Error during FilePeerStats cleanup: " + e.getMessage());
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}, 1, 1, TimeUnit.MINUTES);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void shutdown() {
|
|
||||||
cleanupScheduler.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
|
@@ -664,31 +664,24 @@ public class Peer {
|
|||||||
|
|
||||||
// If output byte buffer is not null, send from that
|
// If output byte buffer is not null, send from that
|
||||||
int bytesWritten = this.socketChannel.write(outputBuffer);
|
int bytesWritten = this.socketChannel.write(outputBuffer);
|
||||||
if ("ARBITRARY_DATA_FILE".equals(this.outputMessageType)) {
|
|
||||||
LOGGER.info("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)",
|
int zeroSendCount = 0;
|
||||||
this.peerConnectionId,
|
|
||||||
bytesWritten,
|
while (bytesWritten == 0) {
|
||||||
this.outputMessageType,
|
if (zeroSendCount > 9) {
|
||||||
this.outputMessageId,
|
LOGGER.debug("Socket write stuck for too long, returning");
|
||||||
this,
|
return true;
|
||||||
outputBuffer.limit());
|
}
|
||||||
}
|
try {
|
||||||
int zeroSendCount = 0;
|
Thread.sleep(10); // 10MS CPU Sleep to try and give it time to flush the socket
|
||||||
while (bytesWritten == 0) {
|
}
|
||||||
if (zeroSendCount > 9) {
|
catch (InterruptedException e) {
|
||||||
LOGGER.warn("Socket write stuck for too long, returning");
|
Thread.currentThread().interrupt();
|
||||||
return true;
|
return false; // optional, if you want to signal shutdown
|
||||||
}
|
}
|
||||||
try {
|
zeroSendCount++;
|
||||||
Thread.sleep(10); // 10MS CPU Sleep to try and give it time to flush the socket
|
bytesWritten = this.socketChannel.write(outputBuffer);
|
||||||
}
|
}
|
||||||
catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
return false; // optional, if you want to signal shutdown
|
|
||||||
}
|
|
||||||
zeroSendCount++;
|
|
||||||
bytesWritten = this.socketChannel.write(outputBuffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more)
|
// If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more)
|
||||||
if (!this.outputBuffer.hasRemaining()) {
|
if (!this.outputBuffer.hasRemaining()) {
|
||||||
|
@@ -59,7 +59,7 @@ public class PeerSendManager {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.warn("Attempt {} failed for message {} to peer {}: {}", attempt, message.getId(), peer, e.getMessage());
|
LOGGER.debug("Attempt {} failed for message {} to peer {}: {}", attempt, message.getId(), peer, e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.sleep(RETRY_DELAY_MS);
|
Thread.sleep(RETRY_DELAY_MS);
|
||||||
@@ -67,10 +67,10 @@ public class PeerSendManager {
|
|||||||
|
|
||||||
if (!success) {
|
if (!success) {
|
||||||
int totalFailures = failureCount.incrementAndGet();
|
int totalFailures = failureCount.incrementAndGet();
|
||||||
LOGGER.warn("Failed to send message {} to peer {}. Total failures: {}", message.getId(), peer, totalFailures);
|
LOGGER.debug("Failed to send message {} to peer {}. Total failures: {}", message.getId(), peer, totalFailures);
|
||||||
|
|
||||||
if (totalFailures >= MAX_FAILURES) {
|
if (totalFailures >= MAX_FAILURES) {
|
||||||
LOGGER.warn("Peer {} exceeded failure limit ({}). Disconnecting...", peer, totalFailures);
|
LOGGER.debug("Peer {} exceeded failure limit ({}). Disconnecting...", peer, totalFailures);
|
||||||
peer.disconnect("Too many message send failures");
|
peer.disconnect("Too many message send failures");
|
||||||
coolingDown = true;
|
coolingDown = true;
|
||||||
queue.clear();
|
queue.clear();
|
||||||
@@ -92,7 +92,7 @@ public class PeerSendManager {
|
|||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
break;
|
break;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.warn("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage(), e);
|
LOGGER.error("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -106,7 +106,7 @@ public class PeerSendManager {
|
|||||||
|
|
||||||
lastUsed = System.currentTimeMillis();
|
lastUsed = System.currentTimeMillis();
|
||||||
if (!queue.offer(new TimedMessage(message))) {
|
if (!queue.offer(new TimedMessage(message))) {
|
||||||
LOGGER.warn("Send queue full, dropping message {}", message.getId());
|
LOGGER.debug("Send queue full, dropping message {}", message.getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -42,7 +42,7 @@ public class ChannelWriteTask implements Task {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (isSocketClogged) {
|
if (isSocketClogged) {
|
||||||
LOGGER.info(
|
LOGGER.debug(
|
||||||
"socket is clogged: peer = {} {}, retrying",
|
"socket is clogged: peer = {} {}, retrying",
|
||||||
peer.getPeerData().getAddress().toString(),
|
peer.getPeerData().getAddress().toString(),
|
||||||
Thread.currentThread().getName()
|
Thread.currentThread().getName()
|
||||||
|
@@ -44,11 +44,9 @@ public class HSQLDBChatRepository implements ChatRepository {
|
|||||||
|
|
||||||
// if the PrimaryTable is available, then use it
|
// if the PrimaryTable is available, then use it
|
||||||
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
|
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
|
||||||
LOGGER.debug("using PrimaryNames for chat transactions");
|
|
||||||
tableName = "PrimaryNames";
|
tableName = "PrimaryNames";
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
LOGGER.debug("using Names for chat transactions");
|
|
||||||
tableName = "Names";
|
tableName = "Names";
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -164,11 +162,9 @@ public class HSQLDBChatRepository implements ChatRepository {
|
|||||||
|
|
||||||
// if the PrimaryTable is available, then use it
|
// if the PrimaryTable is available, then use it
|
||||||
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
|
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
|
||||||
LOGGER.debug("using PrimaryNames for chat transactions");
|
|
||||||
tableName = "PrimaryNames";
|
tableName = "PrimaryNames";
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
LOGGER.debug("using Names for chat transactions");
|
|
||||||
tableName = "Names";
|
tableName = "Names";
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -322,11 +318,9 @@ public class HSQLDBChatRepository implements ChatRepository {
|
|||||||
|
|
||||||
// if the PrimaryTable is available, then use it
|
// if the PrimaryTable is available, then use it
|
||||||
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
|
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
|
||||||
LOGGER.debug("using PrimaryNames for chat transactions");
|
|
||||||
tableName = "PrimaryNames";
|
tableName = "PrimaryNames";
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
LOGGER.debug("using Names for chat transactions");
|
|
||||||
tableName = "Names";
|
tableName = "Names";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user