diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 7bfbc2e3..426d6deb 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -235,7 +235,7 @@ public class Controller extends Thread { LOGGER.info("Starting controller"); Controller.getInstance().start(); - LOGGER.info("Starting networking"); + LOGGER.info("Starting networking on port " + Settings.getInstance().getListenPort()); try { Network network = Network.getInstance(); network.start(); @@ -323,6 +323,13 @@ public class Controller extends Thread { ntpNagTimestamp += NTP_NAG_PERIOD; ntpNag(); } + + // Prune stuck/slow/old peers + try { + Network.getInstance().prunePeers(); + } catch (DataException e) { + LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); + } } } catch (InterruptedException e) { // Fall-through to exit diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index e22490d3..70bb2c55 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -23,8 +23,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -55,6 +53,7 @@ import org.qora.repository.DataException; import org.qora.repository.Repository; import org.qora.repository.RepositoryManager; import org.qora.settings.Settings; +import org.qora.utils.ExecuteProduceConsume; // For managing peers public class Network extends Thread { @@ -97,24 +96,22 @@ public class Network extends Thread { public static final int PEER_ID_LENGTH = 128; private final byte[] ourPeerId; - private volatile boolean isStopping = false; private List connectedPeers; private List selfPeers; - private ExecutorService networkingExecutor; - private static Selector channelSelector; - private static ServerSocketChannel serverChannel; - private static AtomicBoolean isIterationInProgress = new AtomicBoolean(false); - private static Iterator channelIterator = null; - private static volatile boolean hasThreadPending = false; - private static AtomicInteger activeThreads = new AtomicInteger(0); - private static AtomicBoolean generalTaskLock = new AtomicBoolean(false); + private ExecuteProduceConsume networkEPC; + private Selector channelSelector; + private ServerSocketChannel serverChannel; + private Iterator channelIterator = null; private int minOutboundPeers; private int maxPeers; + private long nextConnectTaskTimestamp; + private ExecutorService broadcastExecutor; /** Timestamp (ms) for next general info broadcast to all connected peers. Based on System.currentTimeMillis(). */ - private long nextBroadcast; + private long nextBroadcastTimestamp; + private Lock mergePeersLock; // Constructors @@ -153,14 +150,16 @@ public class Network extends Thread { minOutboundPeers = Settings.getInstance().getMinOutboundPeers(); maxPeers = Settings.getInstance().getMaxPeers(); + nextConnectTaskTimestamp = System.currentTimeMillis(); + broadcastExecutor = Executors.newCachedThreadPool(); - nextBroadcast = System.currentTimeMillis(); + nextBroadcastTimestamp = System.currentTimeMillis(); mergePeersLock = new ReentrantLock(); // Start up first networking thread - networkingExecutor = Executors.newCachedThreadPool(); - networkingExecutor.execute(new NetworkProcessor()); + networkEPC = new NetworkProcessor(); + networkEPC.start(); } // Getters / setters @@ -269,118 +268,179 @@ public class Network extends Thread { // Main thread - class NetworkProcessor implements Runnable { + class NetworkProcessor extends ExecuteProduceConsume { @Override - public void run() { - Thread.currentThread().setName("Network"); + protected Task produceTask(boolean canBlock) throws InterruptedException { + Task task; - activeThreads.incrementAndGet(); - LOGGER.trace(() -> String.format("Network thread %s, hasThreadPending: %s, activeThreads now: %d", Thread.currentThread().getId(), (hasThreadPending ? "yes" : "no"), activeThreads.get())); - hasThreadPending = false; + task = maybeProduceChannelTask(canBlock); + if (task != null) + return task; - // Maintain long-term connections to various peers' API applications - try { - while (!isStopping) { - if (!isIterationInProgress.compareAndSet(false, true)) { - LOGGER.trace(() -> String.format("Network thread %s NOT producing (some other thread is) - exiting", Thread.currentThread().getId())); - break; - } + task = maybeProducePeerMessageTask(); + if (task != null) + return task; - LOGGER.trace(() -> String.format("Network thread %s is producing...", Thread.currentThread().getId())); + task = maybeProducePeerPingTask(); + if (task != null) + return task; - final SelectionKey nextSelectionKey; - try { - // anything to do? - if (channelIterator == null) { - channelSelector.select(1000L); + task = maybeProduceConnectPeerTask(); + if (task != null) + return task; - if (Thread.currentThread().isInterrupted()) - break; + task = maybeProduceBroadcastTask(); + if (task != null) + return task; - channelIterator = channelSelector.selectedKeys().iterator(); - } + // Really nothing to do + return null; + } - if (channelIterator.hasNext()) { - nextSelectionKey = channelIterator.next(); - channelIterator.remove(); - } else { - nextSelectionKey = null; - channelIterator = null; // Nothing to do so reset iterator to cause new select - } + class ChannelTask implements ExecuteProduceConsume.Task { + private final SelectionKey selectionKey; - LOGGER.trace(() -> String.format("Network thread %s produced %s, iterator now %s", - Thread.currentThread().getId(), - (nextSelectionKey == null ? "null" : nextSelectionKey.channel()), - (channelIterator == null ? "null" : channelIterator.toString()))); - - // Spawn another thread in case we need help - if (!hasThreadPending) { - hasThreadPending = true; - LOGGER.trace(() -> String.format("Network thread %s spawning", Thread.currentThread().getId())); - networkingExecutor.execute(this); - } - } finally { - LOGGER.trace(() -> String.format("Network thread %s done producing", Thread.currentThread().getId())); - isIterationInProgress.set(false); - } - - // process - if (nextSelectionKey == null) { - // no pending tasks, but we're last remaining thread so maybe connect a new peer or do a broadcast - LOGGER.trace(() -> String.format("Network thread %s has no pending tasks", Thread.currentThread().getId())); - - if (!generalTaskLock.compareAndSet(false, true)) - continue; - - try { - LOGGER.trace(() -> String.format("Network thread %s performing general tasks", Thread.currentThread().getId())); - - pingPeers(); - - prunePeers(); - - createConnection(); - - if (System.currentTimeMillis() >= nextBroadcast) { - nextBroadcast = System.currentTimeMillis() + BROADCAST_INTERVAL; - - // Controller can decide what to broadcast - Controller.getInstance().doNetworkBroadcast(); - } - } finally { - LOGGER.trace(() -> String.format("Network thread %s finished general tasks", Thread.currentThread().getId())); - generalTaskLock.set(false); - } - } else { - try { - LOGGER.trace(() -> String.format("Network thread %s has pending channel: %s, with ops %d", - Thread.currentThread().getId(), nextSelectionKey.channel(), nextSelectionKey.readyOps())); - - // process pending channel task - if (nextSelectionKey.isReadable()) { - connectionRead((SocketChannel) nextSelectionKey.channel()); - } else if (nextSelectionKey.isAcceptable()) { - acceptConnection((ServerSocketChannel) nextSelectionKey.channel()); - } - - LOGGER.trace(() -> String.format("Network thread %s processed channel: %s", Thread.currentThread().getId(), nextSelectionKey.channel())); - } catch (CancelledKeyException e) { - LOGGER.trace(() -> String.format("Network thread %s encountered cancelled channel: %s", Thread.currentThread().getId(), nextSelectionKey.channel())); - } - } - } - } catch (InterruptedException e) { - // Fall-through to shutdown - } catch (DataException e) { - LOGGER.warn("Repository issue while running network", e); - // Fall-through to shutdown - } catch (IOException e) { - // Fall-through to shutdown - } finally { - activeThreads.decrementAndGet(); - LOGGER.trace(() -> String.format("Network thread %s ending, activeThreads now: %d", Thread.currentThread().getId(), activeThreads.get())); - Thread.currentThread().setName("Network (dormant)"); + public ChannelTask(SelectionKey selectionKey) { + this.selectionKey = selectionKey; } + + @Override + public void perform() throws InterruptedException { + try { + LOGGER.trace(() -> String.format("Thread %d has pending channel: %s, with ops %d", + Thread.currentThread().getId(), selectionKey.channel(), selectionKey.readyOps())); + + // process pending channel task + if (selectionKey.isReadable()) { + connectionRead((SocketChannel) selectionKey.channel()); + } else if (selectionKey.isAcceptable()) { + acceptConnection((ServerSocketChannel) selectionKey.channel()); + } + + LOGGER.trace(() -> String.format("Thread %d processed channel: %s", Thread.currentThread().getId(), selectionKey.channel())); + } catch (CancelledKeyException e) { + LOGGER.trace(() -> String.format("Thread %s encountered cancelled channel: %s", Thread.currentThread().getId(), selectionKey.channel())); + } + } + + private void connectionRead(SocketChannel socketChannel) { + Peer peer = getPeerFromChannel(socketChannel); + if (peer == null) + return; + + try { + peer.readChannel(); + } catch (IOException e) { + if (e.getMessage() != null && e.getMessage().toLowerCase().contains("onnection reset")) { + peer.disconnect("Connection reset"); + return; + } + + LOGGER.trace(() -> String.format("Network thread %s encountered I/O error: %s", Thread.currentThread().getId(), e.getMessage()), e); + peer.disconnect("I/O error"); + return; + } + + } + } + + private Task maybeProduceChannelTask(boolean canBlock) throws InterruptedException { + final SelectionKey nextSelectionKey; + + // anything to do? + if (channelIterator == null) { + try { + if (canBlock) + channelSelector.select(1000L); + else + channelSelector.selectNow(); + } catch (IOException e) { + LOGGER.warn(String.format("Channel selection threw IOException: %s", e.getMessage())); + return null; + } + + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException(); + + channelIterator = channelSelector.selectedKeys().iterator(); + } + + if (channelIterator.hasNext()) { + nextSelectionKey = channelIterator.next(); + channelIterator.remove(); + } else { + nextSelectionKey = null; + channelIterator = null; // Nothing to do so reset iterator to cause new select + } + + LOGGER.trace(() -> String.format("Thread %d, nextSelectionKey %s, channelIterator now %s", + Thread.currentThread().getId(), nextSelectionKey, channelIterator)); + + if (nextSelectionKey == null) + return null; + + return new ChannelTask(nextSelectionKey); + } + + private Task maybeProducePeerMessageTask() { + for (Peer peer : getConnectedPeers()) { + Task peerTask = peer.getMessageTask(); + if (peerTask != null) + return peerTask; + } + + return null; + } + + private Task maybeProducePeerPingTask() { + // Ask connected peers whether they need a ping + for (Peer peer : getConnectedPeers()) { + Task peerTask = peer.getPingTask(); + if (peerTask != null) + return peerTask; + } + + return null; + } + + class PeerConnectTask implements ExecuteProduceConsume.Task { + private final Peer peer; + + public PeerConnectTask(Peer peer) { + this.peer = peer; + } + + @Override + public void perform() throws InterruptedException { + connectPeer(peer); + } + } + + private Task maybeProduceConnectPeerTask() throws InterruptedException { + if (getOutboundHandshakedPeers().size() >= minOutboundPeers) + return null; + + final long now = System.currentTimeMillis(); + if (now < nextConnectTaskTimestamp) + return null; + + nextConnectTaskTimestamp = now + 1000L; + + Peer targetPeer = getConnectablePeer(); + if (targetPeer == null) + return null; + + // Create connection task + return new PeerConnectTask(targetPeer); + } + + private Task maybeProduceBroadcastTask() { + final long now = System.currentTimeMillis(); + if (now < nextBroadcastTimestamp) + return null; + + nextBroadcastTimestamp = now + BROADCAST_INTERVAL; + return () -> Controller.getInstance().doNetworkBroadcast(); } } @@ -438,12 +498,7 @@ public class Network extends Thread { this.onPeerReady(newPeer); } - private void pingPeers() { - for (Peer peer : this.getConnectedPeers()) - peer.pingCheck(); - } - - private void prunePeers() throws InterruptedException, DataException { + public void prunePeers() throws InterruptedException, DataException { final long now = System.currentTimeMillis(); // Disconnect peers that are stuck during handshake @@ -495,12 +550,7 @@ public class Network extends Thread { } } - private void createConnection() throws InterruptedException, DataException { - if (this.getOutboundHandshakedPeers().size() >= minOutboundPeers) - return; - - Peer newPeer; - + private Peer getConnectablePeer() throws InterruptedException { try (final Repository repository = RepositoryManager.getRepository()) { // Find an address to connect to List peers = repository.getNetworkRepository().getAllPeers(); @@ -546,22 +596,29 @@ public class Network extends Thread { // Any left? if (peers.isEmpty()) - return; + return null; // Pick random peer int peerIndex = new SecureRandom().nextInt(peers.size()); // Pick candidate PeerData peerData = peers.get(peerIndex); - newPeer = new Peer(peerData); + Peer newPeer = new Peer(peerData); // Update connection attempt info repository.discardChanges(); peerData.setLastAttempted(System.currentTimeMillis()); repository.getNetworkRepository().save(peerData); repository.saveChanges(); - } + return newPeer; + } catch (DataException e) { + LOGGER.warn(String.format("Repository issue while finding a connectable peer: %s", e.getMessage())); + return null; + } + } + + private void connectPeer(Peer newPeer) throws InterruptedException { SocketChannel socketChannel = newPeer.connect(); if (socketChannel == null) return; @@ -585,20 +642,6 @@ public class Network extends Thread { this.onPeerReady(newPeer); } - private void connectionRead(SocketChannel socketChannel) { - Peer peer = getPeerFromChannel(socketChannel); - if (peer == null) - return; - - try { - peer.readMessages(); - } catch (IOException e) { - LOGGER.trace(() -> String.format("Network thread %s encountered I/O error: %s", Thread.currentThread().getId(), e.getMessage()), e); - peer.disconnect("I/O error"); - return; - } - } - private Peer getPeerFromChannel(SocketChannel socketChannel) { synchronized (this.connectedPeers) { for (Peer peer : this.connectedPeers) @@ -638,7 +681,7 @@ public class Network extends Thread { /** Called when a new message arrives for a peer. message can be null if called after connection */ public void onMessage(Peer peer, Message message) { if (message != null) - LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer)); + LOGGER.trace(() -> String.format("Processing %s message with ID %d from peer %s", message.getType().name(), message.getId(), peer)); Handshake handshakeStatus = peer.getHandshakeStatus(); if (handshakeStatus != Handshake.COMPLETED) { @@ -1065,20 +1108,17 @@ public class Network extends Thread { // Shutdown public void shutdown() { - this.isStopping = true; - // Close listen socket to prevent more incoming connections - if (serverChannel.isOpen()) + if (this.serverChannel.isOpen()) try { - serverChannel.close(); + this.serverChannel.close(); } catch (IOException e) { // Not important } // Stop processing threads - this.networkingExecutor.shutdownNow(); try { - if (!this.networkingExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS)) + if (!this.networkEPC.shutdown(5000)) LOGGER.debug("Network threads failed to terminate"); } catch (InterruptedException e) { LOGGER.debug("Interrupted while waiting for networking threads to terminate"); diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index 920e2e0b..ee828ac4 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -26,6 +27,7 @@ import org.qora.network.message.Message; import org.qora.network.message.Message.MessageException; import org.qora.network.message.Message.MessageType; import org.qora.settings.Settings; +import org.qora.utils.ExecuteProduceConsume; import org.qora.network.message.PingMessage; import org.qora.network.message.VersionMessage; @@ -58,6 +60,7 @@ public class Peer { private boolean isLocal; private ByteBuffer byteBuffer; private Map> replyQueues; + private LinkedBlockingQueue pendingMessages; /** True if we created connection to peer, false if we accepted incoming connection from peer. */ private final boolean isOutbound; @@ -84,7 +87,6 @@ public class Peer { private Long lastPing = null; /** When last PING message was sent, or null if pings not started yet. */ private Long lastPingSent; - private final ReentrantLock pingLock = new ReentrantLock(); /** Latest block height as reported by peer. */ private Integer lastHeight; @@ -271,6 +273,7 @@ public class Peer { this.socketChannel.configureBlocking(false); this.byteBuffer = ByteBuffer.allocate(Network.MAXIMUM_MESSAGE_SIZE); this.replyQueues = Collections.synchronizedMap(new HashMap>()); + this.pendingMessages = new LinkedBlockingQueue(); } public SocketChannel connect() { @@ -299,26 +302,29 @@ public class Peer { } /** - * Attempt to read Message from peer. + * Attempt to buffer bytes from socketChannel. * - * @return message, or null if no message or there was a problem * @throws IOException */ - public void readMessages() throws IOException { - while(true) { - Message message; + /* package */ void readChannel() throws IOException { + synchronized (this.byteBuffer) { + if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) + return; - synchronized (this) { - if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) - break; + int bytesRead = this.socketChannel.read(this.byteBuffer); + if (bytesRead == -1) { + this.disconnect("EOF"); + return; + } - int bytesRead = this.socketChannel.read(this.byteBuffer); - if (bytesRead == -1) { - this.disconnect("EOF"); - return; - } + if (bytesRead == 0) + // No room in buffer, or no more bytes to read + return; - LOGGER.trace(() -> String.format("Receiving message from peer %s", this)); + LOGGER.trace(() -> String.format("Received %d bytes from peer %s", bytesRead, this)); + + while (true) { + final Message message; // Can we build a message from buffer now? try { @@ -328,26 +334,41 @@ public class Peer { this.disconnect(e.getMessage()); return; } + + if (message == null) + return; + + LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); + + BlockingQueue queue = this.replyQueues.get(message.getId()); + if (queue != null) { + // Adding message to queue will unblock thread waiting for response + this.replyQueues.get(message.getId()).add(message); + // Consumed elsewhere + continue; + } + + // No thread waiting for message so we need to pass it up to network layer + + // Add message to pending queue + if (!this.pendingMessages.offer(message)) { + LOGGER.info(String.format("No room to queue message from peer %s - discarding", this)); + return; + } } - - if (message == null) - return; - - LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); - - BlockingQueue queue = this.replyQueues.get(message.getId()); - if (queue != null) { - // Adding message to queue will unblock thread waiting for response - this.replyQueues.get(message.getId()).add(message); - // Consumed elsewhere - continue; - } - - // No thread waiting for message so pass up to network layer - Network.getInstance().onMessage(this, message); } } + /* package */ ExecuteProduceConsume.Task getMessageTask() { + final Message nextMessage = this.pendingMessages.poll(); + + if (nextMessage == null) + return null; + + // Return a task to process message in queue + return () -> Network.getInstance().onMessage(this, nextMessage); + } + /** * Attempt to send Message to peer. * @@ -427,30 +448,27 @@ public class Peer { } } - public void startPings() { + /* package */ void startPings() { // Replacing initial null value allows pingCheck() to start sending pings. LOGGER.trace(() -> String.format("Enabling pings for peer %s", this)); - this.lastPingSent = 0L; //System.currentTimeMillis(); + this.lastPingSent = System.currentTimeMillis(); } - /* package */ void pingCheck() { - LOGGER.trace(() -> String.format("Ping check for peer %s", this)); + /* package */ ExecuteProduceConsume.Task getPingTask() { + // Pings not enabled yet? + if (this.lastPingSent == null) + return null; - if (!this.pingLock.tryLock()) - return; // Some other thread is already checking ping status for this peer + final long now = System.currentTimeMillis(); - try { - // Pings not enabled yet? - if (this.lastPingSent == null) - return; + // Time to send another ping? + if (now < this.lastPingSent + PING_INTERVAL) + return null; // Not yet - final long now = System.currentTimeMillis(); + // Not strictly true, but prevents this peer from being immediately chosen again + this.lastPingSent = now; - // Time to send another ping? - if (now < this.lastPingSent + PING_INTERVAL) - return; // Not yet - - this.lastPingSent = now; + return () -> { PingMessage pingMessage = new PingMessage(); Message message = this.getResponse(pingMessage); final long after = System.currentTimeMillis(); @@ -461,11 +479,7 @@ public class Peer { } this.setLastPing(after - now); - } catch (InterruptedException e) { - // Shutdown situation - } finally { - this.pingLock.unlock(); - } + }; } public void disconnect(String reason) { diff --git a/src/main/java/org/qora/utils/ExecuteProduceConsume.java b/src/main/java/org/qora/utils/ExecuteProduceConsume.java new file mode 100644 index 00000000..44d2d6fc --- /dev/null +++ b/src/main/java/org/qora/utils/ExecuteProduceConsume.java @@ -0,0 +1,149 @@ +package org.qora.utils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class ExecuteProduceConsume implements Runnable { + + private final String className; + private final Logger logger; + + private ExecutorService executor; + private int activeThreadCount = 0; + private int greatestActiveThreadCount = 0; + private int consumerCount = 0; + + private boolean hasThreadPending = false; + + public ExecuteProduceConsume(ExecutorService executor) { + className = this.getClass().getSimpleName(); + logger = LogManager.getLogger(this.getClass()); + + this.executor = executor; + } + + public ExecuteProduceConsume() { + this(Executors.newCachedThreadPool()); + } + + public void start() { + executor.execute(this); + } + + public void shutdown() { + executor.shutdownNow(); + } + + public boolean shutdown(long timeout) throws InterruptedException { + executor.shutdownNow(); + return executor.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } + + public int getActiveThreadCount() { + synchronized (this) { + return activeThreadCount; + } + } + + public int getGreatestActiveThreadCount() { + synchronized (this) { + return greatestActiveThreadCount; + } + } + + /** + * Returns a Task to be performed, possibly blocking. + * + * @param canBlock + * @return task to be performed, or null if no task pending. + * @throws InterruptedException + */ + protected abstract Task produceTask(boolean canBlock) throws InterruptedException; + + @FunctionalInterface + public interface Task { + public abstract void perform() throws InterruptedException; + } + + @Override + public void run() { + Thread.currentThread().setName(className + "-" + Thread.currentThread().getId()); + + synchronized (this) { + ++activeThreadCount; + if (activeThreadCount > greatestActiveThreadCount) + greatestActiveThreadCount = activeThreadCount; + + logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d", + Thread.currentThread().getId(), hasThreadPending, activeThreadCount)); + + hasThreadPending = false; + } + + try { + boolean canBlock = false; + + while (true) { + final Task task; + + logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId())); + + synchronized (this) { + final boolean lambdaCanIdle = canBlock; + logger.trace(() -> String.format("[%d] producing, canBlock is %b...", Thread.currentThread().getId(), lambdaCanIdle)); + task = produceTask(canBlock); + } + + if (task == null) + synchronized (this) { + logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d", + Thread.currentThread().getId(), activeThreadCount, consumerCount)); + + if (activeThreadCount > consumerCount + 1) { + --activeThreadCount; + logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d", Thread.currentThread().getId(), activeThreadCount)); + break; + } + + // We're the last surviving thread - producer can afford to block next round + canBlock = true; + + continue; + } + + // We have a task + + synchronized (this) { + ++consumerCount; + + if (!hasThreadPending) { + logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId())); + hasThreadPending = true; + executor.execute(this); // Same object, different thread + } + } + + logger.trace(() -> String.format("[%d] performing task...", Thread.currentThread().getId())); + task.perform(); // This can block for a while + logger.trace(() -> String.format("[%d] finished task", Thread.currentThread().getId())); + + synchronized (this) { + --consumerCount; + + // Quicker, non-blocking produce next round + canBlock = false; + } + } + } catch (InterruptedException | RejectedExecutionException e) { + // We're in shutdown situation so exit + } finally { + Thread.currentThread().setName(className + "-dormant"); + } + } + +} diff --git a/src/test/java/org/qora/test/ThreadTests.java b/src/test/java/org/qora/test/ThreadTests.java new file mode 100644 index 00000000..a37a90f7 --- /dev/null +++ b/src/test/java/org/qora/test/ThreadTests.java @@ -0,0 +1,136 @@ +package org.qora.test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import org.junit.Test; +import org.qora.utils.ExecuteProduceConsume; + +public class ThreadTests { + + private void testEPC(ExecuteProduceConsume testEPC) throws InterruptedException { + testEPC.start(); + + // Let it run for a minute + for (int s = 1; s <= 60; ++s) { + Thread.sleep(1000); + System.out.println(String.format("After %d second%s, active threads: %d, greatest thread count: %d", s, (s != 1 ? "s" : "") , testEPC.getActiveThreadCount(), testEPC.getGreatestActiveThreadCount())); + } + + final long before = System.currentTimeMillis(); + testEPC.shutdown(30 * 1000); + final long after = System.currentTimeMillis(); + + System.out.println(String.format("Shutdown took %d milliseconds", after - before)); + System.out.println(String.format("Greatest thread count: %d", testEPC.getGreatestActiveThreadCount())); + } + + @Test + public void testRandomEPC() throws InterruptedException { + final int TASK_PERCENT = 25; // Produce a task this % of the time + final int PAUSE_PERCENT = 80; // Pause for new work this % of the time + + class RandomEPC extends ExecuteProduceConsume { + @Override + protected Task produceTask(boolean canIdle) throws InterruptedException { + Random random = new Random(); + + final int percent = random.nextInt(100); + + // Sometimes produce a task + if (percent < TASK_PERCENT) { + return new Task() { + @Override + public void perform() throws InterruptedException { + Thread.sleep(random.nextInt(500) + 100); + } + }; + } else { + // If we don't produce a task, then maybe simulate a pause until work arrives + if (canIdle && percent < PAUSE_PERCENT) + Thread.sleep(random.nextInt(100)); + + return null; + } + } + } + + testEPC(new RandomEPC()); + } + + /** + * Test ping scenario with many peers requiring pings. + *

+ * Specifically, if: + *

    + *
  • the idling EPC thread sleeps for 1 second
  • + *
  • pings are required every P seconds
  • + *
  • there are way more than P peers
  • + *
+ * then we need to make sure EPC threads are not + * delayed such that some peers (>P) don't get a + * chance to be pinged. + */ + @Test + public void testPingEPC() throws InterruptedException { + final long PRODUCER_SLEEP_TIME = 1000; // ms + final long PING_INTERVAL = PRODUCER_SLEEP_TIME * 8; // ms + final long PING_ROUND_TRIP_TIME = PRODUCER_SLEEP_TIME * 5; // ms + + final int MAX_PEERS = 20; + + final List lastPings = new ArrayList<>(Collections.nCopies(MAX_PEERS, System.currentTimeMillis())); + + class PingTask implements ExecuteProduceConsume.Task { + private final int peerIndex; + + public PingTask(int peerIndex) { + this.peerIndex = peerIndex; + } + + @Override + public void perform() throws InterruptedException { + System.out.println("Pinging peer " + peerIndex); + + // At least half the worst case ping round-trip + Random random = new Random(); + int halfTime = (int) PING_ROUND_TRIP_TIME / 2; + long sleep = random.nextInt(halfTime) + halfTime; + Thread.sleep(sleep); + } + } + + class PingEPC extends ExecuteProduceConsume { + @Override + protected Task produceTask(boolean canIdle) throws InterruptedException { + // If we can idle, then we do, to simulate worst case + if (canIdle) + Thread.sleep(PRODUCER_SLEEP_TIME); + + // Is there a peer that needs a ping? + final long now = System.currentTimeMillis(); + synchronized (lastPings) { + for (int peerIndex = 0; peerIndex < lastPings.size(); ++peerIndex) { + long lastPing = lastPings.get(peerIndex); + + if (lastPing < now - PING_INTERVAL - PING_ROUND_TRIP_TIME - PRODUCER_SLEEP_TIME) + throw new RuntimeException("excessive peer ping interval for peer " + peerIndex); + + if (lastPing < now - PING_INTERVAL) { + lastPings.set(peerIndex, System.currentTimeMillis()); + return new PingTask(peerIndex); + } + } + } + + // No work to do + return null; + } + } + + testEPC(new PingEPC()); + } + +}