New network threading model

Instead of 3 threads per peer:

1. peer main thread
2. peer's unsolicited messages processor
3. peer pinger

We now use a Jetty-style Execute-Produce-Consume server threading model.

For 60 connected peers, we no longer have 180 threads but typically only
the usual ~6 threads.

Also in this commit:

* merging peers locking changed from lock() to tryLock()

* PROOF handshake maximum time difference increased from 2000ms to 5000ms

* Peers still handshaking after 60s are considered stuck and hence disconnected

* We now use NIO SocketChannels instead of raw sockets
This commit is contained in:
catbref 2019-07-09 12:16:27 +01:00
parent 67c245bb9d
commit 0d85a60c54
6 changed files with 469 additions and 370 deletions

View File

@ -625,7 +625,7 @@ public class Controller extends Thread {
continue;
// We want to update atomically so use lock
ReentrantLock peerLock = connectedPeer.getPeerLock();
ReentrantLock peerLock = connectedPeer.getPeerDataLock();
peerLock.lock();
try {
connectedPeer.setLastHeight(heightV2Message.getHeight());

View File

@ -86,7 +86,7 @@ public class Synchronizer {
int peerHeight;
byte[] peersLastBlockSignature;
ReentrantLock peerLock = peer.getPeerLock();
ReentrantLock peerLock = peer.getPeerDataLock();
peerLock.lockInterruptibly();
try {
peerHeight = peer.getLastHeight();

View File

@ -173,7 +173,8 @@ public enum Handshake {
private static final Logger LOGGER = LogManager.getLogger(Handshake.class);
private static final long MAX_TIMESTAMP_DELTA = 2000; // ms
/** Maximum allowed difference between peer's reported timestamp and when they connected, in milliseconds. */
private static final long MAX_TIMESTAMP_DELTA = 5000; // ms
public final MessageType expectedMessageType;

View File

@ -4,20 +4,27 @@ import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.net.UnknownHostException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
@ -26,6 +33,7 @@ import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qora.block.Block;
import org.qora.controller.Controller;
import org.qora.data.block.BlockData;
import org.qora.data.network.PeerData;
@ -65,6 +73,11 @@ public class Network extends Thread {
private static final long OLD_PEER_ATTEMPTED_PERIOD = 24 * 60 * 60 * 1000; // ms
/** Maximum time since last successful connection before a peer is potentially considered "old", in milliseconds. */
private static final long OLD_PEER_CONNECTION_PERIOD = 7 * 24 * 60 * 60 * 1000; // ms
/** Maximum time allowed for handshake to complete, in milliseconds. */
private static final long HANDSHAKE_TIMEOUT = 60 * 1000; // ms
/** Maximum message size (bytes). Needs to be at least maximum block size + MAGIC + message type, etc. */
/* package */ static final int MAXIMUM_MESSAGE_SIZE = 4 + 1 + 4 + Block.MAX_BLOCK_BYTES;
private static final byte[] MAINNET_MESSAGE_MAGIC = new byte[] { 0x12, 0x34, 0x56, 0x78 };
private static final byte[] TESTNET_MESSAGE_MAGIC = new byte[] { 0x78, 0x56, 0x34, 0x12 };
@ -87,11 +100,18 @@ public class Network extends Thread {
private volatile boolean isStopping = false;
private List<Peer> connectedPeers;
private List<PeerAddress> selfPeers;
private ServerSocket listenSocket;
private ExecutorService networkingExecutor;
private static Selector channelSelector;
private static ServerSocketChannel serverChannel;
private static AtomicBoolean isIterationInProgress = new AtomicBoolean(false);
private static Iterator<SelectionKey> channelIterator = null;
private static volatile boolean hasThreadPending = false;
private static AtomicInteger activeThreads = new AtomicInteger(0);
private static AtomicBoolean generalTaskLock = new AtomicBoolean(false);
private int minOutboundPeers;
private int maxPeers;
private ExecutorService peerExecutor;
private ExecutorService mergePeersExecutor;
private ExecutorService broadcastExecutor;
/** Timestamp (ms) for next general info broadcast to all connected peers. Based on <tt>System.currentTimeMillis()</tt>. */
private long nextBroadcast;
@ -108,11 +128,14 @@ public class Network extends Thread {
InetAddress bindAddr = InetAddress.getByName(Settings.getInstance().getBindAddress());
InetSocketAddress endpoint = new InetSocketAddress(bindAddr, listenPort);
channelSelector = Selector.open();
// Set up listen socket
listenSocket = new ServerSocket();
listenSocket.setReuseAddress(true);
listenSocket.setSoTimeout(1); // accept() calls block for at most 1ms
listenSocket.bind(endpoint, LISTEN_BACKLOG);
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.bind(endpoint, LISTEN_BACKLOG);
serverChannel.register(channelSelector, SelectionKey.OP_ACCEPT);
} catch (UnknownHostException e) {
LOGGER.error("Can't bind listen socket to address " + Settings.getInstance().getBindAddress());
throw new RuntimeException("Can't bind listen socket to address");
@ -130,13 +153,14 @@ public class Network extends Thread {
minOutboundPeers = Settings.getInstance().getMinOutboundPeers();
maxPeers = Settings.getInstance().getMaxPeers();
peerExecutor = Executors.newCachedThreadPool();
broadcastExecutor = Executors.newCachedThreadPool();
nextBroadcast = System.currentTimeMillis();
mergePeersLock = new ReentrantLock();
mergePeersExecutor = Executors.newCachedThreadPool();
// Start up first networking thread
networkingExecutor = Executors.newCachedThreadPool();
networkingExecutor.execute(new NetworkProcessor());
}
// Getters / setters
@ -245,89 +269,200 @@ public class Network extends Thread {
// Main thread
@Override
public void run() {
Thread.currentThread().setName("Network");
class NetworkProcessor implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("Network");
// Maintain long-term connections to various peers' API applications
try {
while (!isStopping) {
acceptConnections();
activeThreads.incrementAndGet();
LOGGER.trace(() -> String.format("Network thread %s, hasThreadPending: %s, activeThreads now: %d", Thread.currentThread().getId(), (hasThreadPending ? "yes" : "no"), activeThreads.get()));
hasThreadPending = false;
pruneOldPeers();
// Maintain long-term connections to various peers' API applications
try {
while (!isStopping) {
if (!isIterationInProgress.compareAndSet(false, true)) {
LOGGER.trace(() -> String.format("Network thread %s NOT producing (some other thread is) - exiting", Thread.currentThread().getId()));
break;
}
createConnection();
LOGGER.trace(() -> String.format("Network thread %s is producing...", Thread.currentThread().getId()));
if (System.currentTimeMillis() >= this.nextBroadcast) {
this.nextBroadcast = System.currentTimeMillis() + BROADCAST_INTERVAL;
final SelectionKey nextSelectionKey;
try {
// anything to do?
if (channelIterator == null) {
channelSelector.select(1000L);
// Controller can decide what to broadcast
Controller.getInstance().doNetworkBroadcast();
if (Thread.currentThread().isInterrupted())
break;
channelIterator = channelSelector.selectedKeys().iterator();
}
if (channelIterator.hasNext()) {
nextSelectionKey = channelIterator.next();
channelIterator.remove();
} else {
nextSelectionKey = null;
channelIterator = null; // Nothing to do so reset iterator to cause new select
}
LOGGER.trace(() -> String.format("Network thread %s produced %s, iterator now %s",
Thread.currentThread().getId(),
(nextSelectionKey == null ? "null" : nextSelectionKey.channel()),
(channelIterator == null ? "null" : channelIterator.toString())));
// Spawn another thread in case we need help
if (!hasThreadPending) {
hasThreadPending = true;
LOGGER.trace(() -> String.format("Network thread %s spawning", Thread.currentThread().getId()));
networkingExecutor.execute(this);
}
} finally {
LOGGER.trace(() -> String.format("Network thread %s done producing", Thread.currentThread().getId()));
isIterationInProgress.set(false);
}
// process
if (nextSelectionKey == null) {
// no pending tasks, but we're last remaining thread so maybe connect a new peer or do a broadcast
LOGGER.trace(() -> String.format("Network thread %s has no pending tasks", Thread.currentThread().getId()));
if (!generalTaskLock.compareAndSet(false, true))
continue;
try {
LOGGER.trace(() -> String.format("Network thread %s performing general tasks", Thread.currentThread().getId()));
pingPeers();
prunePeers();
createConnection();
if (System.currentTimeMillis() >= nextBroadcast) {
nextBroadcast = System.currentTimeMillis() + BROADCAST_INTERVAL;
// Controller can decide what to broadcast
Controller.getInstance().doNetworkBroadcast();
}
} finally {
LOGGER.trace(() -> String.format("Network thread %s finished general tasks", Thread.currentThread().getId()));
generalTaskLock.set(false);
}
} else {
try {
LOGGER.trace(() -> String.format("Network thread %s has pending channel: %s, with ops %d",
Thread.currentThread().getId(), nextSelectionKey.channel(), nextSelectionKey.readyOps()));
// process pending channel task
if (nextSelectionKey.isReadable()) {
connectionRead((SocketChannel) nextSelectionKey.channel());
} else if (nextSelectionKey.isAcceptable()) {
acceptConnection((ServerSocketChannel) nextSelectionKey.channel());
}
LOGGER.trace(() -> String.format("Network thread %s processed channel: %s", Thread.currentThread().getId(), nextSelectionKey.channel()));
} catch (CancelledKeyException e) {
LOGGER.trace(() -> String.format("Network thread %s encountered cancelled channel: %s", Thread.currentThread().getId(), nextSelectionKey.channel()));
}
}
}
// Sleep for a while
Thread.sleep(1000);
} catch (InterruptedException e) {
// Fall-through to shutdown
} catch (DataException e) {
LOGGER.warn("Repository issue while running network", e);
// Fall-through to shutdown
} catch (IOException e) {
// Fall-through to shutdown
} finally {
activeThreads.decrementAndGet();
LOGGER.trace(() -> String.format("Network thread %s ending, activeThreads now: %d", Thread.currentThread().getId(), activeThreads.get()));
Thread.currentThread().setName("Network (dormant)");
}
} catch (InterruptedException e) {
// Fall-through to shutdown
} catch (DataException e) {
LOGGER.warn("Repository issue while running network", e);
// Fall-through to shutdown
}
}
@SuppressWarnings("resource")
private void acceptConnections() throws InterruptedException {
Socket socket;
private void acceptConnection(ServerSocketChannel serverSocketChannel) throws InterruptedException {
SocketChannel socketChannel;
do {
try {
socket = this.listenSocket.accept();
} catch (SocketTimeoutException e) {
// No connections to accept
return;
} catch (IOException e) {
// Something went wrong or listen socket was closed due to shutdown
return;
}
try {
socketChannel = serverSocketChannel.accept();
} catch (IOException e) {
return;
}
Peer newPeer = null;
// No connection actually accepted?
if (socketChannel == null)
return;
Peer newPeer;
try {
synchronized (this.connectedPeers) {
if (connectedPeers.size() >= maxPeers) {
// We have enough peers
LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress()));
try {
socket.close();
} catch (IOException e) {
// Not important
}
LOGGER.trace(String.format("Connection discarded from peer %s", socketChannel.getRemoteAddress()));
return;
}
LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress()));
newPeer = new Peer(socket);
LOGGER.debug(String.format("Connection accepted from peer %s", socketChannel.getRemoteAddress()));
newPeer = new Peer(socketChannel);
this.connectedPeers.add(newPeer);
}
} catch (IOException e) {
if (socketChannel.isOpen())
try {
socketChannel.close();
} catch (IOException ce) {
}
try {
peerExecutor.execute(newPeer);
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
return;
}
try {
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
socketChannel.configureBlocking(false);
socketChannel.register(channelSelector, SelectionKey.OP_READ);
} catch (IOException e) {
// Remove from connected peers
synchronized (this.connectedPeers) {
this.connectedPeers.remove(newPeer);
}
} while (true);
return;
}
this.onPeerReady(newPeer);
}
private void pruneOldPeers() throws InterruptedException, DataException {
private void pingPeers() {
for (Peer peer : this.getConnectedPeers())
peer.pingCheck();
}
private void prunePeers() throws InterruptedException, DataException {
final long now = System.currentTimeMillis();
// Disconnect peers that are stuck during handshake
List<Peer> handshakePeers = this.getConnectedPeers();
// Disregard peers that have completed handshake or only connected recently
handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT);
for (Peer peer : handshakePeers)
peer.disconnect("handshake timeout");
// Prune 'old' peers from repository...
try (final Repository repository = RepositoryManager.getRepository()) {
// Fetch all known peers
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
// "Old" peers:
// 'Old' peers:
// we have attempted to connect within the last day
// we last managed to connect over a week ago
final long now = System.currentTimeMillis();
Predicate<PeerData> isNotOldPeer = peerData -> {
if (peerData.getLastAttempted() == null || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD)
return true;
@ -338,6 +473,7 @@ public class Network extends Thread {
return false;
};
// Disregard peers that are NOT 'old'
peers.removeIf(isNotOldPeer);
// Don't consider already connected peers (simple address match)
@ -426,7 +562,8 @@ public class Network extends Thread {
repository.saveChanges();
}
if (!newPeer.connect())
SocketChannel socketChannel = newPeer.connect();
if (socketChannel == null)
return;
if (this.isInterrupted())
@ -437,10 +574,39 @@ public class Network extends Thread {
}
try {
peerExecutor.execute(newPeer);
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
socketChannel.register(channelSelector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
// If channel has somehow already closed then remove from connectedPeers
synchronized (this.connectedPeers) {
this.connectedPeers.remove(newPeer);
}
}
this.onPeerReady(newPeer);
}
private void connectionRead(SocketChannel socketChannel) {
Peer peer = getPeerFromChannel(socketChannel);
if (peer == null)
return;
try {
peer.readMessages();
} catch (IOException e) {
LOGGER.trace(() -> String.format("Network thread %s encountered I/O error: %s", Thread.currentThread().getId(), e.getMessage()), e);
peer.disconnect("I/O error");
return;
}
}
private Peer getPeerFromChannel(SocketChannel socketChannel) {
synchronized (this.connectedPeers) {
for (Peer peer : this.connectedPeers)
if (peer.getSocketChannel() == socketChannel)
return peer;
}
return null;
}
// Peer callbacks
@ -817,63 +983,38 @@ public class Network extends Thread {
// Network-wide calls
private void mergePeers(String addedBy, List<PeerAddress> peerAddresses) {
// This can block (due to lock) so fire off in separate thread
class PeersMerger implements Runnable {
private String addedBy;
private List<PeerAddress> peerAddresses;
// Serialize using lock to prevent repository deadlocks
if (!mergePeersLock.tryLock())
return;
public PeersMerger(String addedBy, List<PeerAddress> peerAddresses) {
this.addedBy = addedBy;
this.peerAddresses = peerAddresses;
}
@Override
public void run() {
Thread.currentThread().setName(String.format("Merging peers from %s", this.addedBy));
// Serialize using lock to prevent repository deadlocks
try {
mergePeersLock.lockInterruptibly();
final long addedWhen = System.currentTimeMillis();
try {
try (final Repository repository = RepositoryManager.getRepository()) {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
// Filter out duplicates
Predicate<PeerAddress> isKnownAddress = peerAddress -> {
return knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress));
};
peerAddresses.removeIf(isKnownAddress);
// Save the rest into database
for (PeerAddress peerAddress : peerAddresses) {
PeerData peerData = new PeerData(peerAddress, addedWhen, addedBy);
LOGGER.info(String.format("Adding new peer %s to repository", peerAddress));
repository.getNetworkRepository().save(peerData);
}
repository.saveChanges();
} catch (DataException e) {
LOGGER.error("Repository issue while merging peers list from remote node", e);
}
} finally {
mergePeersLock.unlock();
}
} catch (InterruptedException e1) {
// We're exiting anyway...
}
Thread.currentThread().setName("Merging peers (dormant)");
}
}
final long addedWhen = System.currentTimeMillis();
try {
mergePeersExecutor.execute(new PeersMerger(addedBy, peerAddresses));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
try (final Repository repository = RepositoryManager.getRepository()) {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
// Filter out duplicates
Predicate<PeerAddress> isKnownAddress = peerAddress -> {
return knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress));
};
peerAddresses.removeIf(isKnownAddress);
repository.discardChanges();
// Save the rest into database
for (PeerAddress peerAddress : peerAddresses) {
PeerData peerData = new PeerData(peerAddress, addedWhen, addedBy);
LOGGER.info(String.format("Adding new peer %s to repository", peerAddress));
repository.getNetworkRepository().save(peerData);
}
repository.saveChanges();
} catch (DataException e) {
LOGGER.error("Repository issue while merging peers list from remote node", e);
}
} finally {
mergePeersLock.unlock();
}
}
@ -894,11 +1035,11 @@ public class Network extends Thread {
Random random = new Random();
for (Peer peer : targetPeers) {
// Very short sleep to reduce strain, improve multithreading and catch interrupts
// Very short sleep to reduce strain, improve multi-threading and catch interrupts
try {
Thread.sleep(random.nextInt(20) + 20);
} catch (InterruptedException e) {
return;
break;
}
Message message = peerMessageBuilder.apply(peer);
@ -909,6 +1050,8 @@ public class Network extends Thread {
if (!peer.sendMessage(message))
peer.disconnect("failed to broadcast message");
}
Thread.currentThread().setName("Network Broadcast (dormant)");
}
}
@ -925,28 +1068,20 @@ public class Network extends Thread {
this.isStopping = true;
// Close listen socket to prevent more incoming connections
if (!this.listenSocket.isClosed())
if (serverChannel.isOpen())
try {
this.listenSocket.close();
serverChannel.close();
} catch (IOException e) {
// Not important
}
// Stop our run() thread
this.interrupt();
// Stop processing threads
this.networkingExecutor.shutdownNow();
try {
this.join();
if (!this.networkingExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS))
LOGGER.debug("Network threads failed to terminate");
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for networking thread to terminate");
}
// Give up merging peer lists
this.mergePeersExecutor.shutdownNow();
try {
if (!this.mergePeersExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS))
LOGGER.debug("Peer-list merging threads failed to terminate");
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for peer-list merging threads failed to terminate");
LOGGER.debug("Interrupted while waiting for networking threads to terminate");
}
// Stop broadcasts

View File

@ -1,14 +1,13 @@
package org.qora.network;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.HashMap;
@ -16,9 +15,6 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@ -37,7 +33,7 @@ import com.google.common.net.HostAndPort;
import com.google.common.net.InetAddresses;
// For managing one peer
public class Peer extends Thread {
public class Peer {
private static final Logger LOGGER = LogManager.getLogger(Peer.class);
@ -50,31 +46,19 @@ public class Peer extends Thread {
/**
* Interval between PING messages to a peer. (ms)
* <p>
* Just under every 30s is usually ideal to keep NAT mappings refreshed,<br>
* BUT must be lower than {@link Peer#SOCKET_TIMEOUT}!
* Just under every 30s is usually ideal to keep NAT mappings refreshed.
*/
private static final int PING_INTERVAL = 8000; // ms
/** Maximum time a socket <tt>read()</tt> will block before closing connection due to timeout. (ms) */
private static final int SOCKET_TIMEOUT = 10000; // ms
private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10;
private volatile boolean isStopping = false;
private Socket socket = null;
private SocketChannel socketChannel = null;
private InetSocketAddress resolvedAddress = null;
/** True if remote address is loopback/link-local/site-local, false otherwise. */
private boolean isLocal;
private OutputStream out;
private ByteBuffer byteBuffer;
private Map<Integer, BlockingQueue<Message>> replyQueues;
private BlockingQueue<Message> unsolicitedQueue;
private ExecutorService messageExecutor;
private ScheduledExecutorService pingExecutor;
/** True if we created connection to peer, false if we accepted incoming connection from peer. */
private final boolean isOutbound;
/** Numeric protocol version, typically 1 or 2. */
@ -88,20 +72,29 @@ public class Peer extends Thread {
private byte[] verificationCodeExpected;
private PeerData peerData = null;
private final ReentrantLock peerLock = new ReentrantLock();
private final ReentrantLock peerDataLock = new ReentrantLock();
/** Timestamp of when socket was accepted, or connected. */
private Long connectionTimestamp = null;
/** Version info as reported by peer. */
private VersionMessage versionMessage = null;
/** Last PING message round-trip time (ms). */
private Long lastPing = null;
/** When last PING message was sent, or null if pings not started yet. */
private Long lastPingSent;
private final ReentrantLock pingLock = new ReentrantLock();
/** Latest block height as reported by peer. */
private Integer lastHeight;
/** Latest block signature as reported by peer. */
private byte[] lastBlockSignature;
/** Latest block timestamp as reported by peer. */
private Long lastBlockTimestamp;
/** Latest block generator public key as reported by peer. */
private byte[] lastBlockGenerator;
@ -114,19 +107,24 @@ public class Peer extends Thread {
}
/** Construct Peer using existing, connected socket */
public Peer(Socket socket) {
public Peer(SocketChannel socketChannel) throws IOException {
this.isOutbound = false;
this.socket = socket;
this.socketChannel = socketChannel;
sharedSetup();
this.resolvedAddress = ((InetSocketAddress) socket.getRemoteSocketAddress());
this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress());
this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
PeerAddress peerAddress = PeerAddress.fromSocket(socket);
PeerAddress peerAddress = PeerAddress.fromSocket(socketChannel.socket());
this.peerData = new PeerData(peerAddress);
}
// Getters / setters
public SocketChannel getSocketChannel() {
return this.socketChannel;
}
public boolean isStopping() {
return this.isStopping;
}
@ -247,14 +245,13 @@ public class Peer extends Thread {
}
/** Returns the lock used for synchronizing access to peer info. */
public ReentrantLock getPeerLock() {
return this.peerLock;
public ReentrantLock getPeerDataLock() {
return this.peerDataLock;
}
// Easier, and nicer output, than peer.getRemoteSocketAddress()
@Override
public String toString() {
// Easier, and nicer output, than peer.getRemoteSocketAddress()
return this.peerData.getAddress().toString();
}
@ -268,153 +265,123 @@ public class Peer extends Thread {
new SecureRandom().nextBytes(verificationCodeExpected);
}
class MessageProcessor implements Runnable {
private Peer peer;
private BlockingQueue<Message> blockingQueue;
public MessageProcessor(Peer peer, BlockingQueue<Message> blockingQueue) {
this.peer = peer;
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
Thread.currentThread().setName("Peer UMP " + this.peer);
while (true) {
try {
Message message = blockingQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (message != null)
Network.getInstance().onMessage(peer, message);
} catch (InterruptedException e) {
// Shutdown
return;
}
}
}
}
private void setup() throws IOException {
this.socket.setSoTimeout(SOCKET_TIMEOUT);
this.out = this.socket.getOutputStream();
private void sharedSetup() throws IOException {
this.connectionTimestamp = System.currentTimeMillis();
this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
this.socketChannel.configureBlocking(false);
this.byteBuffer = ByteBuffer.allocate(Network.MAXIMUM_MESSAGE_SIZE);
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
this.unsolicitedQueue = new ArrayBlockingQueue<>(UNSOLICITED_MESSAGE_QUEUE_CAPACITY);
this.messageExecutor = Executors.newSingleThreadExecutor();
this.messageExecutor.execute(new MessageProcessor(this, this.unsolicitedQueue));
}
public boolean connect() {
public SocketChannel connect() {
LOGGER.trace(String.format("Connecting to peer %s", this));
this.socket = new Socket();
try {
this.resolvedAddress = this.peerData.getAddress().toSocketAddress();
this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
this.socket.connect(resolvedAddress, CONNECT_TIMEOUT);
this.socketChannel = SocketChannel.open();
this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT);
LOGGER.debug(String.format("Connected to peer %s", this));
sharedSetup();
return socketChannel;
} catch (SocketTimeoutException e) {
LOGGER.trace(String.format("Connection timed out to peer %s", this));
return false;
return null;
} catch (UnknownHostException e) {
LOGGER.trace(String.format("Connection failed to unresolved peer %s", this));
return false;
return null;
} catch (IOException e) {
LOGGER.trace(String.format("Connection failed to peer %s", this));
return false;
}
return true;
}
// Main thread
@Override
public void run() {
Thread.currentThread().setName("Peer " + this);
try (DataInputStream in = new DataInputStream(socket.getInputStream())) {
setup();
Network.getInstance().onPeerReady(this);
while (!isStopping) {
// Wait (up to INACTIVITY_TIMEOUT) for, and parse, incoming message
Message message = Message.fromStream(in);
if (message == null) {
this.disconnect("null message");
return;
}
LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this));
// Find potential blocking queue for this id (expect null if id is -1)
BlockingQueue<Message> queue = this.replyQueues.get(message.getId());
if (queue != null) {
// Adding message to queue will unblock thread waiting for response
this.replyQueues.get(message.getId()).add(message);
} else {
// Nothing waiting for this message (unsolicited) - queue up for network
// Queue full?
if (unsolicitedQueue.remainingCapacity() == 0) {
LOGGER.debug(String.format("No room for %s message with ID %s from peer %s", message.getType().name(), message.getId(), this));
continue;
}
unsolicitedQueue.add(message);
}
}
} catch (MessageException e) {
LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this));
this.disconnect(e.getMessage());
} catch (SocketTimeoutException e) {
this.disconnect("timeout");
} catch (IOException e) {
if (isStopping) {
// If isStopping is true then our shutdown() has already been called, so no need to call it again
LOGGER.debug(String.format("Peer %s stopping...", this));
return;
}
// More informative logging
if (e instanceof EOFException) {
this.disconnect("EOF");
} else if (e.getMessage().contains("onnection reset")) { // Can't import/rely on sun.net.ConnectionResetException
this.disconnect("Connection reset");
} else {
this.disconnect("I/O error");
}
} finally {
Thread.currentThread().setName("disconnected peer");
return null;
}
}
/**
* Attempt to send Message to peer
* Attempt to read Message from peer.
*
* @return message, or null if no message or there was a problem
* @throws IOException
*/
public void readMessages() throws IOException {
while(true) {
Message message;
synchronized (this) {
if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed())
break;
int bytesRead = this.socketChannel.read(this.byteBuffer);
if (bytesRead == -1) {
this.disconnect("EOF");
return;
}
LOGGER.trace(() -> String.format("Receiving message from peer %s", this));
// Can we build a message from buffer now?
try {
message = Message.fromByteBuffer(this.byteBuffer);
} catch (MessageException e) {
LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this));
this.disconnect(e.getMessage());
return;
}
}
if (message == null)
return;
LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this));
BlockingQueue<Message> queue = this.replyQueues.get(message.getId());
if (queue != null) {
// Adding message to queue will unblock thread waiting for response
this.replyQueues.get(message.getId()).add(message);
// Consumed elsewhere
continue;
}
// No thread waiting for message so pass up to network layer
Network.getInstance().onMessage(this, message);
}
}
/**
* Attempt to send Message to peer.
*
* @param message
* @return <code>true</code> if message successfully sent; <code>false</code> otherwise
*/
public boolean sendMessage(Message message) {
if (this.socket.isClosed())
if (!this.socketChannel.isOpen())
return false;
try {
// Send message
LOGGER.trace(() -> String.format("Sending %s message with ID %d to peer %s", message.getType().name(), message.getId(), this));
synchronized (this.out) {
this.out.write(message.toBytes());
this.out.flush();
ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes());
synchronized (this.socketChannel) {
while (outputBuffer.hasRemaining()) {
int bytesWritten = this.socketChannel.write(outputBuffer);
if (bytesWritten == 0)
// Underlying socket's internal buffer probably full,
// so wait a short while for bytes to actually be transmitted over the wire
Thread.sleep(1L);
}
}
} catch (MessageException e) {
LOGGER.warn(String.format("Failed to send %s message with ID %d to peer %s: %s", message.getType().name(), message.getId(), this, e.getMessage()));
} catch (IOException e) {
// Send failure
return false;
} catch (InterruptedException e) {
// Likely shutdown scenario - so exit
return false;
}
// Sent OK
@ -461,38 +428,44 @@ public class Peer extends Thread {
}
public void startPings() {
class Pinger implements Runnable {
private Peer peer;
// Replacing initial null value allows pingCheck() to start sending pings.
LOGGER.trace(() -> String.format("Enabling pings for peer %s", this));
this.lastPingSent = 0L; //System.currentTimeMillis();
}
public Pinger(Peer peer) {
this.peer = peer;
/* package */ void pingCheck() {
LOGGER.trace(() -> String.format("Ping check for peer %s", this));
if (!this.pingLock.tryLock())
return; // Some other thread is already checking ping status for this peer
try {
// Pings not enabled yet?
if (this.lastPingSent == null)
return;
final long now = System.currentTimeMillis();
// Time to send another ping?
if (now < this.lastPingSent + PING_INTERVAL)
return; // Not yet
this.lastPingSent = now;
PingMessage pingMessage = new PingMessage();
Message message = this.getResponse(pingMessage);
final long after = System.currentTimeMillis();
if (message == null || message.getType() != MessageType.PING) {
this.disconnect("no ping received");
return;
}
@Override
public void run() {
Thread.currentThread().setName("Pinger " + this.peer);
PingMessage pingMessage = new PingMessage();
try {
final long before = System.currentTimeMillis();
Message message = peer.getResponse(pingMessage);
final long after = System.currentTimeMillis();
if (message == null || message.getType() != MessageType.PING)
peer.disconnect("no ping received");
peer.setLastPing(after - before);
} catch (InterruptedException e) {
// Shutdown
}
}
this.setLastPing(after - now);
} catch (InterruptedException e) {
// Shutdown situation
} finally {
this.pingLock.unlock();
}
Random random = new Random();
long initialDelay = random.nextInt(PING_INTERVAL);
this.pingExecutor = Executors.newSingleThreadScheduledExecutor();
this.pingExecutor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS);
}
public void disconnect(String reason) {
@ -505,46 +478,14 @@ public class Peer extends Thread {
public void shutdown() {
LOGGER.debug(() -> String.format("Shutting down peer %s", this));
this.isStopping = true;
// Shut down pinger
if (this.pingExecutor != null) {
this.pingExecutor.shutdownNow();
if (this.socketChannel.isOpen()) {
try {
if (!this.pingExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS))
LOGGER.debug(String.format("Pinger for peer %s failed to terminate", this));
} catch (InterruptedException e) {
LOGGER.debug(String.format("Interrupted while terminating pinger for peer %s", this));
}
}
// Shut down unsolicited message processor
if (this.messageExecutor != null) {
this.messageExecutor.shutdownNow();
try {
if (!this.messageExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS))
LOGGER.debug(String.format("Message processor for peer %s failed to terminate", this));
} catch (InterruptedException e) {
LOGGER.debug(String.format("Interrupted while terminating message processor for peer %s", this));
}
}
LOGGER.debug(() -> String.format("Interrupting peer %s", this));
this.interrupt();
// Close socket, which should trigger run() to exit
if (!this.socket.isClosed()) {
try {
this.socket.close();
this.socketChannel.close();
} catch (IOException e) {
LOGGER.debug(String.format("IOException while trying to close peer %s", this));
}
}
try {
this.join();
} catch (InterruptedException e) {
LOGGER.debug(String.format("Interrupted while waiting for peer %s to shutdown", this));
}
}
// Utility methods

View File

@ -11,11 +11,9 @@ import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toMap;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketTimeoutException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -104,12 +102,12 @@ public abstract class Message {
return map.get(value);
}
public Message fromBytes(int id, byte[] data) throws MessageException {
public Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws MessageException {
if (this.fromByteBuffer == null)
throw new MessageException("Unsupported message type [" + value + "] during conversion from bytes");
try {
return (Message) this.fromByteBuffer.invoke(null, id, data == null ? null : ByteBuffer.wrap(data));
return (Message) this.fromByteBuffer.invoke(null, id, byteBuffer);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
if (e.getCause() instanceof BufferUnderflowException)
throw new MessageException("Byte data too short for " + name() + " message");
@ -147,47 +145,65 @@ public abstract class Message {
return this.type;
}
public static Message fromStream(DataInputStream in) throws MessageException, IOException {
/**
* Attempt to read a message from byte buffer.
*
* @param byteBuffer
* @return null if no complete message can be read
* @throws MessageException
*/
public static Message fromByteBuffer(ByteBuffer byteBuffer) throws MessageException {
try {
byteBuffer.flip();
ByteBuffer readBuffer = byteBuffer.asReadOnlyBuffer();
// Read only enough bytes to cover Message "magic" preamble
byte[] messageMagic = new byte[MAGIC_LENGTH];
in.readFully(messageMagic);
readBuffer.get(messageMagic);
if (!Arrays.equals(messageMagic, Network.getInstance().getMessageMagic()))
// Didn't receive correct Message "magic"
throw new MessageException("Received incorrect message 'magic'");
int typeValue = in.readInt();
// Find supporting object
int typeValue = readBuffer.getInt();
MessageType messageType = MessageType.valueOf(typeValue);
if (messageType == null)
// Unrecognised message type
throw new MessageException(String.format("Received unknown message type [%d]", typeValue));
// Find supporting object
int hasId = in.read();
// Optional message ID
byte hasId = readBuffer.get();
int id = -1;
if (hasId != 0) {
id = in.readInt();
id = readBuffer.getInt();
if (id <= 0)
// Invalid ID
throw new MessageException("Invalid negative ID");
}
int dataSize = in.readInt();
int dataSize = readBuffer.getInt();
if (dataSize > MAX_DATA_SIZE)
// Too large
throw new MessageException(String.format("Declared data length %d larger than max allowed %d", dataSize, MAX_DATA_SIZE));
byte[] data = null;
ByteBuffer dataSlice = null;
if (dataSize > 0) {
byte[] expectedChecksum = new byte[CHECKSUM_LENGTH];
in.readFully(expectedChecksum);
readBuffer.get(expectedChecksum);
data = new byte[dataSize];
in.readFully(data);
// Remember this position in readBuffer so we can pass to Message subclass
dataSlice = readBuffer.slice();
// Consume data from buffer
byte[] data = new byte[dataSize];
readBuffer.get(data);
// We successfully read all the data bytes, so we can set limit on dataSlice
dataSlice.limit(dataSize);
// Test checksum
byte[] actualChecksum = generateChecksum(data);
@ -195,11 +211,17 @@ public abstract class Message {
throw new MessageException("Message checksum incorrect");
}
return messageType.fromBytes(id, data);
} catch (SocketTimeoutException e) {
throw e;
} catch (IOException e) {
throw e;
Message message = messageType.fromByteBuffer(id, dataSlice);
// We successfully read a message, so bump byteBuffer's position to reflect this
byteBuffer.position(readBuffer.position());
return message;
} catch (BufferUnderflowException e) {
// Not enough bytes to fully decode message...
return null;
} finally {
byteBuffer.compact();
}
}
@ -209,7 +231,7 @@ public abstract class Message {
public byte[] toBytes() throws MessageException {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
ByteArrayOutputStream bytes = new ByteArrayOutputStream(256);
// Magic
bytes.write(Network.getInstance().getMessageMagic());