diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index 99ea7210..0e9ac32b 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -96,22 +96,24 @@ public class Network { private final String ourNodeId = Crypto.toNodeAddress(edPublicKeyParams.getEncoded()); private final int maxMessageSize; + private final int minOutboundPeers; + private final int maxPeers; private final List allKnownPeers = new ArrayList<>(); private final List connectedPeers = new ArrayList<>(); private final List selfPeers = new ArrayList<>(); - private ExecuteProduceConsume networkEPC; + private final ExecuteProduceConsume networkEPC; private Selector channelSelector; private ServerSocketChannel serverChannel; private Iterator channelIterator = null; - private int minOutboundPeers; - private int maxPeers; - private long nextConnectTaskTimestamp = 0L; // ms - try first connect once NTP syncs + // volatile because value is updated inside any one of the EPC threads + private volatile long nextConnectTaskTimestamp = 0L; // ms - try first connect once NTP syncs private ExecutorService broadcastExecutor = Executors.newCachedThreadPool(); - private long nextBroadcastTimestamp = 0L; // ms - try first broadcast once NTP syncs + // volatile because value is updated inside any one of the EPC threads + private volatile long nextBroadcastTimestamp = 0L; // ms - try first broadcast once NTP syncs private final Lock mergePeersLock = new ReentrantLock(); @@ -429,35 +431,38 @@ public class Network { private Task maybeProduceChannelTask(boolean canBlock) throws InterruptedException { final SelectionKey nextSelectionKey; - // anything to do? - if (channelIterator == null) { - try { - if (canBlock) - channelSelector.select(1000L); - else - channelSelector.selectNow(); - } catch (IOException e) { - LOGGER.warn(String.format("Channel selection threw IOException: %s", e.getMessage())); - return null; + // Synchronization here to enforce thread-safety on channelIterator + synchronized (channelSelector) { + // anything to do? + if (channelIterator == null) { + try { + if (canBlock) + channelSelector.select(1000L); + else + channelSelector.selectNow(); + } catch (IOException e) { + LOGGER.warn(String.format("Channel selection threw IOException: %s", e.getMessage())); + return null; + } + + if (Thread.currentThread().isInterrupted()) + throw new InterruptedException(); + + channelIterator = channelSelector.selectedKeys().iterator(); } - if (Thread.currentThread().isInterrupted()) - throw new InterruptedException(); + if (channelIterator.hasNext()) { + nextSelectionKey = channelIterator.next(); + channelIterator.remove(); + } else { + nextSelectionKey = null; + channelIterator = null; // Nothing to do so reset iterator to cause new select + } - channelIterator = channelSelector.selectedKeys().iterator(); + LOGGER.trace(() -> String.format("Thread %d, nextSelectionKey %s, channelIterator now %s", + Thread.currentThread().getId(), nextSelectionKey, channelIterator)); } - if (channelIterator.hasNext()) { - nextSelectionKey = channelIterator.next(); - channelIterator.remove(); - } else { - nextSelectionKey = null; - channelIterator = null; // Nothing to do so reset iterator to cause new select - } - - LOGGER.trace(() -> String.format("Thread %d, nextSelectionKey %s, channelIterator now %s", - Thread.currentThread().getId(), nextSelectionKey, channelIterator)); - if (nextSelectionKey == null) return null;