diff --git a/src/main/java/org/qortal/api/model/ConnectedPeer.java b/src/main/java/org/qortal/api/model/ConnectedPeer.java index 3209ee6a..a98ead76 100644 --- a/src/main/java/org/qortal/api/model/ConnectedPeer.java +++ b/src/main/java/org/qortal/api/model/ConnectedPeer.java @@ -1,61 +1,64 @@ package org.qortal.api.model; -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; - +import io.swagger.v3.oas.annotations.media.Schema; import org.qortal.data.network.PeerChainTipData; import org.qortal.data.network.PeerData; import org.qortal.network.Handshake; import org.qortal.network.Peer; -import io.swagger.v3.oas.annotations.media.Schema; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import java.util.UUID; @XmlAccessorType(XmlAccessType.FIELD) public class ConnectedPeer { - public enum Direction { - INBOUND, - OUTBOUND; - } - public Direction direction; - public Handshake handshakeStatus; - public Long lastPing; - public Long connectedWhen; - public Long peersConnectedWhen; + public enum Direction { + INBOUND, + OUTBOUND; + } - public String address; - public String version; + public Direction direction; + public Handshake handshakeStatus; + public Long lastPing; + public Long connectedWhen; + public Long peersConnectedWhen; - public String nodeId; + public String address; + public String version; - public Integer lastHeight; - @Schema(example = "base58") - public byte[] lastBlockSignature; - public Long lastBlockTimestamp; + public String nodeId; - protected ConnectedPeer() { - } + public Integer lastHeight; + @Schema(example = "base58") + public byte[] lastBlockSignature; + public Long lastBlockTimestamp; + public UUID connectionId; - public ConnectedPeer(Peer peer) { - this.direction = peer.isOutbound() ? Direction.OUTBOUND : Direction.INBOUND; - this.handshakeStatus = peer.getHandshakeStatus(); - this.lastPing = peer.getLastPing(); + protected ConnectedPeer() { + } - PeerData peerData = peer.getPeerData(); - this.connectedWhen = peer.getConnectionTimestamp(); - this.peersConnectedWhen = peer.getPeersConnectionTimestamp(); + public ConnectedPeer(Peer peer) { + this.direction = peer.isOutbound() ? Direction.OUTBOUND : Direction.INBOUND; + this.handshakeStatus = peer.getHandshakeStatus(); + this.lastPing = peer.getLastPing(); - this.address = peerData.getAddress().toString(); + PeerData peerData = peer.getPeerData(); + this.connectedWhen = peer.getConnectionTimestamp(); + this.peersConnectedWhen = peer.getPeersConnectionTimestamp(); - this.version = peer.getPeersVersionString(); - this.nodeId = peer.getPeersNodeId(); + this.address = peerData.getAddress().toString(); - PeerChainTipData peerChainTipData = peer.getChainTipData(); - if (peerChainTipData != null) { - this.lastHeight = peerChainTipData.getLastHeight(); - this.lastBlockSignature = peerChainTipData.getLastBlockSignature(); - this.lastBlockTimestamp = peerChainTipData.getLastBlockTimestamp(); - } - } + this.version = peer.getPeersVersionString(); + this.nodeId = peer.getPeersNodeId(); + this.connectionId = peer.getPeerConnectionId(); + + PeerChainTipData peerChainTipData = peer.getChainTipData(); + if (peerChainTipData != null) { + this.lastHeight = peerChainTipData.getLastHeight(); + this.lastBlockSignature = peerChainTipData.getLastBlockSignature(); + this.lastBlockTimestamp = peerChainTipData.getLastBlockTimestamp(); + } + } } diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index 0e9ac32b..635c2dfc 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -1,34 +1,5 @@ package org.qortal.network; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; -import java.net.UnknownHostException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bouncycastle.crypto.params.Ed25519PrivateKeyParameters; @@ -39,1106 +10,1237 @@ import org.qortal.crypto.Crypto; import org.qortal.data.block.BlockData; import org.qortal.data.network.PeerData; import org.qortal.data.transaction.TransactionData; -import org.qortal.network.message.GetPeersMessage; -import org.qortal.network.message.GetUnconfirmedTransactionsMessage; -import org.qortal.network.message.HeightV2Message; -import org.qortal.network.message.Message; -import org.qortal.network.message.PeersV2Message; -import org.qortal.network.message.PingMessage; -import org.qortal.network.message.TransactionSignaturesMessage; +import org.qortal.network.message.*; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; import org.qortal.settings.Settings; import org.qortal.utils.ExecuteProduceConsume; -// import org.qortal.utils.ExecutorDumper; import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot; import org.qortal.utils.NTP; import org.qortal.utils.NamedThreadFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.net.UnknownHostException; +import java.nio.channels.*; +import java.security.SecureRandom; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.qortal.controller.Controller.MISBEHAVIOUR_COOLOFF; + // For managing peers public class Network { - - private static final Logger LOGGER = LogManager.getLogger(Network.class); - private static Network instance; - - private static final int LISTEN_BACKLOG = 10; - /** How long before retrying after a connection failure, in milliseconds. */ - private static final long CONNECT_FAILURE_BACKOFF = 5 * 60 * 1000L; // ms - /** How long between informational broadcasts to all connected peers, in milliseconds. */ - private static final long BROADCAST_INTERVAL = 60 * 1000L; // ms - /** Maximum time since last successful connection for peer info to be propagated, in milliseconds. */ - private static final long RECENT_CONNECTION_THRESHOLD = 24 * 60 * 60 * 1000L; // ms - /** Maximum time since last connection attempt before a peer is potentially considered "old", in milliseconds. */ - private static final long OLD_PEER_ATTEMPTED_PERIOD = 24 * 60 * 60 * 1000L; // ms - /** Maximum time since last successful connection before a peer is potentially considered "old", in milliseconds. */ - private static final long OLD_PEER_CONNECTION_PERIOD = 7 * 24 * 60 * 60 * 1000L; // ms - /** Maximum time allowed for handshake to complete, in milliseconds. */ - private static final long HANDSHAKE_TIMEOUT = 60 * 1000L; // ms - - private static final byte[] MAINNET_MESSAGE_MAGIC = new byte[] { 0x51, 0x4f, 0x52, 0x54 }; // QORT - private static final byte[] TESTNET_MESSAGE_MAGIC = new byte[] { 0x71, 0x6f, 0x72, 0x54 }; // qorT - - private static final String[] INITIAL_PEERS = new String[] { - "node1.qortal.org", "node2.qortal.org", "node3.qortal.org", "node4.qortal.org", "node5.qortal.org", - "node6.qortal.org", "node7.qortal.org", "node8.qortal.org", "node9.qortal.org", "node10.qortal.org", - "node.qortal.ru", "node2.qortal.ru", "node3.qortal.ru", "node.qortal.uk" - }; - - private static final long NETWORK_EPC_KEEPALIVE = 10L; // seconds - - public static final int MAX_SIGNATURES_PER_REPLY = 500; - public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500; - - // Generate our node keys / ID - private final Ed25519PrivateKeyParameters edPrivateKeyParams = new Ed25519PrivateKeyParameters(new SecureRandom()); - private final Ed25519PublicKeyParameters edPublicKeyParams = edPrivateKeyParams.generatePublicKey(); - private final String ourNodeId = Crypto.toNodeAddress(edPublicKeyParams.getEncoded()); - - private final int maxMessageSize; - private final int minOutboundPeers; - private final int maxPeers; - - private final List allKnownPeers = new ArrayList<>(); - private final List connectedPeers = new ArrayList<>(); - private final List selfPeers = new ArrayList<>(); - - private final ExecuteProduceConsume networkEPC; - private Selector channelSelector; - private ServerSocketChannel serverChannel; - private Iterator channelIterator = null; - - // volatile because value is updated inside any one of the EPC threads - private volatile long nextConnectTaskTimestamp = 0L; // ms - try first connect once NTP syncs - - private ExecutorService broadcastExecutor = Executors.newCachedThreadPool(); - // volatile because value is updated inside any one of the EPC threads - private volatile long nextBroadcastTimestamp = 0L; // ms - try first broadcast once NTP syncs - - private final Lock mergePeersLock = new ReentrantLock(); - - // Constructors - - private Network() { - maxMessageSize = 4 + 1 + 4 + BlockChain.getInstance().getMaxBlockSize(); - - minOutboundPeers = Settings.getInstance().getMinOutboundPeers(); - maxPeers = Settings.getInstance().getMaxPeers(); - - // We'll use a cached thread pool but with more aggressive timeout. - ExecutorService networkExecutor = new ThreadPoolExecutor(1, - Settings.getInstance().getMaxNetworkThreadPoolSize(), - NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS, - new SynchronousQueue(), - new NamedThreadFactory("Network-EPC")); - networkEPC = new NetworkProcessor(networkExecutor); - } - - public void start() throws IOException, DataException { - // Grab P2P port from settings - int listenPort = Settings.getInstance().getListenPort(); - - // Grab P2P bind address from settings - try { - InetAddress bindAddr = InetAddress.getByName(Settings.getInstance().getBindAddress()); - InetSocketAddress endpoint = new InetSocketAddress(bindAddr, listenPort); - - channelSelector = Selector.open(); - - // Set up listen socket - serverChannel = ServerSocketChannel.open(); - serverChannel.configureBlocking(false); - serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - serverChannel.bind(endpoint, LISTEN_BACKLOG); - serverChannel.register(channelSelector, SelectionKey.OP_ACCEPT); - } catch (UnknownHostException e) { - LOGGER.error(String.format("Can't bind listen socket to address %s", Settings.getInstance().getBindAddress())); - throw new IOException("Can't bind listen socket to address", e); - } catch (IOException e) { - LOGGER.error(String.format("Can't create listen socket: %s", e.getMessage())); - throw new IOException("Can't create listen socket", e); - } - - // Load all known peers from repository - try (final Repository repository = RepositoryManager.getRepository()) { - synchronized (this.allKnownPeers) { - this.allKnownPeers.addAll(repository.getNetworkRepository().getAllPeers()); - } - } - - // Start up first networking thread - networkEPC.start(); - } - - // Getters / setters - - public static synchronized Network getInstance() { - if (instance == null) - instance = new Network(); - - return instance; - } - - public byte[] getMessageMagic() { - return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC; - } - - public String getOurNodeId() { - return this.ourNodeId; - } - - /*package*/ byte[] getOurPublicKey() { - return this.edPublicKeyParams.getEncoded(); - } - - /** Maximum message size (bytes). Needs to be at least maximum block size + MAGIC + message type, etc. */ - /* package */ int getMaxMessageSize() { - return this.maxMessageSize; - } - - public StatsSnapshot getStatsSnapshot() { - return this.networkEPC.getStatsSnapshot(); - } - - // Peer lists - - public List getAllKnownPeers() { - synchronized (this.allKnownPeers) { - return new ArrayList<>(this.allKnownPeers); - } - } - - public List getConnectedPeers() { - synchronized (this.connectedPeers) { - return new ArrayList<>(this.connectedPeers); - } - } - - public List getSelfPeers() { - synchronized (this.selfPeers) { - return new ArrayList<>(this.selfPeers); - } - } - - /** Returns list of connected peers that have completed handshaking. */ - public List getHandshakedPeers() { - synchronized (this.connectedPeers) { - return this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); - } - } - - /** Returns list of peers we connected to that have completed handshaking. */ - public List getOutboundHandshakedPeers() { - synchronized (this.connectedPeers) { - return this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); - } - } - - /** Returns first peer that has completed handshaking and has matching public key. */ - public Peer getHandshakedPeerWithPublicKey(byte[] publicKey) { - synchronized (this.connectedPeers) { - return this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED && Arrays.equals(peer.getPeersPublicKey(), publicKey)).findFirst().orElse(null); - } - } - - // Peer list filters - - /** Must be inside synchronized (this.selfPeers) {...} */ - private final Predicate isSelfPeer = peerData -> { - PeerAddress peerAddress = peerData.getAddress(); - return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress)); - }; - - /** Must be inside synchronized (this.connectedPeers) {...} */ - private final Predicate isConnectedPeer = peerData -> { - PeerAddress peerAddress = peerData.getAddress(); - return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); - }; - - /** Must be inside synchronized (this.connectedPeers) {...} */ - private final Predicate isResolvedAsConnectedPeer = peerData -> { - try { - InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress(); - return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress)); - } catch (UnknownHostException e) { - // Can't resolve - no point even trying to connect - return true; - } - }; - - // Initial setup - - public static void installInitialPeers(Repository repository) throws DataException { - for (String address : INITIAL_PEERS) { - PeerAddress peerAddress = PeerAddress.fromString(address); - - PeerData peerData = new PeerData(peerAddress, System.currentTimeMillis(), "INIT"); - repository.getNetworkRepository().save(peerData); - } - - repository.saveChanges(); - } - - // Main thread - - class NetworkProcessor extends ExecuteProduceConsume { - - public NetworkProcessor(ExecutorService executor) { - super(executor); - } - - @Override - protected void onSpawnFailure() { - // For debugging: - // ExecutorDumper.dump(this.executor, 3, ExecuteProduceConsume.class); - } - - @Override - protected Task produceTask(boolean canBlock) throws InterruptedException { - Task task; - - task = maybeProducePeerMessageTask(); - if (task != null) - return task; - - final Long now = NTP.getTime(); - - task = maybeProducePeerPingTask(now); - if (task != null) - return task; - - task = maybeProduceConnectPeerTask(now); - if (task != null) - return task; - - task = maybeProduceBroadcastTask(now); - if (task != null) - return task; - - // Only this method can block to reduce CPU spin - task = maybeProduceChannelTask(canBlock); - if (task != null) - return task; - - // Really nothing to do - return null; - } - - private Task maybeProducePeerMessageTask() { - for (Peer peer : getConnectedPeers()) { - Task peerTask = peer.getMessageTask(); - if (peerTask != null) - return peerTask; - } - - return null; - } - - private Task maybeProducePeerPingTask(Long now) { - // Ask connected peers whether they need a ping - for (Peer peer : getHandshakedPeers()) { - Task peerTask = peer.getPingTask(now); - if (peerTask != null) - return peerTask; - } - - return null; - } - - class PeerConnectTask implements ExecuteProduceConsume.Task { - private final Peer peer; - - public PeerConnectTask(Peer peer) { - this.peer = peer; - } - - @Override - public void perform() throws InterruptedException { - connectPeer(peer); - } - } - - private Task maybeProduceConnectPeerTask(Long now) throws InterruptedException { - if (now == null || now < nextConnectTaskTimestamp) - return null; - - if (getOutboundHandshakedPeers().size() >= minOutboundPeers) - return null; - - nextConnectTaskTimestamp = now + 1000L; - - Peer targetPeer = getConnectablePeer(now); - if (targetPeer == null) - return null; - - // Create connection task - return new PeerConnectTask(targetPeer); - } - - private Task maybeProduceBroadcastTask(Long now) { - if (now == null || now < nextBroadcastTimestamp) - return null; - - nextBroadcastTimestamp = now + BROADCAST_INTERVAL; - return () -> Controller.getInstance().doNetworkBroadcast(); - } - - class ChannelTask implements ExecuteProduceConsume.Task { - private final SelectionKey selectionKey; - - public ChannelTask(SelectionKey selectionKey) { - this.selectionKey = selectionKey; - } - - @Override - public void perform() throws InterruptedException { - try { - LOGGER.trace(() -> String.format("Thread %d has pending channel: %s, with ops %d", - Thread.currentThread().getId(), selectionKey.channel(), selectionKey.readyOps())); - - // process pending channel task - if (selectionKey.isReadable()) { - connectionRead((SocketChannel) selectionKey.channel()); - } else if (selectionKey.isAcceptable()) { - acceptConnection((ServerSocketChannel) selectionKey.channel()); - } - - LOGGER.trace(() -> String.format("Thread %d processed channel: %s", Thread.currentThread().getId(), selectionKey.channel())); - } catch (CancelledKeyException e) { - LOGGER.trace(() -> String.format("Thread %s encountered cancelled channel: %s", Thread.currentThread().getId(), selectionKey.channel())); - } - } - - private void connectionRead(SocketChannel socketChannel) { - Peer peer = getPeerFromChannel(socketChannel); - if (peer == null) - return; - - try { - peer.readChannel(); - } catch (IOException e) { - if (e.getMessage() != null && e.getMessage().toLowerCase().contains("onnection reset")) { - peer.disconnect("Connection reset"); - return; - } - - LOGGER.trace(() -> String.format("Network thread %s encountered I/O error: %s", Thread.currentThread().getId(), e.getMessage()), e); - peer.disconnect("I/O error"); - } - } - } - - private Task maybeProduceChannelTask(boolean canBlock) throws InterruptedException { - final SelectionKey nextSelectionKey; - - // Synchronization here to enforce thread-safety on channelIterator - synchronized (channelSelector) { - // anything to do? - if (channelIterator == null) { - try { - if (canBlock) - channelSelector.select(1000L); - else - channelSelector.selectNow(); - } catch (IOException e) { - LOGGER.warn(String.format("Channel selection threw IOException: %s", e.getMessage())); - return null; - } - - if (Thread.currentThread().isInterrupted()) - throw new InterruptedException(); - - channelIterator = channelSelector.selectedKeys().iterator(); - } - - if (channelIterator.hasNext()) { - nextSelectionKey = channelIterator.next(); - channelIterator.remove(); - } else { - nextSelectionKey = null; - channelIterator = null; // Nothing to do so reset iterator to cause new select - } - - LOGGER.trace(() -> String.format("Thread %d, nextSelectionKey %s, channelIterator now %s", - Thread.currentThread().getId(), nextSelectionKey, channelIterator)); - } - - if (nextSelectionKey == null) - return null; - - return new ChannelTask(nextSelectionKey); - } - } - - private void acceptConnection(ServerSocketChannel serverSocketChannel) throws InterruptedException { - SocketChannel socketChannel; - - try { - socketChannel = serverSocketChannel.accept(); - } catch (IOException e) { - return; - } - - // No connection actually accepted? - if (socketChannel == null) - return; - - final Long now = NTP.getTime(); - Peer newPeer; - - try { - if (now == null) { - LOGGER.debug(() -> String.format("Connection discarded from peer %s due to lack of NTP sync", PeerAddress.fromSocket(socketChannel.socket()))); - socketChannel.close(); - return; - } - - synchronized (this.connectedPeers) { - if (connectedPeers.size() >= maxPeers) { - // We have enough peers - LOGGER.debug(() -> String.format("Connection discarded from peer %s", PeerAddress.fromSocket(socketChannel.socket()))); - socketChannel.close(); - return; - } - - LOGGER.debug(() -> String.format("Connection accepted from peer %s", PeerAddress.fromSocket(socketChannel.socket()))); - - newPeer = new Peer(socketChannel, channelSelector); - this.connectedPeers.add(newPeer); - } - } catch (IOException e) { - if (socketChannel.isOpen()) - try { - socketChannel.close(); - } catch (IOException ce) { - // Couldn't close? - } - - return; - } - - this.onPeerReady(newPeer); - } - - private Peer getConnectablePeer(final Long now) throws InterruptedException { - // We can't block here so use tryRepository(). We don't NEED to connect a new peer. - try (final Repository repository = RepositoryManager.tryRepository()) { - if (repository == null) - return null; - - // Find an address to connect to - List peers = this.getAllKnownPeers(); - - // Don't consider peers with recent connection failures - final long lastAttemptedThreshold = now - CONNECT_FAILURE_BACKOFF; - peers.removeIf(peerData -> peerData.getLastAttempted() != null && - (peerData.getLastConnected() == null || peerData.getLastConnected() < peerData.getLastAttempted()) && - peerData.getLastAttempted() > lastAttemptedThreshold); - - // Don't consider peers that we know loop back to ourself - synchronized (this.selfPeers) { - peers.removeIf(isSelfPeer); - } - - synchronized (this.connectedPeers) { - // Don't consider already connected peers (simple address match) - peers.removeIf(isConnectedPeer); - - // Don't consider already connected peers (resolved address match) - // XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS - peers.removeIf(isResolvedAsConnectedPeer); - } - - // Any left? - if (peers.isEmpty()) - return null; - - // Pick random peer - int peerIndex = new Random().nextInt(peers.size()); - - // Pick candidate - PeerData peerData = peers.get(peerIndex); - Peer newPeer = new Peer(peerData); - - // Update connection attempt info - peerData.setLastAttempted(now); - synchronized (this.allKnownPeers) { - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); - } - - return newPeer; - } catch (DataException e) { - LOGGER.error("Repository issue while finding a connectable peer", e); - return null; - } - } - - private void connectPeer(Peer newPeer) throws InterruptedException { - SocketChannel socketChannel = newPeer.connect(this.channelSelector); - if (socketChannel == null) - return; - - if (Thread.currentThread().isInterrupted()) - return; - - synchronized (this.connectedPeers) { - this.connectedPeers.add(newPeer); - } - - this.onPeerReady(newPeer); - } - - private Peer getPeerFromChannel(SocketChannel socketChannel) { - synchronized (this.connectedPeers) { - for (Peer peer : this.connectedPeers) - if (peer.getSocketChannel() == socketChannel) - return peer; - } - - return null; - } - - // Peer callbacks - - /*package*/ void wakeupChannelSelector() { - this.channelSelector.wakeup(); - } - - /*package*/ boolean verify(byte[] signature, byte[] message) { - return Crypto.verify(this.edPublicKeyParams.getEncoded(), signature, message); - } - - /*package*/ byte[] sign(byte[] message) { - return Crypto.sign(this.edPrivateKeyParams, message); - } - - /*package*/ byte[] getSharedSecret(byte[] publicKey) { - return Crypto.getSharedSecret(this.edPrivateKeyParams.getEncoded(), publicKey); - } - - /** Called when Peer's thread has setup and is ready to process messages */ - public void onPeerReady(Peer peer) { - onHandshakingMessage(peer, null, Handshake.STARTED); - } - - public void onDisconnect(Peer peer) { - // Notify Controller - Controller.getInstance().onPeerDisconnect(peer); - - synchronized (this.connectedPeers) { - this.connectedPeers.remove(peer); - } - } - - public void peerMisbehaved(Peer peer) { - PeerData peerData = peer.getPeerData(); - peerData.setLastMisbehaved(NTP.getTime()); - - // Only update repository if outbound peer - if (peer.isOutbound()) - try (final Repository repository = RepositoryManager.getRepository()) { - synchronized (this.allKnownPeers) { - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); - } - } catch (DataException e) { - LOGGER.warn("Repository issue while updating peer synchronization info", e); - } - } - - /** Called when a new message arrives for a peer. message can be null if called after connection */ - public void onMessage(Peer peer, Message message) { - if (message != null) - LOGGER.trace(() -> String.format("Processing %s message with ID %d from peer %s", message.getType().name(), message.getId(), peer)); - - Handshake handshakeStatus = peer.getHandshakeStatus(); - if (handshakeStatus != Handshake.COMPLETED) { - onHandshakingMessage(peer, message, handshakeStatus); - return; - } - - // Should be non-handshaking messages from now on - - // Ordered by message type value - switch (message.getType()) { - case GET_PEERS: - onGetPeersMessage(peer, message); - break; - - case PING: - onPingMessage(peer, message); - break; - - case HELLO: - case CHALLENGE: - case RESPONSE: - LOGGER.debug(() -> String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer)); - peer.disconnect("unexpected handshaking message"); - return; - - case PEERS_V2: - onPeersV2Message(peer, message); - break; - - default: - // Bump up to controller for possible action - Controller.getInstance().onNetworkMessage(peer, message); - break; - } - } - - private void onHandshakingMessage(Peer peer, Message message, Handshake handshakeStatus) { - try { - // Still handshaking - LOGGER.trace(() -> String.format("Handshake status %s, message %s from peer %s", handshakeStatus.name(), (message != null ? message.getType().name() : "null"), peer)); - - // Check message type is as expected - if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { - LOGGER.debug(() -> String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); - peer.disconnect("unexpected message"); - return; - } - - Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message); - - if (newHandshakeStatus == null) { - // Handshake failure - LOGGER.debug(() -> String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); - peer.disconnect("handshake failure"); - return; - } - - if (peer.isOutbound()) - // If we made outbound connection then we need to act first - newHandshakeStatus.action(peer); - else - // We have inbound connection so we need to respond in kind with what we just received - handshakeStatus.action(peer); - - peer.setHandshakeStatus(newHandshakeStatus); - - if (newHandshakeStatus == Handshake.COMPLETED) - this.onHandshakeCompleted(peer); - } finally { - peer.resetHandshakeMessagePending(); - } - } - - private void onGetPeersMessage(Peer peer, Message message) { - // Send our known peers - if (!peer.sendMessage(this.buildPeersMessage(peer))) - peer.disconnect("failed to send peers list"); - } - - private void onPingMessage(Peer peer, Message message) { - PingMessage pingMessage = (PingMessage) message; - - // Generate 'pong' using same ID - PingMessage pongMessage = new PingMessage(); - pongMessage.setId(pingMessage.getId()); - - if (!peer.sendMessage(pongMessage)) - peer.disconnect("failed to send ping reply"); - } - - private void onPeersV2Message(Peer peer, Message message) { - PeersV2Message peersV2Message = (PeersV2Message) message; - - List peerV2Addresses = peersV2Message.getPeerAddresses(); - - // First entry contains remote peer's listen port but empty address. - int peerPort = peerV2Addresses.get(0).getPort(); - peerV2Addresses.remove(0); - - // If inbound peer, use listen port and socket address to recreate first entry - if (!peer.isOutbound()) { - PeerAddress sendingPeerAddress = PeerAddress.fromString(peer.getPeerData().getAddress().getHost() + ":" + peerPort); - LOGGER.trace(() -> String.format("PEERS_V2 sending peer's listen address: %s", sendingPeerAddress.toString())); - peerV2Addresses.add(0, sendingPeerAddress); - } - - opportunisticMergePeers(peer.toString(), peerV2Addresses); - } - - /*pacakge*/ void onHandshakeCompleted(Peer peer) { - LOGGER.debug(String.format("Handshake completed with peer %s", peer)); - - // Are we already connected to this peer? - Peer existingPeer = getHandshakedPeerWithPublicKey(peer.getPeersPublicKey()); - // NOTE: actual object reference compare, not Peer.equals() - if (existingPeer != peer) { - LOGGER.info(() -> String.format("We already have a connection with peer %s - discarding", peer)); - peer.disconnect("existing connection"); - return; - } - - // Make a note that we've successfully completed handshake (and when) - peer.getPeerData().setLastConnected(NTP.getTime()); - - // Update connection info for outbound peers only - if (peer.isOutbound()) - try (final Repository repository = RepositoryManager.getRepository()) { - synchronized (this.allKnownPeers) { - repository.getNetworkRepository().save(peer.getPeerData()); - repository.saveChanges(); - } - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while trying to update outbound peer %s", peer), e); - } - - // Start regular pings - peer.startPings(); - - // Only the outbound side needs to send anything (after we've received handshake-completing response). - // (If inbound sent anything here, it's possible it could be processed out-of-order with handshake message). - - if (peer.isOutbound()) { - // Send our height - Message heightMessage = buildHeightMessage(peer, Controller.getInstance().getChainTip()); - if (!peer.sendMessage(heightMessage)) { - peer.disconnect("failed to send height/info"); - return; - } - - // Send our peers list - Message peersMessage = this.buildPeersMessage(peer); - if (!peer.sendMessage(peersMessage)) - peer.disconnect("failed to send peers list"); - - // Request their peers list - Message getPeersMessage = new GetPeersMessage(); - if (!peer.sendMessage(getPeersMessage)) - peer.disconnect("failed to request peers list"); - } - - // Ask Controller if they want to do anything - Controller.getInstance().onPeerHandshakeCompleted(peer); - } - - // Message-building calls - - /** Returns PEERS message made from peers we've connected to recently, and this node's details */ - public Message buildPeersMessage(Peer peer) { - List knownPeers = this.getAllKnownPeers(); - - // Filter out peers that we've not connected to ever or within X milliseconds - final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD; - Predicate notRecentlyConnected = peerData -> { - final Long lastAttempted = peerData.getLastAttempted(); - final Long lastConnected = peerData.getLastConnected(); - - if (lastAttempted == null || lastConnected == null) - return true; - - if (lastConnected < lastAttempted) - return true; - - if (lastConnected < connectionThreshold) - return true; - - return false; - }; - knownPeers.removeIf(notRecentlyConnected); - - List peerAddresses = new ArrayList<>(); - - for (PeerData peerData : knownPeers) { - try { - InetAddress address = InetAddress.getByName(peerData.getAddress().getHost()); - - // Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org - if (!peer.isLocal() && Peer.isAddressLocal(address)) - continue; - - peerAddresses.add(peerData.getAddress()); - } catch (UnknownHostException e) { - // Couldn't resolve hostname to IP address so discard - } - } - - // New format PEERS_V2 message that supports hostnames, IPv6 and ports - return new PeersV2Message(peerAddresses); - } - - public Message buildHeightMessage(Peer peer, BlockData blockData) { - // HEIGHT_V2 contains way more useful info - return new HeightV2Message(blockData.getHeight(), blockData.getSignature(), blockData.getTimestamp(), blockData.getMinterPublicKey()); - } - - public Message buildNewTransactionMessage(Peer peer, TransactionData transactionData) { - // In V2 we send out transaction signature only and peers can decide whether to request the full transaction - return new TransactionSignaturesMessage(Collections.singletonList(transactionData.getSignature())); - } - - public Message buildGetUnconfirmedTransactionsMessage(Peer peer) { - return new GetUnconfirmedTransactionsMessage(); - } - - // Peer-management calls - - public void noteToSelf(Peer peer) { - LOGGER.info(() -> String.format("No longer considering peer address %s as it connects to self", peer)); - - synchronized (this.selfPeers) { - this.selfPeers.add(peer.getPeerData().getAddress()); - } - } - - public boolean forgetPeer(PeerAddress peerAddress) throws DataException { - int numDeleted; - - synchronized (this.allKnownPeers) { - this.allKnownPeers.removeIf(peerData -> peerData.getAddress().equals(peerAddress)); - - try (final Repository repository = RepositoryManager.getRepository()) { - numDeleted = repository.getNetworkRepository().delete(peerAddress); - repository.saveChanges(); - } - } - - disconnectPeer(peerAddress); - - return numDeleted != 0; - } - - public int forgetAllPeers() throws DataException { - int numDeleted; - - synchronized (this.allKnownPeers) { - this.allKnownPeers.clear(); - - try (final Repository repository = RepositoryManager.getRepository()) { - numDeleted = repository.getNetworkRepository().deleteAllPeers(); - repository.saveChanges(); - } - } - - for (Peer peer : this.getConnectedPeers()) - peer.disconnect("to be forgotten"); - - return numDeleted; - } - - private void disconnectPeer(PeerAddress peerAddress) { - // Disconnect peer - try { - InetSocketAddress knownAddress = peerAddress.toSocketAddress(); - - List peers = this.getConnectedPeers(); - peers.removeIf(peer -> !Peer.addressEquals(knownAddress, peer.getResolvedAddress())); - - for (Peer peer : peers) - peer.disconnect("to be forgotten"); - } catch (UnknownHostException e) { - // Unknown host isn't going to match any of our connected peers so ignore - } - } - - // Network-wide calls - - public void prunePeers() throws DataException { - final Long now = NTP.getTime(); - if (now == null) - return; - - // Disconnect peers that are stuck during handshake - List handshakePeers = this.getConnectedPeers(); - - // Disregard peers that have completed handshake or only connected recently - handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT); - - for (Peer peer : handshakePeers) - peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name())); - - // Prune 'old' peers from repository... - // Pruning peers isn't critical so no need to block for a repository instance. - try (final Repository repository = RepositoryManager.tryRepository()) { - if (repository == null) - return; - - synchronized (this.allKnownPeers) { - // Fetch all known peers - List peers = new ArrayList<>(this.allKnownPeers); - - // 'Old' peers: - // We attempted to connect within the last day - // but we last managed to connect over a week ago. - Predicate isNotOldPeer = peerData -> { - if (peerData.getLastAttempted() == null || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD) - return true; - - if (peerData.getLastConnected() == null || peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD) - return true; - - return false; - }; - - // Disregard peers that are NOT 'old' - peers.removeIf(isNotOldPeer); - - // Don't consider already connected peers (simple address match) - synchronized (this.connectedPeers) { - peers.removeIf(isConnectedPeer); - } - - for (PeerData peerData : peers) { - LOGGER.debug(() -> String.format("Deleting old peer %s from repository", peerData.getAddress().toString())); - repository.getNetworkRepository().delete(peerData.getAddress()); - - // Delete from known peer cache too - this.allKnownPeers.remove(peerData); - } - - repository.saveChanges(); - } - } - } - - public boolean mergePeers(String addedBy, long addedWhen, List peerAddresses) throws DataException { - mergePeersLock.lock(); - - try (final Repository repository = RepositoryManager.getRepository()) { - return this.mergePeers(repository, addedBy, addedWhen, peerAddresses); - } finally { - mergePeersLock.unlock(); - } - } - - private void opportunisticMergePeers(String addedBy, List peerAddresses) { - final Long addedWhen = NTP.getTime(); - if (addedWhen == null) - return; - - // Serialize using lock to prevent repository deadlocks - if (!mergePeersLock.tryLock()) - return; - - try { - // Merging peers isn't critical so don't block for a repository instance. - try (final Repository repository = RepositoryManager.tryRepository()) { - if (repository == null) - return; - - this.mergePeers(repository, addedBy, addedWhen, peerAddresses); - - } catch (DataException e) { - // Already logged by this.mergePeers() - } - } finally { - mergePeersLock.unlock(); - } - } - - private boolean mergePeers(Repository repository, String addedBy, long addedWhen, List peerAddresses) throws DataException { - List newPeers; - synchronized (this.allKnownPeers) { - for (PeerData knownPeerData : this.allKnownPeers) { - // Filter out duplicates, without resolving via DNS - Predicate isKnownAddress = peerAddress -> knownPeerData.getAddress().equals(peerAddress); - peerAddresses.removeIf(isKnownAddress); - } - - if (peerAddresses.isEmpty()) - return false; - - // Add leftover peer addresses to known peers list - newPeers = peerAddresses.stream().map(peerAddress -> new PeerData(peerAddress, addedWhen, addedBy)).collect(Collectors.toList()); - - this.allKnownPeers.addAll(newPeers); - - try { - // Save new peers into database - for (PeerData peerData : newPeers) { - LOGGER.info(String.format("Adding new peer %s to repository", peerData.getAddress())); - repository.getNetworkRepository().save(peerData); - } - - repository.saveChanges(); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while merging peers list from %s", addedBy), e); - throw e; - } - - return true; - } - } - - public void broadcast(Function peerMessageBuilder) { - class Broadcaster implements Runnable { - private final Random random = new Random(); - - private List targetPeers; - private Function peerMessageBuilder; - - public Broadcaster(List targetPeers, Function peerMessageBuilder) { - this.targetPeers = targetPeers; - this.peerMessageBuilder = peerMessageBuilder; - } - - @Override - public void run() { - Thread.currentThread().setName("Network Broadcast"); - - for (Peer peer : targetPeers) { - // Very short sleep to reduce strain, improve multi-threading and catch interrupts - try { - Thread.sleep(random.nextInt(20) + 20L); - } catch (InterruptedException e) { - break; - } - - Message message = peerMessageBuilder.apply(peer); - - if (message == null) - continue; - - if (!peer.sendMessage(message)) - peer.disconnect("failed to broadcast message"); - } - - Thread.currentThread().setName("Network Broadcast (dormant)"); - } - } - - try { - broadcastExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessageBuilder)); - } catch (RejectedExecutionException e) { - // Can't execute - probably because we're shutting down, so ignore - } - } - - // Shutdown - - public void shutdown() { - // Close listen socket to prevent more incoming connections - if (this.serverChannel.isOpen()) - try { - this.serverChannel.close(); - } catch (IOException e) { - // Not important - } - - // Stop processing threads - try { - if (!this.networkEPC.shutdown(5000)) - LOGGER.warn("Network threads failed to terminate"); - } catch (InterruptedException e) { - LOGGER.warn("Interrupted while waiting for networking threads to terminate"); - } - - // Stop broadcasts - this.broadcastExecutor.shutdownNow(); - try { - if (!this.broadcastExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) - LOGGER.warn("Broadcast threads failed to terminate"); - } catch (InterruptedException e) { - LOGGER.warn("Interrupted while waiting for broadcast threads failed to terminate"); - } - - // Close all peer connections - for (Peer peer : this.getConnectedPeers()) - peer.shutdown(); - } + private static final Logger LOGGER = LogManager.getLogger(Network.class); + private static Network instance; + + private static final int LISTEN_BACKLOG = 10; + /** + * How long before retrying after a connection failure, in milliseconds. + */ + private static final long CONNECT_FAILURE_BACKOFF = 5 * 60 * 1000L; // ms + /** + * How long between informational broadcasts to all connected peers, in milliseconds. + */ + private static final long BROADCAST_INTERVAL = 60 * 1000L; // ms + /** + * Maximum time since last successful connection for peer info to be propagated, in milliseconds. + */ + private static final long RECENT_CONNECTION_THRESHOLD = 24 * 60 * 60 * 1000L; // ms + /** + * Maximum time since last connection attempt before a peer is potentially considered "old", in milliseconds. + */ + private static final long OLD_PEER_ATTEMPTED_PERIOD = 24 * 60 * 60 * 1000L; // ms + /** + * Maximum time since last successful connection before a peer is potentially considered "old", in milliseconds. + */ + private static final long OLD_PEER_CONNECTION_PERIOD = 7 * 24 * 60 * 60 * 1000L; // ms + /** + * Maximum time allowed for handshake to complete, in milliseconds. + */ + private static final long HANDSHAKE_TIMEOUT = 60 * 1000L; // ms + + private static final byte[] MAINNET_MESSAGE_MAGIC = new byte[]{0x51, 0x4f, 0x52, 0x54}; // QORT + private static final byte[] TESTNET_MESSAGE_MAGIC = new byte[]{0x71, 0x6f, 0x72, 0x54}; // qorT + + private static final String[] INITIAL_PEERS = new String[]{ + "node1.qortal.org", "node2.qortal.org", "node3.qortal.org", "node4.qortal.org", "node5.qortal.org", + "node6.qortal.org", "node7.qortal.org", "node8.qortal.org", "node9.qortal.org", "node10.qortal.org", + "node.qortal.ru", "node2.qortal.ru", "node3.qortal.ru", "node.qortal.uk" + }; + + private static final long NETWORK_EPC_KEEPALIVE = 10L; // seconds + + public static final int MAX_SIGNATURES_PER_REPLY = 500; + public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500; + + // Generate our node keys / ID + private final Ed25519PrivateKeyParameters edPrivateKeyParams = new Ed25519PrivateKeyParameters(new SecureRandom()); + private final Ed25519PublicKeyParameters edPublicKeyParams = edPrivateKeyParams.generatePublicKey(); + private final String ourNodeId = Crypto.toNodeAddress(edPublicKeyParams.getEncoded()); + + private final int maxMessageSize; + private final int minOutboundPeers; + private final int maxPeers; + + private final List allKnownPeers = new ArrayList<>(); + private final List connectedPeers = new ArrayList<>(); + private final List selfPeers = new ArrayList<>(); + + private final ExecuteProduceConsume networkEPC; + private Selector channelSelector; + private ServerSocketChannel serverChannel; + private Iterator channelIterator = null; + + // volatile because value is updated inside any one of the EPC threads + private volatile long nextConnectTaskTimestamp = 0L; // ms - try first connect once NTP syncs + + private final ExecutorService broadcastExecutor = Executors.newCachedThreadPool(); + // volatile because value is updated inside any one of the EPC threads + private volatile long nextBroadcastTimestamp = 0L; // ms - try first broadcast once NTP syncs + + private final Lock mergePeersLock = new ReentrantLock(); + + // Constructors + + private Network() { + maxMessageSize = 4 + 1 + 4 + BlockChain.getInstance().getMaxBlockSize(); + + minOutboundPeers = Settings.getInstance().getMinOutboundPeers(); + maxPeers = Settings.getInstance().getMaxPeers(); + + // We'll use a cached thread pool but with more aggressive timeout. + ExecutorService networkExecutor = new ThreadPoolExecutor(1, + Settings.getInstance().getMaxNetworkThreadPoolSize(), + NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS, + new SynchronousQueue(), + new NamedThreadFactory("Network-EPC")); + networkEPC = new NetworkProcessor(networkExecutor); + } + + public void start() throws IOException, DataException { + // Grab P2P port from settings + int listenPort = Settings.getInstance().getListenPort(); + + // Grab P2P bind address from settings + try { + InetAddress bindAddr = InetAddress.getByName(Settings.getInstance().getBindAddress()); + InetSocketAddress endpoint = new InetSocketAddress(bindAddr, listenPort); + + channelSelector = Selector.open(); + + // Set up listen socket + serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(false); + serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + serverChannel.bind(endpoint, LISTEN_BACKLOG); + serverChannel.register(channelSelector, SelectionKey.OP_ACCEPT); + } catch (UnknownHostException e) { + LOGGER.error("Can't bind listen socket to address {}", Settings.getInstance().getBindAddress()); + throw new IOException("Can't bind listen socket to address", e); + } catch (IOException e) { + LOGGER.error("Can't create listen socket: {}", e.getMessage()); + throw new IOException("Can't create listen socket", e); + } + + // Load all known peers from repository + try (Repository repository = RepositoryManager.getRepository()) { + synchronized (this.allKnownPeers) { + this.allKnownPeers.addAll(repository.getNetworkRepository().getAllPeers()); + } + } + + // Start up first networking thread + networkEPC.start(); + } + + // Getters / setters + + public static synchronized Network getInstance() { + if (instance == null) { + instance = new Network(); + } + + return instance; + } + + public byte[] getMessageMagic() { + return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC; + } + + public String getOurNodeId() { + return this.ourNodeId; + } + + protected byte[] getOurPublicKey() { + return this.edPublicKeyParams.getEncoded(); + } + + /** + * Maximum message size (bytes). Needs to be at least maximum block size + MAGIC + message type, etc. + */ + protected int getMaxMessageSize() { + return this.maxMessageSize; + } + + public StatsSnapshot getStatsSnapshot() { + return this.networkEPC.getStatsSnapshot(); + } + + // Peer lists + + public List getAllKnownPeers() { + synchronized (this.allKnownPeers) { + return new ArrayList<>(this.allKnownPeers); + } + } + + public List getConnectedPeers() { + synchronized (this.connectedPeers) { + return new ArrayList<>(this.connectedPeers); + } + } + + public List getSelfPeers() { + synchronized (this.selfPeers) { + return new ArrayList<>(this.selfPeers); + } + } + + /** + * Returns list of connected peers that have completed handshaking. + */ + public List getHandshakedPeers() { + synchronized (this.connectedPeers) { + return this.connectedPeers.stream() + .filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED) + .collect(Collectors.toList()); + } + } + + /** + * Returns list of peers we connected to that have completed handshaking. + */ + public List getOutboundHandshakedPeers() { + synchronized (this.connectedPeers) { + return this.connectedPeers.stream() + .filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED) + .collect(Collectors.toList()); + } + } + + /** + * Returns first peer that has completed handshaking and has matching public key. + */ + public Peer getHandshakedPeerWithPublicKey(byte[] publicKey) { + synchronized (this.connectedPeers) { + return this.connectedPeers.stream() + .filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED + && Arrays.equals(peer.getPeersPublicKey(), publicKey)) + .findFirst().orElse(null); + } + } + + // Peer list filters + + /** + * Must be inside synchronized (this.selfPeers) {...} + */ + private final Predicate isSelfPeer = peerData -> { + PeerAddress peerAddress = peerData.getAddress(); + return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress)); + }; + + /** + * Must be inside synchronized (this.connectedPeers) {...} + */ + private final Predicate isConnectedPeer = peerData -> { + PeerAddress peerAddress = peerData.getAddress(); + return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); + }; + + private static final Predicate hasMisbehaved = peerData -> { + final Long lastMisbehaved = peerData.getLastMisbehaved(); + return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF; + }; + + /** + * Must be inside synchronized (this.connectedPeers) {...} + */ + private final Predicate isResolvedAsConnectedPeer = peerData -> { + try { + InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress(); + return this.connectedPeers.stream() + .anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress)); + } catch (UnknownHostException e) { + // Can't resolve - no point even trying to connect + return true; + } + }; + + // Initial setup + + public static void installInitialPeers(Repository repository) throws DataException { + for (String address : INITIAL_PEERS) { + PeerAddress peerAddress = PeerAddress.fromString(address); + + PeerData peerData = new PeerData(peerAddress, System.currentTimeMillis(), "INIT"); + repository.getNetworkRepository().save(peerData); + } + + repository.saveChanges(); + } + + // Main thread + + class NetworkProcessor extends ExecuteProduceConsume { + + NetworkProcessor(ExecutorService executor) { + super(executor); + } + + @Override + protected void onSpawnFailure() { + // For debugging: + // ExecutorDumper.dump(this.executor, 3, ExecuteProduceConsume.class); + } + + @Override + protected Task produceTask(boolean canBlock) throws InterruptedException { + Task task; + + task = maybeProducePeerMessageTask(); + if (task != null) { + return task; + } + + final Long now = NTP.getTime(); + + task = maybeProducePeerPingTask(now); + if (task != null) { + return task; + } + + task = maybeProduceConnectPeerTask(now); + if (task != null) { + return task; + } + + task = maybeProduceBroadcastTask(now); + if (task != null) { + return task; + } + + // Only this method can block to reduce CPU spin + return maybeProduceChannelTask(canBlock); + } + + private Task maybeProducePeerMessageTask() { + for (Peer peer : getConnectedPeers()) { + Task peerTask = peer.getMessageTask(); + if (peerTask != null) { + return peerTask; + } + } + + return null; + } + + private Task maybeProducePeerPingTask(Long now) { + // Ask connected peers whether they need a ping + for (Peer peer : getHandshakedPeers()) { + Task peerTask = peer.getPingTask(now); + if (peerTask != null) { + return peerTask; + } + } + + return null; + } + + class PeerConnectTask implements ExecuteProduceConsume.Task { + private final Peer peer; + + PeerConnectTask(Peer peer) { + this.peer = peer; + } + + @Override + public void perform() throws InterruptedException { + connectPeer(peer); + } + } + + private Task maybeProduceConnectPeerTask(Long now) throws InterruptedException { + if (now == null || now < nextConnectTaskTimestamp) { + return null; + } + + if (getOutboundHandshakedPeers().size() >= minOutboundPeers) { + return null; + } + + nextConnectTaskTimestamp = now + 1000L; + + Peer targetPeer = getConnectablePeer(now); + if (targetPeer == null) { + return null; + } + + // Create connection task + return new PeerConnectTask(targetPeer); + } + + private Task maybeProduceBroadcastTask(Long now) { + if (now == null || now < nextBroadcastTimestamp) { + return null; + } + + nextBroadcastTimestamp = now + BROADCAST_INTERVAL; + return () -> Controller.getInstance().doNetworkBroadcast(); + } + + class ChannelTask implements ExecuteProduceConsume.Task { + private final SelectionKey selectionKey; + + ChannelTask(SelectionKey selectionKey) { + this.selectionKey = selectionKey; + } + + @Override + public void perform() throws InterruptedException { + try { + LOGGER.trace("Thread {} has pending channel: {}, with ops {}", + Thread.currentThread().getId(), selectionKey.channel(), selectionKey.readyOps()); + + // process pending channel task + if (selectionKey.isReadable()) { + connectionRead((SocketChannel) selectionKey.channel()); + } else if (selectionKey.isAcceptable()) { + acceptConnection((ServerSocketChannel) selectionKey.channel()); + } + + LOGGER.trace("Thread {} processed channel: {}", + Thread.currentThread().getId(), selectionKey.channel()); + } catch (CancelledKeyException e) { + LOGGER.trace("Thread {} encountered cancelled channel: {}", + Thread.currentThread().getId(), selectionKey.channel()); + } + } + + private void connectionRead(SocketChannel socketChannel) { + Peer peer = getPeerFromChannel(socketChannel); + if (peer == null) { + return; + } + + try { + peer.readChannel(); + } catch (IOException e) { + if (e.getMessage() != null && e.getMessage().toLowerCase().contains("connection reset")) { + peer.disconnect("Connection reset"); + return; + } + + LOGGER.trace("[{}] Network thread {} encountered I/O error: {}", peer.getPeerConnectionId(), + Thread.currentThread().getId(), e.getMessage(), e); + peer.disconnect("I/O error"); + } + } + } + + private Task maybeProduceChannelTask(boolean canBlock) throws InterruptedException { + final SelectionKey nextSelectionKey; + + // Synchronization here to enforce thread-safety on channelIterator + synchronized (channelSelector) { + // anything to do? + if (channelIterator == null) { + try { + if (canBlock) { + channelSelector.select(1000L); + } else { + channelSelector.selectNow(); + } + } catch (IOException e) { + LOGGER.warn("Channel selection threw IOException: {}", e.getMessage()); + return null; + } + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + channelIterator = channelSelector.selectedKeys().iterator(); + } + + if (channelIterator.hasNext()) { + nextSelectionKey = channelIterator.next(); + channelIterator.remove(); + } else { + nextSelectionKey = null; + channelIterator = null; // Nothing to do so reset iterator to cause new select + } + + LOGGER.trace("Thread {}, nextSelectionKey {}, channelIterator now {}", + Thread.currentThread().getId(), nextSelectionKey, channelIterator); + } + + if (nextSelectionKey == null) { + return null; + } + + return new ChannelTask(nextSelectionKey); + } + } + + private void acceptConnection(ServerSocketChannel serverSocketChannel) throws InterruptedException { + SocketChannel socketChannel; + + try { + socketChannel = serverSocketChannel.accept(); + } catch (IOException e) { + return; + } + + // No connection actually accepted? + if (socketChannel == null) { + return; + } + + final Long now = NTP.getTime(); + Peer newPeer; + + try { + if (now == null) { + LOGGER.debug("Connection discarded from peer {} due to lack of NTP sync", + PeerAddress.fromSocket(socketChannel.socket())); + socketChannel.close(); + return; + } + + synchronized (this.connectedPeers) { + if (connectedPeers.size() >= maxPeers) { + // We have enough peers + LOGGER.debug("Connection discarded from peer {}", PeerAddress.fromSocket(socketChannel.socket())); + socketChannel.close(); + return; + } + + LOGGER.debug("Connection accepted from peer {}", PeerAddress.fromSocket(socketChannel.socket())); + + newPeer = new Peer(socketChannel, channelSelector); + this.connectedPeers.add(newPeer); + } + } catch (IOException e) { + if (socketChannel.isOpen()) { + try { + socketChannel.close(); + } catch (IOException ce) { + // Couldn't close? + } + } + return; + } + + this.onPeerReady(newPeer); + } + + private Peer getConnectablePeer(final Long now) throws InterruptedException { + // We can't block here so use tryRepository(). We don't NEED to connect a new peer. + try (Repository repository = RepositoryManager.tryRepository()) { + if (repository == null) { + return null; + } + + // Find an address to connect to + List peers = this.getAllKnownPeers(); + + // Don't consider peers with recent connection failures + final long lastAttemptedThreshold = now - CONNECT_FAILURE_BACKOFF; + peers.removeIf(peerData -> peerData.getLastAttempted() != null + && (peerData.getLastConnected() == null + || peerData.getLastConnected() < peerData.getLastAttempted()) + && peerData.getLastAttempted() > lastAttemptedThreshold); + + peers.removeIf(hasMisbehaved); + + // Don't consider peers that we know loop back to ourself + synchronized (this.selfPeers) { + peers.removeIf(isSelfPeer); + } + + synchronized (this.connectedPeers) { + // Don't consider already connected peers (simple address match) + peers.removeIf(isConnectedPeer); + + // Don't consider already connected peers (resolved address match) + // XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS + peers.removeIf(isResolvedAsConnectedPeer); + } + + // Any left? + if (peers.isEmpty()) { + return null; + } + + // Pick random peer + int peerIndex = new Random().nextInt(peers.size()); + + // Pick candidate + PeerData peerData = peers.get(peerIndex); + Peer newPeer = new Peer(peerData); + + // Update connection attempt info + peerData.setLastAttempted(now); + synchronized (this.allKnownPeers) { + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } + + return newPeer; + } catch (DataException e) { + LOGGER.error("Repository issue while finding a connectable peer", e); + return null; + } + } + + private void connectPeer(Peer newPeer) throws InterruptedException { + SocketChannel socketChannel = newPeer.connect(this.channelSelector); + if (socketChannel == null) { + return; + } + + if (Thread.currentThread().isInterrupted()) { + return; + } + + synchronized (this.connectedPeers) { + this.connectedPeers.add(newPeer); + } + + this.onPeerReady(newPeer); + } + + private Peer getPeerFromChannel(SocketChannel socketChannel) { + synchronized (this.connectedPeers) { + for (Peer peer : this.connectedPeers) { + if (peer.getSocketChannel() == socketChannel) { + return peer; + } + } + } + + return null; + } + + // Peer callbacks + + protected void wakeupChannelSelector() { + this.channelSelector.wakeup(); + } + + protected boolean verify(byte[] signature, byte[] message) { + return Crypto.verify(this.edPublicKeyParams.getEncoded(), signature, message); + } + + protected byte[] sign(byte[] message) { + return Crypto.sign(this.edPrivateKeyParams, message); + } + + protected byte[] getSharedSecret(byte[] publicKey) { + return Crypto.getSharedSecret(this.edPrivateKeyParams.getEncoded(), publicKey); + } + + /** + * Called when Peer's thread has setup and is ready to process messages + */ + public void onPeerReady(Peer peer) { + onHandshakingMessage(peer, null, Handshake.STARTED); + } + + public void onDisconnect(Peer peer) { + // Notify Controller + Controller.getInstance().onPeerDisconnect(peer); + if (peer.getConnectionEstablishedTime() > 0L) { + LOGGER.info("[{}] Disconnected from peer {}", peer.getPeerConnectionId(), peer); + } else { + LOGGER.warn("[{}] Failed to connect to peer {}", peer.getPeerConnectionId(), peer); + } + + synchronized (this.connectedPeers) { + this.connectedPeers.remove(peer); + } + } + + public void peerMisbehaved(Peer peer) { + PeerData peerData = peer.getPeerData(); + peerData.setLastMisbehaved(NTP.getTime()); + + // Only update repository if outbound peer + if (peer.isOutbound()) { + try (Repository repository = RepositoryManager.getRepository()) { + synchronized (this.allKnownPeers) { + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } + } catch (DataException e) { + LOGGER.warn("Repository issue while updating peer synchronization info", e); + } + } + } + + /** + * Called when a new message arrives for a peer. message can be null if called after connection + */ + public void onMessage(Peer peer, Message message) { + if (message != null) { + LOGGER.trace("[{}} Processing {} message with ID {} from peer {}", peer.getPeerConnectionId(), + message.getType().name(), message.getId(), peer); + } + + Handshake handshakeStatus = peer.getHandshakeStatus(); + if (handshakeStatus != Handshake.COMPLETED) { + onHandshakingMessage(peer, message, handshakeStatus); + return; + } + + // Should be non-handshaking messages from now on + + // Ordered by message type value + switch (message.getType()) { + case GET_PEERS: + onGetPeersMessage(peer, message); + break; + + case PING: + onPingMessage(peer, message); + break; + + case HELLO: + case CHALLENGE: + case RESPONSE: + LOGGER.debug("[{}] Unexpected handshaking message {} from peer {}", peer.getPeerConnectionId(), + message.getType().name(), peer); + peer.disconnect("unexpected handshaking message"); + return; + + case PEERS_V2: + onPeersV2Message(peer, message); + break; + + default: + // Bump up to controller for possible action + Controller.getInstance().onNetworkMessage(peer, message); + break; + } + } + + private void onHandshakingMessage(Peer peer, Message message, Handshake handshakeStatus) { + try { + // Still handshaking + LOGGER.trace("[{}] Handshake status {}, message {} from peer {}", peer.getPeerConnectionId(), + handshakeStatus.name(), (message != null ? message.getType().name() : "null"), peer); + + // Check message type is as expected + if (handshakeStatus.expectedMessageType != null + && message.getType() != handshakeStatus.expectedMessageType) { + LOGGER.debug("[{}] Unexpected {} message from {}, expected {}", peer.getPeerConnectionId(), + message.getType().name(), peer, handshakeStatus.expectedMessageType); + peer.disconnect("unexpected message"); + return; + } + + Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message); + + if (newHandshakeStatus == null) { + // Handshake failure + LOGGER.debug("[{}] Handshake failure with peer {} message {}", peer.getPeerConnectionId(), peer, + message.getType().name()); + peer.disconnect("handshake failure"); + return; + } + + if (peer.isOutbound()) { + // If we made outbound connection then we need to act first + newHandshakeStatus.action(peer); + } else { + // We have inbound connection so we need to respond in kind with what we just received + handshakeStatus.action(peer); + } + peer.setHandshakeStatus(newHandshakeStatus); + + if (newHandshakeStatus == Handshake.COMPLETED) { + this.onHandshakeCompleted(peer); + } + } finally { + peer.resetHandshakeMessagePending(); + } + } + + private void onGetPeersMessage(Peer peer, Message message) { + // Send our known peers + if (!peer.sendMessage(this.buildPeersMessage(peer))) { + peer.disconnect("failed to send peers list"); + } + } + + private void onPingMessage(Peer peer, Message message) { + PingMessage pingMessage = (PingMessage) message; + + // Generate 'pong' using same ID + PingMessage pongMessage = new PingMessage(); + pongMessage.setId(pingMessage.getId()); + + if (!peer.sendMessage(pongMessage)) { + peer.disconnect("failed to send ping reply"); + } + } + + private void onPeersV2Message(Peer peer, Message message) { + PeersV2Message peersV2Message = (PeersV2Message) message; + + List peerV2Addresses = peersV2Message.getPeerAddresses(); + + // First entry contains remote peer's listen port but empty address. + int peerPort = peerV2Addresses.get(0).getPort(); + peerV2Addresses.remove(0); + + // If inbound peer, use listen port and socket address to recreate first entry + if (!peer.isOutbound()) { + String host = peer.getPeerData().getAddress().getHost(); + PeerAddress sendingPeerAddress = PeerAddress.fromString(host + ":" + peerPort); + LOGGER.trace("PEERS_V2 sending peer's listen address: {}", sendingPeerAddress.toString()); + peerV2Addresses.add(0, sendingPeerAddress); + } + + opportunisticMergePeers(peer.toString(), peerV2Addresses); + } + + protected void onHandshakeCompleted(Peer peer) { + LOGGER.info("[{}] Handshake completed with peer {} on {}", peer.getPeerConnectionId(), peer, + peer.getPeersVersionString()); + + // Are we already connected to this peer? + Peer existingPeer = getHandshakedPeerWithPublicKey(peer.getPeersPublicKey()); + // NOTE: actual object reference compare, not Peer.equals() + if (existingPeer != peer) { + LOGGER.info("[{}] We already have a connection with peer {} - discarding", + peer.getPeerConnectionId(), peer); + peer.disconnect("existing connection"); + return; + } + + // Make a note that we've successfully completed handshake (and when) + peer.getPeerData().setLastConnected(NTP.getTime()); + + // Update connection info for outbound peers only + if (peer.isOutbound()) { + try (Repository repository = RepositoryManager.getRepository()) { + synchronized (this.allKnownPeers) { + repository.getNetworkRepository().save(peer.getPeerData()); + repository.saveChanges(); + } + } catch (DataException e) { + LOGGER.error("[{}] Repository issue while trying to update outbound peer {}", + peer.getPeerConnectionId(), peer, e); + } + } + + // Start regular pings + peer.startPings(); + + // Only the outbound side needs to send anything (after we've received handshake-completing response). + // (If inbound sent anything here, it's possible it could be processed out-of-order with handshake message). + + if (peer.isOutbound()) { + // Send our height + Message heightMessage = buildHeightMessage(peer, Controller.getInstance().getChainTip()); + if (!peer.sendMessage(heightMessage)) { + peer.disconnect("failed to send height/info"); + return; + } + + // Send our peers list + Message peersMessage = this.buildPeersMessage(peer); + if (!peer.sendMessage(peersMessage)) { + peer.disconnect("failed to send peers list"); + } + + // Request their peers list + Message getPeersMessage = new GetPeersMessage(); + if (!peer.sendMessage(getPeersMessage)) { + peer.disconnect("failed to request peers list"); + } + } + + // Ask Controller if they want to do anything + Controller.getInstance().onPeerHandshakeCompleted(peer); + } + + // Message-building calls + + /** + * Returns PEERS message made from peers we've connected to recently, and this node's details + */ + public Message buildPeersMessage(Peer peer) { + List knownPeers = this.getAllKnownPeers(); + + // Filter out peers that we've not connected to ever or within X milliseconds + final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD; + Predicate notRecentlyConnected = peerData -> { + final Long lastAttempted = peerData.getLastAttempted(); + final Long lastConnected = peerData.getLastConnected(); + + if (lastAttempted == null || lastConnected == null) { + return true; + } + + if (lastConnected < lastAttempted) { + return true; + } + + if (lastConnected < connectionThreshold) { + return true; + } + + return false; + }; + knownPeers.removeIf(notRecentlyConnected); + + List peerAddresses = new ArrayList<>(); + + for (PeerData peerData : knownPeers) { + try { + InetAddress address = InetAddress.getByName(peerData.getAddress().getHost()); + + // Don't send 'local' addresses if peer is not 'local'. + // e.g. don't send localhost:9084 to node4.qortal.org + if (!peer.isLocal() && Peer.isAddressLocal(address)) { + continue; + } + + peerAddresses.add(peerData.getAddress()); + } catch (UnknownHostException e) { + // Couldn't resolve hostname to IP address so discard + } + } + + // New format PEERS_V2 message that supports hostnames, IPv6 and ports + return new PeersV2Message(peerAddresses); + } + + public Message buildHeightMessage(Peer peer, BlockData blockData) { + // HEIGHT_V2 contains way more useful info + return new HeightV2Message(blockData.getHeight(), blockData.getSignature(), + blockData.getTimestamp(), blockData.getMinterPublicKey()); + } + + public Message buildNewTransactionMessage(Peer peer, TransactionData transactionData) { + // In V2 we send out transaction signature only and peers can decide whether to request the full transaction + return new TransactionSignaturesMessage(Collections.singletonList(transactionData.getSignature())); + } + + public Message buildGetUnconfirmedTransactionsMessage(Peer peer) { + return new GetUnconfirmedTransactionsMessage(); + } + + // Peer-management calls + + public void noteToSelf(Peer peer) { + LOGGER.info("[{}] No longer considering peer address {} as it connects to self", + peer.getPeerConnectionId(), peer); + + synchronized (this.selfPeers) { + this.selfPeers.add(peer.getPeerData().getAddress()); + } + } + + public boolean forgetPeer(PeerAddress peerAddress) throws DataException { + int numDeleted; + + synchronized (this.allKnownPeers) { + this.allKnownPeers.removeIf(peerData -> peerData.getAddress().equals(peerAddress)); + + try (Repository repository = RepositoryManager.getRepository()) { + numDeleted = repository.getNetworkRepository().delete(peerAddress); + repository.saveChanges(); + } + } + + disconnectPeer(peerAddress); + + return numDeleted != 0; + } + + public int forgetAllPeers() throws DataException { + int numDeleted; + + synchronized (this.allKnownPeers) { + this.allKnownPeers.clear(); + + try (Repository repository = RepositoryManager.getRepository()) { + numDeleted = repository.getNetworkRepository().deleteAllPeers(); + repository.saveChanges(); + } + } + + for (Peer peer : this.getConnectedPeers()) { + peer.disconnect("to be forgotten"); + } + + return numDeleted; + } + + private void disconnectPeer(PeerAddress peerAddress) { + // Disconnect peer + try { + InetSocketAddress knownAddress = peerAddress.toSocketAddress(); + + List peers = this.getConnectedPeers(); + peers.removeIf(peer -> !Peer.addressEquals(knownAddress, peer.getResolvedAddress())); + + for (Peer peer : peers) { + peer.disconnect("to be forgotten"); + } + } catch (UnknownHostException e) { + // Unknown host isn't going to match any of our connected peers so ignore + } + } + + // Network-wide calls + + public void prunePeers() throws DataException { + final Long now = NTP.getTime(); + if (now == null) { + return; + } + + // Disconnect peers that are stuck during handshake + List handshakePeers = this.getConnectedPeers(); + + // Disregard peers that have completed handshake or only connected recently + handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED + || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT); + + for (Peer peer : handshakePeers) { + peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name())); + } + + // Prune 'old' peers from repository... + // Pruning peers isn't critical so no need to block for a repository instance. + try (Repository repository = RepositoryManager.tryRepository()) { + if (repository == null) { + return; + } + + synchronized (this.allKnownPeers) { + // Fetch all known peers + List peers = new ArrayList<>(this.allKnownPeers); + + // 'Old' peers: + // We attempted to connect within the last day + // but we last managed to connect over a week ago. + Predicate isNotOldPeer = peerData -> { + if (peerData.getLastAttempted() == null + || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD) { + return true; + } + + if (peerData.getLastConnected() == null + || peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD) { + return true; + } + + return false; + }; + + // Disregard peers that are NOT 'old' + peers.removeIf(isNotOldPeer); + + // Don't consider already connected peers (simple address match) + synchronized (this.connectedPeers) { + peers.removeIf(isConnectedPeer); + } + + for (PeerData peerData : peers) { + LOGGER.debug("Deleting old peer {} from repository", peerData.getAddress().toString()); + repository.getNetworkRepository().delete(peerData.getAddress()); + + // Delete from known peer cache too + this.allKnownPeers.remove(peerData); + } + + repository.saveChanges(); + } + } + } + + public boolean mergePeers(String addedBy, long addedWhen, List peerAddresses) throws DataException { + mergePeersLock.lock(); + + try (Repository repository = RepositoryManager.getRepository()) { + return this.mergePeers(repository, addedBy, addedWhen, peerAddresses); + } finally { + mergePeersLock.unlock(); + } + } + + private void opportunisticMergePeers(String addedBy, List peerAddresses) { + final Long addedWhen = NTP.getTime(); + if (addedWhen == null) { + return; + } + + // Serialize using lock to prevent repository deadlocks + if (!mergePeersLock.tryLock()) { + return; + } + + try { + // Merging peers isn't critical so don't block for a repository instance. + try (Repository repository = RepositoryManager.tryRepository()) { + if (repository == null) { + return; + } + + this.mergePeers(repository, addedBy, addedWhen, peerAddresses); + + } catch (DataException e) { + // Already logged by this.mergePeers() + } + } finally { + mergePeersLock.unlock(); + } + } + + private boolean mergePeers(Repository repository, String addedBy, long addedWhen, List peerAddresses) + throws DataException { + List newPeers; + synchronized (this.allKnownPeers) { + for (PeerData knownPeerData : this.allKnownPeers) { + // Filter out duplicates, without resolving via DNS + Predicate isKnownAddress = peerAddress -> knownPeerData.getAddress().equals(peerAddress); + peerAddresses.removeIf(isKnownAddress); + } + + if (peerAddresses.isEmpty()) { + return false; + } + + // Add leftover peer addresses to known peers list + newPeers = peerAddresses.stream() + .map(peerAddress -> new PeerData(peerAddress, addedWhen, addedBy)) + .collect(Collectors.toList()); + + this.allKnownPeers.addAll(newPeers); + + try { + // Save new peers into database + for (PeerData peerData : newPeers) { + LOGGER.info("Adding new peer {} to repository", peerData.getAddress()); + repository.getNetworkRepository().save(peerData); + } + + repository.saveChanges(); + } catch (DataException e) { + LOGGER.error("Repository issue while merging peers list from {}", addedBy, e); + throw e; + } + + return true; + } + } + + public void broadcast(Function peerMessageBuilder) { + class Broadcaster implements Runnable { + private final Random random = new Random(); + + private List targetPeers; + private Function peerMessageBuilder; + + Broadcaster(List targetPeers, Function peerMessageBuilder) { + this.targetPeers = targetPeers; + this.peerMessageBuilder = peerMessageBuilder; + } + + @Override + public void run() { + Thread.currentThread().setName("Network Broadcast"); + + for (Peer peer : targetPeers) { + // Very short sleep to reduce strain, improve multi-threading and catch interrupts + try { + Thread.sleep(random.nextInt(20) + 20L); + } catch (InterruptedException e) { + break; + } + + Message message = peerMessageBuilder.apply(peer); + + if (message == null) { + continue; + } + + if (!peer.sendMessage(message)) { + peer.disconnect("failed to broadcast message"); + } + } + + Thread.currentThread().setName("Network Broadcast (dormant)"); + } + } + + try { + broadcastExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessageBuilder)); + } catch (RejectedExecutionException e) { + // Can't execute - probably because we're shutting down, so ignore + } + } + + // Shutdown + + public void shutdown() { + // Close listen socket to prevent more incoming connections + if (this.serverChannel.isOpen()) { + try { + this.serverChannel.close(); + } catch (IOException e) { + // Not important + } + } + + // Stop processing threads + try { + if (!this.networkEPC.shutdown(5000)) { + LOGGER.warn("Network threads failed to terminate"); + } + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting for networking threads to terminate"); + } + + // Stop broadcasts + this.broadcastExecutor.shutdownNow(); + try { + if (!this.broadcastExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Broadcast threads failed to terminate"); + } + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting for broadcast threads failed to terminate"); + } + + // Close all peer connections + for (Peer peer : this.getConnectedPeers()) { + peer.shutdown(); + } + } } diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index ffc90dc7..c84d1118 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -1,28 +1,8 @@ package org.qortal.network; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.net.StandardSocketOptions; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.security.SecureRandom; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.Arrays; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - +import com.google.common.hash.HashCode; +import com.google.common.net.HostAndPort; +import com.google.common.net.InetAddresses; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.controller.Controller; @@ -31,684 +11,770 @@ import org.qortal.data.network.PeerChainTipData; import org.qortal.data.network.PeerData; import org.qortal.network.message.ChallengeMessage; import org.qortal.network.message.Message; -import org.qortal.network.message.PingMessage; import org.qortal.network.message.Message.MessageException; import org.qortal.network.message.Message.MessageType; +import org.qortal.network.message.PingMessage; import org.qortal.settings.Settings; import org.qortal.utils.ExecuteProduceConsume; import org.qortal.utils.NTP; -import com.google.common.hash.HashCode; -import com.google.common.net.HostAndPort; -import com.google.common.net.InetAddresses; +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.security.SecureRandom; +import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; // For managing one peer public class Peer { - - private static final Logger LOGGER = LogManager.getLogger(Peer.class); - - /** Maximum time to allow connect() to remote peer to complete. (ms) */ - private static final int CONNECT_TIMEOUT = 2000; // ms - - /** Maximum time to wait for a message reply to arrive from peer. (ms) */ - private static final int RESPONSE_TIMEOUT = 3000; // ms - - /** - * Interval between PING messages to a peer. (ms) - *

