Added support for thread limits.

Default thread limits per message type can be specified in Settings.setAdditionalDefaults(), e.g

maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE", 5));

These can also be overridden on a per-node basis in settings.json, e.g

"maxThreadsPerMessageType": [
    { "messageType": "GET_ARBITRARY_DATA_FILE", "limit": 3 },
    { "messageType": "GET_ARBITRARY_DATA_FILE_LIST", "limit": 3 }
]

settings.json values take priority, but any message types that aren't specified in settings.json will still be included from the Settings.java defaults. This allows single message types to be overridden in settings.json without removing the limits for all of the other message types.

Any messages that arrive are discarded if the node is already at the thread limit for that message type.

Warnings are now shown in the logs if the total number of active threads reaches 90% of the allocated thread pool size. Additionally, it can warn per message type by specifying a per-message-type warning threshold in settings.json, e.g

"threadCountPerMessageTypeWarningThreshold": 20

The above setting would warn in the logs if a single message type was consuming more than 20 threads at once, therefore making it a candidate to be limited in maxThreadsPerMessageType.

Initial values of maxThreadsPerMessageType are guesses and may need modifying based on real world results. Limiting threads may impact functionality, so this should be carefully tested.

Also be aware that the thread tracking may reduce network performance slightly, so be sure to test thoroughly on slower hardware.
This commit is contained in:
CalDescent 2023-09-22 13:06:03 +01:00
parent d453e80c6b
commit 9454031b48
2 changed files with 154 additions and 1 deletions

View File

