From 3505788d42c6812129623a809b5f90338275db3c Mon Sep 17 00:00:00 2001 From: catbref Date: Thu, 31 Mar 2022 20:52:23 +0100 Subject: [PATCH] Another chunk of improvements to networking / EPC. Instead of synchronizing/blocking in Peer.sendMessage(), we queue messages in a concurrent blocking TransferQueue, with timeout. In EPC, ChannelWriteTasks consume from TransferQueue, unblocking callers to Peer.sendMessage(). If a new message is to be sent, or socket output buffer is full, then OP_WRITE is used to wait for socket to become writable again. Only one ChannelWriteTask per peer can be active/pending at a time. Each ChannelWriteTask tries to send as much as it can in one go. Other minor tidy-ups. --- src/main/java/org/qortal/network/Network.java | 50 ++++--- src/main/java/org/qortal/network/Peer.java | 123 ++++++++++-------- .../network/task/ChannelAcceptTask.java | 6 +- .../qortal/network/task/ChannelReadTask.java | 12 +- .../qortal/network/task/ChannelWriteTask.java | 22 ++-- 5 files changed, 118 insertions(+), 95 deletions(-) diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index 3ee7af75..fff27390 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -33,6 +33,7 @@ import java.nio.channels.*; import java.security.SecureRandom; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -42,7 +43,6 @@ import java.util.stream.Collectors; // For managing peers public class Network { private static final Logger LOGGER = LogManager.getLogger(Network.class); - private static Network instance; private static final int LISTEN_BACKLOG = 5; /** @@ -124,14 +124,9 @@ public class Network { private Selector channelSelector; private ServerSocketChannel serverChannel; private SelectionKey serverSelectionKey; - private Iterator channelIterator = null; - - // volatile because value is updated inside any one of the EPC threads - private volatile long nextConnectTaskTimestamp = 0L; // ms - try first connect once NTP syncs + private final Set channelsPendingWrite = ConcurrentHashMap.newKeySet(); private final ExecutorService broadcastExecutor = Executors.newCachedThreadPool(); - // volatile because value is updated inside any one of the EPC threads - private volatile long nextBroadcastTimestamp = 0L; // ms - try first broadcast once NTP syncs private final Lock mergePeersLock = new ReentrantLock(); @@ -460,6 +455,11 @@ public class Network { class NetworkProcessor extends ExecuteProduceConsume { + private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs + private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs + + private Iterator channelIterator = null; + NetworkProcessor(ExecutorService executor) { super(executor); } @@ -517,7 +517,7 @@ public class Network { } private Task maybeProduceConnectPeerTask(Long now) throws InterruptedException { - if (now == null || now < nextConnectTaskTimestamp) { + if (now == null || now < nextConnectTaskTimestamp.get()) { return null; } @@ -525,7 +525,7 @@ public class Network { return null; } - nextConnectTaskTimestamp = now + 1000L; + nextConnectTaskTimestamp.set(now + 1000L); Peer targetPeer = getConnectablePeer(now); if (targetPeer == null) { @@ -537,11 +537,11 @@ public class Network { } private Task maybeProduceBroadcastTask(Long now) { - if (now == null || now < nextBroadcastTimestamp) { + if (now == null || now < nextBroadcastTimestamp.get()) { return null; } - nextBroadcastTimestamp = now + BROADCAST_INTERVAL; + nextBroadcastTimestamp.set(now + BROADCAST_INTERVAL); return new BroadcastTask(); } @@ -584,19 +584,34 @@ public class Network { LOGGER.trace("Thread {}, nextSelectionKey {}", Thread.currentThread().getId(), nextSelectionKey); + SelectableChannel socketChannel = nextSelectionKey.channel(); + if (nextSelectionKey.isReadable()) { clearInterestOps(nextSelectionKey, SelectionKey.OP_READ); - return new ChannelReadTask(nextSelectionKey); + Peer peer = getPeerFromChannel((SocketChannel) socketChannel); + if (peer == null) + return null; + + return new ChannelReadTask((SocketChannel) socketChannel, peer); } if (nextSelectionKey.isWritable()) { clearInterestOps(nextSelectionKey, SelectionKey.OP_WRITE); - return new ChannelWriteTask(nextSelectionKey); + Peer peer = getPeerFromChannel((SocketChannel) socketChannel); + if (peer == null) + return null; + + // Any thread that queues a message to send can set OP_WRITE, + // but we only allow one pending/active ChannelWriteTask per Peer + if (!channelsPendingWrite.add(socketChannel)) + return null; + + return new ChannelWriteTask((SocketChannel) socketChannel, peer); } if (nextSelectionKey.isAcceptable()) { clearInterestOps(nextSelectionKey, SelectionKey.OP_ACCEPT); - return new ChannelAcceptTask(nextSelectionKey); + return new ChannelAcceptTask((ServerSocketChannel) socketChannel); } } @@ -788,7 +803,11 @@ public class Network { selectionKey.interestOpsOr(interestOps); } - // Peer callbacks + // Peer / Task callbacks + + public void notifyChannelNotWriting(SelectableChannel socketChannel) { + this.channelsPendingWrite.remove(socketChannel); + } protected void wakeupChannelSelector() { this.channelSelector.wakeup(); @@ -823,6 +842,7 @@ public class Network { } this.removeConnectedPeer(peer); + this.channelsPendingWrite.remove(peer.getSocketChannel()); if (getImmutableConnectedPeers().size() < maxPeers - 1 && (serverSelectionKey.interestOps() & SelectionKey.OP_ACCEPT) == 0) { try { diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index a755632d..9d29fc1f 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -44,9 +44,9 @@ public class Peer { private static final int RESPONSE_TIMEOUT = 3000; // ms /** - * Maximum time to wait for a peer to respond with blocks (ms) + * Maximum time to wait for a message to be added to sendQueue (ms) */ - public static final int FETCH_BLOCKS_TIMEOUT = 10000; + private static final int QUEUE_TIMEOUT = 1000; // ms /** * Interval between PING messages to a peer. (ms) @@ -67,10 +67,14 @@ public class Peer { private final UUID peerConnectionId = UUID.randomUUID(); private final Object byteBufferLock = new Object(); private ByteBuffer byteBuffer; - private Map> replyQueues; private LinkedBlockingQueue pendingMessages; + private TransferQueue sendQueue; + private ByteBuffer outputBuffer; + private String outputMessageType; + private int outputMessageId; + /** * True if we created connection to peer, false if we accepted incoming connection from peer. */ @@ -342,12 +346,6 @@ public class Peer { } } - protected void queueMessage(Message message) { - if (!this.pendingMessages.offer(message)) { - LOGGER.info("[{}] No room to queue message from peer {} - discarding", this.peerConnectionId, this); - } - } - public boolean isSyncInProgress() { return this.syncInProgress; } @@ -398,6 +396,7 @@ public class Peer { this.socketChannel.configureBlocking(false); Network.getInstance().setInterestOps(this.socketChannel, SelectionKey.OP_READ); this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC! + this.sendQueue = new LinkedTransferQueue<>(); this.replyQueues = new ConcurrentHashMap<>(); this.pendingMessages = new LinkedBlockingQueue<>(); @@ -557,8 +556,59 @@ public class Peer { * @return true if more data is pending to be sent */ public boolean writeChannel() throws IOException { - // TODO - return false; + // It is the responsibility of ChannelWriteTask's producer to produce only one call to writeChannel() at a time + + while (true) { + // If output byte buffer is null, fetch next message from queue (if any) + while (this.outputBuffer == null) { + Message message; + + try { + // Allow other thread time to add message to queue having raised OP_WRITE. + // Timeout is overkill but not excessive enough to clog up networking / EPC. + // This is to avoid race condition in sendMessageWithTimeout() below. + message = this.sendQueue.poll(QUEUE_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Shutdown situation + return false; + } + + // No message? No further work to be done + if (message == null) + return false; + + try { + this.outputBuffer = ByteBuffer.wrap(message.toBytes()); + this.outputMessageType = message.getType().name(); + this.outputMessageId = message.getId(); + + LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", + this.peerConnectionId, this.outputMessageType, this.outputMessageId, this); + } catch (MessageException e) { + // Something went wrong converting message to bytes, so discard but allow another round + LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId, + message.getType().name(), message.getId(), this, e.getMessage()); + } + } + + // If output byte buffer is not null, send from that + int bytesWritten = this.socketChannel.write(outputBuffer); + + LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", this.peerConnectionId, + bytesWritten, this.outputMessageType, this.outputMessageId, this, outputBuffer.limit()); + + // If we've sent 0 bytes then socket buffer is full so we need to wait until it's empty again + if (bytesWritten == 0) { + return true; + } + + // If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more) + if (!this.outputBuffer.hasRemaining()) { + this.outputMessageType = null; + this.outputMessageId = 0; + this.outputBuffer = null; + } + } } protected Task getMessageTask() { @@ -610,54 +660,19 @@ public class Peer { } try { - // Send message - LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", this.peerConnectionId, + // Queue message, to be picked up by ChannelWriteTask and then peer.writeChannel() + LOGGER.trace("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId, message.getType().name(), message.getId(), this); - ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes()); - - synchronized (this.socketChannel) { - final long sendStart = System.currentTimeMillis(); - long totalBytes = 0; - - while (outputBuffer.hasRemaining()) { - int bytesWritten = this.socketChannel.write(outputBuffer); - totalBytes += bytesWritten; - - LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", this.peerConnectionId, - bytesWritten, message.getType().name(), message.getId(), this, totalBytes); - - if (bytesWritten == 0) { - // Underlying socket's internal buffer probably full, - // so wait a short while for bytes to actually be transmitted over the wire - - /* - * NOSONAR squid:S2276 - we don't want to use this.socketChannel.wait() - * as this releases the lock held by synchronized() above - * and would allow another thread to send another message, - * potentially interleaving them on-the-wire, causing checksum failures - * and connection loss. - */ - Thread.sleep(100L); //NOSONAR squid:S2276 - - if (System.currentTimeMillis() - sendStart > timeout) { - // We've taken too long to send this message - return false; - } - } - } - } - } catch (MessageException e) { - LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId, - message.getType().name(), message.getId(), this, e.getMessage()); - return false; - } catch (IOException | InterruptedException e) { + // Possible race condition: + // We set OP_WRITE, EPC creates ChannelWriteTask which calls Peer.writeChannel, writeChannel's poll() finds no message to send + // Avoided by poll-with-timeout in writeChannel() above. + Network.getInstance().setInterestOps(this.socketChannel, SelectionKey.OP_WRITE); + return this.sendQueue.tryTransfer(message, timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { // Send failure return false; } - - // Sent OK - return true; } /** diff --git a/src/main/java/org/qortal/network/task/ChannelAcceptTask.java b/src/main/java/org/qortal/network/task/ChannelAcceptTask.java index b98a881a..3e2a3033 100644 --- a/src/main/java/org/qortal/network/task/ChannelAcceptTask.java +++ b/src/main/java/org/qortal/network/task/ChannelAcceptTask.java @@ -18,12 +18,10 @@ import java.util.List; public class ChannelAcceptTask implements Task { private static final Logger LOGGER = LogManager.getLogger(ChannelAcceptTask.class); - private final SelectionKey serverSelectionKey; private final ServerSocketChannel serverSocketChannel; - public ChannelAcceptTask(SelectionKey selectionKey) { - this.serverSelectionKey = selectionKey; - this.serverSocketChannel = (ServerSocketChannel) this.serverSelectionKey.channel(); + public ChannelAcceptTask(ServerSocketChannel serverSocketChannel) { + this.serverSocketChannel = serverSocketChannel; } @Override diff --git a/src/main/java/org/qortal/network/task/ChannelReadTask.java b/src/main/java/org/qortal/network/task/ChannelReadTask.java index ad190ef2..edd4e8c0 100644 --- a/src/main/java/org/qortal/network/task/ChannelReadTask.java +++ b/src/main/java/org/qortal/network/task/ChannelReadTask.java @@ -14,15 +14,13 @@ import java.nio.channels.SocketChannel; public class ChannelReadTask implements Task { private static final Logger LOGGER = LogManager.getLogger(ChannelReadTask.class); - private final SelectionKey selectionKey; private final SocketChannel socketChannel; private final Peer peer; private final String name; - public ChannelReadTask(SelectionKey selectionKey) { - this.selectionKey = selectionKey; - this.socketChannel = (SocketChannel) this.selectionKey.channel(); - this.peer = Network.getInstance().getPeerFromChannel(this.socketChannel); + public ChannelReadTask(SocketChannel socketChannel, Peer peer) { + this.socketChannel = socketChannel; + this.peer = peer; this.name = "ChannelReadTask::" + peer; } @@ -33,10 +31,6 @@ public class ChannelReadTask implements Task { @Override public void perform() throws InterruptedException { - if (peer == null) { - return; - } - try { peer.readChannel(); diff --git a/src/main/java/org/qortal/network/task/ChannelWriteTask.java b/src/main/java/org/qortal/network/task/ChannelWriteTask.java index 757fa01d..59bc557e 100644 --- a/src/main/java/org/qortal/network/task/ChannelWriteTask.java +++ b/src/main/java/org/qortal/network/task/ChannelWriteTask.java @@ -13,15 +13,13 @@ import java.nio.channels.SocketChannel; public class ChannelWriteTask implements Task { private static final Logger LOGGER = LogManager.getLogger(ChannelWriteTask.class); - private final SelectionKey selectionKey; private final SocketChannel socketChannel; private final Peer peer; private final String name; - public ChannelWriteTask(SelectionKey selectionKey) { - this.selectionKey = selectionKey; - this.socketChannel = (SocketChannel) this.selectionKey.channel(); - this.peer = Network.getInstance().getPeerFromChannel(this.socketChannel); + public ChannelWriteTask(SocketChannel socketChannel, Peer peer) { + this.socketChannel = socketChannel; + this.peer = peer; this.name = "ChannelWriteTask::" + peer; } @@ -32,16 +30,14 @@ public class ChannelWriteTask implements Task { @Override public void perform() throws InterruptedException { - if (peer == null) { - return; - } - try { - boolean isMoreDataPending = peer.writeChannel(); + boolean isSocketClogged = peer.writeChannel(); - if (isMoreDataPending) { - Network.getInstance().setInterestOps(socketChannel, SelectionKey.OP_WRITE); - } + // Tell Network that we've finished + Network.getInstance().notifyChannelNotWriting(socketChannel); + + if (isSocketClogged) + Network.getInstance().setInterestOps(this.socketChannel, SelectionKey.OP_WRITE); } catch (IOException e) { if (e.getMessage() != null && e.getMessage().toLowerCase().contains("connection reset")) { peer.disconnect("Connection reset");