Networking work-in-progress:

When node has reached max connections, Network will ignore pending incoming connections by:
1. not calling accept()
2. de-registering OP_ACCEPT 'interest op' on the listen socket's channel

When a peer disconnects, Network might re-register OP_ACCEPT interest op on listen socket.
This commit is contained in:
catbref 2022-03-19 13:16:32 +00:00
parent a5fb0be274
commit 6255b2a907

View File

@ -43,7 +43,7 @@ public class Network {
private static final Logger LOGGER = LogManager.getLogger(Network.class); private static final Logger LOGGER = LogManager.getLogger(Network.class);
private static Network instance; private static Network instance;
private static final int LISTEN_BACKLOG = 10; private static final int LISTEN_BACKLOG = 5;
/** /**
* How long before retrying after a connection failure, in milliseconds. * How long before retrying after a connection failure, in milliseconds.
*/ */
@ -122,6 +122,7 @@ public class Network {
private final ExecuteProduceConsume networkEPC; private final ExecuteProduceConsume networkEPC;
private Selector channelSelector; private Selector channelSelector;
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
private SelectionKey serverSelectionKey;
private Iterator<SelectionKey> channelIterator = null; private Iterator<SelectionKey> channelIterator = null;
// volatile because value is updated inside any one of the EPC threads // volatile because value is updated inside any one of the EPC threads
@ -170,7 +171,7 @@ public class Network {
serverChannel.configureBlocking(false); serverChannel.configureBlocking(false);
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.bind(endpoint, LISTEN_BACKLOG); serverChannel.bind(endpoint, LISTEN_BACKLOG);
serverChannel.register(channelSelector, SelectionKey.OP_ACCEPT); serverSelectionKey = serverChannel.register(channelSelector, SelectionKey.OP_ACCEPT);
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
LOGGER.error("Can't bind listen socket to address {}", Settings.getInstance().getBindAddress()); LOGGER.error("Can't bind listen socket to address {}", Settings.getInstance().getBindAddress());
throw new IOException("Can't bind listen socket to address", e); throw new IOException("Can't bind listen socket to address", e);
@ -657,6 +658,15 @@ public class Network {
SocketChannel socketChannel; SocketChannel socketChannel;
try { try {
if (getImmutableConnectedPeers().size() >= maxPeers) {
// We have enough peers
if (serverSelectionKey.interestOps() != 0) {
LOGGER.debug("Ignoring pending incoming connections because the server is full");
serverSelectionKey.interestOps(0);
}
return;
}
socketChannel = serverSocketChannel.accept(); socketChannel = serverSocketChannel.accept();
} catch (IOException e) { } catch (IOException e) {
return; return;
@ -688,13 +698,6 @@ public class Network {
return; return;
} }
if (getImmutableConnectedPeers().size() >= maxPeers) {
// We have enough peers
LOGGER.debug("Connection discarded from peer {} because the server is full", address);
socketChannel.close();
return;
}
LOGGER.debug("Connection accepted from peer {}", address); LOGGER.debug("Connection accepted from peer {}", address);
newPeer = new Peer(socketChannel, channelSelector); newPeer = new Peer(socketChannel, channelSelector);
@ -783,6 +786,10 @@ public class Network {
} }
private boolean connectPeer(Peer newPeer) throws InterruptedException { private boolean connectPeer(Peer newPeer) throws InterruptedException {
// NOT CORRECT:
if (getImmutableConnectedPeers().size() >= minOutboundPeers)
return false;
SocketChannel socketChannel = newPeer.connect(this.channelSelector); SocketChannel socketChannel = newPeer.connect(this.channelSelector);
if (socketChannel == null) { if (socketChannel == null) {
return false; return false;
@ -866,6 +873,15 @@ public class Network {
} }
this.removeConnectedPeer(peer); this.removeConnectedPeer(peer);
if (getImmutableConnectedPeers().size() < maxPeers - 1 && (serverSelectionKey.interestOps() & SelectionKey.OP_ACCEPT) == 0) {
try {
LOGGER.debug("Re-enabling accepting incoming connections because the server is not longer full");
serverSelectionKey.interestOps(SelectionKey.OP_ACCEPT);
} catch (CancelledKeyException e) {
LOGGER.error("Failed to re-enable accepting of incoming connections: {}", e.getMessage());
}
}
} }
public void peerMisbehaved(Peer peer) { public void peerMisbehaved(Peer peer) {