- * Just under every 30s is usually ideal to keep NAT mappings refreshed. - */ - private static final int PING_INTERVAL = 20_000; // ms - - private volatile boolean isStopping = false; - - private SocketChannel socketChannel = null; - private InetSocketAddress resolvedAddress = null; - /** True if remote address is loopback/link-local/site-local, false otherwise. */ - private boolean isLocal; - - private final Object byteBufferLock = new Object(); - private ByteBuffer byteBuffer; - - private Map> replyQueues; - private LinkedBlockingQueue pendingMessages; - - /** True if we created connection to peer, false if we accepted incoming connection from peer. */ - private final boolean isOutbound; - - private final Object handshakingLock = new Object(); - private Handshake handshakeStatus = Handshake.STARTED; - private volatile boolean handshakeMessagePending = false; - - /** Timestamp of when socket was accepted, or connected. */ - private Long connectionTimestamp = null; - - /** Last PING message round-trip time (ms). */ - private Long lastPing = null; - /** When last PING message was sent, or null if pings not started yet. */ - private Long lastPingSent; - - byte[] ourChallenge; - - // Versioning - public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})"); - - // Peer info - - private final Object peerInfoLock = new Object(); - - private String peersNodeId; - private byte[] peersPublicKey; - private byte[] peersChallenge; - - private PeerData peerData = null; - - /** Peer's value of connectionTimestamp. */ - private Long peersConnectionTimestamp = null; - - /** Version string as reported by peer. */ - private String peersVersionString = null; - /** Numeric version of peer. */ - private Long peersVersion = null; - - /** Latest block info as reported by peer. */ - private PeerChainTipData peersChainTipData; - - /** Our common block with this peer */ - private CommonBlockData commonBlockData; - - // Constructors - - /** Construct unconnected, outbound Peer using socket address in peer data */ - public Peer(PeerData peerData) { - this.isOutbound = true; - this.peerData = peerData; - } - - /** Construct Peer using existing, connected socket */ - public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException { - this.isOutbound = false; - this.socketChannel = socketChannel; - sharedSetup(channelSelector); - - this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress()); - this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); - - PeerAddress peerAddress = PeerAddress.fromSocket(socketChannel.socket()); - this.peerData = new PeerData(peerAddress); - } - - // Getters / setters - - public boolean isStopping() { - return this.isStopping; - } - - public SocketChannel getSocketChannel() { - return this.socketChannel; - } - - public InetSocketAddress getResolvedAddress() { - return this.resolvedAddress; - } - - public boolean isLocal() { - return this.isLocal; - } - - public boolean isOutbound() { - return this.isOutbound; - } - - public Handshake getHandshakeStatus() { - synchronized (this.handshakingLock) { - return this.handshakeStatus; - } - } - - /*package*/ void setHandshakeStatus(Handshake handshakeStatus) { - synchronized (this.handshakingLock) { - this.handshakeStatus = handshakeStatus; - } - } - - /*package*/ void resetHandshakeMessagePending() { - this.handshakeMessagePending = false; - } - - public PeerData getPeerData() { - synchronized (this.peerInfoLock) { - return this.peerData; - } - } - - public Long getConnectionTimestamp() { - synchronized (this.peerInfoLock) { - return this.connectionTimestamp; - } - } - - public String getPeersVersionString() { - synchronized (this.peerInfoLock) { - return this.peersVersionString; - } - } - - public Long getPeersVersion() { - synchronized (this.peerInfoLock) { - return this.peersVersion; - } - } - - /*package*/ void setPeersVersion(String versionString, long version) { - synchronized (this.peerInfoLock) { - this.peersVersionString = versionString; - this.peersVersion = version; - } - } - - public Long getPeersConnectionTimestamp() { - synchronized (this.peerInfoLock) { - return this.peersConnectionTimestamp; - } - } - - /*package*/ void setPeersConnectionTimestamp(Long peersConnectionTimestamp) { - synchronized (this.peerInfoLock) { - this.peersConnectionTimestamp = peersConnectionTimestamp; - } - } - - public Long getLastPing() { - synchronized (this.peerInfoLock) { - return this.lastPing; - } - } - - /*package*/ void setLastPing(long lastPing) { - synchronized (this.peerInfoLock) { - this.lastPing = lastPing; - } - } - - /*package*/ byte[] getOurChallenge() { - return this.ourChallenge; - } - - public String getPeersNodeId() { - synchronized (this.peerInfoLock) { - return this.peersNodeId; - } - } - - /*package*/ void setPeersNodeId(String peersNodeId) { - synchronized (this.peerInfoLock) { - this.peersNodeId = peersNodeId; - } - } - - public byte[] getPeersPublicKey() { - synchronized (this.peerInfoLock) { - return this.peersPublicKey; - } - } - - /*package*/ void setPeersPublicKey(byte[] peerPublicKey) { - synchronized (this.peerInfoLock) { - this.peersPublicKey = peerPublicKey; - } - } - - public byte[] getPeersChallenge() { - synchronized (this.peerInfoLock) { - return this.peersChallenge; - } - } - - /*package*/ void setPeersChallenge(byte[] peersChallenge) { - synchronized (this.peerInfoLock) { - this.peersChallenge = peersChallenge; - } - } - - public PeerChainTipData getChainTipData() { - synchronized (this.peerInfoLock) { - return this.peersChainTipData; - } - } - - public void setChainTipData(PeerChainTipData chainTipData) { - synchronized (this.peerInfoLock) { - this.peersChainTipData = chainTipData; - } - } - - public CommonBlockData getCommonBlockData() { - synchronized (this.peerInfoLock) { - return this.commonBlockData; - } - } - - public void setCommonBlockData(CommonBlockData commonBlockData) { - synchronized (this.peerInfoLock) { - this.commonBlockData = commonBlockData; - } - } - - /*package*/ void queueMessage(Message message) { - if (!this.pendingMessages.offer(message)) - LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this)); - } - - @Override - public String toString() { - // Easier, and nicer output, than peer.getRemoteSocketAddress() - return this.peerData.getAddress().toString(); - } - - // Processing - - private void sharedSetup(Selector channelSelector) throws IOException { - this.connectionTimestamp = NTP.getTime(); - this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); - this.socketChannel.configureBlocking(false); - this.socketChannel.register(channelSelector, SelectionKey.OP_READ); - this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC! - this.replyQueues = Collections.synchronizedMap(new HashMap>()); - this.pendingMessages = new LinkedBlockingQueue<>(); - - Random random = new SecureRandom(); - this.ourChallenge = new byte[ChallengeMessage.CHALLENGE_LENGTH]; - random.nextBytes(this.ourChallenge); - } - - public SocketChannel connect(Selector channelSelector) { - LOGGER.trace(() -> String.format("Connecting to peer %s", this)); - - try { - this.resolvedAddress = this.peerData.getAddress().toSocketAddress(); - this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); - - this.socketChannel = SocketChannel.open(); - this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT); - } catch (SocketTimeoutException e) { - LOGGER.trace(String.format("Connection timed out to peer %s", this)); - return null; - } catch (UnknownHostException e) { - LOGGER.trace(String.format("Connection failed to unresolved peer %s", this)); - return null; - } catch (IOException e) { - LOGGER.trace(String.format("Connection failed to peer %s", this)); - return null; - } - - try { - LOGGER.debug(() -> String.format("Connected to peer %s", this)); - sharedSetup(channelSelector); - return socketChannel; - } catch (IOException e) { - LOGGER.trace(String.format("Post-connection setup failed, peer %s", this)); - try { - socketChannel.close(); - } catch (IOException ce) { - // Failed to close? - } - return null; - } - } - - /** - * Attempt to buffer bytes from socketChannel. - * - * @throws IOException - */ - /* package */ void readChannel() throws IOException { - synchronized (this.byteBufferLock) { - while(true) { - if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) - return; - - // Do we need to allocate byteBuffer? - if (this.byteBuffer == null) - this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize()); - - final int priorPosition = this.byteBuffer.position(); - final int bytesRead = this.socketChannel.read(this.byteBuffer); - if (bytesRead == -1) { - this.disconnect("EOF"); - return; - } - - LOGGER.trace(() -> { - if (bytesRead > 0) { - byte[] leadingBytes = new byte[Math.min(bytesRead, 8)]; - this.byteBuffer.asReadOnlyBuffer().position(priorPosition).get(leadingBytes); - String leadingHex = HashCode.fromBytes(leadingBytes).toString(); - - return String.format("Received %d bytes, starting %s, into byteBuffer[%d] from peer %s", - bytesRead, - leadingHex, - priorPosition, - this); - } else { - return String.format("Received %d bytes into byteBuffer[%d] from peer %s", bytesRead, priorPosition, this); - } - }); - final boolean wasByteBufferFull = !this.byteBuffer.hasRemaining(); - - while (true) { - final Message message; - - // Can we build a message from buffer now? - ByteBuffer readOnlyBuffer = this.byteBuffer.asReadOnlyBuffer().flip(); - try { - message = Message.fromByteBuffer(readOnlyBuffer); - } catch (MessageException e) { - LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this)); - this.disconnect(e.getMessage()); - return; - } - - if (message == null && bytesRead == 0 && !wasByteBufferFull) { - // No complete message in buffer, no more bytes to read from socket even though there was room to read bytes - - /* DISABLED - // If byteBuffer is empty then we can deallocate it, to save memory, albeit costing GC - if (this.byteBuffer.remaining() == this.byteBuffer.capacity()) - this.byteBuffer = null; - */ - - return; - } - - if (message == null) - // No complete message in buffer, but maybe more bytes to read from socket - break; - - LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); - - // Tidy up buffers: - this.byteBuffer.flip(); - // Read-only, flipped buffer's position will be after end of message, so copy that - this.byteBuffer.position(readOnlyBuffer.position()); - // Copy bytes after read message to front of buffer, adjusting position accordingly, reset limit to capacity - this.byteBuffer.compact(); - - BlockingQueue queue = this.replyQueues.get(message.getId()); - if (queue != null) { - // Adding message to queue will unblock thread waiting for response - this.replyQueues.get(message.getId()).add(message); - // Consumed elsewhere - continue; - } - - // No thread waiting for message so we need to pass it up to network layer - - // Add message to pending queue - if (!this.pendingMessages.offer(message)) { - LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this)); - return; - } - - // Prematurely end any blocking channel select so that new messages can be processed. - // This might cause this.socketChannel.read() above to return zero into bytesRead. - Network.getInstance().wakeupChannelSelector(); - } - } - } - } - - /* package */ ExecuteProduceConsume.Task getMessageTask() { - /* - * If we are still handshaking and there is a message yet to be processed then - * don't produce another message task. This allows us to process handshake - * messages sequentially. - */ - if (this.handshakeMessagePending) - return null; - - final Message nextMessage = this.pendingMessages.poll(); - - if (nextMessage == null) - return null; - - LOGGER.trace(() -> String.format("Produced %s message task from peer %s", nextMessage.getType().name(), this)); - - if (this.handshakeStatus != Handshake.COMPLETED) - this.handshakeMessagePending = true; - - // Return a task to process message in queue - return () -> Network.getInstance().onMessage(this, nextMessage); - } - - /** - * Attempt to send Message to peer. - * - * @param message - * @return true if message successfully sent; false otherwise - */ - public boolean sendMessage(Message message) { - if (!this.socketChannel.isOpen()) - return false; - - try { - // Send message - LOGGER.trace(() -> String.format("Sending %s message with ID %d to peer %s", message.getType().name(), message.getId(), this)); - - ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes()); - - synchronized (this.socketChannel) { - final long sendStart = System.currentTimeMillis(); - - while (outputBuffer.hasRemaining()) { - int bytesWritten = this.socketChannel.write(outputBuffer); - - LOGGER.trace(() -> String.format("Sent %d bytes of %s message with ID %d to peer %s", - bytesWritten, - message.getType().name(), - message.getId(), - this)); - - if (bytesWritten == 0) { - // Underlying socket's internal buffer probably full, - // so wait a short while for bytes to actually be transmitted over the wire - - /* - * NOSONAR squid:S2276 - we don't want to use this.socketChannel.wait() - * as this releases the lock held by synchronized() above - * and would allow another thread to send another message, - * potentially interleaving them on-the-wire, causing checksum failures - * and connection loss. - */ - Thread.sleep(1L); //NOSONAR squid:S2276 - - if (System.currentTimeMillis() - sendStart > RESPONSE_TIMEOUT) - // We've taken too long to send this message - return false; - } - } - } - } catch (MessageException e) { - LOGGER.warn(String.format("Failed to send %s message with ID %d to peer %s: %s", message.getType().name(), message.getId(), this, e.getMessage())); - return false; - } catch (IOException e) { - // Send failure - return false; - } catch (InterruptedException e) { - // Likely shutdown scenario - so exit - return false; - } - - // Sent OK - return true; - } - - /** - * Send message to peer and await response. - *

