diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index fd8d0dcf..7bfbc2e3 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -625,7 +625,7 @@ public class Controller extends Thread { continue; // We want to update atomically so use lock - ReentrantLock peerLock = connectedPeer.getPeerLock(); + ReentrantLock peerLock = connectedPeer.getPeerDataLock(); peerLock.lock(); try { connectedPeer.setLastHeight(heightV2Message.getHeight()); diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index d124a084..5a0841d0 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -86,7 +86,7 @@ public class Synchronizer { int peerHeight; byte[] peersLastBlockSignature; - ReentrantLock peerLock = peer.getPeerLock(); + ReentrantLock peerLock = peer.getPeerDataLock(); peerLock.lockInterruptibly(); try { peerHeight = peer.getLastHeight(); diff --git a/src/main/java/org/qora/network/Handshake.java b/src/main/java/org/qora/network/Handshake.java index f5c8f921..17dbc6c6 100644 --- a/src/main/java/org/qora/network/Handshake.java +++ b/src/main/java/org/qora/network/Handshake.java @@ -173,7 +173,8 @@ public enum Handshake { private static final Logger LOGGER = LogManager.getLogger(Handshake.class); - private static final long MAX_TIMESTAMP_DELTA = 2000; // ms + /** Maximum allowed difference between peer's reported timestamp and when they connected, in milliseconds. */ + private static final long MAX_TIMESTAMP_DELTA = 5000; // ms public final MessageType expectedMessageType; diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index 78165172..e22490d3 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -4,20 +4,27 @@ import java.io.IOException; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; import java.net.UnknownHostException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -26,6 +33,7 @@ import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.qora.block.Block; import org.qora.controller.Controller; import org.qora.data.block.BlockData; import org.qora.data.network.PeerData; @@ -65,6 +73,11 @@ public class Network extends Thread { private static final long OLD_PEER_ATTEMPTED_PERIOD = 24 * 60 * 60 * 1000; // ms /** Maximum time since last successful connection before a peer is potentially considered "old", in milliseconds. */ private static final long OLD_PEER_CONNECTION_PERIOD = 7 * 24 * 60 * 60 * 1000; // ms + /** Maximum time allowed for handshake to complete, in milliseconds. */ + private static final long HANDSHAKE_TIMEOUT = 60 * 1000; // ms + + /** Maximum message size (bytes). Needs to be at least maximum block size + MAGIC + message type, etc. */ + /* package */ static final int MAXIMUM_MESSAGE_SIZE = 4 + 1 + 4 + Block.MAX_BLOCK_BYTES; private static final byte[] MAINNET_MESSAGE_MAGIC = new byte[] { 0x12, 0x34, 0x56, 0x78 }; private static final byte[] TESTNET_MESSAGE_MAGIC = new byte[] { 0x78, 0x56, 0x34, 0x12 }; @@ -87,11 +100,18 @@ public class Network extends Thread { private volatile boolean isStopping = false; private List connectedPeers; private List selfPeers; - private ServerSocket listenSocket; + + private ExecutorService networkingExecutor; + private static Selector channelSelector; + private static ServerSocketChannel serverChannel; + private static AtomicBoolean isIterationInProgress = new AtomicBoolean(false); + private static Iterator channelIterator = null; + private static volatile boolean hasThreadPending = false; + private static AtomicInteger activeThreads = new AtomicInteger(0); + private static AtomicBoolean generalTaskLock = new AtomicBoolean(false); + private int minOutboundPeers; private int maxPeers; - private ExecutorService peerExecutor; - private ExecutorService mergePeersExecutor; private ExecutorService broadcastExecutor; /** Timestamp (ms) for next general info broadcast to all connected peers. Based on System.currentTimeMillis(). */ private long nextBroadcast; @@ -108,11 +128,14 @@ public class Network extends Thread { InetAddress bindAddr = InetAddress.getByName(Settings.getInstance().getBindAddress()); InetSocketAddress endpoint = new InetSocketAddress(bindAddr, listenPort); + channelSelector = Selector.open(); + // Set up listen socket - listenSocket = new ServerSocket(); - listenSocket.setReuseAddress(true); - listenSocket.setSoTimeout(1); // accept() calls block for at most 1ms - listenSocket.bind(endpoint, LISTEN_BACKLOG); + serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(false); + serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + serverChannel.bind(endpoint, LISTEN_BACKLOG); + serverChannel.register(channelSelector, SelectionKey.OP_ACCEPT); } catch (UnknownHostException e) { LOGGER.error("Can't bind listen socket to address " + Settings.getInstance().getBindAddress()); throw new RuntimeException("Can't bind listen socket to address"); @@ -130,13 +153,14 @@ public class Network extends Thread { minOutboundPeers = Settings.getInstance().getMinOutboundPeers(); maxPeers = Settings.getInstance().getMaxPeers(); - peerExecutor = Executors.newCachedThreadPool(); - broadcastExecutor = Executors.newCachedThreadPool(); nextBroadcast = System.currentTimeMillis(); mergePeersLock = new ReentrantLock(); - mergePeersExecutor = Executors.newCachedThreadPool(); + + // Start up first networking thread + networkingExecutor = Executors.newCachedThreadPool(); + networkingExecutor.execute(new NetworkProcessor()); } // Getters / setters @@ -245,89 +269,200 @@ public class Network extends Thread { // Main thread - @Override - public void run() { - Thread.currentThread().setName("Network"); + class NetworkProcessor implements Runnable { + @Override + public void run() { + Thread.currentThread().setName("Network"); - // Maintain long-term connections to various peers' API applications - try { - while (!isStopping) { - acceptConnections(); + activeThreads.incrementAndGet(); + LOGGER.trace(() -> String.format("Network thread %s, hasThreadPending: %s, activeThreads now: %d", Thread.currentThread().getId(), (hasThreadPending ? "yes" : "no"), activeThreads.get())); + hasThreadPending = false; - pruneOldPeers(); + // Maintain long-term connections to various peers' API applications + try { + while (!isStopping) { + if (!isIterationInProgress.compareAndSet(false, true)) { + LOGGER.trace(() -> String.format("Network thread %s NOT producing (some other thread is) - exiting", Thread.currentThread().getId())); + break; + } - createConnection(); + LOGGER.trace(() -> String.format("Network thread %s is producing...", Thread.currentThread().getId())); - if (System.currentTimeMillis() >= this.nextBroadcast) { - this.nextBroadcast = System.currentTimeMillis() + BROADCAST_INTERVAL; + final SelectionKey nextSelectionKey; + try { + // anything to do? + if (channelIterator == null) { + channelSelector.select(1000L); - // Controller can decide what to broadcast - Controller.getInstance().doNetworkBroadcast(); + if (Thread.currentThread().isInterrupted()) + break; + + channelIterator = channelSelector.selectedKeys().iterator(); + } + + if (channelIterator.hasNext()) { + nextSelectionKey = channelIterator.next(); + channelIterator.remove(); + } else { + nextSelectionKey = null; + channelIterator = null; // Nothing to do so reset iterator to cause new select + } + + LOGGER.trace(() -> String.format("Network thread %s produced %s, iterator now %s", + Thread.currentThread().getId(), + (nextSelectionKey == null ? "null" : nextSelectionKey.channel()), + (channelIterator == null ? "null" : channelIterator.toString()))); + + // Spawn another thread in case we need help + if (!hasThreadPending) { + hasThreadPending = true; + LOGGER.trace(() -> String.format("Network thread %s spawning", Thread.currentThread().getId())); + networkingExecutor.execute(this); + } + } finally { + LOGGER.trace(() -> String.format("Network thread %s done producing", Thread.currentThread().getId())); + isIterationInProgress.set(false); + } + + // process + if (nextSelectionKey == null) { + // no pending tasks, but we're last remaining thread so maybe connect a new peer or do a broadcast + LOGGER.trace(() -> String.format("Network thread %s has no pending tasks", Thread.currentThread().getId())); + + if (!generalTaskLock.compareAndSet(false, true)) + continue; + + try { + LOGGER.trace(() -> String.format("Network thread %s performing general tasks", Thread.currentThread().getId())); + + pingPeers(); + + prunePeers(); + + createConnection(); + + if (System.currentTimeMillis() >= nextBroadcast) { + nextBroadcast = System.currentTimeMillis() + BROADCAST_INTERVAL; + + // Controller can decide what to broadcast + Controller.getInstance().doNetworkBroadcast(); + } + } finally { + LOGGER.trace(() -> String.format("Network thread %s finished general tasks", Thread.currentThread().getId())); + generalTaskLock.set(false); + } + } else { + try { + LOGGER.trace(() -> String.format("Network thread %s has pending channel: %s, with ops %d", + Thread.currentThread().getId(), nextSelectionKey.channel(), nextSelectionKey.readyOps())); + + // process pending channel task + if (nextSelectionKey.isReadable()) { + connectionRead((SocketChannel) nextSelectionKey.channel()); + } else if (nextSelectionKey.isAcceptable()) { + acceptConnection((ServerSocketChannel) nextSelectionKey.channel()); + } + + LOGGER.trace(() -> String.format("Network thread %s processed channel: %s", Thread.currentThread().getId(), nextSelectionKey.channel())); + } catch (CancelledKeyException e) { + LOGGER.trace(() -> String.format("Network thread %s encountered cancelled channel: %s", Thread.currentThread().getId(), nextSelectionKey.channel())); + } + } } - - // Sleep for a while - Thread.sleep(1000); + } catch (InterruptedException e) { + // Fall-through to shutdown + } catch (DataException e) { + LOGGER.warn("Repository issue while running network", e); + // Fall-through to shutdown + } catch (IOException e) { + // Fall-through to shutdown + } finally { + activeThreads.decrementAndGet(); + LOGGER.trace(() -> String.format("Network thread %s ending, activeThreads now: %d", Thread.currentThread().getId(), activeThreads.get())); + Thread.currentThread().setName("Network (dormant)"); } - } catch (InterruptedException e) { - // Fall-through to shutdown - } catch (DataException e) { - LOGGER.warn("Repository issue while running network", e); - // Fall-through to shutdown } } - @SuppressWarnings("resource") - private void acceptConnections() throws InterruptedException { - Socket socket; + private void acceptConnection(ServerSocketChannel serverSocketChannel) throws InterruptedException { + SocketChannel socketChannel; - do { - try { - socket = this.listenSocket.accept(); - } catch (SocketTimeoutException e) { - // No connections to accept - return; - } catch (IOException e) { - // Something went wrong or listen socket was closed due to shutdown - return; - } + try { + socketChannel = serverSocketChannel.accept(); + } catch (IOException e) { + return; + } - Peer newPeer = null; + // No connection actually accepted? + if (socketChannel == null) + return; + + Peer newPeer; + + try { synchronized (this.connectedPeers) { if (connectedPeers.size() >= maxPeers) { // We have enough peers - LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress())); - - try { - socket.close(); - } catch (IOException e) { - // Not important - } - + LOGGER.trace(String.format("Connection discarded from peer %s", socketChannel.getRemoteAddress())); return; } - LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress())); - newPeer = new Peer(socket); + LOGGER.debug(String.format("Connection accepted from peer %s", socketChannel.getRemoteAddress())); + + newPeer = new Peer(socketChannel); this.connectedPeers.add(newPeer); } + } catch (IOException e) { + if (socketChannel.isOpen()) + try { + socketChannel.close(); + } catch (IOException ce) { + } - try { - peerExecutor.execute(newPeer); - } catch (RejectedExecutionException e) { - // Can't execute - probably because we're shutting down, so ignore + return; + } + + try { + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); + socketChannel.configureBlocking(false); + socketChannel.register(channelSelector, SelectionKey.OP_READ); + } catch (IOException e) { + // Remove from connected peers + synchronized (this.connectedPeers) { + this.connectedPeers.remove(newPeer); } - } while (true); + + return; + } + + this.onPeerReady(newPeer); } - private void pruneOldPeers() throws InterruptedException, DataException { + private void pingPeers() { + for (Peer peer : this.getConnectedPeers()) + peer.pingCheck(); + } + + private void prunePeers() throws InterruptedException, DataException { + final long now = System.currentTimeMillis(); + + // Disconnect peers that are stuck during handshake + List handshakePeers = this.getConnectedPeers(); + + // Disregard peers that have completed handshake or only connected recently + handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT); + + for (Peer peer : handshakePeers) + peer.disconnect("handshake timeout"); + + // Prune 'old' peers from repository... try (final Repository repository = RepositoryManager.getRepository()) { // Fetch all known peers List peers = repository.getNetworkRepository().getAllPeers(); - // "Old" peers: + // 'Old' peers: // we have attempted to connect within the last day // we last managed to connect over a week ago - final long now = System.currentTimeMillis(); Predicate isNotOldPeer = peerData -> { if (peerData.getLastAttempted() == null || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD) return true; @@ -338,6 +473,7 @@ public class Network extends Thread { return false; }; + // Disregard peers that are NOT 'old' peers.removeIf(isNotOldPeer); // Don't consider already connected peers (simple address match) @@ -426,7 +562,8 @@ public class Network extends Thread { repository.saveChanges(); } - if (!newPeer.connect()) + SocketChannel socketChannel = newPeer.connect(); + if (socketChannel == null) return; if (this.isInterrupted()) @@ -437,10 +574,39 @@ public class Network extends Thread { } try { - peerExecutor.execute(newPeer); - } catch (RejectedExecutionException e) { - // Can't execute - probably because we're shutting down, so ignore + socketChannel.register(channelSelector, SelectionKey.OP_READ); + } catch (ClosedChannelException e) { + // If channel has somehow already closed then remove from connectedPeers + synchronized (this.connectedPeers) { + this.connectedPeers.remove(newPeer); + } } + + this.onPeerReady(newPeer); + } + + private void connectionRead(SocketChannel socketChannel) { + Peer peer = getPeerFromChannel(socketChannel); + if (peer == null) + return; + + try { + peer.readMessages(); + } catch (IOException e) { + LOGGER.trace(() -> String.format("Network thread %s encountered I/O error: %s", Thread.currentThread().getId(), e.getMessage()), e); + peer.disconnect("I/O error"); + return; + } + } + + private Peer getPeerFromChannel(SocketChannel socketChannel) { + synchronized (this.connectedPeers) { + for (Peer peer : this.connectedPeers) + if (peer.getSocketChannel() == socketChannel) + return peer; + } + + return null; } // Peer callbacks @@ -817,63 +983,38 @@ public class Network extends Thread { // Network-wide calls private void mergePeers(String addedBy, List peerAddresses) { - // This can block (due to lock) so fire off in separate thread - class PeersMerger implements Runnable { - private String addedBy; - private List peerAddresses; + // Serialize using lock to prevent repository deadlocks + if (!mergePeersLock.tryLock()) + return; - public PeersMerger(String addedBy, List peerAddresses) { - this.addedBy = addedBy; - this.peerAddresses = peerAddresses; - } - - @Override - public void run() { - Thread.currentThread().setName(String.format("Merging peers from %s", this.addedBy)); - - // Serialize using lock to prevent repository deadlocks - try { - mergePeersLock.lockInterruptibly(); - - final long addedWhen = System.currentTimeMillis(); - - try { - try (final Repository repository = RepositoryManager.getRepository()) { - List knownPeers = repository.getNetworkRepository().getAllPeers(); - - // Filter out duplicates - Predicate isKnownAddress = peerAddress -> { - return knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress)); - }; - - peerAddresses.removeIf(isKnownAddress); - - // Save the rest into database - for (PeerAddress peerAddress : peerAddresses) { - PeerData peerData = new PeerData(peerAddress, addedWhen, addedBy); - LOGGER.info(String.format("Adding new peer %s to repository", peerAddress)); - repository.getNetworkRepository().save(peerData); - } - - repository.saveChanges(); - } catch (DataException e) { - LOGGER.error("Repository issue while merging peers list from remote node", e); - } - } finally { - mergePeersLock.unlock(); - } - } catch (InterruptedException e1) { - // We're exiting anyway... - } - - Thread.currentThread().setName("Merging peers (dormant)"); - } - } + final long addedWhen = System.currentTimeMillis(); try { - mergePeersExecutor.execute(new PeersMerger(addedBy, peerAddresses)); - } catch (RejectedExecutionException e) { - // Can't execute - probably because we're shutting down, so ignore + try (final Repository repository = RepositoryManager.getRepository()) { + List knownPeers = repository.getNetworkRepository().getAllPeers(); + + // Filter out duplicates + Predicate isKnownAddress = peerAddress -> { + return knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress)); + }; + + peerAddresses.removeIf(isKnownAddress); + + repository.discardChanges(); + + // Save the rest into database + for (PeerAddress peerAddress : peerAddresses) { + PeerData peerData = new PeerData(peerAddress, addedWhen, addedBy); + LOGGER.info(String.format("Adding new peer %s to repository", peerAddress)); + repository.getNetworkRepository().save(peerData); + } + + repository.saveChanges(); + } catch (DataException e) { + LOGGER.error("Repository issue while merging peers list from remote node", e); + } + } finally { + mergePeersLock.unlock(); } } @@ -894,11 +1035,11 @@ public class Network extends Thread { Random random = new Random(); for (Peer peer : targetPeers) { - // Very short sleep to reduce strain, improve multithreading and catch interrupts + // Very short sleep to reduce strain, improve multi-threading and catch interrupts try { Thread.sleep(random.nextInt(20) + 20); } catch (InterruptedException e) { - return; + break; } Message message = peerMessageBuilder.apply(peer); @@ -909,6 +1050,8 @@ public class Network extends Thread { if (!peer.sendMessage(message)) peer.disconnect("failed to broadcast message"); } + + Thread.currentThread().setName("Network Broadcast (dormant)"); } } @@ -925,28 +1068,20 @@ public class Network extends Thread { this.isStopping = true; // Close listen socket to prevent more incoming connections - if (!this.listenSocket.isClosed()) + if (serverChannel.isOpen()) try { - this.listenSocket.close(); + serverChannel.close(); } catch (IOException e) { // Not important } - // Stop our run() thread - this.interrupt(); + // Stop processing threads + this.networkingExecutor.shutdownNow(); try { - this.join(); + if (!this.networkingExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS)) + LOGGER.debug("Network threads failed to terminate"); } catch (InterruptedException e) { - LOGGER.debug("Interrupted while waiting for networking thread to terminate"); - } - - // Give up merging peer lists - this.mergePeersExecutor.shutdownNow(); - try { - if (!this.mergePeersExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) - LOGGER.debug("Peer-list merging threads failed to terminate"); - } catch (InterruptedException e) { - LOGGER.debug("Interrupted while waiting for peer-list merging threads failed to terminate"); + LOGGER.debug("Interrupted while waiting for networking threads to terminate"); } // Stop broadcasts diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index a7c3d6a9..920e2e0b 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -1,14 +1,13 @@ package org.qora.network; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; -import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.Socket; import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.security.SecureRandom; import java.util.Collections; import java.util.HashMap; @@ -16,9 +15,6 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -37,7 +33,7 @@ import com.google.common.net.HostAndPort; import com.google.common.net.InetAddresses; // For managing one peer -public class Peer extends Thread { +public class Peer { private static final Logger LOGGER = LogManager.getLogger(Peer.class); @@ -50,31 +46,19 @@ public class Peer extends Thread { /** * Interval between PING messages to a peer. (ms) *

- * Just under every 30s is usually ideal to keep NAT mappings refreshed,
- * BUT must be lower than {@link Peer#SOCKET_TIMEOUT}! + * Just under every 30s is usually ideal to keep NAT mappings refreshed. */ private static final int PING_INTERVAL = 8000; // ms - /** Maximum time a socket read() will block before closing connection due to timeout. (ms) */ - private static final int SOCKET_TIMEOUT = 10000; // ms - - private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10; - private volatile boolean isStopping = false; - private Socket socket = null; + private SocketChannel socketChannel = null; private InetSocketAddress resolvedAddress = null; /** True if remote address is loopback/link-local/site-local, false otherwise. */ private boolean isLocal; - private OutputStream out; - + private ByteBuffer byteBuffer; private Map> replyQueues; - private BlockingQueue unsolicitedQueue; - private ExecutorService messageExecutor; - - private ScheduledExecutorService pingExecutor; - /** True if we created connection to peer, false if we accepted incoming connection from peer. */ private final boolean isOutbound; /** Numeric protocol version, typically 1 or 2. */ @@ -88,20 +72,29 @@ public class Peer extends Thread { private byte[] verificationCodeExpected; private PeerData peerData = null; - private final ReentrantLock peerLock = new ReentrantLock(); + private final ReentrantLock peerDataLock = new ReentrantLock(); /** Timestamp of when socket was accepted, or connected. */ private Long connectionTimestamp = null; + /** Version info as reported by peer. */ private VersionMessage versionMessage = null; + /** Last PING message round-trip time (ms). */ private Long lastPing = null; + /** When last PING message was sent, or null if pings not started yet. */ + private Long lastPingSent; + private final ReentrantLock pingLock = new ReentrantLock(); + /** Latest block height as reported by peer. */ private Integer lastHeight; + /** Latest block signature as reported by peer. */ private byte[] lastBlockSignature; + /** Latest block timestamp as reported by peer. */ private Long lastBlockTimestamp; + /** Latest block generator public key as reported by peer. */ private byte[] lastBlockGenerator; @@ -114,19 +107,24 @@ public class Peer extends Thread { } /** Construct Peer using existing, connected socket */ - public Peer(Socket socket) { + public Peer(SocketChannel socketChannel) throws IOException { this.isOutbound = false; - this.socket = socket; + this.socketChannel = socketChannel; + sharedSetup(); - this.resolvedAddress = ((InetSocketAddress) socket.getRemoteSocketAddress()); + this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress()); this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); - PeerAddress peerAddress = PeerAddress.fromSocket(socket); + PeerAddress peerAddress = PeerAddress.fromSocket(socketChannel.socket()); this.peerData = new PeerData(peerAddress); } // Getters / setters + public SocketChannel getSocketChannel() { + return this.socketChannel; + } + public boolean isStopping() { return this.isStopping; } @@ -247,14 +245,13 @@ public class Peer extends Thread { } /** Returns the lock used for synchronizing access to peer info. */ - public ReentrantLock getPeerLock() { - return this.peerLock; + public ReentrantLock getPeerDataLock() { + return this.peerDataLock; } - // Easier, and nicer output, than peer.getRemoteSocketAddress() - @Override public String toString() { + // Easier, and nicer output, than peer.getRemoteSocketAddress() return this.peerData.getAddress().toString(); } @@ -268,153 +265,123 @@ public class Peer extends Thread { new SecureRandom().nextBytes(verificationCodeExpected); } - class MessageProcessor implements Runnable { - private Peer peer; - private BlockingQueue blockingQueue; - - public MessageProcessor(Peer peer, BlockingQueue blockingQueue) { - this.peer = peer; - this.blockingQueue = blockingQueue; - } - - @Override - public void run() { - Thread.currentThread().setName("Peer UMP " + this.peer); - - while (true) { - try { - Message message = blockingQueue.poll(1000L, TimeUnit.MILLISECONDS); - if (message != null) - Network.getInstance().onMessage(peer, message); - } catch (InterruptedException e) { - // Shutdown - return; - } - } - } - } - - private void setup() throws IOException { - this.socket.setSoTimeout(SOCKET_TIMEOUT); - this.out = this.socket.getOutputStream(); + private void sharedSetup() throws IOException { this.connectionTimestamp = System.currentTimeMillis(); + this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); + this.socketChannel.configureBlocking(false); + this.byteBuffer = ByteBuffer.allocate(Network.MAXIMUM_MESSAGE_SIZE); this.replyQueues = Collections.synchronizedMap(new HashMap>()); - - this.unsolicitedQueue = new ArrayBlockingQueue<>(UNSOLICITED_MESSAGE_QUEUE_CAPACITY); - this.messageExecutor = Executors.newSingleThreadExecutor(); - this.messageExecutor.execute(new MessageProcessor(this, this.unsolicitedQueue)); } - public boolean connect() { + public SocketChannel connect() { LOGGER.trace(String.format("Connecting to peer %s", this)); - this.socket = new Socket(); try { this.resolvedAddress = this.peerData.getAddress().toSocketAddress(); this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); - this.socket.connect(resolvedAddress, CONNECT_TIMEOUT); + this.socketChannel = SocketChannel.open(); + this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT); + LOGGER.debug(String.format("Connected to peer %s", this)); + sharedSetup(); + return socketChannel; } catch (SocketTimeoutException e) { LOGGER.trace(String.format("Connection timed out to peer %s", this)); - return false; + return null; } catch (UnknownHostException e) { LOGGER.trace(String.format("Connection failed to unresolved peer %s", this)); - return false; + return null; } catch (IOException e) { LOGGER.trace(String.format("Connection failed to peer %s", this)); - return false; - } - - return true; - } - - // Main thread - - @Override - public void run() { - Thread.currentThread().setName("Peer " + this); - - try (DataInputStream in = new DataInputStream(socket.getInputStream())) { - setup(); - - Network.getInstance().onPeerReady(this); - - while (!isStopping) { - // Wait (up to INACTIVITY_TIMEOUT) for, and parse, incoming message - Message message = Message.fromStream(in); - if (message == null) { - this.disconnect("null message"); - return; - } - - LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); - - // Find potential blocking queue for this id (expect null if id is -1) - BlockingQueue queue = this.replyQueues.get(message.getId()); - if (queue != null) { - // Adding message to queue will unblock thread waiting for response - this.replyQueues.get(message.getId()).add(message); - } else { - // Nothing waiting for this message (unsolicited) - queue up for network - - // Queue full? - if (unsolicitedQueue.remainingCapacity() == 0) { - LOGGER.debug(String.format("No room for %s message with ID %s from peer %s", message.getType().name(), message.getId(), this)); - continue; - } - - unsolicitedQueue.add(message); - } - } - } catch (MessageException e) { - LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this)); - this.disconnect(e.getMessage()); - } catch (SocketTimeoutException e) { - this.disconnect("timeout"); - } catch (IOException e) { - if (isStopping) { - // If isStopping is true then our shutdown() has already been called, so no need to call it again - LOGGER.debug(String.format("Peer %s stopping...", this)); - return; - } - - // More informative logging - if (e instanceof EOFException) { - this.disconnect("EOF"); - } else if (e.getMessage().contains("onnection reset")) { // Can't import/rely on sun.net.ConnectionResetException - this.disconnect("Connection reset"); - } else { - this.disconnect("I/O error"); - } - } finally { - Thread.currentThread().setName("disconnected peer"); + return null; } } /** - * Attempt to send Message to peer + * Attempt to read Message from peer. + * + * @return message, or null if no message or there was a problem + * @throws IOException + */ + public void readMessages() throws IOException { + while(true) { + Message message; + + synchronized (this) { + if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) + break; + + int bytesRead = this.socketChannel.read(this.byteBuffer); + if (bytesRead == -1) { + this.disconnect("EOF"); + return; + } + + LOGGER.trace(() -> String.format("Receiving message from peer %s", this)); + + // Can we build a message from buffer now? + try { + message = Message.fromByteBuffer(this.byteBuffer); + } catch (MessageException e) { + LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this)); + this.disconnect(e.getMessage()); + return; + } + } + + if (message == null) + return; + + LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); + + BlockingQueue queue = this.replyQueues.get(message.getId()); + if (queue != null) { + // Adding message to queue will unblock thread waiting for response + this.replyQueues.get(message.getId()).add(message); + // Consumed elsewhere + continue; + } + + // No thread waiting for message so pass up to network layer + Network.getInstance().onMessage(this, message); + } + } + + /** + * Attempt to send Message to peer. * * @param message * @return true if message successfully sent; false otherwise */ public boolean sendMessage(Message message) { - if (this.socket.isClosed()) + if (!this.socketChannel.isOpen()) return false; try { // Send message LOGGER.trace(() -> String.format("Sending %s message with ID %d to peer %s", message.getType().name(), message.getId(), this)); - synchronized (this.out) { - this.out.write(message.toBytes()); - this.out.flush(); + ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes()); + + synchronized (this.socketChannel) { + while (outputBuffer.hasRemaining()) { + int bytesWritten = this.socketChannel.write(outputBuffer); + + if (bytesWritten == 0) + // Underlying socket's internal buffer probably full, + // so wait a short while for bytes to actually be transmitted over the wire + Thread.sleep(1L); + } } } catch (MessageException e) { LOGGER.warn(String.format("Failed to send %s message with ID %d to peer %s: %s", message.getType().name(), message.getId(), this, e.getMessage())); } catch (IOException e) { // Send failure return false; + } catch (InterruptedException e) { + // Likely shutdown scenario - so exit + return false; } // Sent OK @@ -461,38 +428,44 @@ public class Peer extends Thread { } public void startPings() { - class Pinger implements Runnable { - private Peer peer; + // Replacing initial null value allows pingCheck() to start sending pings. + LOGGER.trace(() -> String.format("Enabling pings for peer %s", this)); + this.lastPingSent = 0L; //System.currentTimeMillis(); + } - public Pinger(Peer peer) { - this.peer = peer; + /* package */ void pingCheck() { + LOGGER.trace(() -> String.format("Ping check for peer %s", this)); + + if (!this.pingLock.tryLock()) + return; // Some other thread is already checking ping status for this peer + + try { + // Pings not enabled yet? + if (this.lastPingSent == null) + return; + + final long now = System.currentTimeMillis(); + + // Time to send another ping? + if (now < this.lastPingSent + PING_INTERVAL) + return; // Not yet + + this.lastPingSent = now; + PingMessage pingMessage = new PingMessage(); + Message message = this.getResponse(pingMessage); + final long after = System.currentTimeMillis(); + + if (message == null || message.getType() != MessageType.PING) { + this.disconnect("no ping received"); + return; } - @Override - public void run() { - Thread.currentThread().setName("Pinger " + this.peer); - - PingMessage pingMessage = new PingMessage(); - - try { - final long before = System.currentTimeMillis(); - Message message = peer.getResponse(pingMessage); - final long after = System.currentTimeMillis(); - - if (message == null || message.getType() != MessageType.PING) - peer.disconnect("no ping received"); - - peer.setLastPing(after - before); - } catch (InterruptedException e) { - // Shutdown - } - } + this.setLastPing(after - now); + } catch (InterruptedException e) { + // Shutdown situation + } finally { + this.pingLock.unlock(); } - - Random random = new Random(); - long initialDelay = random.nextInt(PING_INTERVAL); - this.pingExecutor = Executors.newSingleThreadScheduledExecutor(); - this.pingExecutor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS); } public void disconnect(String reason) { @@ -505,46 +478,14 @@ public class Peer extends Thread { public void shutdown() { LOGGER.debug(() -> String.format("Shutting down peer %s", this)); - this.isStopping = true; - // Shut down pinger - if (this.pingExecutor != null) { - this.pingExecutor.shutdownNow(); + if (this.socketChannel.isOpen()) { try { - if (!this.pingExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) - LOGGER.debug(String.format("Pinger for peer %s failed to terminate", this)); - } catch (InterruptedException e) { - LOGGER.debug(String.format("Interrupted while terminating pinger for peer %s", this)); - } - } - - // Shut down unsolicited message processor - if (this.messageExecutor != null) { - this.messageExecutor.shutdownNow(); - try { - if (!this.messageExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS)) - LOGGER.debug(String.format("Message processor for peer %s failed to terminate", this)); - } catch (InterruptedException e) { - LOGGER.debug(String.format("Interrupted while terminating message processor for peer %s", this)); - } - } - - LOGGER.debug(() -> String.format("Interrupting peer %s", this)); - this.interrupt(); - - // Close socket, which should trigger run() to exit - if (!this.socket.isClosed()) { - try { - this.socket.close(); + this.socketChannel.close(); } catch (IOException e) { + LOGGER.debug(String.format("IOException while trying to close peer %s", this)); } } - - try { - this.join(); - } catch (InterruptedException e) { - LOGGER.debug(String.format("Interrupted while waiting for peer %s to shutdown", this)); - } } // Utility methods diff --git a/src/main/java/org/qora/network/message/Message.java b/src/main/java/org/qora/network/message/Message.java index 3689bcb1..4ed97a7c 100644 --- a/src/main/java/org/qora/network/message/Message.java +++ b/src/main/java/org/qora/network/message/Message.java @@ -11,11 +11,9 @@ import static java.util.Arrays.stream; import static java.util.stream.Collectors.toMap; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.SocketTimeoutException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -104,12 +102,12 @@ public abstract class Message { return map.get(value); } - public Message fromBytes(int id, byte[] data) throws MessageException { + public Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws MessageException { if (this.fromByteBuffer == null) throw new MessageException("Unsupported message type [" + value + "] during conversion from bytes"); try { - return (Message) this.fromByteBuffer.invoke(null, id, data == null ? null : ByteBuffer.wrap(data)); + return (Message) this.fromByteBuffer.invoke(null, id, byteBuffer); } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { if (e.getCause() instanceof BufferUnderflowException) throw new MessageException("Byte data too short for " + name() + " message"); @@ -147,47 +145,65 @@ public abstract class Message { return this.type; } - public static Message fromStream(DataInputStream in) throws MessageException, IOException { + /** + * Attempt to read a message from byte buffer. + * + * @param byteBuffer + * @return null if no complete message can be read + * @throws MessageException + */ + public static Message fromByteBuffer(ByteBuffer byteBuffer) throws MessageException { try { + byteBuffer.flip(); + + ByteBuffer readBuffer = byteBuffer.asReadOnlyBuffer(); + // Read only enough bytes to cover Message "magic" preamble byte[] messageMagic = new byte[MAGIC_LENGTH]; - in.readFully(messageMagic); + readBuffer.get(messageMagic); if (!Arrays.equals(messageMagic, Network.getInstance().getMessageMagic())) // Didn't receive correct Message "magic" throw new MessageException("Received incorrect message 'magic'"); - int typeValue = in.readInt(); + // Find supporting object + int typeValue = readBuffer.getInt(); MessageType messageType = MessageType.valueOf(typeValue); if (messageType == null) // Unrecognised message type throw new MessageException(String.format("Received unknown message type [%d]", typeValue)); - // Find supporting object - - int hasId = in.read(); + // Optional message ID + byte hasId = readBuffer.get(); int id = -1; if (hasId != 0) { - id = in.readInt(); + id = readBuffer.getInt(); if (id <= 0) // Invalid ID throw new MessageException("Invalid negative ID"); } - int dataSize = in.readInt(); + int dataSize = readBuffer.getInt(); if (dataSize > MAX_DATA_SIZE) // Too large throw new MessageException(String.format("Declared data length %d larger than max allowed %d", dataSize, MAX_DATA_SIZE)); - byte[] data = null; + ByteBuffer dataSlice = null; if (dataSize > 0) { byte[] expectedChecksum = new byte[CHECKSUM_LENGTH]; - in.readFully(expectedChecksum); + readBuffer.get(expectedChecksum); - data = new byte[dataSize]; - in.readFully(data); + // Remember this position in readBuffer so we can pass to Message subclass + dataSlice = readBuffer.slice(); + + // Consume data from buffer + byte[] data = new byte[dataSize]; + readBuffer.get(data); + + // We successfully read all the data bytes, so we can set limit on dataSlice + dataSlice.limit(dataSize); // Test checksum byte[] actualChecksum = generateChecksum(data); @@ -195,11 +211,17 @@ public abstract class Message { throw new MessageException("Message checksum incorrect"); } - return messageType.fromBytes(id, data); - } catch (SocketTimeoutException e) { - throw e; - } catch (IOException e) { - throw e; + Message message = messageType.fromByteBuffer(id, dataSlice); + + // We successfully read a message, so bump byteBuffer's position to reflect this + byteBuffer.position(readBuffer.position()); + + return message; + } catch (BufferUnderflowException e) { + // Not enough bytes to fully decode message... + return null; + } finally { + byteBuffer.compact(); } } @@ -209,7 +231,7 @@ public abstract class Message { public byte[] toBytes() throws MessageException { try { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ByteArrayOutputStream bytes = new ByteArrayOutputStream(256); // Magic bytes.write(Network.getInstance().getMessageMagic());