forked from Qortal/qortal
commit
e40d884c81
@ -8,7 +8,6 @@ import org.bouncycastle.crypto.params.Ed25519PublicKeyParameters;
|
|||||||
import org.qortal.block.BlockChain;
|
import org.qortal.block.BlockChain;
|
||||||
import org.qortal.controller.Controller;
|
import org.qortal.controller.Controller;
|
||||||
import org.qortal.controller.arbitrary.ArbitraryDataFileListManager;
|
import org.qortal.controller.arbitrary.ArbitraryDataFileListManager;
|
||||||
import org.qortal.controller.arbitrary.ArbitraryDataManager;
|
|
||||||
import org.qortal.crypto.Crypto;
|
import org.qortal.crypto.Crypto;
|
||||||
import org.qortal.data.block.BlockData;
|
import org.qortal.data.block.BlockData;
|
||||||
import org.qortal.data.block.BlockSummaryData;
|
import org.qortal.data.block.BlockSummaryData;
|
||||||
@ -122,6 +121,22 @@ public class Network {
|
|||||||
private List<Peer> immutableOutboundHandshakedPeers = Collections.emptyList(); // always rebuilt from mutable, synced list above
|
private List<Peer> immutableOutboundHandshakedPeers = Collections.emptyList(); // always rebuilt from mutable, synced list above
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count threads per message type in order to enforce limits
|
||||||
|
*/
|
||||||
|
private final Map<MessageType, Integer> threadsPerMessageType = Collections.synchronizedMap(new HashMap<>());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keep track of total thread count, to warn when the thread pool is getting low
|
||||||
|
*/
|
||||||
|
private int totalThreadCount = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thresholds at which to warn about the number of active threads
|
||||||
|
*/
|
||||||
|
private final int threadCountWarningThreshold = (int) (Settings.getInstance().getMaxNetworkThreadPoolSize() * 0.9f);
|
||||||
|
private final Integer threadCountPerMessageTypeWarningThreshold = Settings.getInstance().getThreadCountPerMessageTypeWarningThreshold();
|
||||||
|
|
||||||
private final List<PeerAddress> selfPeers = new ArrayList<>();
|
private final List<PeerAddress> selfPeers = new ArrayList<>();
|
||||||
|
|
||||||
private String bindAddress = null;
|
private String bindAddress = null;
|
||||||
@ -240,6 +255,16 @@ public class Network {
|
|||||||
private static final Network INSTANCE = new Network();
|
private static final Network INSTANCE = new Network();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<MessageType, Integer> getThreadsPerMessageType() {
|
||||||
|
return this.threadsPerMessageType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getTotalThreadCount() {
|
||||||
|
synchronized (this) {
|
||||||
|
return this.totalThreadCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static Network getInstance() {
|
public static Network getInstance() {
|
||||||
return SingletonContainer.INSTANCE;
|
return SingletonContainer.INSTANCE;
|
||||||
}
|
}
|
||||||
@ -952,6 +977,37 @@ public class Network {
|
|||||||
|
|
||||||
// Should be non-handshaking messages from now on
|
// Should be non-handshaking messages from now on
|
||||||
|
|
||||||
|
// Limit threads per message type and discard if there are already too many
|
||||||
|
Integer maxThreadsForMessageType = Settings.getInstance().getMaxThreadsForMessageType(message.getType());
|
||||||
|
if (maxThreadsForMessageType != null) {
|
||||||
|
Integer threadCount = threadsPerMessageType.get(message.getType());
|
||||||
|
if (threadCount != null && threadCount >= maxThreadsForMessageType) {
|
||||||
|
LOGGER.trace("Discarding {} message as there are already {} active threads", message.getType().name(), threadCount);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Warn if necessary
|
||||||
|
if (threadCountPerMessageTypeWarningThreshold != null) {
|
||||||
|
Integer threadCount = threadsPerMessageType.get(message.getType());
|
||||||
|
if (threadCount != null && threadCount > threadCountPerMessageTypeWarningThreshold) {
|
||||||
|
LOGGER.info("Warning: high thread count for {} message type: {}", message.getType().name(), threadCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to per-message thread count (first initializing to 0 if not already present)
|
||||||
|
threadsPerMessageType.computeIfAbsent(message.getType(), key -> 0);
|
||||||
|
threadsPerMessageType.computeIfPresent(message.getType(), (key, value) -> value + 1);
|
||||||
|
|
||||||
|
// Add to total thread count
|
||||||
|
synchronized (this) {
|
||||||
|
totalThreadCount++;
|
||||||
|
|
||||||
|
if (totalThreadCount >= threadCountWarningThreshold) {
|
||||||
|
LOGGER.info("Warning: high total thread count: {} / {}", totalThreadCount, Settings.getInstance().getMaxNetworkThreadPoolSize());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Ordered by message type value
|
// Ordered by message type value
|
||||||
switch (message.getType()) {
|
switch (message.getType()) {
|
||||||
case GET_PEERS:
|
case GET_PEERS:
|
||||||
@ -979,6 +1035,15 @@ public class Network {
|
|||||||
Controller.getInstance().onNetworkMessage(peer, message);
|
Controller.getInstance().onNetworkMessage(peer, message);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove from per-message thread count (first initializing to 0 if not already present)
|
||||||
|
threadsPerMessageType.computeIfAbsent(message.getType(), key -> 0);
|
||||||
|
threadsPerMessageType.computeIfPresent(message.getType(), (key, value) -> value - 1);
|
||||||
|
|
||||||
|
// Remove from total thread count
|
||||||
|
synchronized (this) {
|
||||||
|
totalThreadCount--;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onHandshakingMessage(Peer peer, Message message, Handshake handshakeStatus) {
|
private void onHandshakingMessage(Peer peer, Message message, Handshake handshakeStatus) {
|
||||||
|
@ -29,6 +29,7 @@ import org.qortal.crosschain.Dogecoin.DogecoinNet;
|
|||||||
import org.qortal.crosschain.Digibyte.DigibyteNet;
|
import org.qortal.crosschain.Digibyte.DigibyteNet;
|
||||||
import org.qortal.crosschain.Ravencoin.RavencoinNet;
|
import org.qortal.crosschain.Ravencoin.RavencoinNet;
|
||||||
import org.qortal.crosschain.PirateChain.PirateChainNet;
|
import org.qortal.crosschain.PirateChain.PirateChainNet;
|
||||||
|
import org.qortal.network.message.MessageType;
|
||||||
import org.qortal.utils.EnumUtils;
|
import org.qortal.utils.EnumUtils;
|
||||||
|
|
||||||
// All properties to be converted to JSON via JAXB
|
// All properties to be converted to JSON via JAXB
|
||||||
@ -382,6 +383,58 @@ public class Settings {
|
|||||||
/** Whether to serve QDN data without authentication */
|
/** Whether to serve QDN data without authentication */
|
||||||
private boolean qdnAuthBypassEnabled = true;
|
private boolean qdnAuthBypassEnabled = true;
|
||||||
|
|
||||||
|
/** Limit threads per message type */
|
||||||
|
private Set<ThreadLimit> maxThreadsPerMessageType = new HashSet<>();
|
||||||
|
|
||||||
|
/** The number of threads per message type at which a warning should be logged.
|
||||||
|
* Exclude from settings.json to disable this warning. */
|
||||||
|
private Integer threadCountPerMessageTypeWarningThreshold = null;
|
||||||
|
|
||||||
|
|
||||||
|
// Domain mapping
|
||||||
|
public static class ThreadLimit {
|
||||||
|
private String messageType;
|
||||||
|
private Integer limit;
|
||||||
|
|
||||||
|
private ThreadLimit() { // makes JAXB happy; will never be invoked
|
||||||
|
}
|
||||||
|
|
||||||
|
private ThreadLimit(String messageType, Integer limit) {
|
||||||
|
this.messageType = messageType;
|
||||||
|
this.limit = limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMessageType() {
|
||||||
|
return messageType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessageType(String messageType) {
|
||||||
|
this.messageType = messageType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getLimit() {
|
||||||
|
return limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLimit(Integer limit) {
|
||||||
|
this.limit = limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (!(other instanceof ThreadLimit))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return this.messageType.equals(((ThreadLimit) other).getMessageType());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(messageType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// Domain mapping
|
// Domain mapping
|
||||||
public static class DomainMap {
|
public static class DomainMap {
|
||||||
private String domain;
|
private String domain;
|
||||||
@ -507,6 +560,9 @@ public class Settings {
|
|||||||
}
|
}
|
||||||
} while (settings.userPath != null);
|
} while (settings.userPath != null);
|
||||||
|
|
||||||
|
// Set some additional defaults if needed
|
||||||
|
settings.setAdditionalDefaults();
|
||||||
|
|
||||||
// Validate settings
|
// Validate settings
|
||||||
settings.validate();
|
settings.validate();
|
||||||
|
|
||||||
@ -543,6 +599,22 @@ public class Settings {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setAdditionalDefaults() {
|
||||||
|
// Populate defaults for maxThreadsPerMessageType. If any are specified in settings.json, they will take priority.
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA_FILE", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA_FILE_LIST", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE_LIST", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_SIGNATURES", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_METADATA", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_METADATA", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("GET_TRANSACTION", 10));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("TRANSACTION_SIGNATURES", 5));
|
||||||
|
maxThreadsPerMessageType.add(new ThreadLimit("TRADE_PRESENCES", 5));
|
||||||
|
}
|
||||||
|
|
||||||
// Getters / setters
|
// Getters / setters
|
||||||
|
|
||||||
public String getUserPath() {
|
public String getUserPath() {
|
||||||
@ -1063,4 +1135,20 @@ public class Settings {
|
|||||||
}
|
}
|
||||||
return this.qdnAuthBypassEnabled;
|
return this.qdnAuthBypassEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Integer getMaxThreadsForMessageType(MessageType messageType) {
|
||||||
|
if (maxThreadsPerMessageType != null) {
|
||||||
|
for (ThreadLimit threadLimit : maxThreadsPerMessageType) {
|
||||||
|
if (threadLimit.getMessageType().equals(messageType.name())) {
|
||||||
|
return threadLimit.getLimit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// No entry, so assume unlimited
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getThreadCountPerMessageTypeWarningThreshold() {
|
||||||
|
return this.threadCountPerMessageTypeWarningThreshold;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user