- * Message is assigned a random ID and sent. If a response with matching ID is received then it is returned to caller. - *

- * If no response with matching ID within timeout, or some other error/exception occurs, then return null.
- * (Assume peer will be rapidly disconnected after this). - * - * @param message - * @return Message if valid response received; null if not or error/exception occurs - * @throws InterruptedException - */ - public Message getResponse(Message message) throws InterruptedException { - BlockingQueue blockingQueue = new ArrayBlockingQueue<>(1); - - // Assign random ID to this message - Random random = new Random(); - int id; - do { - id = random.nextInt(Integer.MAX_VALUE - 1) + 1; - - // Put queue into map (keyed by message ID) so we can poll for a response - // If putIfAbsent() doesn't return null, then this ID is already taken - } while (this.replyQueues.putIfAbsent(id, blockingQueue) != null); - message.setId(id); - - // Try to send message - if (!this.sendMessage(message)) { - this.replyQueues.remove(id); - return null; - } - - try { - return blockingQueue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS); - } finally { - this.replyQueues.remove(id); - } - } - - /* package */ void startPings() { - // Replacing initial null value allows getPingTask() to start sending pings. - LOGGER.trace(() -> String.format("Enabling pings for peer %s", this)); - this.lastPingSent = NTP.getTime(); - } - - /* package */ ExecuteProduceConsume.Task getPingTask(Long now) { - // Pings not enabled yet? - if (now == null || this.lastPingSent == null) - return null; - - // Time to send another ping? - if (now < this.lastPingSent + PING_INTERVAL) - return null; // Not yet - - // Not strictly true, but prevents this peer from being immediately chosen again - this.lastPingSent = now; - - return () -> { - PingMessage pingMessage = new PingMessage(); - Message message = this.getResponse(pingMessage); - - if (message == null || message.getType() != MessageType.PING) { - LOGGER.debug(() -> String.format("Didn't receive reply from %s for PING ID %d", this, pingMessage.getId())); - this.disconnect("no ping received"); - return; - } - - this.setLastPing(NTP.getTime() - now); - }; - } - - public void disconnect(String reason) { - if (!isStopping) - LOGGER.debug(() -> String.format("Disconnecting peer %s: %s", this, reason)); - - this.shutdown(); - - Network.getInstance().onDisconnect(this); - } - - public void shutdown() { - if (!isStopping) - LOGGER.debug(() -> String.format("Shutting down peer %s", this)); - - isStopping = true; - - if (this.socketChannel.isOpen()) { - try { - this.socketChannel.shutdownOutput(); - this.socketChannel.close(); - } catch (IOException e) { - LOGGER.debug(String.format("IOException while trying to close peer %s", this)); - } - } - } - - - // Minimum version - - public boolean isAtLeastVersion(String minVersionString) { - if (minVersionString == null) - return false; - - // Add the version prefix - minVersionString = Controller.VERSION_PREFIX + minVersionString; - - Matcher matcher = VERSION_PATTERN.matcher(minVersionString); - if (!matcher.lookingAt()) - return false; - - // We're expecting 3 positive shorts, so we can convert 1.2.3 into 0x0100020003 - long minVersion = 0; - for (int g = 1; g <= 3; ++g) { - long value = Long.parseLong(matcher.group(g)); - - if (value < 0 || value > Short.MAX_VALUE) - return false; - - minVersion <<= 16; - minVersion |= value; - } - - return this.getPeersVersion() >= minVersion; - } - - - // Common block data - - public boolean canUseCachedCommonBlockData() { - PeerChainTipData peerChainTipData = this.getChainTipData(); - CommonBlockData commonBlockData = this.getCommonBlockData(); - - if (peerChainTipData != null && commonBlockData != null) { - PeerChainTipData commonBlockChainTipData = commonBlockData.getChainTipData(); - if (peerChainTipData.getLastBlockSignature() != null && commonBlockChainTipData != null && commonBlockChainTipData.getLastBlockSignature() != null) { - if (Arrays.equals(peerChainTipData.getLastBlockSignature(), commonBlockChainTipData.getLastBlockSignature())) { - return true; - } - } - } - return false; - } - - - // Utility methods - - /** Returns true if ports and addresses (or hostnames) match */ - public static boolean addressEquals(InetSocketAddress knownAddress, InetSocketAddress peerAddress) { - if (knownAddress.getPort() != peerAddress.getPort()) - return false; - - return knownAddress.getHostString().equalsIgnoreCase(peerAddress.getHostString()); - } - - public static InetSocketAddress parsePeerAddress(String peerAddress) throws IllegalArgumentException { - HostAndPort hostAndPort = HostAndPort.fromString(peerAddress).requireBracketsForIPv6(); - - // HostAndPort doesn't try to validate host so we do extra checking here - InetAddress address = InetAddresses.forString(hostAndPort.getHost()); - - return new InetSocketAddress(address, hostAndPort.getPortOrDefault(Settings.getInstance().getDefaultListenPort())); - } - - /** Returns true if address is loopback/link-local/site-local, false otherwise. */ - public static boolean isAddressLocal(InetAddress address) { - return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress(); - } - + private static final Logger LOGGER = LogManager.getLogger(Peer.class); + + /** + * Maximum time to allow connect() to remote peer to complete. (ms) + */ + private static final int CONNECT_TIMEOUT = 2000; // ms + + /** + * Maximum time to wait for a message reply to arrive from peer. (ms) + */ + private static final int RESPONSE_TIMEOUT = 3000; // ms + + /** + * Interval between PING messages to a peer. (ms) + *

+ * Just under every 30s is usually ideal to keep NAT mappings refreshed. + */ + private static final int PING_INTERVAL = 20_000; // ms + + private volatile boolean isStopping = false; + + private SocketChannel socketChannel = null; + private InetSocketAddress resolvedAddress = null; + /** + * True if remote address is loopback/link-local/site-local, false otherwise. + */ + private boolean isLocal; + + private final UUID peerConnectionId = UUID.randomUUID(); + private final Object byteBufferLock = new Object(); + private ByteBuffer byteBuffer; + + private Map> replyQueues; + private LinkedBlockingQueue pendingMessages; + + /** + * True if we created connection to peer, false if we accepted incoming connection from peer. + */ + private final boolean isOutbound; + + private final Object handshakingLock = new Object(); + private Handshake handshakeStatus = Handshake.STARTED; + private volatile boolean handshakeMessagePending = false; + private long handshakeComplete = -1L; + + /** + * Timestamp of when socket was accepted, or connected. + */ + private Long connectionTimestamp = null; + + /** + * Last PING message round-trip time (ms). + */ + private Long lastPing = null; + /** + * When last PING message was sent, or null if pings not started yet. + */ + private Long lastPingSent; + + byte[] ourChallenge; + + // Versioning + public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})"); + + // Peer info + + private final Object peerInfoLock = new Object(); + + private String peersNodeId; + private byte[] peersPublicKey; + private byte[] peersChallenge; + + private PeerData peerData = null; + + /** + * Peer's value of connectionTimestamp. + */ + private Long peersConnectionTimestamp = null; + + /** + * Version string as reported by peer. + */ + private String peersVersionString = null; + /** + * Numeric version of peer. + */ + private Long peersVersion = null; + + /** + * Latest block info as reported by peer. + */ + private PeerChainTipData peersChainTipData; + + /** + * Our common block with this peer + */ + private CommonBlockData commonBlockData; + + // Constructors + + /** + * Construct unconnected, outbound Peer using socket address in peer data + */ + public Peer(PeerData peerData) { + this.isOutbound = true; + this.peerData = peerData; + } + + /** + * Construct Peer using existing, connected socket + */ + public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException { + this.isOutbound = false; + this.socketChannel = socketChannel; + sharedSetup(channelSelector); + + this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress()); + this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); + + PeerAddress peerAddress = PeerAddress.fromSocket(socketChannel.socket()); + this.peerData = new PeerData(peerAddress); + } + + // Getters / setters + + public boolean isStopping() { + return this.isStopping; + } + + public SocketChannel getSocketChannel() { + return this.socketChannel; + } + + public InetSocketAddress getResolvedAddress() { + return this.resolvedAddress; + } + + public boolean isLocal() { + return this.isLocal; + } + + public boolean isOutbound() { + return this.isOutbound; + } + + public Handshake getHandshakeStatus() { + synchronized (this.handshakingLock) { + return this.handshakeStatus; + } + } + + protected void setHandshakeStatus(Handshake handshakeStatus) { + synchronized (this.handshakingLock) { + this.handshakeStatus = handshakeStatus; + if (handshakeStatus.equals(Handshake.COMPLETED)) { + this.handshakeComplete = System.currentTimeMillis(); + } + } + } + + protected void resetHandshakeMessagePending() { + this.handshakeMessagePending = false; + } + + public PeerData getPeerData() { + synchronized (this.peerInfoLock) { + return this.peerData; + } + } + + public Long getConnectionTimestamp() { + synchronized (this.peerInfoLock) { + return this.connectionTimestamp; + } + } + + public String getPeersVersionString() { + synchronized (this.peerInfoLock) { + return this.peersVersionString; + } + } + + public Long getPeersVersion() { + synchronized (this.peerInfoLock) { + return this.peersVersion; + } + } + + protected void setPeersVersion(String versionString, long version) { + synchronized (this.peerInfoLock) { + this.peersVersionString = versionString; + this.peersVersion = version; + } + } + + public Long getPeersConnectionTimestamp() { + synchronized (this.peerInfoLock) { + return this.peersConnectionTimestamp; + } + } + + protected void setPeersConnectionTimestamp(Long peersConnectionTimestamp) { + synchronized (this.peerInfoLock) { + this.peersConnectionTimestamp = peersConnectionTimestamp; + } + } + + public Long getLastPing() { + synchronized (this.peerInfoLock) { + return this.lastPing; + } + } + + protected void setLastPing(long lastPing) { + synchronized (this.peerInfoLock) { + this.lastPing = lastPing; + } + } + + protected byte[] getOurChallenge() { + return this.ourChallenge; + } + + public String getPeersNodeId() { + synchronized (this.peerInfoLock) { + return this.peersNodeId; + } + } + + protected void setPeersNodeId(String peersNodeId) { + synchronized (this.peerInfoLock) { + this.peersNodeId = peersNodeId; + } + } + + public byte[] getPeersPublicKey() { + synchronized (this.peerInfoLock) { + return this.peersPublicKey; + } + } + + protected void setPeersPublicKey(byte[] peerPublicKey) { + synchronized (this.peerInfoLock) { + this.peersPublicKey = peerPublicKey; + } + } + + public byte[] getPeersChallenge() { + synchronized (this.peerInfoLock) { + return this.peersChallenge; + } + } + + protected void setPeersChallenge(byte[] peersChallenge) { + synchronized (this.peerInfoLock) { + this.peersChallenge = peersChallenge; + } + } + + public PeerChainTipData getChainTipData() { + synchronized (this.peerInfoLock) { + return this.peersChainTipData; + } + } + + public void setChainTipData(PeerChainTipData chainTipData) { + synchronized (this.peerInfoLock) { + this.peersChainTipData = chainTipData; + } + } + + public CommonBlockData getCommonBlockData() { + synchronized (this.peerInfoLock) { + return this.commonBlockData; + } + } + + public void setCommonBlockData(CommonBlockData commonBlockData) { + synchronized (this.peerInfoLock) { + this.commonBlockData = commonBlockData; + } + } + + protected void queueMessage(Message message) { + if (!this.pendingMessages.offer(message)) { + LOGGER.info("[{}] No room to queue message from peer {} - discarding", this.peerConnectionId, this); + } + } + + @Override + public String toString() { + // Easier, and nicer output, than peer.getRemoteSocketAddress() + return this.peerData.getAddress().toString(); + } + + // Processing + + private void sharedSetup(Selector channelSelector) throws IOException { + this.connectionTimestamp = NTP.getTime(); + this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); + this.socketChannel.configureBlocking(false); + this.socketChannel.register(channelSelector, SelectionKey.OP_READ); + this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC! + this.replyQueues = Collections.synchronizedMap(new HashMap>()); + this.pendingMessages = new LinkedBlockingQueue<>(); + + Random random = new SecureRandom(); + this.ourChallenge = new byte[ChallengeMessage.CHALLENGE_LENGTH]; + random.nextBytes(this.ourChallenge); + } + + public SocketChannel connect(Selector channelSelector) { + LOGGER.trace("[{}] Connecting to peer {}", this.peerConnectionId, this); + + try { + this.resolvedAddress = this.peerData.getAddress().toSocketAddress(); + this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); + + this.socketChannel = SocketChannel.open(); + this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT); + } catch (SocketTimeoutException e) { + LOGGER.trace("[{}] Connection timed out to peer {}", this.peerConnectionId, this); + return null; + } catch (UnknownHostException e) { + LOGGER.trace("[{}] Connection failed to unresolved peer {}", this.peerConnectionId, this); + return null; + } catch (IOException e) { + LOGGER.trace("[{}] Connection failed to peer {}", this.peerConnectionId, this); + return null; + } + + try { + LOGGER.debug("[{}] Connected to peer {}", this.peerConnectionId, this); + sharedSetup(channelSelector); + return socketChannel; + } catch (IOException e) { + LOGGER.trace("[{}] Post-connection setup failed, peer {}", this.peerConnectionId, this); + try { + socketChannel.close(); + } catch (IOException ce) { + // Failed to close? + } + return null; + } + } + + /** + * Attempt to buffer bytes from socketChannel. + * + * @throws IOException If this channel is not yet connected + */ + protected void readChannel() throws IOException { + synchronized (this.byteBufferLock) { + while (true) { + if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) { + return; + } + + // Do we need to allocate byteBuffer? + if (this.byteBuffer == null) { + this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize()); + } + + final int priorPosition = this.byteBuffer.position(); + final int bytesRead = this.socketChannel.read(this.byteBuffer); + if (bytesRead == -1) { + if (priorPosition > 0) { + this.disconnect("EOF - read " + priorPosition + " bytes"); + } else { + this.disconnect("EOF - failed to read any data"); + } + return; + } + + if (bytesRead > 0) { + byte[] leadingBytes = new byte[Math.min(bytesRead, 8)]; + this.byteBuffer.asReadOnlyBuffer().position(priorPosition).get(leadingBytes); + String leadingHex = HashCode.fromBytes(leadingBytes).toString(); + + LOGGER.trace("[{}] Received {} bytes, starting {}, into byteBuffer[{}] from peer {}", + this.peerConnectionId, bytesRead, leadingHex, priorPosition, this); + } else { + LOGGER.trace("[{}] Received {} bytes into byteBuffer[{}] from peer {}", this.peerConnectionId, + bytesRead, priorPosition, this); + } + final boolean wasByteBufferFull = !this.byteBuffer.hasRemaining(); + + while (true) { + final Message message; + + // Can we build a message from buffer now? + ByteBuffer readOnlyBuffer = this.byteBuffer.asReadOnlyBuffer().flip(); + try { + message = Message.fromByteBuffer(readOnlyBuffer); + } catch (MessageException e) { + LOGGER.debug("[{}] {}, from peer {}", this.peerConnectionId, e.getMessage(), this); + this.disconnect(e.getMessage()); + return; + } + + if (message == null && bytesRead == 0 && !wasByteBufferFull) { + // No complete message in buffer, no more bytes to read from socket + // even though there was room to read bytes + + /* DISABLED + // If byteBuffer is empty then we can deallocate it, to save memory, albeit costing GC + if (this.byteBuffer.remaining() == this.byteBuffer.capacity()) { + this.byteBuffer = null; + } + */ + + return; + } + + if (message == null) { + // No complete message in buffer, but maybe more bytes to read from socket + break; + } + + LOGGER.trace("[{}] Received {} message with ID {} from peer {}", this.peerConnectionId, + message.getType().name(), message.getId(), this); + + // Tidy up buffers: + this.byteBuffer.flip(); + // Read-only, flipped buffer's position will be after end of message, so copy that + this.byteBuffer.position(readOnlyBuffer.position()); + // Copy bytes after read message to front of buffer, + // adjusting position accordingly, reset limit to capacity + this.byteBuffer.compact(); + + BlockingQueue queue = this.replyQueues.get(message.getId()); + if (queue != null) { + // Adding message to queue will unblock thread waiting for response + this.replyQueues.get(message.getId()).add(message); + // Consumed elsewhere + continue; + } + + // No thread waiting for message so we need to pass it up to network layer + + // Add message to pending queue + if (!this.pendingMessages.offer(message)) { + LOGGER.info("[{}] No room to queue message from peer {} - discarding", + this.peerConnectionId, this); + return; + } + + // Prematurely end any blocking channel select so that new messages can be processed. + // This might cause this.socketChannel.read() above to return zero into bytesRead. + Network.getInstance().wakeupChannelSelector(); + } + } + } + } + + protected ExecuteProduceConsume.Task getMessageTask() { + /* + * If we are still handshaking and there is a message yet to be processed then + * don't produce another message task. This allows us to process handshake + * messages sequentially. + */ + if (this.handshakeMessagePending) { + return null; + } + + final Message nextMessage = this.pendingMessages.poll(); + + if (nextMessage == null) { + return null; + } + + LOGGER.trace("[{}] Produced {} message task from peer {}", this.peerConnectionId, + nextMessage.getType().name(), this); + + if (this.handshakeStatus != Handshake.COMPLETED) { + this.handshakeMessagePending = true; + } + + // Return a task to process message in queue + return () -> Network.getInstance().onMessage(this, nextMessage); + } + + /** + * Attempt to send Message to peer. + * + * @param message message to be sent + * @return true if message successfully sent; false otherwise + */ + public boolean sendMessage(Message message) { + if (!this.socketChannel.isOpen()) { + return false; + } + + try { + // Send message + LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", this.peerConnectionId, + message.getType().name(), message.getId(), this); + + ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes()); + + synchronized (this.socketChannel) { + final long sendStart = System.currentTimeMillis(); + + while (outputBuffer.hasRemaining()) { + int bytesWritten = this.socketChannel.write(outputBuffer); + + LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {}", this.peerConnectionId, + bytesWritten, message.getType().name(), message.getId(), this); + + if (bytesWritten == 0) { + // Underlying socket's internal buffer probably full, + // so wait a short while for bytes to actually be transmitted over the wire + + /* + * NOSONAR squid:S2276 - we don't want to use this.socketChannel.wait() + * as this releases the lock held by synchronized() above + * and would allow another thread to send another message, + * potentially interleaving them on-the-wire, causing checksum failures + * and connection loss. + */ + Thread.sleep(1L); //NOSONAR squid:S2276 + + if (System.currentTimeMillis() - sendStart > RESPONSE_TIMEOUT) { + // We've taken too long to send this message + return false; + } + } + } + } + } catch (MessageException e) { + LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId, + message.getType().name(), message.getId(), this, e.getMessage()); + return false; + } catch (IOException | InterruptedException e) { + // Send failure + return false; + } + + // Sent OK + return true; + } + + /** + * Send message to peer and await response. + *

+ * Message is assigned a random ID and sent. + * If a response with matching ID is received then it is returned to caller. + *

+ * If no response with matching ID within timeout, or some other error/exception occurs, + * then return null.
+ * (Assume peer will be rapidly disconnected after this). + * + * @param message message to send + * @return Message if valid response received; null if not or error/exception occurs + * @throws InterruptedException if interrupted while waiting + */ + public Message getResponse(Message message) throws InterruptedException { + BlockingQueue blockingQueue = new ArrayBlockingQueue<>(1); + + // Assign random ID to this message + Random random = new Random(); + int id; + do { + id = random.nextInt(Integer.MAX_VALUE - 1) + 1; + + // Put queue into map (keyed by message ID) so we can poll for a response + // If putIfAbsent() doesn't return null, then this ID is already taken + } while (this.replyQueues.putIfAbsent(id, blockingQueue) != null); + message.setId(id); + + // Try to send message + if (!this.sendMessage(message)) { + this.replyQueues.remove(id); + return null; + } + + try { + return blockingQueue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS); + } finally { + this.replyQueues.remove(id); + } + } + + protected void startPings() { + // Replacing initial null value allows getPingTask() to start sending pings. + LOGGER.trace("[{}] Enabling pings for peer {}", this.peerConnectionId, this); + this.lastPingSent = NTP.getTime(); + } + + protected ExecuteProduceConsume.Task getPingTask(Long now) { + // Pings not enabled yet? + if (now == null || this.lastPingSent == null) { + return null; + } + + // Time to send another ping? + if (now < this.lastPingSent + PING_INTERVAL) { + return null; // Not yet + } + + // Not strictly true, but prevents this peer from being immediately chosen again + this.lastPingSent = now; + + return () -> { + PingMessage pingMessage = new PingMessage(); + Message message = this.getResponse(pingMessage); + + if (message == null || message.getType() != MessageType.PING) { + LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}", this.peerConnectionId, this, + pingMessage.getId()); + this.disconnect("no ping received"); + return; + } + + this.setLastPing(NTP.getTime() - now); + }; + } + + public void disconnect(String reason) { + if (!isStopping) { + LOGGER.debug("[{}] Disconnecting peer {} after {}: {}", this.peerConnectionId, this, + getConnectionAge(), reason); + } + this.shutdown(); + + Network.getInstance().onDisconnect(this); + } + + public void shutdown() { + if (!isStopping) { + LOGGER.debug("[{}] Shutting down peer {}", this.peerConnectionId, this); + } + isStopping = true; + + if (this.socketChannel.isOpen()) { + try { + this.socketChannel.shutdownOutput(); + this.socketChannel.close(); + } catch (IOException e) { + LOGGER.debug("[{}] IOException while trying to close peer {}", this.peerConnectionId, this); + } + } + } + + + // Minimum version + + public boolean isAtLeastVersion(String minVersionString) { + if (minVersionString == null) { + return false; + } + + // Add the version prefix + minVersionString = Controller.VERSION_PREFIX + minVersionString; + + Matcher matcher = VERSION_PATTERN.matcher(minVersionString); + if (!matcher.lookingAt()) { + return false; + } + + // We're expecting 3 positive shorts, so we can convert 1.2.3 into 0x0100020003 + long minVersion = 0; + for (int g = 1; g <= 3; ++g) { + long value = Long.parseLong(matcher.group(g)); + + if (value < 0 || value > Short.MAX_VALUE) { + return false; + } + + minVersion <<= 16; + minVersion |= value; + } + + return this.getPeersVersion() >= minVersion; + } + + + // Common block data + + public boolean canUseCachedCommonBlockData() { + PeerChainTipData peerChainTipData = this.getChainTipData(); + CommonBlockData commonBlockData = this.getCommonBlockData(); + + if (peerChainTipData != null && commonBlockData != null) { + PeerChainTipData commonBlockChainTipData = commonBlockData.getChainTipData(); + if (peerChainTipData.getLastBlockSignature() != null && commonBlockChainTipData != null + && commonBlockChainTipData.getLastBlockSignature() != null) { + if (Arrays.equals(peerChainTipData.getLastBlockSignature(), + commonBlockChainTipData.getLastBlockSignature())) { + return true; + } + } + } + return false; + } + + + // Utility methods + + /** + * Returns true if ports and addresses (or hostnames) match + */ + public static boolean addressEquals(InetSocketAddress knownAddress, InetSocketAddress peerAddress) { + if (knownAddress.getPort() != peerAddress.getPort()) { + return false; + } + + return knownAddress.getHostString().equalsIgnoreCase(peerAddress.getHostString()); + } + + public static InetSocketAddress parsePeerAddress(String peerAddress) throws IllegalArgumentException { + HostAndPort hostAndPort = HostAndPort.fromString(peerAddress).requireBracketsForIPv6(); + + // HostAndPort doesn't try to validate host so we do extra checking here + InetAddress address = InetAddresses.forString(hostAndPort.getHost()); + + int defaultPort = Settings.getInstance().getDefaultListenPort(); + return new InetSocketAddress(address, hostAndPort.getPortOrDefault(defaultPort)); + } + + /** + * Returns true if address is loopback/link-local/site-local, false otherwise. + */ + public static boolean isAddressLocal(InetAddress address) { + return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress(); + } + + public UUID getPeerConnectionId() { + return peerConnectionId; + } + + public long getConnectionEstablishedTime() { + return handshakeComplete; + } + + public long getConnectionAge() { + if (handshakeComplete > 0L) { + return System.currentTimeMillis() - handshakeComplete; + } + return handshakeComplete; + } }