diff --git a/src/main/java/org/qortal/network/Handshake.java b/src/main/java/org/qortal/network/Handshake.java index b8a72372..7f18e11b 100644 --- a/src/main/java/org/qortal/network/Handshake.java +++ b/src/main/java/org/qortal/network/Handshake.java @@ -1,6 +1,8 @@ package org.qortal.network; import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -13,7 +15,9 @@ import org.qortal.network.message.ChallengeMessage; import org.qortal.network.message.HelloMessage; import org.qortal.network.message.Message; import org.qortal.network.message.Message.MessageType; +import org.qortal.settings.Settings; import org.qortal.network.message.ResponseMessage; +import org.qortal.utils.DaemonThreadFactory; import org.qortal.utils.NTP; import com.google.common.primitives.Bytes; @@ -27,6 +31,7 @@ public enum Handshake { @Override public void action(Peer peer) { + /* Never called */ } }, HELLO(MessageType.HELLO) { @@ -183,7 +188,12 @@ public enum Handshake { final byte[] data = Crypto.digest(Bytes.concat(sharedSecret, peersChallenge)); // We do this in a new thread as it can take a while... - Thread responseThread = new Thread(() -> { + responseExecutor.execute(() -> { + // Are we still connected? + if (peer.isStopping()) + // No point computing for dead peer + return; + Integer nonce = MemoryPoW.compute2(data, POW_BUFFER_SIZE, POW_DIFFICULTY); Message responseMessage = new ResponseMessage(nonce, data); @@ -197,9 +207,6 @@ public enum Handshake { Network.getInstance().onHandshakeCompleted(peer); } }); - - responseThread.setDaemon(true); - responseThread.start(); } }, // Interim holding state while we compute RESPONSE to send to inbound peer @@ -237,6 +244,7 @@ public enum Handshake { private static final int POW_BUFFER_SIZE = 8 * 1024 * 1024; // bytes private static final int POW_DIFFICULTY = 8; // leading zero bits + private static final ExecutorService responseExecutor = Executors.newFixedThreadPool(Settings.getInstance().getNetworkPoWComputePoolSize(), new DaemonThreadFactory("Network-PoW")); private static final byte[] ZERO_CHALLENGE = new byte[ChallengeMessage.CHALLENGE_LENGTH]; diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index 86e189b0..9866697b 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -55,6 +55,7 @@ import org.qortal.utils.ExecuteProduceConsume; // import org.qortal.utils.ExecutorDumper; import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot; import org.qortal.utils.NTP; +import org.qortal.utils.NamedThreadFactory; // For managing peers public class Network { @@ -151,7 +152,8 @@ public class Network { ExecutorService networkExecutor = new ThreadPoolExecutor(1, Settings.getInstance().getMaxNetworkThreadPoolSize(), NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS, - new SynchronousQueue()); + new SynchronousQueue(), + new NamedThreadFactory("Network-EPC")); networkEPC = new NetworkProcessor(networkExecutor); } @@ -355,7 +357,7 @@ public class Network { private Task maybeProducePeerPingTask(Long now) { // Ask connected peers whether they need a ping - for (Peer peer : getConnectedPeers()) { + for (Peer peer : getHandshakedPeers()) { Task peerTask = peer.getPingTask(now); if (peerTask != null) return peerTask; diff --git a/src/main/java/org/qortal/settings/Settings.java b/src/main/java/org/qortal/settings/Settings.java index b1b1c13e..84483b9e 100644 --- a/src/main/java/org/qortal/settings/Settings.java +++ b/src/main/java/org/qortal/settings/Settings.java @@ -92,6 +92,8 @@ public class Settings { private int maxPeers = 32; /** Maximum number of threads for network engine. */ private int maxNetworkThreadPoolSize = 20; + /** Maximum number of threads for network proof-of-work compute, used during handshaking. */ + private int networkPoWComputePoolSize = 2; // Which blockchains this node is running private String blockchainConfig = null; // use default from resources @@ -355,6 +357,10 @@ public class Settings { return this.maxNetworkThreadPoolSize; } + public int getNetworkPoWComputePoolSize() { + return this.networkPoWComputePoolSize; + } + public String getBlockchainConfig() { return this.blockchainConfig; } diff --git a/src/main/java/org/qortal/utils/DaemonThreadFactory.java b/src/main/java/org/qortal/utils/DaemonThreadFactory.java new file mode 100644 index 00000000..9a73bd1d --- /dev/null +++ b/src/main/java/org/qortal/utils/DaemonThreadFactory.java @@ -0,0 +1,31 @@ +package org.qortal.utils; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class DaemonThreadFactory implements ThreadFactory { + + private final String name; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + public DaemonThreadFactory(String name) { + this.name = name; + } + + public DaemonThreadFactory() { + this(null); + } + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = Executors.defaultThreadFactory().newThread(runnable); + thread.setDaemon(true); + + if (this.name != null) + thread.setName(this.name + "-" + this.threadNumber.getAndIncrement()); + + return thread; + } + +} diff --git a/src/main/java/org/qortal/utils/NamedThreadFactory.java b/src/main/java/org/qortal/utils/NamedThreadFactory.java new file mode 100644 index 00000000..6834c3b8 --- /dev/null +++ b/src/main/java/org/qortal/utils/NamedThreadFactory.java @@ -0,0 +1,24 @@ +package org.qortal.utils; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NamedThreadFactory implements ThreadFactory { + + private final String name; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + public NamedThreadFactory(String name) { + this.name = name; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = Executors.defaultThreadFactory().newThread(runnable); + thread.setName(this.name + "-" + this.threadNumber.getAndIncrement()); + + return thread; + } + +}