@ -8,7 +8,6 @@ import org.bouncycastle.crypto.params.Ed25519PublicKeyParameters;
import org.qortal.block.BlockChain;
import org.qortal.controller.Controller;
import org.qortal.controller.arbitrary.ArbitraryDataFileListManager;
import org.qortal.controller.arbitrary.ArbitraryDataManager;
import org.qortal.crypto.Crypto;
import org.qortal.data.block.BlockData;
import org.qortal.data.block.BlockSummaryData;
@ -122,6 +121,22 @@ public class Network {
private List<Peer> immutableOutboundHandshakedPeers = Collections.emptyList(); // always rebuilt from mutable, synced list above
/**
* Count threads per message type in order to enforce limits
*/
private final Map<MessageType, Integer> threadsPerMessageType = Collections.synchronizedMap(new HashMap<>());
/**
* Keep track of total thread count, to warn when the thread pool is getting low
*/
private int totalThreadCount = 0;
/**
* Thresholds at which to warn about the number of active threads
*/
private final int threadCountWarningThreshold = (int) (Settings.getInstance().getMaxNetworkThreadPoolSize() * 0.9f);
private final Integer threadCountPerMessageTypeWarningThreshold = Settings.getInstance().getThreadCountPerMessageTypeWarningThreshold();
private final List<PeerAddress> selfPeers = new ArrayList<>();
private String bindAddress = null;
@ -240,6 +255,16 @@ public class Network {
private static final Network INSTANCE = new Network();
}
public Map<MessageType, Integer> getThreadsPerMessageType() {
return this.threadsPerMessageType;
}
public int getTotalThreadCount() {
synchronized (this) {
return this.totalThreadCount;
}
}
public static Network getInstance() {
return SingletonContainer.INSTANCE;
}
@ -952,6 +977,37 @@ public class Network {
// Should be non-handshaking messages from now on
// Limit threads per message type and discard if there are already too many
Integer maxThreadsForMessageType = Settings.getInstance().getMaxThreadsForMessageType(message.getType());
if (maxThreadsForMessageType != null) {
Integer threadCount = threadsPerMessageType.get(message.getType());
if (threadCount != null && threadCount >= maxThreadsForMessageType) {
LOGGER.trace("Discarding {} message as there are already {} active threads", message.getType().name(), threadCount);
return;
}
}
// Warn if necessary
if (threadCountPerMessageTypeWarningThreshold != null) {
Integer threadCount = threadsPerMessageType.get(message.getType());
if (threadCount != null && threadCount > threadCountPerMessageTypeWarningThreshold) {
LOGGER.info("Warning: high thread count for {} message type: {}", message.getType().name(), threadCount);
}
}
// Add to per-message thread count (first initializing to 0 if not already present)
threadsPerMessageType.computeIfAbsent(message.getType(), key -> 0);
threadsPerMessageType.computeIfPresent(message.getType(), (key, value) -> value + 1);
// Add to total thread count
synchronized (this) {
totalThreadCount++;
if (totalThreadCount >= threadCountWarningThreshold) {
LOGGER.info("Warning: high total thread count: {} / {}", totalThreadCount, Settings.getInstance().getMaxNetworkThreadPoolSize());
}
}
// Ordered by message type value
switch (message.getType()) {
case GET_PEERS:
@ -979,6 +1035,15 @@ public class Network {
Controller.getInstance().onNetworkMessage(peer, message);
break;
}
// Remove from per-message thread count (first initializing to 0 if not already present)
threadsPerMessageType.computeIfAbsent(message.getType(), key -> 0);
threadsPerMessageType.computeIfPresent(message.getType(), (key, value) -> value - 1);
// Remove from total thread count
synchronized (this) {
totalThreadCount--;
}
}
private void onHandshakingMessage(Peer peer, Message message, Handshake handshakeStatus) {

View File

@ -29,6 +29,7 @@ import org.qortal.crosschain.Dogecoin.DogecoinNet;
import org.qortal.crosschain.Digibyte.DigibyteNet;
import org.qortal.crosschain.Ravencoin.RavencoinNet;
import org.qortal.crosschain.PirateChain.PirateChainNet;
import org.qortal.network.message.MessageType;
import org.qortal.utils.EnumUtils;
// All properties to be converted to JSON via JAXB
@ -371,6 +372,58 @@ public class Settings {
/** Whether to serve QDN data without authentication */
private boolean qdnAuthBypassEnabled = true;
/** Limit threads per message type */
private Set<ThreadLimit> maxThreadsPerMessageType = new HashSet<>();
/** The number of threads per message type at which a warning should be logged.
* Exclude from settings.json to disable this warning. */
private Integer threadCountPerMessageTypeWarningThreshold = null;
// Domain mapping
public static class ThreadLimit {
private String messageType;
private Integer limit;
private ThreadLimit() { // makes JAXB happy; will never be invoked
}
private ThreadLimit(String messageType, Integer limit) {
this.messageType = messageType;
this.limit = limit;
}
public String getMessageType() {
return messageType;
}
public void setMessageType(String messageType) {
this.messageType = messageType;
}
public Integer getLimit() {
return limit;
}
public void setLimit(Integer limit) {
this.limit = limit;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof ThreadLimit))
return false;
return this.messageType.equals(((ThreadLimit) other).getMessageType());
}
@Override
public int hashCode() {
return Objects.hash(messageType);
}
}
// Domain mapping
public static class DomainMap {
private String domain;
@ -497,6 +550,9 @@ public class Settings {
}
} while (settings.userPath != null);
// Set some additional defaults if needed
settings.setAdditionalDefaults();
// Validate settings
settings.validate();
@ -533,6 +589,22 @@ public class Settings {
}
}
private void setAdditionalDefaults() {
// Populate defaults for maxThreadsPerMessageType. If any are specified in settings.json, they will take priority.
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA_FILE", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE", 5));
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA", 5));
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA_FILE_LIST", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE_LIST", 5));
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_SIGNATURES", 5));
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_METADATA", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_METADATA", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_TRANSACTION", 10));
maxThreadsPerMessageType.add(new ThreadLimit("TRANSACTION_SIGNATURES", 5));
maxThreadsPerMessageType.add(new ThreadLimit("TRADE_PRESENCES", 5));
}
// Getters / setters
public String getUserPath() {
@ -1054,4 +1126,20 @@ public class Settings {
}
return this.qdnAuthBypassEnabled;
}
public Integer getMaxThreadsForMessageType(MessageType messageType) {
if (maxThreadsPerMessageType != null) {
for (ThreadLimit threadLimit : maxThreadsPerMessageType) {
if (threadLimit.getMessageType().equals(messageType.name())) {
return threadLimit.getLimit();
}
}
}
// No entry, so assume unlimited
return null;
}
public Integer getThreadCountPerMessageTypeWarningThreshold() {
return this.threadCountPerMessageTypeWarningThreshold;
}
}