diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index a3528a66..b42ab450 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -8,7 +8,6 @@ import org.bouncycastle.crypto.params.Ed25519PublicKeyParameters; import org.qortal.block.BlockChain; import org.qortal.controller.Controller; import org.qortal.controller.arbitrary.ArbitraryDataFileListManager; -import org.qortal.controller.arbitrary.ArbitraryDataManager; import org.qortal.crypto.Crypto; import org.qortal.data.block.BlockData; import org.qortal.data.block.BlockSummaryData; @@ -122,6 +121,22 @@ public class Network { private List immutableOutboundHandshakedPeers = Collections.emptyList(); // always rebuilt from mutable, synced list above + /** + * Count threads per message type in order to enforce limits + */ + private final Map threadsPerMessageType = Collections.synchronizedMap(new HashMap<>()); + + /** + * Keep track of total thread count, to warn when the thread pool is getting low + */ + private int totalThreadCount = 0; + + /** + * Thresholds at which to warn about the number of active threads + */ + private final int threadCountWarningThreshold = (int) (Settings.getInstance().getMaxNetworkThreadPoolSize() * 0.9f); + private final Integer threadCountPerMessageTypeWarningThreshold = Settings.getInstance().getThreadCountPerMessageTypeWarningThreshold(); + private final List selfPeers = new ArrayList<>(); private String bindAddress = null; @@ -240,6 +255,16 @@ public class Network { private static final Network INSTANCE = new Network(); } + public Map getThreadsPerMessageType() { + return this.threadsPerMessageType; + } + + public int getTotalThreadCount() { + synchronized (this) { + return this.totalThreadCount; + } + } + public static Network getInstance() { return SingletonContainer.INSTANCE; } @@ -952,6 +977,37 @@ public class Network { // Should be non-handshaking messages from now on + // Limit threads per message type and discard if there are already too many + Integer maxThreadsForMessageType = Settings.getInstance().getMaxThreadsForMessageType(message.getType()); + if (maxThreadsForMessageType != null) { + Integer threadCount = threadsPerMessageType.get(message.getType()); + if (threadCount != null && threadCount >= maxThreadsForMessageType) { + LOGGER.trace("Discarding {} message as there are already {} active threads", message.getType().name(), threadCount); + return; + } + } + + // Warn if necessary + if (threadCountPerMessageTypeWarningThreshold != null) { + Integer threadCount = threadsPerMessageType.get(message.getType()); + if (threadCount != null && threadCount > threadCountPerMessageTypeWarningThreshold) { + LOGGER.info("Warning: high thread count for {} message type: {}", message.getType().name(), threadCount); + } + } + + // Add to per-message thread count (first initializing to 0 if not already present) + threadsPerMessageType.computeIfAbsent(message.getType(), key -> 0); + threadsPerMessageType.computeIfPresent(message.getType(), (key, value) -> value + 1); + + // Add to total thread count + synchronized (this) { + totalThreadCount++; + + if (totalThreadCount >= threadCountWarningThreshold) { + LOGGER.info("Warning: high total thread count: {} / {}", totalThreadCount, Settings.getInstance().getMaxNetworkThreadPoolSize()); + } + } + // Ordered by message type value switch (message.getType()) { case GET_PEERS: @@ -979,6 +1035,15 @@ public class Network { Controller.getInstance().onNetworkMessage(peer, message); break; } + + // Remove from per-message thread count (first initializing to 0 if not already present) + threadsPerMessageType.computeIfAbsent(message.getType(), key -> 0); + threadsPerMessageType.computeIfPresent(message.getType(), (key, value) -> value - 1); + + // Remove from total thread count + synchronized (this) { + totalThreadCount--; + } } private void onHandshakingMessage(Peer peer, Message message, Handshake handshakeStatus) { diff --git a/src/main/java/org/qortal/settings/Settings.java b/src/main/java/org/qortal/settings/Settings.java index bdff9506..9d25e846 100644 --- a/src/main/java/org/qortal/settings/Settings.java +++ b/src/main/java/org/qortal/settings/Settings.java @@ -29,6 +29,7 @@ import org.qortal.crosschain.Dogecoin.DogecoinNet; import org.qortal.crosschain.Digibyte.DigibyteNet; import org.qortal.crosschain.Ravencoin.RavencoinNet; import org.qortal.crosschain.PirateChain.PirateChainNet; +import org.qortal.network.message.MessageType; import org.qortal.utils.EnumUtils; // All properties to be converted to JSON via JAXB @@ -371,6 +372,58 @@ public class Settings { /** Whether to serve QDN data without authentication */ private boolean qdnAuthBypassEnabled = true; + /** Limit threads per message type */ + private Set maxThreadsPerMessageType = new HashSet<>(); + + /** The number of threads per message type at which a warning should be logged. + * Exclude from settings.json to disable this warning. */ + private Integer threadCountPerMessageTypeWarningThreshold = null; + + + // Domain mapping + public static class ThreadLimit { + private String messageType; + private Integer limit; + + private ThreadLimit() { // makes JAXB happy; will never be invoked + } + + private ThreadLimit(String messageType, Integer limit) { + this.messageType = messageType; + this.limit = limit; + } + + public String getMessageType() { + return messageType; + } + + public void setMessageType(String messageType) { + this.messageType = messageType; + } + + public Integer getLimit() { + return limit; + } + + public void setLimit(Integer limit) { + this.limit = limit; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ThreadLimit)) + return false; + + return this.messageType.equals(((ThreadLimit) other).getMessageType()); + } + + @Override + public int hashCode() { + return Objects.hash(messageType); + } + } + + // Domain mapping public static class DomainMap { private String domain; @@ -497,6 +550,9 @@ public class Settings { } } while (settings.userPath != null); + // Set some additional defaults if needed + settings.setAdditionalDefaults(); + // Validate settings settings.validate(); @@ -533,6 +589,22 @@ public class Settings { } } + private void setAdditionalDefaults() { + // Populate defaults for maxThreadsPerMessageType. If any are specified in settings.json, they will take priority. + maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA_FILE", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA_FILE_LIST", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE_LIST", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_SIGNATURES", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_METADATA", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_METADATA", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("GET_TRANSACTION", 10)); + maxThreadsPerMessageType.add(new ThreadLimit("TRANSACTION_SIGNATURES", 5)); + maxThreadsPerMessageType.add(new ThreadLimit("TRADE_PRESENCES", 5)); + } + // Getters / setters public String getUserPath() { @@ -1054,4 +1126,20 @@ public class Settings { } return this.qdnAuthBypassEnabled; } + + public Integer getMaxThreadsForMessageType(MessageType messageType) { + if (maxThreadsPerMessageType != null) { + for (ThreadLimit threadLimit : maxThreadsPerMessageType) { + if (threadLimit.getMessageType().equals(messageType.name())) { + return threadLimit.getLimit(); + } + } + } + // No entry, so assume unlimited + return null; + } + + public Integer getThreadCountPerMessageTypeWarningThreshold() { + return this.threadCountPerMessageTypeWarningThreshold; + } }