Another chunk of improvements to networking / EPC.

Instead of synchronizing/blocking in Peer.sendMessage(),
we queue messages in a concurrent blocking TransferQueue, with timeout.

In EPC, ChannelWriteTasks consume from TransferQueue, unblocking callers to Peer.sendMessage().

If a new message is to be sent, or socket output buffer is full,
then OP_WRITE is used to wait for socket to become writable again.

Only one ChannelWriteTask per peer can be active/pending at a time.
Each ChannelWriteTask tries to send as much as it can in one go.

Other minor tidy-ups.
This commit is contained in:
catbref 2022-03-31 20:52:23 +01:00
parent 91e0c9b940
commit 3505788d42
5 changed files with 118 additions and 95 deletions

View File

@ -33,6 +33,7 @@ import java.nio.channels.*;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
@ -42,7 +43,6 @@ import java.util.stream.Collectors;
// For managing peers
public class Network {
private static final Logger LOGGER = LogManager.getLogger(Network.class);
private static Network instance;
private static final int LISTEN_BACKLOG = 5;
/**
@ -124,14 +124,9 @@ public class Network {
private Selector channelSelector;
private ServerSocketChannel serverChannel;
private SelectionKey serverSelectionKey;
private Iterator<SelectionKey> channelIterator = null;
// volatile because value is updated inside any one of the EPC threads
private volatile long nextConnectTaskTimestamp = 0L; // ms - try first connect once NTP syncs
private final Set<SelectableChannel> channelsPendingWrite = ConcurrentHashMap.newKeySet();
private final ExecutorService broadcastExecutor = Executors.newCachedThreadPool();
// volatile because value is updated inside any one of the EPC threads
private volatile long nextBroadcastTimestamp = 0L; // ms - try first broadcast once NTP syncs
private final Lock mergePeersLock = new ReentrantLock();
@ -460,6 +455,11 @@ public class Network {
class NetworkProcessor extends ExecuteProduceConsume {
private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs
private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs
private Iterator<SelectionKey> channelIterator = null;
NetworkProcessor(ExecutorService executor) {
super(executor);
}
@ -517,7 +517,7 @@ public class Network {
}
private Task maybeProduceConnectPeerTask(Long now) throws InterruptedException {
if (now == null || now < nextConnectTaskTimestamp) {
if (now == null || now < nextConnectTaskTimestamp.get()) {
return null;
}
@ -525,7 +525,7 @@ public class Network {
return null;
}
nextConnectTaskTimestamp = now + 1000L;
nextConnectTaskTimestamp.set(now + 1000L);
Peer targetPeer = getConnectablePeer(now);
if (targetPeer == null) {
@ -537,11 +537,11 @@ public class Network {
}
private Task maybeProduceBroadcastTask(Long now) {
if (now == null || now < nextBroadcastTimestamp) {
if (now == null || now < nextBroadcastTimestamp.get()) {
return null;
}
nextBroadcastTimestamp = now + BROADCAST_INTERVAL;
nextBroadcastTimestamp.set(now + BROADCAST_INTERVAL);
return new BroadcastTask();
}
@ -584,19 +584,34 @@ public class Network {
LOGGER.trace("Thread {}, nextSelectionKey {}", Thread.currentThread().getId(), nextSelectionKey);
SelectableChannel socketChannel = nextSelectionKey.channel();
if (nextSelectionKey.isReadable()) {
clearInterestOps(nextSelectionKey, SelectionKey.OP_READ);
return new ChannelReadTask(nextSelectionKey);
Peer peer = getPeerFromChannel((SocketChannel) socketChannel);
if (peer == null)
return null;
return new ChannelReadTask((SocketChannel) socketChannel, peer);
}
if (nextSelectionKey.isWritable()) {
clearInterestOps(nextSelectionKey, SelectionKey.OP_WRITE);
return new ChannelWriteTask(nextSelectionKey);
Peer peer = getPeerFromChannel((SocketChannel) socketChannel);
if (peer == null)
return null;
// Any thread that queues a message to send can set OP_WRITE,
// but we only allow one pending/active ChannelWriteTask per Peer
if (!channelsPendingWrite.add(socketChannel))
return null;
return new ChannelWriteTask((SocketChannel) socketChannel, peer);
}
if (nextSelectionKey.isAcceptable()) {
clearInterestOps(nextSelectionKey, SelectionKey.OP_ACCEPT);
return new ChannelAcceptTask(nextSelectionKey);
return new ChannelAcceptTask((ServerSocketChannel) socketChannel);
}
}
@ -788,7 +803,11 @@ public class Network {
selectionKey.interestOpsOr(interestOps);
}
// Peer callbacks
// Peer / Task callbacks
public void notifyChannelNotWriting(SelectableChannel socketChannel) {
this.channelsPendingWrite.remove(socketChannel);
}
protected void wakeupChannelSelector() {
this.channelSelector.wakeup();
@ -823,6 +842,7 @@ public class Network {
}
this.removeConnectedPeer(peer);
this.channelsPendingWrite.remove(peer.getSocketChannel());
if (getImmutableConnectedPeers().size() < maxPeers - 1 && (serverSelectionKey.interestOps() & SelectionKey.OP_ACCEPT) == 0) {
try {

View File

@ -44,9 +44,9 @@ public class Peer {
private static final int RESPONSE_TIMEOUT = 3000; // ms
/**
* Maximum time to wait for a peer to respond with blocks (ms)
* Maximum time to wait for a message to be added to sendQueue (ms)
*/
public static final int FETCH_BLOCKS_TIMEOUT = 10000;
private static final int QUEUE_TIMEOUT = 1000; // ms
/**
* Interval between PING messages to a peer. (ms)
@ -67,10 +67,14 @@ public class Peer {
private final UUID peerConnectionId = UUID.randomUUID();
private final Object byteBufferLock = new Object();
private ByteBuffer byteBuffer;
private Map<Integer, BlockingQueue<Message>> replyQueues;
private LinkedBlockingQueue<Message> pendingMessages;
private TransferQueue<Message> sendQueue;
private ByteBuffer outputBuffer;
private String outputMessageType;
private int outputMessageId;
/**
* True if we created connection to peer, false if we accepted incoming connection from peer.
*/
@ -342,12 +346,6 @@ public class Peer {
}
}
protected void queueMessage(Message message) {
if (!this.pendingMessages.offer(message)) {
LOGGER.info("[{}] No room to queue message from peer {} - discarding", this.peerConnectionId, this);
}
}
public boolean isSyncInProgress() {
return this.syncInProgress;
}
@ -398,6 +396,7 @@ public class Peer {
this.socketChannel.configureBlocking(false);
Network.getInstance().setInterestOps(this.socketChannel, SelectionKey.OP_READ);
this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC!
this.sendQueue = new LinkedTransferQueue<>();
this.replyQueues = new ConcurrentHashMap<>();
this.pendingMessages = new LinkedBlockingQueue<>();
@ -557,8 +556,59 @@ public class Peer {
* @return true if more data is pending to be sent
*/
public boolean writeChannel() throws IOException {
// TODO
return false;
// It is the responsibility of ChannelWriteTask's producer to produce only one call to writeChannel() at a time
while (true) {
// If output byte buffer is null, fetch next message from queue (if any)
while (this.outputBuffer == null) {
Message message;
try {
// Allow other thread time to add message to queue having raised OP_WRITE.
// Timeout is overkill but not excessive enough to clog up networking / EPC.
// This is to avoid race condition in sendMessageWithTimeout() below.
message = this.sendQueue.poll(QUEUE_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Shutdown situation
return false;
}
// No message? No further work to be done
if (message == null)
return false;
try {
this.outputBuffer = ByteBuffer.wrap(message.toBytes());
this.outputMessageType = message.getType().name();
this.outputMessageId = message.getId();
LOGGER.trace("[{}] Sending {} message with ID {} to peer {}",
this.peerConnectionId, this.outputMessageType, this.outputMessageId, this);
} catch (MessageException e) {
// Something went wrong converting message to bytes, so discard but allow another round
LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId,
message.getType().name(), message.getId(), this, e.getMessage());
}
}
// If output byte buffer is not null, send from that
int bytesWritten = this.socketChannel.write(outputBuffer);
LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", this.peerConnectionId,
bytesWritten, this.outputMessageType, this.outputMessageId, this, outputBuffer.limit());
// If we've sent 0 bytes then socket buffer is full so we need to wait until it's empty again
if (bytesWritten == 0) {
return true;
}
// If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more)
if (!this.outputBuffer.hasRemaining()) {
this.outputMessageType = null;
this.outputMessageId = 0;
this.outputBuffer = null;
}
}
}
protected Task getMessageTask() {
@ -610,54 +660,19 @@ public class Peer {
}
try {
// Send message
LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", this.peerConnectionId,
// Queue message, to be picked up by ChannelWriteTask and then peer.writeChannel()
LOGGER.trace("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId,
message.getType().name(), message.getId(), this);
ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes());
synchronized (this.socketChannel) {
final long sendStart = System.currentTimeMillis();
long totalBytes = 0;
while (outputBuffer.hasRemaining()) {
int bytesWritten = this.socketChannel.write(outputBuffer);
totalBytes += bytesWritten;
LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", this.peerConnectionId,
bytesWritten, message.getType().name(), message.getId(), this, totalBytes);
if (bytesWritten == 0) {
// Underlying socket's internal buffer probably full,
// so wait a short while for bytes to actually be transmitted over the wire
/*
* NOSONAR squid:S2276 - we don't want to use this.socketChannel.wait()
* as this releases the lock held by synchronized() above
* and would allow another thread to send another message,
* potentially interleaving them on-the-wire, causing checksum failures
* and connection loss.
*/
Thread.sleep(100L); //NOSONAR squid:S2276
if (System.currentTimeMillis() - sendStart > timeout) {
// We've taken too long to send this message
return false;
}
}
}
}
} catch (MessageException e) {
LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId,
message.getType().name(), message.getId(), this, e.getMessage());
return false;
} catch (IOException | InterruptedException e) {
// Possible race condition:
// We set OP_WRITE, EPC creates ChannelWriteTask which calls Peer.writeChannel, writeChannel's poll() finds no message to send
// Avoided by poll-with-timeout in writeChannel() above.
Network.getInstance().setInterestOps(this.socketChannel, SelectionKey.OP_WRITE);
return this.sendQueue.tryTransfer(message, timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Send failure
return false;
}
// Sent OK
return true;
}
/**

View File

@ -18,12 +18,10 @@ import java.util.List;
public class ChannelAcceptTask implements Task {
private static final Logger LOGGER = LogManager.getLogger(ChannelAcceptTask.class);
private final SelectionKey serverSelectionKey;
private final ServerSocketChannel serverSocketChannel;
public ChannelAcceptTask(SelectionKey selectionKey) {
this.serverSelectionKey = selectionKey;
this.serverSocketChannel = (ServerSocketChannel) this.serverSelectionKey.channel();
public ChannelAcceptTask(ServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
}
@Override

View File

@ -14,15 +14,13 @@ import java.nio.channels.SocketChannel;
public class ChannelReadTask implements Task {
private static final Logger LOGGER = LogManager.getLogger(ChannelReadTask.class);
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private final Peer peer;
private final String name;
public ChannelReadTask(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
this.socketChannel = (SocketChannel) this.selectionKey.channel();
this.peer = Network.getInstance().getPeerFromChannel(this.socketChannel);
public ChannelReadTask(SocketChannel socketChannel, Peer peer) {
this.socketChannel = socketChannel;
this.peer = peer;
this.name = "ChannelReadTask::" + peer;
}
@ -33,10 +31,6 @@ public class ChannelReadTask implements Task {
@Override
public void perform() throws InterruptedException {
if (peer == null) {
return;
}
try {
peer.readChannel();

View File

@ -13,15 +13,13 @@ import java.nio.channels.SocketChannel;
public class ChannelWriteTask implements Task {
private static final Logger LOGGER = LogManager.getLogger(ChannelWriteTask.class);
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private final Peer peer;
private final String name;
public ChannelWriteTask(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
this.socketChannel = (SocketChannel) this.selectionKey.channel();
this.peer = Network.getInstance().getPeerFromChannel(this.socketChannel);
public ChannelWriteTask(SocketChannel socketChannel, Peer peer) {
this.socketChannel = socketChannel;
this.peer = peer;
this.name = "ChannelWriteTask::" + peer;
}
@ -32,16 +30,14 @@ public class ChannelWriteTask implements Task {
@Override
public void perform() throws InterruptedException {
if (peer == null) {
return;
}
try {
boolean isMoreDataPending = peer.writeChannel();
boolean isSocketClogged = peer.writeChannel();
if (isMoreDataPending) {
Network.getInstance().setInterestOps(socketChannel, SelectionKey.OP_WRITE);
}
// Tell Network that we've finished
Network.getInstance().notifyChannelNotWriting(socketChannel);
if (isSocketClogged)
Network.getInstance().setInterestOps(this.socketChannel, SelectionKey.OP_WRITE);
} catch (IOException e) {
if (e.getMessage() != null && e.getMessage().toLowerCase().contains("connection reset")) {
peer.disconnect("Connection reset");