peerMessageBuilder) {
+ this.targetPeers = targetPeers;
+ this.peerMessageBuilder = peerMessageBuilder;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("Network Broadcast");
+
+ for (Peer peer : targetPeers) {
+ // Very short sleep to reduce strain, improve multi-threading and catch interrupts
+ try {
+ Thread.sleep(random.nextInt(20) + 20L);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ Message message = peerMessageBuilder.apply(peer);
+
+ if (message == null) {
+ continue;
+ }
+
+ if (!peer.sendMessage(message)) {
+ peer.disconnect("failed to broadcast message");
+ }
+ }
+
+ Thread.currentThread().setName("Network Broadcast (dormant)");
+ }
+ }
+
+ try {
+ broadcastExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessageBuilder));
+ } catch (RejectedExecutionException e) {
+ // Can't execute - probably because we're shutting down, so ignore
+ }
+ }
+
+ // Shutdown
+
+ public void shutdown() {
+ // Close listen socket to prevent more incoming connections
+ if (this.serverChannel.isOpen()) {
+ try {
+ this.serverChannel.close();
+ } catch (IOException e) {
+ // Not important
+ }
+ }
+
+ // Stop processing threads
+ try {
+ if (!this.networkEPC.shutdown(5000)) {
+ LOGGER.warn("Network threads failed to terminate");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting for networking threads to terminate");
+ }
+
+ // Stop broadcasts
+ this.broadcastExecutor.shutdownNow();
+ try {
+ if (!this.broadcastExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
+ LOGGER.warn("Broadcast threads failed to terminate");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting for broadcast threads failed to terminate");
+ }
+
+ // Close all peer connections
+ for (Peer peer : this.getConnectedPeers()) {
+ peer.shutdown();
+ }
+ }
}
diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java
index ffc90dc7..c84d1118 100644
--- a/src/main/java/org/qortal/network/Peer.java
+++ b/src/main/java/org/qortal/network/Peer.java
@@ -1,28 +1,8 @@
package org.qortal.network;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.StandardSocketOptions;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.security.SecureRandom;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.Arrays;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import com.google.common.hash.HashCode;
+import com.google.common.net.HostAndPort;
+import com.google.common.net.InetAddresses;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
@@ -31,684 +11,770 @@ import org.qortal.data.network.PeerChainTipData;
import org.qortal.data.network.PeerData;
import org.qortal.network.message.ChallengeMessage;
import org.qortal.network.message.Message;
-import org.qortal.network.message.PingMessage;
import org.qortal.network.message.Message.MessageException;
import org.qortal.network.message.Message.MessageType;
+import org.qortal.network.message.PingMessage;
import org.qortal.settings.Settings;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.NTP;
-import com.google.common.hash.HashCode;
-import com.google.common.net.HostAndPort;
-import com.google.common.net.InetAddresses;
+import java.io.IOException;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.security.SecureRandom;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
// For managing one peer
public class Peer {
-
- private static final Logger LOGGER = LogManager.getLogger(Peer.class);
-
- /** Maximum time to allow connect() to remote peer to complete. (ms) */
- private static final int CONNECT_TIMEOUT = 2000; // ms
-
- /** Maximum time to wait for a message reply to arrive from peer. (ms) */
- private static final int RESPONSE_TIMEOUT = 3000; // ms
-
- /**
- * Interval between PING messages to a peer. (ms)
- *
- * Just under every 30s is usually ideal to keep NAT mappings refreshed.
- */
- private static final int PING_INTERVAL = 20_000; // ms
-
- private volatile boolean isStopping = false;
-
- private SocketChannel socketChannel = null;
- private InetSocketAddress resolvedAddress = null;
- /** True if remote address is loopback/link-local/site-local, false otherwise. */
- private boolean isLocal;
-
- private final Object byteBufferLock = new Object();
- private ByteBuffer byteBuffer;
-
- private Map> replyQueues;
- private LinkedBlockingQueue pendingMessages;
-
- /** True if we created connection to peer, false if we accepted incoming connection from peer. */
- private final boolean isOutbound;
-
- private final Object handshakingLock = new Object();
- private Handshake handshakeStatus = Handshake.STARTED;
- private volatile boolean handshakeMessagePending = false;
-
- /** Timestamp of when socket was accepted, or connected. */
- private Long connectionTimestamp = null;
-
- /** Last PING message round-trip time (ms). */
- private Long lastPing = null;
- /** When last PING message was sent, or null if pings not started yet. */
- private Long lastPingSent;
-
- byte[] ourChallenge;
-
- // Versioning
- public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
-
- // Peer info
-
- private final Object peerInfoLock = new Object();
-
- private String peersNodeId;
- private byte[] peersPublicKey;
- private byte[] peersChallenge;
-
- private PeerData peerData = null;
-
- /** Peer's value of connectionTimestamp. */
- private Long peersConnectionTimestamp = null;
-
- /** Version string as reported by peer. */
- private String peersVersionString = null;
- /** Numeric version of peer. */
- private Long peersVersion = null;
-
- /** Latest block info as reported by peer. */
- private PeerChainTipData peersChainTipData;
-
- /** Our common block with this peer */
- private CommonBlockData commonBlockData;
-
- // Constructors
-
- /** Construct unconnected, outbound Peer using socket address in peer data */
- public Peer(PeerData peerData) {
- this.isOutbound = true;
- this.peerData = peerData;
- }
-
- /** Construct Peer using existing, connected socket */
- public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException {
- this.isOutbound = false;
- this.socketChannel = socketChannel;
- sharedSetup(channelSelector);
-
- this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress());
- this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
-
- PeerAddress peerAddress = PeerAddress.fromSocket(socketChannel.socket());
- this.peerData = new PeerData(peerAddress);
- }
-
- // Getters / setters
-
- public boolean isStopping() {
- return this.isStopping;
- }
-
- public SocketChannel getSocketChannel() {
- return this.socketChannel;
- }
-
- public InetSocketAddress getResolvedAddress() {
- return this.resolvedAddress;
- }
-
- public boolean isLocal() {
- return this.isLocal;
- }
-
- public boolean isOutbound() {
- return this.isOutbound;
- }
-
- public Handshake getHandshakeStatus() {
- synchronized (this.handshakingLock) {
- return this.handshakeStatus;
- }
- }
-
- /*package*/ void setHandshakeStatus(Handshake handshakeStatus) {
- synchronized (this.handshakingLock) {
- this.handshakeStatus = handshakeStatus;
- }
- }
-
- /*package*/ void resetHandshakeMessagePending() {
- this.handshakeMessagePending = false;
- }
-
- public PeerData getPeerData() {
- synchronized (this.peerInfoLock) {
- return this.peerData;
- }
- }
-
- public Long getConnectionTimestamp() {
- synchronized (this.peerInfoLock) {
- return this.connectionTimestamp;
- }
- }
-
- public String getPeersVersionString() {
- synchronized (this.peerInfoLock) {
- return this.peersVersionString;
- }
- }
-
- public Long getPeersVersion() {
- synchronized (this.peerInfoLock) {
- return this.peersVersion;
- }
- }
-
- /*package*/ void setPeersVersion(String versionString, long version) {
- synchronized (this.peerInfoLock) {
- this.peersVersionString = versionString;
- this.peersVersion = version;
- }
- }
-
- public Long getPeersConnectionTimestamp() {
- synchronized (this.peerInfoLock) {
- return this.peersConnectionTimestamp;
- }
- }
-
- /*package*/ void setPeersConnectionTimestamp(Long peersConnectionTimestamp) {
- synchronized (this.peerInfoLock) {
- this.peersConnectionTimestamp = peersConnectionTimestamp;
- }
- }
-
- public Long getLastPing() {
- synchronized (this.peerInfoLock) {
- return this.lastPing;
- }
- }
-
- /*package*/ void setLastPing(long lastPing) {
- synchronized (this.peerInfoLock) {
- this.lastPing = lastPing;
- }
- }
-
- /*package*/ byte[] getOurChallenge() {
- return this.ourChallenge;
- }
-
- public String getPeersNodeId() {
- synchronized (this.peerInfoLock) {
- return this.peersNodeId;
- }
- }
-
- /*package*/ void setPeersNodeId(String peersNodeId) {
- synchronized (this.peerInfoLock) {
- this.peersNodeId = peersNodeId;
- }
- }
-
- public byte[] getPeersPublicKey() {
- synchronized (this.peerInfoLock) {
- return this.peersPublicKey;
- }
- }
-
- /*package*/ void setPeersPublicKey(byte[] peerPublicKey) {
- synchronized (this.peerInfoLock) {
- this.peersPublicKey = peerPublicKey;
- }
- }
-
- public byte[] getPeersChallenge() {
- synchronized (this.peerInfoLock) {
- return this.peersChallenge;
- }
- }
-
- /*package*/ void setPeersChallenge(byte[] peersChallenge) {
- synchronized (this.peerInfoLock) {
- this.peersChallenge = peersChallenge;
- }
- }
-
- public PeerChainTipData getChainTipData() {
- synchronized (this.peerInfoLock) {
- return this.peersChainTipData;
- }
- }
-
- public void setChainTipData(PeerChainTipData chainTipData) {
- synchronized (this.peerInfoLock) {
- this.peersChainTipData = chainTipData;
- }
- }
-
- public CommonBlockData getCommonBlockData() {
- synchronized (this.peerInfoLock) {
- return this.commonBlockData;
- }
- }
-
- public void setCommonBlockData(CommonBlockData commonBlockData) {
- synchronized (this.peerInfoLock) {
- this.commonBlockData = commonBlockData;
- }
- }
-
- /*package*/ void queueMessage(Message message) {
- if (!this.pendingMessages.offer(message))
- LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this));
- }
-
- @Override
- public String toString() {
- // Easier, and nicer output, than peer.getRemoteSocketAddress()
- return this.peerData.getAddress().toString();
- }
-
- // Processing
-
- private void sharedSetup(Selector channelSelector) throws IOException {
- this.connectionTimestamp = NTP.getTime();
- this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
- this.socketChannel.configureBlocking(false);
- this.socketChannel.register(channelSelector, SelectionKey.OP_READ);
- this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC!
- this.replyQueues = Collections.synchronizedMap(new HashMap>());
- this.pendingMessages = new LinkedBlockingQueue<>();
-
- Random random = new SecureRandom();
- this.ourChallenge = new byte[ChallengeMessage.CHALLENGE_LENGTH];
- random.nextBytes(this.ourChallenge);
- }
-
- public SocketChannel connect(Selector channelSelector) {
- LOGGER.trace(() -> String.format("Connecting to peer %s", this));
-
- try {
- this.resolvedAddress = this.peerData.getAddress().toSocketAddress();
- this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
-
- this.socketChannel = SocketChannel.open();
- this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT);
- } catch (SocketTimeoutException e) {
- LOGGER.trace(String.format("Connection timed out to peer %s", this));
- return null;
- } catch (UnknownHostException e) {
- LOGGER.trace(String.format("Connection failed to unresolved peer %s", this));
- return null;
- } catch (IOException e) {
- LOGGER.trace(String.format("Connection failed to peer %s", this));
- return null;
- }
-
- try {
- LOGGER.debug(() -> String.format("Connected to peer %s", this));
- sharedSetup(channelSelector);
- return socketChannel;
- } catch (IOException e) {
- LOGGER.trace(String.format("Post-connection setup failed, peer %s", this));
- try {
- socketChannel.close();
- } catch (IOException ce) {
- // Failed to close?
- }
- return null;
- }
- }
-
- /**
- * Attempt to buffer bytes from socketChannel.
- *
- * @throws IOException
- */
- /* package */ void readChannel() throws IOException {
- synchronized (this.byteBufferLock) {
- while(true) {
- if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed())
- return;
-
- // Do we need to allocate byteBuffer?
- if (this.byteBuffer == null)
- this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize());
-
- final int priorPosition = this.byteBuffer.position();
- final int bytesRead = this.socketChannel.read(this.byteBuffer);
- if (bytesRead == -1) {
- this.disconnect("EOF");
- return;
- }
-
- LOGGER.trace(() -> {
- if (bytesRead > 0) {
- byte[] leadingBytes = new byte[Math.min(bytesRead, 8)];
- this.byteBuffer.asReadOnlyBuffer().position(priorPosition).get(leadingBytes);
- String leadingHex = HashCode.fromBytes(leadingBytes).toString();
-
- return String.format("Received %d bytes, starting %s, into byteBuffer[%d] from peer %s",
- bytesRead,
- leadingHex,
- priorPosition,
- this);
- } else {
- return String.format("Received %d bytes into byteBuffer[%d] from peer %s", bytesRead, priorPosition, this);
- }
- });
- final boolean wasByteBufferFull = !this.byteBuffer.hasRemaining();
-
- while (true) {
- final Message message;
-
- // Can we build a message from buffer now?
- ByteBuffer readOnlyBuffer = this.byteBuffer.asReadOnlyBuffer().flip();
- try {
- message = Message.fromByteBuffer(readOnlyBuffer);
- } catch (MessageException e) {
- LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this));
- this.disconnect(e.getMessage());
- return;
- }
-
- if (message == null && bytesRead == 0 && !wasByteBufferFull) {
- // No complete message in buffer, no more bytes to read from socket even though there was room to read bytes
-
- /* DISABLED
- // If byteBuffer is empty then we can deallocate it, to save memory, albeit costing GC
- if (this.byteBuffer.remaining() == this.byteBuffer.capacity())
- this.byteBuffer = null;
- */
-
- return;
- }
-
- if (message == null)
- // No complete message in buffer, but maybe more bytes to read from socket
- break;
-
- LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this));
-
- // Tidy up buffers:
- this.byteBuffer.flip();
- // Read-only, flipped buffer's position will be after end of message, so copy that
- this.byteBuffer.position(readOnlyBuffer.position());
- // Copy bytes after read message to front of buffer, adjusting position accordingly, reset limit to capacity
- this.byteBuffer.compact();
-
- BlockingQueue queue = this.replyQueues.get(message.getId());
- if (queue != null) {
- // Adding message to queue will unblock thread waiting for response
- this.replyQueues.get(message.getId()).add(message);
- // Consumed elsewhere
- continue;
- }
-
- // No thread waiting for message so we need to pass it up to network layer
-
- // Add message to pending queue
- if (!this.pendingMessages.offer(message)) {
- LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this));
- return;
- }
-
- // Prematurely end any blocking channel select so that new messages can be processed.
- // This might cause this.socketChannel.read() above to return zero into bytesRead.
- Network.getInstance().wakeupChannelSelector();
- }
- }
- }
- }
-
- /* package */ ExecuteProduceConsume.Task getMessageTask() {
- /*
- * If we are still handshaking and there is a message yet to be processed then
- * don't produce another message task. This allows us to process handshake
- * messages sequentially.
- */
- if (this.handshakeMessagePending)
- return null;
-
- final Message nextMessage = this.pendingMessages.poll();
-
- if (nextMessage == null)
- return null;
-
- LOGGER.trace(() -> String.format("Produced %s message task from peer %s", nextMessage.getType().name(), this));
-
- if (this.handshakeStatus != Handshake.COMPLETED)
- this.handshakeMessagePending = true;
-
- // Return a task to process message in queue
- return () -> Network.getInstance().onMessage(this, nextMessage);
- }
-
- /**
- * Attempt to send Message to peer.
- *
- * @param message
- * @return true
if message successfully sent; false
otherwise
- */
- public boolean sendMessage(Message message) {
- if (!this.socketChannel.isOpen())
- return false;
-
- try {
- // Send message
- LOGGER.trace(() -> String.format("Sending %s message with ID %d to peer %s", message.getType().name(), message.getId(), this));
-
- ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes());
-
- synchronized (this.socketChannel) {
- final long sendStart = System.currentTimeMillis();
-
- while (outputBuffer.hasRemaining()) {
- int bytesWritten = this.socketChannel.write(outputBuffer);
-
- LOGGER.trace(() -> String.format("Sent %d bytes of %s message with ID %d to peer %s",
- bytesWritten,
- message.getType().name(),
- message.getId(),
- this));
-
- if (bytesWritten == 0) {
- // Underlying socket's internal buffer probably full,
- // so wait a short while for bytes to actually be transmitted over the wire
-
- /*
- * NOSONAR squid:S2276 - we don't want to use this.socketChannel.wait()
- * as this releases the lock held by synchronized() above
- * and would allow another thread to send another message,
- * potentially interleaving them on-the-wire, causing checksum failures
- * and connection loss.
- */
- Thread.sleep(1L); //NOSONAR squid:S2276
-
- if (System.currentTimeMillis() - sendStart > RESPONSE_TIMEOUT)
- // We've taken too long to send this message
- return false;
- }
- }
- }
- } catch (MessageException e) {
- LOGGER.warn(String.format("Failed to send %s message with ID %d to peer %s: %s", message.getType().name(), message.getId(), this, e.getMessage()));
- return false;
- } catch (IOException e) {
- // Send failure
- return false;
- } catch (InterruptedException e) {
- // Likely shutdown scenario - so exit
- return false;
- }
-
- // Sent OK
- return true;
- }
-
- /**
- * Send message to peer and await response.
- *
- * Message is assigned a random ID and sent. If a response with matching ID is received then it is returned to caller.
- *
- * If no response with matching ID within timeout, or some other error/exception occurs, then return null
.
- * (Assume peer will be rapidly disconnected after this).
- *
- * @param message
- * @return Message
if valid response received; null
if not or error/exception occurs
- * @throws InterruptedException
- */
- public Message getResponse(Message message) throws InterruptedException {
- BlockingQueue blockingQueue = new ArrayBlockingQueue<>(1);
-
- // Assign random ID to this message
- Random random = new Random();
- int id;
- do {
- id = random.nextInt(Integer.MAX_VALUE - 1) + 1;
-
- // Put queue into map (keyed by message ID) so we can poll for a response
- // If putIfAbsent() doesn't return null, then this ID is already taken
- } while (this.replyQueues.putIfAbsent(id, blockingQueue) != null);
- message.setId(id);
-
- // Try to send message
- if (!this.sendMessage(message)) {
- this.replyQueues.remove(id);
- return null;
- }
-
- try {
- return blockingQueue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
- } finally {
- this.replyQueues.remove(id);
- }
- }
-
- /* package */ void startPings() {
- // Replacing initial null value allows getPingTask() to start sending pings.
- LOGGER.trace(() -> String.format("Enabling pings for peer %s", this));
- this.lastPingSent = NTP.getTime();
- }
-
- /* package */ ExecuteProduceConsume.Task getPingTask(Long now) {
- // Pings not enabled yet?
- if (now == null || this.lastPingSent == null)
- return null;
-
- // Time to send another ping?
- if (now < this.lastPingSent + PING_INTERVAL)
- return null; // Not yet
-
- // Not strictly true, but prevents this peer from being immediately chosen again
- this.lastPingSent = now;
-
- return () -> {
- PingMessage pingMessage = new PingMessage();
- Message message = this.getResponse(pingMessage);
-
- if (message == null || message.getType() != MessageType.PING) {
- LOGGER.debug(() -> String.format("Didn't receive reply from %s for PING ID %d", this, pingMessage.getId()));
- this.disconnect("no ping received");
- return;
- }
-
- this.setLastPing(NTP.getTime() - now);
- };
- }
-
- public void disconnect(String reason) {
- if (!isStopping)
- LOGGER.debug(() -> String.format("Disconnecting peer %s: %s", this, reason));
-
- this.shutdown();
-
- Network.getInstance().onDisconnect(this);
- }
-
- public void shutdown() {
- if (!isStopping)
- LOGGER.debug(() -> String.format("Shutting down peer %s", this));
-
- isStopping = true;
-
- if (this.socketChannel.isOpen()) {
- try {
- this.socketChannel.shutdownOutput();
- this.socketChannel.close();
- } catch (IOException e) {
- LOGGER.debug(String.format("IOException while trying to close peer %s", this));
- }
- }
- }
-
-
- // Minimum version
-
- public boolean isAtLeastVersion(String minVersionString) {
- if (minVersionString == null)
- return false;
-
- // Add the version prefix
- minVersionString = Controller.VERSION_PREFIX + minVersionString;
-
- Matcher matcher = VERSION_PATTERN.matcher(minVersionString);
- if (!matcher.lookingAt())
- return false;
-
- // We're expecting 3 positive shorts, so we can convert 1.2.3 into 0x0100020003
- long minVersion = 0;
- for (int g = 1; g <= 3; ++g) {
- long value = Long.parseLong(matcher.group(g));
-
- if (value < 0 || value > Short.MAX_VALUE)
- return false;
-
- minVersion <<= 16;
- minVersion |= value;
- }
-
- return this.getPeersVersion() >= minVersion;
- }
-
-
- // Common block data
-
- public boolean canUseCachedCommonBlockData() {
- PeerChainTipData peerChainTipData = this.getChainTipData();
- CommonBlockData commonBlockData = this.getCommonBlockData();
-
- if (peerChainTipData != null && commonBlockData != null) {
- PeerChainTipData commonBlockChainTipData = commonBlockData.getChainTipData();
- if (peerChainTipData.getLastBlockSignature() != null && commonBlockChainTipData != null && commonBlockChainTipData.getLastBlockSignature() != null) {
- if (Arrays.equals(peerChainTipData.getLastBlockSignature(), commonBlockChainTipData.getLastBlockSignature())) {
- return true;
- }
- }
- }
- return false;
- }
-
-
- // Utility methods
-
- /** Returns true if ports and addresses (or hostnames) match */
- public static boolean addressEquals(InetSocketAddress knownAddress, InetSocketAddress peerAddress) {
- if (knownAddress.getPort() != peerAddress.getPort())
- return false;
-
- return knownAddress.getHostString().equalsIgnoreCase(peerAddress.getHostString());
- }
-
- public static InetSocketAddress parsePeerAddress(String peerAddress) throws IllegalArgumentException {
- HostAndPort hostAndPort = HostAndPort.fromString(peerAddress).requireBracketsForIPv6();
-
- // HostAndPort doesn't try to validate host so we do extra checking here
- InetAddress address = InetAddresses.forString(hostAndPort.getHost());
-
- return new InetSocketAddress(address, hostAndPort.getPortOrDefault(Settings.getInstance().getDefaultListenPort()));
- }
-
- /** Returns true if address is loopback/link-local/site-local, false otherwise. */
- public static boolean isAddressLocal(InetAddress address) {
- return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress();
- }
-
+ private static final Logger LOGGER = LogManager.getLogger(Peer.class);
+
+ /**
+ * Maximum time to allow connect() to remote peer to complete. (ms)
+ */
+ private static final int CONNECT_TIMEOUT = 2000; // ms
+
+ /**
+ * Maximum time to wait for a message reply to arrive from peer. (ms)
+ */
+ private static final int RESPONSE_TIMEOUT = 3000; // ms
+
+ /**
+ * Interval between PING messages to a peer. (ms)
+ *
+ * Just under every 30s is usually ideal to keep NAT mappings refreshed.
+ */
+ private static final int PING_INTERVAL = 20_000; // ms
+
+ private volatile boolean isStopping = false;
+
+ private SocketChannel socketChannel = null;
+ private InetSocketAddress resolvedAddress = null;
+ /**
+ * True if remote address is loopback/link-local/site-local, false otherwise.
+ */
+ private boolean isLocal;
+
+ private final UUID peerConnectionId = UUID.randomUUID();
+ private final Object byteBufferLock = new Object();
+ private ByteBuffer byteBuffer;
+
+ private Map> replyQueues;
+ private LinkedBlockingQueue pendingMessages;
+
+ /**
+ * True if we created connection to peer, false if we accepted incoming connection from peer.
+ */
+ private final boolean isOutbound;
+
+ private final Object handshakingLock = new Object();
+ private Handshake handshakeStatus = Handshake.STARTED;
+ private volatile boolean handshakeMessagePending = false;
+ private long handshakeComplete = -1L;
+
+ /**
+ * Timestamp of when socket was accepted, or connected.
+ */
+ private Long connectionTimestamp = null;
+
+ /**
+ * Last PING message round-trip time (ms).
+ */
+ private Long lastPing = null;
+ /**
+ * When last PING message was sent, or null if pings not started yet.
+ */
+ private Long lastPingSent;
+
+ byte[] ourChallenge;
+
+ // Versioning
+ public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX
+ + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
+
+ // Peer info
+
+ private final Object peerInfoLock = new Object();
+
+ private String peersNodeId;
+ private byte[] peersPublicKey;
+ private byte[] peersChallenge;
+
+ private PeerData peerData = null;
+
+ /**
+ * Peer's value of connectionTimestamp.
+ */
+ private Long peersConnectionTimestamp = null;
+
+ /**
+ * Version string as reported by peer.
+ */
+ private String peersVersionString = null;
+ /**
+ * Numeric version of peer.
+ */
+ private Long peersVersion = null;
+
+ /**
+ * Latest block info as reported by peer.
+ */
+ private PeerChainTipData peersChainTipData;
+
+ /**
+ * Our common block with this peer
+ */
+ private CommonBlockData commonBlockData;
+
+ // Constructors
+
+ /**
+ * Construct unconnected, outbound Peer using socket address in peer data
+ */
+ public Peer(PeerData peerData) {
+ this.isOutbound = true;
+ this.peerData = peerData;
+ }
+
+ /**
+ * Construct Peer using existing, connected socket
+ */
+ public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException {
+ this.isOutbound = false;
+ this.socketChannel = socketChannel;
+ sharedSetup(channelSelector);
+
+ this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress());
+ this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
+
+ PeerAddress peerAddress = PeerAddress.fromSocket(socketChannel.socket());
+ this.peerData = new PeerData(peerAddress);
+ }
+
+ // Getters / setters
+
+ public boolean isStopping() {
+ return this.isStopping;
+ }
+
+ public SocketChannel getSocketChannel() {
+ return this.socketChannel;
+ }
+
+ public InetSocketAddress getResolvedAddress() {
+ return this.resolvedAddress;
+ }
+
+ public boolean isLocal() {
+ return this.isLocal;
+ }
+
+ public boolean isOutbound() {
+ return this.isOutbound;
+ }
+
+ public Handshake getHandshakeStatus() {
+ synchronized (this.handshakingLock) {
+ return this.handshakeStatus;
+ }
+ }
+
+ protected void setHandshakeStatus(Handshake handshakeStatus) {
+ synchronized (this.handshakingLock) {
+ this.handshakeStatus = handshakeStatus;
+ if (handshakeStatus.equals(Handshake.COMPLETED)) {
+ this.handshakeComplete = System.currentTimeMillis();
+ }
+ }
+ }
+
+ protected void resetHandshakeMessagePending() {
+ this.handshakeMessagePending = false;
+ }
+
+ public PeerData getPeerData() {
+ synchronized (this.peerInfoLock) {
+ return this.peerData;
+ }
+ }
+
+ public Long getConnectionTimestamp() {
+ synchronized (this.peerInfoLock) {
+ return this.connectionTimestamp;
+ }
+ }
+
+ public String getPeersVersionString() {
+ synchronized (this.peerInfoLock) {
+ return this.peersVersionString;
+ }
+ }
+
+ public Long getPeersVersion() {
+ synchronized (this.peerInfoLock) {
+ return this.peersVersion;
+ }
+ }
+
+ protected void setPeersVersion(String versionString, long version) {
+ synchronized (this.peerInfoLock) {
+ this.peersVersionString = versionString;
+ this.peersVersion = version;
+ }
+ }
+
+ public Long getPeersConnectionTimestamp() {
+ synchronized (this.peerInfoLock) {
+ return this.peersConnectionTimestamp;
+ }
+ }
+
+ protected void setPeersConnectionTimestamp(Long peersConnectionTimestamp) {
+ synchronized (this.peerInfoLock) {
+ this.peersConnectionTimestamp = peersConnectionTimestamp;
+ }
+ }
+
+ public Long getLastPing() {
+ synchronized (this.peerInfoLock) {
+ return this.lastPing;
+ }
+ }
+
+ protected void setLastPing(long lastPing) {
+ synchronized (this.peerInfoLock) {
+ this.lastPing = lastPing;
+ }
+ }
+
+ protected byte[] getOurChallenge() {
+ return this.ourChallenge;
+ }
+
+ public String getPeersNodeId() {
+ synchronized (this.peerInfoLock) {
+ return this.peersNodeId;
+ }
+ }
+
+ protected void setPeersNodeId(String peersNodeId) {
+ synchronized (this.peerInfoLock) {
+ this.peersNodeId = peersNodeId;
+ }
+ }
+
+ public byte[] getPeersPublicKey() {
+ synchronized (this.peerInfoLock) {
+ return this.peersPublicKey;
+ }
+ }
+
+ protected void setPeersPublicKey(byte[] peerPublicKey) {
+ synchronized (this.peerInfoLock) {
+ this.peersPublicKey = peerPublicKey;
+ }
+ }
+
+ public byte[] getPeersChallenge() {
+ synchronized (this.peerInfoLock) {
+ return this.peersChallenge;
+ }
+ }
+
+ protected void setPeersChallenge(byte[] peersChallenge) {
+ synchronized (this.peerInfoLock) {
+ this.peersChallenge = peersChallenge;
+ }
+ }
+
+ public PeerChainTipData getChainTipData() {
+ synchronized (this.peerInfoLock) {
+ return this.peersChainTipData;
+ }
+ }
+
+ public void setChainTipData(PeerChainTipData chainTipData) {
+ synchronized (this.peerInfoLock) {
+ this.peersChainTipData = chainTipData;
+ }
+ }
+
+ public CommonBlockData getCommonBlockData() {
+ synchronized (this.peerInfoLock) {
+ return this.commonBlockData;
+ }
+ }
+
+ public void setCommonBlockData(CommonBlockData commonBlockData) {
+ synchronized (this.peerInfoLock) {
+ this.commonBlockData = commonBlockData;
+ }
+ }
+
+ protected void queueMessage(Message message) {
+ if (!this.pendingMessages.offer(message)) {
+ LOGGER.info("[{}] No room to queue message from peer {} - discarding", this.peerConnectionId, this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ // Easier, and nicer output, than peer.getRemoteSocketAddress()
+ return this.peerData.getAddress().toString();
+ }
+
+ // Processing
+
+ private void sharedSetup(Selector channelSelector) throws IOException {
+ this.connectionTimestamp = NTP.getTime();
+ this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
+ this.socketChannel.configureBlocking(false);
+ this.socketChannel.register(channelSelector, SelectionKey.OP_READ);
+ this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC!
+ this.replyQueues = Collections.synchronizedMap(new HashMap>());
+ this.pendingMessages = new LinkedBlockingQueue<>();
+
+ Random random = new SecureRandom();
+ this.ourChallenge = new byte[ChallengeMessage.CHALLENGE_LENGTH];
+ random.nextBytes(this.ourChallenge);
+ }
+
+ public SocketChannel connect(Selector channelSelector) {
+ LOGGER.trace("[{}] Connecting to peer {}", this.peerConnectionId, this);
+
+ try {
+ this.resolvedAddress = this.peerData.getAddress().toSocketAddress();
+ this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
+
+ this.socketChannel = SocketChannel.open();
+ this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT);
+ } catch (SocketTimeoutException e) {
+ LOGGER.trace("[{}] Connection timed out to peer {}", this.peerConnectionId, this);
+ return null;
+ } catch (UnknownHostException e) {
+ LOGGER.trace("[{}] Connection failed to unresolved peer {}", this.peerConnectionId, this);
+ return null;
+ } catch (IOException e) {
+ LOGGER.trace("[{}] Connection failed to peer {}", this.peerConnectionId, this);
+ return null;
+ }
+
+ try {
+ LOGGER.debug("[{}] Connected to peer {}", this.peerConnectionId, this);
+ sharedSetup(channelSelector);
+ return socketChannel;
+ } catch (IOException e) {
+ LOGGER.trace("[{}] Post-connection setup failed, peer {}", this.peerConnectionId, this);
+ try {
+ socketChannel.close();
+ } catch (IOException ce) {
+ // Failed to close?
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Attempt to buffer bytes from socketChannel.
+ *
+ * @throws IOException If this channel is not yet connected
+ */
+ protected void readChannel() throws IOException {
+ synchronized (this.byteBufferLock) {
+ while (true) {
+ if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) {
+ return;
+ }
+
+ // Do we need to allocate byteBuffer?
+ if (this.byteBuffer == null) {
+ this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize());
+ }
+
+ final int priorPosition = this.byteBuffer.position();
+ final int bytesRead = this.socketChannel.read(this.byteBuffer);
+ if (bytesRead == -1) {
+ if (priorPosition > 0) {
+ this.disconnect("EOF - read " + priorPosition + " bytes");
+ } else {
+ this.disconnect("EOF - failed to read any data");
+ }
+ return;
+ }
+
+ if (bytesRead > 0) {
+ byte[] leadingBytes = new byte[Math.min(bytesRead, 8)];
+ this.byteBuffer.asReadOnlyBuffer().position(priorPosition).get(leadingBytes);
+ String leadingHex = HashCode.fromBytes(leadingBytes).toString();
+
+ LOGGER.trace("[{}] Received {} bytes, starting {}, into byteBuffer[{}] from peer {}",
+ this.peerConnectionId, bytesRead, leadingHex, priorPosition, this);
+ } else {
+ LOGGER.trace("[{}] Received {} bytes into byteBuffer[{}] from peer {}", this.peerConnectionId,
+ bytesRead, priorPosition, this);
+ }
+ final boolean wasByteBufferFull = !this.byteBuffer.hasRemaining();
+
+ while (true) {
+ final Message message;
+
+ // Can we build a message from buffer now?
+ ByteBuffer readOnlyBuffer = this.byteBuffer.asReadOnlyBuffer().flip();
+ try {
+ message = Message.fromByteBuffer(readOnlyBuffer);
+ } catch (MessageException e) {
+ LOGGER.debug("[{}] {}, from peer {}", this.peerConnectionId, e.getMessage(), this);
+ this.disconnect(e.getMessage());
+ return;
+ }
+
+ if (message == null && bytesRead == 0 && !wasByteBufferFull) {
+ // No complete message in buffer, no more bytes to read from socket
+ // even though there was room to read bytes
+
+ /* DISABLED
+ // If byteBuffer is empty then we can deallocate it, to save memory, albeit costing GC
+ if (this.byteBuffer.remaining() == this.byteBuffer.capacity()) {
+ this.byteBuffer = null;
+ }
+ */
+
+ return;
+ }
+
+ if (message == null) {
+ // No complete message in buffer, but maybe more bytes to read from socket
+ break;
+ }
+
+ LOGGER.trace("[{}] Received {} message with ID {} from peer {}", this.peerConnectionId,
+ message.getType().name(), message.getId(), this);
+
+ // Tidy up buffers:
+ this.byteBuffer.flip();
+ // Read-only, flipped buffer's position will be after end of message, so copy that
+ this.byteBuffer.position(readOnlyBuffer.position());
+ // Copy bytes after read message to front of buffer,
+ // adjusting position accordingly, reset limit to capacity
+ this.byteBuffer.compact();
+
+ BlockingQueue queue = this.replyQueues.get(message.getId());
+ if (queue != null) {
+ // Adding message to queue will unblock thread waiting for response
+ this.replyQueues.get(message.getId()).add(message);
+ // Consumed elsewhere
+ continue;
+ }
+
+ // No thread waiting for message so we need to pass it up to network layer
+
+ // Add message to pending queue
+ if (!this.pendingMessages.offer(message)) {
+ LOGGER.info("[{}] No room to queue message from peer {} - discarding",
+ this.peerConnectionId, this);
+ return;
+ }
+
+ // Prematurely end any blocking channel select so that new messages can be processed.
+ // This might cause this.socketChannel.read() above to return zero into bytesRead.
+ Network.getInstance().wakeupChannelSelector();
+ }
+ }
+ }
+ }
+
+ protected ExecuteProduceConsume.Task getMessageTask() {
+ /*
+ * If we are still handshaking and there is a message yet to be processed then
+ * don't produce another message task. This allows us to process handshake
+ * messages sequentially.
+ */
+ if (this.handshakeMessagePending) {
+ return null;
+ }
+
+ final Message nextMessage = this.pendingMessages.poll();
+
+ if (nextMessage == null) {
+ return null;
+ }
+
+ LOGGER.trace("[{}] Produced {} message task from peer {}", this.peerConnectionId,
+ nextMessage.getType().name(), this);
+
+ if (this.handshakeStatus != Handshake.COMPLETED) {
+ this.handshakeMessagePending = true;
+ }
+
+ // Return a task to process message in queue
+ return () -> Network.getInstance().onMessage(this, nextMessage);
+ }
+
+ /**
+ * Attempt to send Message to peer.
+ *
+ * @param message message to be sent
+ * @return true
if message successfully sent; false
otherwise
+ */
+ public boolean sendMessage(Message message) {
+ if (!this.socketChannel.isOpen()) {
+ return false;
+ }
+
+ try {
+ // Send message
+ LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", this.peerConnectionId,
+ message.getType().name(), message.getId(), this);
+
+ ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes());
+
+ synchronized (this.socketChannel) {
+ final long sendStart = System.currentTimeMillis();
+
+ while (outputBuffer.hasRemaining()) {
+ int bytesWritten = this.socketChannel.write(outputBuffer);
+
+ LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {}", this.peerConnectionId,
+ bytesWritten, message.getType().name(), message.getId(), this);
+
+ if (bytesWritten == 0) {
+ // Underlying socket's internal buffer probably full,
+ // so wait a short while for bytes to actually be transmitted over the wire
+
+ /*
+ * NOSONAR squid:S2276 - we don't want to use this.socketChannel.wait()
+ * as this releases the lock held by synchronized() above
+ * and would allow another thread to send another message,
+ * potentially interleaving them on-the-wire, causing checksum failures
+ * and connection loss.
+ */
+ Thread.sleep(1L); //NOSONAR squid:S2276
+
+ if (System.currentTimeMillis() - sendStart > RESPONSE_TIMEOUT) {
+ // We've taken too long to send this message
+ return false;
+ }
+ }
+ }
+ }
+ } catch (MessageException e) {
+ LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId,
+ message.getType().name(), message.getId(), this, e.getMessage());
+ return false;
+ } catch (IOException | InterruptedException e) {
+ // Send failure
+ return false;
+ }
+
+ // Sent OK
+ return true;
+ }
+
+ /**
+ * Send message to peer and await response.
+ *
+ * Message is assigned a random ID and sent.
+ * If a response with matching ID is received then it is returned to caller.
+ *
+ * If no response with matching ID within timeout, or some other error/exception occurs,
+ * then return null
.
+ * (Assume peer will be rapidly disconnected after this).
+ *
+ * @param message message to send
+ * @return Message
if valid response received; null
if not or error/exception occurs
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public Message getResponse(Message message) throws InterruptedException {
+ BlockingQueue blockingQueue = new ArrayBlockingQueue<>(1);
+
+ // Assign random ID to this message
+ Random random = new Random();
+ int id;
+ do {
+ id = random.nextInt(Integer.MAX_VALUE - 1) + 1;
+
+ // Put queue into map (keyed by message ID) so we can poll for a response
+ // If putIfAbsent() doesn't return null, then this ID is already taken
+ } while (this.replyQueues.putIfAbsent(id, blockingQueue) != null);
+ message.setId(id);
+
+ // Try to send message
+ if (!this.sendMessage(message)) {
+ this.replyQueues.remove(id);
+ return null;
+ }
+
+ try {
+ return blockingQueue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ } finally {
+ this.replyQueues.remove(id);
+ }
+ }
+
+ protected void startPings() {
+ // Replacing initial null value allows getPingTask() to start sending pings.
+ LOGGER.trace("[{}] Enabling pings for peer {}", this.peerConnectionId, this);
+ this.lastPingSent = NTP.getTime();
+ }
+
+ protected ExecuteProduceConsume.Task getPingTask(Long now) {
+ // Pings not enabled yet?
+ if (now == null || this.lastPingSent == null) {
+ return null;
+ }
+
+ // Time to send another ping?
+ if (now < this.lastPingSent + PING_INTERVAL) {
+ return null; // Not yet
+ }
+
+ // Not strictly true, but prevents this peer from being immediately chosen again
+ this.lastPingSent = now;
+
+ return () -> {
+ PingMessage pingMessage = new PingMessage();
+ Message message = this.getResponse(pingMessage);
+
+ if (message == null || message.getType() != MessageType.PING) {
+ LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}", this.peerConnectionId, this,
+ pingMessage.getId());
+ this.disconnect("no ping received");
+ return;
+ }
+
+ this.setLastPing(NTP.getTime() - now);
+ };
+ }
+
+ public void disconnect(String reason) {
+ if (!isStopping) {
+ LOGGER.debug("[{}] Disconnecting peer {} after {}: {}", this.peerConnectionId, this,
+ getConnectionAge(), reason);
+ }
+ this.shutdown();
+
+ Network.getInstance().onDisconnect(this);
+ }
+
+ public void shutdown() {
+ if (!isStopping) {
+ LOGGER.debug("[{}] Shutting down peer {}", this.peerConnectionId, this);
+ }
+ isStopping = true;
+
+ if (this.socketChannel.isOpen()) {
+ try {
+ this.socketChannel.shutdownOutput();
+ this.socketChannel.close();
+ } catch (IOException e) {
+ LOGGER.debug("[{}] IOException while trying to close peer {}", this.peerConnectionId, this);
+ }
+ }
+ }
+
+
+ // Minimum version
+
+ public boolean isAtLeastVersion(String minVersionString) {
+ if (minVersionString == null) {
+ return false;
+ }
+
+ // Add the version prefix
+ minVersionString = Controller.VERSION_PREFIX + minVersionString;
+
+ Matcher matcher = VERSION_PATTERN.matcher(minVersionString);
+ if (!matcher.lookingAt()) {
+ return false;
+ }
+
+ // We're expecting 3 positive shorts, so we can convert 1.2.3 into 0x0100020003
+ long minVersion = 0;
+ for (int g = 1; g <= 3; ++g) {
+ long value = Long.parseLong(matcher.group(g));
+
+ if (value < 0 || value > Short.MAX_VALUE) {
+ return false;
+ }
+
+ minVersion <<= 16;
+ minVersion |= value;
+ }
+
+ return this.getPeersVersion() >= minVersion;
+ }
+
+
+ // Common block data
+
+ public boolean canUseCachedCommonBlockData() {
+ PeerChainTipData peerChainTipData = this.getChainTipData();
+ CommonBlockData commonBlockData = this.getCommonBlockData();
+
+ if (peerChainTipData != null && commonBlockData != null) {
+ PeerChainTipData commonBlockChainTipData = commonBlockData.getChainTipData();
+ if (peerChainTipData.getLastBlockSignature() != null && commonBlockChainTipData != null
+ && commonBlockChainTipData.getLastBlockSignature() != null) {
+ if (Arrays.equals(peerChainTipData.getLastBlockSignature(),
+ commonBlockChainTipData.getLastBlockSignature())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+
+ // Utility methods
+
+ /**
+ * Returns true if ports and addresses (or hostnames) match
+ */
+ public static boolean addressEquals(InetSocketAddress knownAddress, InetSocketAddress peerAddress) {
+ if (knownAddress.getPort() != peerAddress.getPort()) {
+ return false;
+ }
+
+ return knownAddress.getHostString().equalsIgnoreCase(peerAddress.getHostString());
+ }
+
+ public static InetSocketAddress parsePeerAddress(String peerAddress) throws IllegalArgumentException {
+ HostAndPort hostAndPort = HostAndPort.fromString(peerAddress).requireBracketsForIPv6();
+
+ // HostAndPort doesn't try to validate host so we do extra checking here
+ InetAddress address = InetAddresses.forString(hostAndPort.getHost());
+
+ int defaultPort = Settings.getInstance().getDefaultListenPort();
+ return new InetSocketAddress(address, hostAndPort.getPortOrDefault(defaultPort));
+ }
+
+ /**
+ * Returns true if address is loopback/link-local/site-local, false otherwise.
+ */
+ public static boolean isAddressLocal(InetAddress address) {
+ return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress();
+ }
+
+ public UUID getPeerConnectionId() {
+ return peerConnectionId;
+ }
+
+ public long getConnectionEstablishedTime() {
+ return handshakeComplete;
+ }
+
+ public long getConnectionAge() {
+ if (handshakeComplete > 0L) {
+ return System.currentTimeMillis() - handshakeComplete;
+ }
+ return handshakeComplete;
+ }
}