From fc82bcaf496f454f125aacc849db64a887ed52ff Mon Sep 17 00:00:00 2001 From: catbref Date: Wed, 19 Jun 2019 16:55:02 +0100 Subject: [PATCH] Tidying up peer info. lastHeight/blockSig/blockTimestamp, etc. moved from PeerData/repository to Peer as it's transient so no need to store in repository. Repository now keeps track of when/who added peer, e.g. API/INIT/another peer. API calls DELETE /peers and DELETE /peers/known also disconnect peer as well as deleting from repository. Connection timestamp now reported by API call GET /peers Some repository-updating code removed from Network/Controller as no longer needed. Removed obsolete Controller.hasShorterBlockchain predicate. --- .../org/qora/api/model/ConnectedPeer.java | 8 +- .../org/qora/api/resource/PeersResource.java | 20 +- .../java/org/qora/block/BlockGenerator.java | 2 +- .../java/org/qora/controller/Controller.java | 63 +---- .../org/qora/controller/Synchronizer.java | 14 +- .../java/org/qora/data/network/PeerData.java | 61 ++--- src/main/java/org/qora/network/Network.java | 252 +++++++++++------- src/main/java/org/qora/network/Peer.java | 93 +++++-- .../qora/repository/NetworkRepository.java | 3 +- .../hsqldb/HSQLDBDatabaseUpdates.java | 10 + .../hsqldb/HSQLDBNetworkRepository.java | 27 +- 11 files changed, 304 insertions(+), 249 deletions(-) diff --git a/src/main/java/org/qora/api/model/ConnectedPeer.java b/src/main/java/org/qora/api/model/ConnectedPeer.java index f15f372c..ced54ff7 100644 --- a/src/main/java/org/qora/api/model/ConnectedPeer.java +++ b/src/main/java/org/qora/api/model/ConnectedPeer.java @@ -19,6 +19,7 @@ public class ConnectedPeer { public Direction direction; public Handshake handshakeStatus; public Long lastPing; + public Long connectedWhen; public String address; public String version; @@ -38,6 +39,7 @@ public class ConnectedPeer { this.lastPing = peer.getLastPing(); PeerData peerData = peer.getPeerData(); + this.connectedWhen = peerData.getLastConnected(); this.address = peerData.getAddress().toString(); if (peer.getVersionMessage() != null) { @@ -45,9 +47,9 @@ public class ConnectedPeer { this.buildTimestamp = peer.getVersionMessage().getBuildTimestamp(); } - this.lastHeight = peerData.getLastHeight(); - this.lastBlockSignature = peerData.getLastBlockSignature(); - this.lastBlockTimestamp = peerData.getLastBlockTimestamp(); + this.lastHeight = peer.getLastHeight(); + this.lastBlockSignature = peer.getLastBlockSignature(); + this.lastBlockTimestamp = peer.getLastBlockTimestamp(); } } diff --git a/src/main/java/org/qora/api/resource/PeersResource.java b/src/main/java/org/qora/api/resource/PeersResource.java index 5da8b818..e6280e11 100644 --- a/src/main/java/org/qora/api/resource/PeersResource.java +++ b/src/main/java/org/qora/api/resource/PeersResource.java @@ -31,6 +31,7 @@ import org.qora.network.PeerAddress; import org.qora.repository.DataException; import org.qora.repository.Repository; import org.qora.repository.RepositoryManager; +import org.qora.utils.NTP; @Path("/peers") @Tag(name = "Peers") @@ -145,7 +146,7 @@ public class PeersResource { try (final Repository repository = RepositoryManager.getRepository()) { PeerAddress peerAddress = PeerAddress.fromString(address); - PeerData peerData = new PeerData(peerAddress); + PeerData peerData = new PeerData(peerAddress, NTP.getTime(), "API"); repository.getNetworkRepository().save(peerData); repository.saveChanges(); @@ -193,15 +194,11 @@ public class PeersResource { public String removePeer(String address) { Security.checkApiCallAllowed(request); - try (final Repository repository = RepositoryManager.getRepository()) { + try { PeerAddress peerAddress = PeerAddress.fromString(address); - PeerData peerData = new PeerData(peerAddress); - - int numDeleted = repository.getNetworkRepository().delete(peerData); - repository.saveChanges(); - - return numDeleted != 0 ? "true" : "false"; + boolean wasKnown = Network.getInstance().forgetPeer(peerAddress); + return wasKnown ? "true" : "false"; } catch (IllegalArgumentException e) { throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); } catch (ApiException e) { @@ -214,7 +211,7 @@ public class PeersResource { @DELETE @Path("/known") @Operation( - summary = "Remove any known peers from database", + summary = "Remove all known peers from database", responses = { @ApiResponse( description = "true if any peers were removed, false if there were no peers to delete", @@ -232,9 +229,8 @@ public class PeersResource { public String removeKnownPeers(String address) { Security.checkApiCallAllowed(request); - try (final Repository repository = RepositoryManager.getRepository()) { - int numDeleted = repository.getNetworkRepository().deleteAllPeers(); - repository.saveChanges(); + try { + int numDeleted = Network.getInstance().forgetAllPeers(); return numDeleted != 0 ? "true" : "false"; } catch (ApiException e) { diff --git a/src/main/java/org/qora/block/BlockGenerator.java b/src/main/java/org/qora/block/BlockGenerator.java index 4cf06371..d2deab5c 100644 --- a/src/main/java/org/qora/block/BlockGenerator.java +++ b/src/main/java/org/qora/block/BlockGenerator.java @@ -92,7 +92,7 @@ public class BlockGenerator extends Thread { final long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); // Disregard peers that don't have a recent block - peers.removeIf(peer -> peer.getPeerData().getLastBlockTimestamp() == null || peer.getPeerData().getLastBlockTimestamp() < minLatestBlockTimestamp); + peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp); // If we have any peers with a recent block, but our latest block isn't recent // then we need to synchronize instead of generating. diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 7bdc5d6f..1e5afe92 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -314,12 +314,12 @@ public class Controller extends Thread { // Disregard peers that don't have a recent block final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); - peers.removeIf(peer -> peer.getPeerData().getLastBlockTimestamp() == null || peer.getPeerData().getLastBlockTimestamp() < minLatestBlockTimestamp); + peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp); BlockData latestBlockData = getChainTip(); // Disregard peers that have no block signature or the same block signature as us - peers.removeIf(peer -> peer.getPeerData().getLastBlockSignature() == null || Arrays.equals(latestBlockData.getSignature(), peer.getPeerData().getLastBlockSignature())); + peers.removeIf(peer -> peer.getLastBlockSignature() == null || Arrays.equals(latestBlockData.getSignature(), peer.getLastBlockSignature())); if (!peers.isEmpty()) { // Pick random peer to sync with @@ -522,17 +522,7 @@ public class Controller extends Thread { if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) continue; - PeerData peerData = connectedPeer.getPeerData(); - peerData.setLastHeight(heightMessage.getHeight()); - - // Only save to repository if outbound peer - if (connectedPeer.isOutbound()) - try (final Repository repository = RepositoryManager.getRepository()) { - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while updating height of peer %s", connectedPeer), e); - } + connectedPeer.setLastHeight(heightMessage.getHeight()); } // Potentially synchronize @@ -551,28 +541,17 @@ public class Controller extends Thread { if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) continue; - PeerData peerData = connectedPeer.getPeerData(); - // We want to update atomically so use lock - ReentrantLock peerDataLock = connectedPeer.getPeerDataLock(); - peerDataLock.lock(); + ReentrantLock peerLock = connectedPeer.getPeerLock(); + peerLock.lock(); try { - peerData.setLastHeight(heightV2Message.getHeight()); - peerData.setLastBlockSignature(heightV2Message.getSignature()); - peerData.setLastBlockTimestamp(heightV2Message.getTimestamp()); - peerData.setLastBlockGenerator(heightV2Message.getGenerator()); + peer.setLastHeight(heightV2Message.getHeight()); + peer.setLastBlockSignature(heightV2Message.getSignature()); + peer.setLastBlockTimestamp(heightV2Message.getTimestamp()); + peer.setLastBlockGenerator(heightV2Message.getGenerator()); } finally { - peerDataLock.unlock(); + peerLock.unlock(); } - - // Only save to repository if outbound peer - if (connectedPeer.isOutbound()) - try (final Repository repository = RepositoryManager.getRepository()) { - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while updating info of peer %s", connectedPeer), e); - } } // Potentially synchronize @@ -997,26 +976,6 @@ public class Controller extends Thread { return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF; }; - /** True if peer has unknown height, lower height or same height and same block signature (unless we don't have their block signature). */ - public static Predicate hasShorterBlockchain() { - BlockData highestBlockData = getInstance().getChainTip(); - int ourHeight = highestBlockData.getHeight(); - - return peer -> { - PeerData peerData = peer.getPeerData(); - - Integer peerHeight = peerData.getLastHeight(); - if (peerHeight == null || peerHeight < ourHeight) - return true; - - if (peerHeight > ourHeight || peerData.getLastBlockSignature() == null) - return false; - - // Remove if signatures match - return Arrays.equals(peerData.getLastBlockSignature(), highestBlockData.getSignature()); - }; - } - /** Returns whether we think our node has up-to-date blockchain based on our info about other peers. */ public boolean isUpToDate() { final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); @@ -1036,7 +995,7 @@ public class Controller extends Thread { return false; // Disregard peers that don't have a recent block - peers.removeIf(peer -> peer.getPeerData().getLastBlockTimestamp() == null || peer.getPeerData().getLastBlockTimestamp() < minLatestBlockTimestamp); + peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp); // If we don't have any peers left then can't synchronize, therefore consider ourself not up to date return !peers.isEmpty(); diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index 62d980b0..e835e236 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -83,7 +83,18 @@ public class Synchronizer { final BlockData ourLatestBlockData = this.repository.getBlockRepository().getLastBlock(); final int ourInitialHeight = ourLatestBlockData.getHeight(); int ourHeight = ourInitialHeight; - int peerHeight = peer.getPeerData().getLastHeight(); + + int peerHeight; + byte[] peersLastBlockSignature; + + ReentrantLock peerLock = peer.getPeerLock(); + peerLock.lockInterruptibly(); + try { + peerHeight = peer.getLastHeight(); + peersLastBlockSignature = peer.getLastBlockSignature(); + } finally { + peerLock.unlock(); + } // If peer is at genesis block then peer has no blocks so ignore them for a while if (peerHeight == 1) @@ -97,7 +108,6 @@ public class Synchronizer { return SynchronizationResult.TOO_FAR_BEHIND; } - byte[] peersLastBlockSignature = peer.getPeerData().getLastBlockSignature(); byte[] ourLastBlockSignature = ourLatestBlockData.getSignature(); if (peerHeight == ourHeight && (peersLastBlockSignature == null || !Arrays.equals(peersLastBlockSignature, ourLastBlockSignature))) LOGGER.debug(String.format("Synchronizing with peer %s at height %d, our height %d, signatures differ", peer, peerHeight, ourHeight)); diff --git a/src/main/java/org/qora/data/network/PeerData.java b/src/main/java/org/qora/data/network/PeerData.java index 98e75874..56f36e19 100644 --- a/src/main/java/org/qora/data/network/PeerData.java +++ b/src/main/java/org/qora/data/network/PeerData.java @@ -19,13 +19,12 @@ public class PeerData { @XmlTransient @Schema(hidden = true) private PeerAddress peerAddress; + private Long lastAttempted; private Long lastConnected; - private Integer lastHeight; - private byte[] lastBlockSignature; - private Long lastBlockTimestamp; - private byte[] lastBlockGenerator; private Long lastMisbehaved; + private Long addedWhen; + private String addedBy; // Constructors @@ -33,19 +32,21 @@ public class PeerData { protected PeerData() { } - public PeerData(PeerAddress peerAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, byte[] lastBlockSignature, Long lastBlockTimestamp, byte[] lastBlockGenerator, Long lastMisbehaved) { + public PeerData(PeerAddress peerAddress, Long lastAttempted, Long lastConnected, Long lastMisbehaved, Long addedWhen, String addedBy) { this.peerAddress = peerAddress; this.lastAttempted = lastAttempted; this.lastConnected = lastConnected; - this.lastHeight = lastHeight; - this.lastBlockSignature = lastBlockSignature; - this.lastBlockTimestamp = lastBlockTimestamp; - this.lastBlockGenerator = lastBlockGenerator; this.lastMisbehaved = lastMisbehaved; + this.addedWhen = addedWhen; + this.addedBy = addedBy; + } + + public PeerData(PeerAddress peerAddress, Long addedWhen, String addedBy) { + this(peerAddress, null, null, null, addedWhen, addedBy); } public PeerData(PeerAddress peerAddress) { - this(peerAddress, null, null, null, null, null, null, null); + this(peerAddress, null, null, null, null, null); } // Getters / setters @@ -73,38 +74,6 @@ public class PeerData { this.lastConnected = lastConnected; } - public Integer getLastHeight() { - return this.lastHeight; - } - - public void setLastHeight(Integer lastHeight) { - this.lastHeight = lastHeight; - } - - public byte[] getLastBlockSignature() { - return lastBlockSignature; - } - - public void setLastBlockSignature(byte[] lastBlockSignature) { - this.lastBlockSignature = lastBlockSignature; - } - - public Long getLastBlockTimestamp() { - return lastBlockTimestamp; - } - - public void setLastBlockTimestamp(Long lastBlockTimestamp) { - this.lastBlockTimestamp = lastBlockTimestamp; - } - - public byte[] getLastBlockGenerator() { - return lastBlockGenerator; - } - - public void setLastBlockGenerator(byte[] lastBlockGenerator) { - this.lastBlockGenerator = lastBlockGenerator; - } - public Long getLastMisbehaved() { return this.lastMisbehaved; } @@ -113,6 +82,14 @@ public class PeerData { this.lastMisbehaved = lastMisbehaved; } + public Long getAddedWhen() { + return this.addedWhen; + } + + public String getAddedBy() { + return this.addedBy; + } + // Pretty peerAddress getter for JAXB @XmlElement(name = "address") protected String getPrettyAddress() { diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index 1cf21c24..617d8c60 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -148,10 +148,16 @@ public class Network extends Thread { return instance; } + public byte[] getMessageMagic() { + return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC; + } + public byte[] getOurPeerId() { return this.ourPeerId; } + // Peer lists + public List getConnectedPeers() { synchronized (this.connectedPeers) { return new ArrayList<>(this.connectedPeers); @@ -164,16 +170,64 @@ public class Network extends Thread { } } - public void noteToSelf(Peer peer) { - LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer)); + /** Returns list of connected peers that have completed handshaking. */ + public List getHandshakedPeers() { + List peers = new ArrayList<>(); - synchronized (this.selfPeers) { - this.selfPeers.add(peer.getPeerData().getAddress()); + synchronized (this.connectedPeers) { + peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); + } + + return peers; + } + + /** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */ + public List getUniqueHandshakedPeers() { + final List peers; + + synchronized (this.connectedPeers) { + peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); + } + + // Returns true if this [inbound] peer has corresponding outbound peer with same ID + Predicate hasOutboundWithSameId = peer -> { + // Peer is outbound so return fast + if (peer.isOutbound()) + return false; + + return peers.stream().anyMatch(otherPeer -> otherPeer.isOutbound() && Arrays.equals(otherPeer.getPeerId(), peer.getPeerId())); + }; + + // Filter out [inbound] peers that have corresponding outbound peer with the same ID + peers.removeIf(hasOutboundWithSameId); + + return peers; + } + + /** Returns list of peers we connected to that have completed handshaking. */ + public List getOutboundHandshakedPeers() { + List peers = new ArrayList<>(); + + synchronized (this.connectedPeers) { + peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED) + .collect(Collectors.toList()); + } + + return peers; + } + + /** Returns Peer with inbound connection and matching ID, or null if none found. */ + public Peer getInboundPeerWithId(byte[] peerId) { + synchronized (this.connectedPeers) { + return this.connectedPeers.stream().filter(peer -> !peer.isOutbound() && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null); } } - public byte[] getMessageMagic() { - return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC; + /** Returns handshake-completed Peer with outbound connection and matching ID, or null if none found. */ + public Peer getOutboundHandshakedPeerWithId(byte[] peerId) { + synchronized (this.connectedPeers) { + return this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null); + } } // Initial setup @@ -182,7 +236,7 @@ public class Network extends Thread { for (String address : INITIAL_PEERS) { PeerAddress peerAddress = PeerAddress.fromString(address); - PeerData peerData = new PeerData(peerAddress); + PeerData peerData = new PeerData(peerAddress, NTP.getTime(), "INIT"); repository.getNetworkRepository().save(peerData); } @@ -288,7 +342,7 @@ public class Network extends Thread { for (PeerData peerData : peers) { LOGGER.debug(String.format("Deleting old peer %s from repository", peerData.getAddress().toString())); - repository.getNetworkRepository().delete(peerData); + repository.getNetworkRepository().delete(peerData.getAddress()); } repository.saveChanges(); @@ -398,7 +452,7 @@ public class Network extends Thread { // as remote port is not likely to be remote peer's listen port if (!peer.isOutbound()) try (final Repository repository = RepositoryManager.getRepository()) { - repository.getNetworkRepository().delete(peer.getPeerData()); + repository.getNetworkRepository().delete(peer.getPeerData().getAddress()); repository.saveChanges(); } catch (DataException e) { LOGGER.warn(String.format("Repository issue while trying to delete inbound peer %s", peer)); @@ -499,7 +553,7 @@ public class Network extends Thread { // Also add peer's details peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost())); - mergePeers(peerAddresses); + mergePeers(peer.toString(), peerAddresses); break; case PEERS_V2: @@ -518,7 +572,7 @@ public class Network extends Thread { peerV2Addresses.add(0, sendingPeerAddress); } - mergePeers(peerV2Addresses); + mergePeers(peer.toString(), peerV2Addresses); break; case GET_PEERS: @@ -587,6 +641,8 @@ public class Network extends Thread { Controller.getInstance().onPeerHandshakeCompleted(peer); } + // Message-building calls + /** Returns PEERS message made from peers we've connected to recently, and this node's details */ public Message buildPeersMessage(Peer peer) { try (final Repository repository = RepositoryManager.getRepository()) { @@ -691,117 +747,115 @@ public class Network extends Thread { return new GetUnconfirmedTransactionsMessage(); } + // Peer-management calls + + public void noteToSelf(Peer peer) { + LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer)); + + synchronized (this.selfPeers) { + this.selfPeers.add(peer.getPeerData().getAddress()); + } + } + + public boolean forgetPeer(PeerAddress peerAddress) throws DataException { + try (final Repository repository = RepositoryManager.getRepository()) { + int numDeleted = repository.getNetworkRepository().delete(peerAddress); + repository.saveChanges(); + + disconnectPeer(peerAddress); + + return numDeleted != 0; + } + } + + public int forgetAllPeers() throws DataException { + try (final Repository repository = RepositoryManager.getRepository()) { + int numDeleted = repository.getNetworkRepository().deleteAllPeers(); + repository.saveChanges(); + + for (Peer peer : this.getConnectedPeers()) + peer.disconnect("to be forgotten"); + + return numDeleted; + } + } + + private void disconnectPeer(PeerAddress peerAddress) { + // Disconnect peer + try { + InetSocketAddress knownAddress = peerAddress.toSocketAddress(); + + List peers = this.getConnectedPeers(); + peers.removeIf(peer -> !Peer.addressEquals(knownAddress, peer.getResolvedAddress())); + + for (Peer peer : peers) + peer.disconnect("to be forgotten"); + } catch (UnknownHostException e) { + // Unknown host isn't going to match any of our connected peers so ignore + } + } + // Network-wide calls - /** Returns list of connected peers that have completed handshaking. */ - public List getHandshakedPeers() { - List peers = new ArrayList<>(); - - synchronized (this.connectedPeers) { - peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); - } - - return peers; - } - - /** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */ - public List getUniqueHandshakedPeers() { - final List peers; - - synchronized (this.connectedPeers) { - peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); - } - - // Returns true if this [inbound] peer has corresponding outbound peer with same ID - Predicate hasOutboundWithSameId = peer -> { - // Peer is outbound so return fast - if (peer.isOutbound()) - return false; - - return peers.stream().anyMatch(otherPeer -> otherPeer.isOutbound() && Arrays.equals(otherPeer.getPeerId(), peer.getPeerId())); - }; - - // Filter out [inbound] peers that have corresponding outbound peer with the same ID - peers.removeIf(hasOutboundWithSameId); - - return peers; - } - - /** Returns list of peers we connected to that have completed handshaking. */ - public List getOutboundHandshakedPeers() { - List peers = new ArrayList<>(); - - synchronized (this.connectedPeers) { - peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED) - .collect(Collectors.toList()); - } - - return peers; - } - - /** Returns Peer with inbound connection and matching ID, or null if none found. */ - public Peer getInboundPeerWithId(byte[] peerId) { - synchronized (this.connectedPeers) { - return this.connectedPeers.stream().filter(peer -> !peer.isOutbound() && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null); - } - } - - /** Returns handshake-completed Peer with outbound connection and matching ID, or null if none found. */ - public Peer getOutboundHandshakedPeerWithId(byte[] peerId) { - synchronized (this.connectedPeers) { - return this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null); - } - } - - private void mergePeers(List peerAddresses) { + private void mergePeers(String addedBy, List peerAddresses) { // This can block (due to lock) so fire off in separate thread class PeersMerger implements Runnable { + private String addedBy; private List peerAddresses; - public PeersMerger(List peerAddresses) { + public PeersMerger(String addedBy, List peerAddresses) { + this.addedBy = addedBy; this.peerAddresses = peerAddresses; } @Override public void run() { - Thread.currentThread().setName("Merging peers"); + Thread.currentThread().setName(String.format("Merging peers from %s", this.addedBy)); // Serialize using lock to prevent repository deadlocks - mergePeersLock.lock(); - try { - try (final Repository repository = RepositoryManager.getRepository()) { - List knownPeers = repository.getNetworkRepository().getAllPeers(); + mergePeersLock.lockInterruptibly(); - for (PeerData peerData : knownPeers) - LOGGER.trace(String.format("Known peer %s", peerData.getAddress())); + final long addedWhen = NTP.getTime(); - // Filter out duplicates - Predicate isKnownAddress = peerAddress -> { - return knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress)); - }; + try { + try (final Repository repository = RepositoryManager.getRepository()) { + List knownPeers = repository.getNetworkRepository().getAllPeers(); - peerAddresses.removeIf(isKnownAddress); + for (PeerData peerData : knownPeers) + LOGGER.trace(String.format("Known peer %s", peerData.getAddress())); - // Save the rest into database - for (PeerAddress peerAddress : peerAddresses) { - PeerData peerData = new PeerData(peerAddress); - LOGGER.info(String.format("Adding new peer %s to repository", peerAddress)); - repository.getNetworkRepository().save(peerData); + // Filter out duplicates + Predicate isKnownAddress = peerAddress -> { + return knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress)); + }; + + peerAddresses.removeIf(isKnownAddress); + + // Save the rest into database + for (PeerAddress peerAddress : peerAddresses) { + PeerData peerData = new PeerData(peerAddress, addedWhen, addedBy); + LOGGER.info(String.format("Adding new peer %s to repository", peerAddress)); + repository.getNetworkRepository().save(peerData); + } + + repository.saveChanges(); + } catch (DataException e) { + LOGGER.error("Repository issue while merging peers list from remote node", e); } - - repository.saveChanges(); - } catch (DataException e) { - LOGGER.error("Repository issue while merging peers list from remote node", e); + } finally { + mergePeersLock.unlock(); } - } finally { - mergePeersLock.unlock(); + } catch (InterruptedException e1) { + // We're exiting anyway... } + + Thread.currentThread().setName("Merging peers (dormant)"); } } try { - mergePeersExecutor.execute(new PeersMerger(peerAddresses)); + mergePeersExecutor.execute(new PeersMerger(addedBy, peerAddresses)); } catch (RejectedExecutionException e) { // Can't execute - probably because we're shutting down, so ignore } @@ -849,6 +903,8 @@ public class Network extends Thread { } } + // Shutdown + public void shutdown() { this.isStopping = true; @@ -891,6 +947,6 @@ public class Network extends Thread { for (Peer peer : this.connectedPeers) peer.shutdown(); } -} + } } diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index d1212874..3cab08de 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -47,30 +47,54 @@ public class Peer extends Thread { private static final int SOCKET_TIMEOUT = 10000; // ms private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10; - private final boolean isOutbound; private volatile boolean isStopping = false; + private Socket socket = null; - private PeerData peerData = null; - private final ReentrantLock peerDataLock = new ReentrantLock(); - private Long connectionTimestamp = null; + private InetSocketAddress resolvedAddress = null; + /** True if remote address is loopback/link-local/site-local, false otherwise. */ + private boolean isLocal; private OutputStream out; - private Handshake handshakeStatus = Handshake.STARTED; + private Map> replyQueues; + private BlockingQueue unsolicitedQueue; private ExecutorService messageExecutor; - private VersionMessage versionMessage = null; - private Integer version; + private ScheduledExecutorService pingExecutor; - private Long lastPing = null; - private InetSocketAddress resolvedAddress = null; - private boolean isLocal; + + /** True if we created connection to peer, false if we accepted incoming connection from peer. */ + private final boolean isOutbound; + /** Numeric protocol version, typically 1 or 2. */ + private Integer version; private byte[] peerId; + private Handshake handshakeStatus = Handshake.STARTED; + private byte[] pendingPeerId; private byte[] verificationCodeSent; private byte[] verificationCodeExpected; - /** Construct unconnected outbound Peer using socket address in peer data */ + private PeerData peerData = null; + private final ReentrantLock peerLock = new ReentrantLock(); + + /** Timestamp of when socket was accepted, or connected. */ + private Long connectionTimestamp = null; + /** Version info as reported by peer. */ + private VersionMessage versionMessage = null; + /** Last PING message round-trip time (ms). */ + private Long lastPing = null; + /** Latest block height as reported by peer. */ + private Integer lastHeight; + /** Latest block signature as reported by peer. */ + private byte[] lastBlockSignature; + /** Latest block timestamp as reported by peer. */ + private Long lastBlockTimestamp; + /** Latest block generator public key as reported by peer. */ + private byte[] lastBlockGenerator; + + // Constructors + + /** Construct unconnected, outbound Peer using socket address in peer data */ public Peer(PeerData peerData) { this.isOutbound = true; this.peerData = peerData; @@ -91,13 +115,7 @@ public class Peer extends Thread { // Getters / setters public PeerData getPeerData() { - this.peerDataLock.lock(); - - try { - return this.peerData; - } finally { - this.peerDataLock.unlock(); - } + return this.peerData; } public boolean isOutbound() { @@ -179,9 +197,41 @@ public class Peer extends Thread { this.verificationCodeExpected = expected; } - /** Returns the lock used for synchronizing access to peer's PeerData. */ - public ReentrantLock getPeerDataLock() { - return this.peerDataLock; + public Integer getLastHeight() { + return this.lastHeight; + } + + public void setLastHeight(Integer lastHeight) { + this.lastHeight = lastHeight; + } + + public byte[] getLastBlockSignature() { + return lastBlockSignature; + } + + public void setLastBlockSignature(byte[] lastBlockSignature) { + this.lastBlockSignature = lastBlockSignature; + } + + public Long getLastBlockTimestamp() { + return lastBlockTimestamp; + } + + public void setLastBlockTimestamp(Long lastBlockTimestamp) { + this.lastBlockTimestamp = lastBlockTimestamp; + } + + public byte[] getLastBlockGenerator() { + return lastBlockGenerator; + } + + public void setLastBlockGenerator(byte[] lastBlockGenerator) { + this.lastBlockGenerator = lastBlockGenerator; + } + + /** Returns the lock used for synchronizing access to peer info. */ + public ReentrantLock getPeerLock() { + return this.peerLock; } // Easier, and nicer output, than peer.getRemoteSocketAddress() @@ -488,6 +538,7 @@ public class Peer extends Thread { return new InetSocketAddress(address, hostAndPort.getPortOrDefault(Settings.DEFAULT_LISTEN_PORT)); } + /** Returns true if address is loopback/link-local/site-local, false otherwise. */ public static boolean isAddressLocal(InetAddress address) { return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress(); } diff --git a/src/main/java/org/qora/repository/NetworkRepository.java b/src/main/java/org/qora/repository/NetworkRepository.java index 8b52fbea..823a47d6 100644 --- a/src/main/java/org/qora/repository/NetworkRepository.java +++ b/src/main/java/org/qora/repository/NetworkRepository.java @@ -3,6 +3,7 @@ package org.qora.repository; import java.util.List; import org.qora.data.network.PeerData; +import org.qora.network.PeerAddress; public interface NetworkRepository { @@ -10,7 +11,7 @@ public interface NetworkRepository { public void save(PeerData peerData) throws DataException; - public int delete(PeerData peerData) throws DataException; + public int delete(PeerAddress peerAddress) throws DataException; public int deleteAllPeers() throws DataException; diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java index ae80ed03..e5b6a0ae 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java @@ -766,6 +766,16 @@ public class HSQLDBDatabaseUpdates { stmt.execute("ALTER TABLE ArbitraryTransactions ALTER COLUMN data_hash RENAME TO data"); break; + case 53: + // Change what we store about peers (again) + stmt.execute("ALTER TABLE Peers DROP COLUMN last_block_signature"); + stmt.execute("ALTER TABLE Peers DROP COLUMN last_block_timestamp"); + stmt.execute("ALTER TABLE Peers DROP COLUMN last_block_generator"); + stmt.execute("ALTER TABLE Peers DROP COLUMN last_height"); + stmt.execute("ALTER TABLE Peers ADD COLUMN added_when TIMESTAMP WITH TIME ZONE"); + stmt.execute("ALTER TABLE Peers ADD COLUMN added_by VARCHAR(255)"); + break; + default: // nothing to do return false; diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java index f3d10ecf..928e718a 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java @@ -20,7 +20,7 @@ public class HSQLDBNetworkRepository implements NetworkRepository { @Override public List getAllPeers() throws DataException { - String sql = "SELECT address, last_connected, last_attempted, last_height, last_block_signature, last_block_timestamp, last_block_generator, last_misbehaved FROM Peers"; + String sql = "SELECT address, last_connected, last_attempted, last_misbehaved, added_when, added_by FROM Peers"; List peers = new ArrayList<>(); @@ -37,19 +37,13 @@ public class HSQLDBNetworkRepository implements NetworkRepository { Long lastAttempted = HSQLDBRepository.getZonedTimestampMilli(resultSet, 3); - Integer lastHeight = resultSet.getInt(4); - if (lastHeight == 0 && resultSet.wasNull()) - lastHeight = null; + Long lastMisbehaved = HSQLDBRepository.getZonedTimestampMilli(resultSet, 4); - byte[] lastBlockSignature = resultSet.getBytes(5); + Long addedWhen = HSQLDBRepository.getZonedTimestampMilli(resultSet, 5); - Long lastBlockTimestamp = HSQLDBRepository.getZonedTimestampMilli(resultSet, 6); + String addedBy = resultSet.getString(6); - byte[] lastBlockGenerator = resultSet.getBytes(7); - - Long lastMisbehaved = HSQLDBRepository.getZonedTimestampMilli(resultSet, 8); - - peers.add(new PeerData(peerAddress, lastAttempted, lastConnected, lastHeight, lastBlockSignature, lastBlockTimestamp, lastBlockGenerator, lastMisbehaved)); + peers.add(new PeerData(peerAddress, lastAttempted, lastConnected, lastMisbehaved, addedWhen, addedBy)); } while (resultSet.next()); return peers; @@ -66,10 +60,9 @@ public class HSQLDBNetworkRepository implements NetworkRepository { saveHelper.bind("address", peerData.getAddress().toString()).bind("last_connected", HSQLDBRepository.toOffsetDateTime(peerData.getLastConnected())) .bind("last_attempted", HSQLDBRepository.toOffsetDateTime(peerData.getLastAttempted())) - .bind("last_height", peerData.getLastHeight()).bind("last_block_signature", peerData.getLastBlockSignature()) - .bind("last_block_timestamp", HSQLDBRepository.toOffsetDateTime(peerData.getLastBlockTimestamp())) - .bind("last_block_generator", peerData.getLastBlockGenerator()) - .bind("last_misbehaved", HSQLDBRepository.toOffsetDateTime(peerData.getLastMisbehaved())); + .bind("last_misbehaved", HSQLDBRepository.toOffsetDateTime(peerData.getLastMisbehaved())) + .bind("added_when", HSQLDBRepository.toOffsetDateTime(peerData.getAddedWhen())) + .bind("added_by", peerData.getAddedBy()); try { saveHelper.execute(this.repository); @@ -79,9 +72,9 @@ public class HSQLDBNetworkRepository implements NetworkRepository { } @Override - public int delete(PeerData peerData) throws DataException { + public int delete(PeerAddress peerAddress) throws DataException { try { - return this.repository.delete("Peers", "address = ?", peerData.getAddress().toString()); + return this.repository.delete("Peers", "address = ?", peerAddress.toString()); } catch (SQLException e) { throw new DataException("Unable to delete peer from repository", e); }