diff --git a/src/main/java/org/qortal/api/ApiError.java b/src/main/java/org/qortal/api/ApiError.java index f4cfdfb5..8a0b6078 100644 --- a/src/main/java/org/qortal/api/ApiError.java +++ b/src/main/java/org/qortal/api/ApiError.java @@ -15,6 +15,7 @@ public enum ApiError { REPOSITORY_ISSUE(5, 500), NON_PRODUCTION(6, 403), BLOCKCHAIN_NEEDS_SYNC(7, 503), + NO_TIME_SYNC(8, 503), // VALIDATION INVALID_SIGNATURE(101, 400), diff --git a/src/main/java/org/qortal/api/resource/PeersResource.java b/src/main/java/org/qortal/api/resource/PeersResource.java index 962f5e14..486eb2cb 100644 --- a/src/main/java/org/qortal/api/resource/PeersResource.java +++ b/src/main/java/org/qortal/api/resource/PeersResource.java @@ -8,6 +8,7 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -29,9 +30,8 @@ import org.qortal.data.network.PeerData; import org.qortal.network.Network; import org.qortal.network.PeerAddress; import org.qortal.repository.DataException; -import org.qortal.repository.Repository; -import org.qortal.repository.RepositoryManager; import org.qortal.utils.ExecuteProduceConsume; +import org.qortal.utils.NTP; @Path("/peers") @Tag(name = "Peers") @@ -81,11 +81,7 @@ public class PeersResource { ApiError.REPOSITORY_ISSUE }) public List getKnownPeers() { - try (final Repository repository = RepositoryManager.getRepository()) { - return repository.getNetworkRepository().getAllPeers(); - } catch (DataException e) { - throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e); - } + return Network.getInstance().getAllKnownPeers(); } @GET @@ -166,12 +162,14 @@ public class PeersResource { public String addPeer(String address) { Security.checkApiCallAllowed(request); - try (final Repository repository = RepositoryManager.getRepository()) { + final Long addedWhen = NTP.getTime(); + if (addedWhen == null) + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.NO_TIME_SYNC); + + try { PeerAddress peerAddress = PeerAddress.fromString(address); - PeerData peerData = new PeerData(peerAddress, System.currentTimeMillis(), "API"); - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); + Network.getInstance().mergePeers("API", addedWhen, Arrays.asList(peerAddress)); return "true"; } catch (IllegalArgumentException e) { diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 08fdc953..169fcda6 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -336,7 +336,7 @@ public class Controller extends Thread { try { Network network = Network.getInstance(); network.start(); - } catch (IOException e) { + } catch (IOException | DataException e) { LOGGER.error("Unable to start networking", e); Controller.getInstance().shutdown(); Gui.getInstance().fatalError("Networking failure", e); @@ -549,17 +549,7 @@ public class Controller extends Thread { LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name())); // Don't use this peer again for a while - PeerData peerData = peer.getPeerData(); - peerData.setLastMisbehaved(NTP.getTime()); - - // Only save to repository if outbound peer - if (peer.isOutbound()) - try (final Repository repository = RepositoryManager.getRepository()) { - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); - } catch (DataException e) { - LOGGER.warn("Repository issue while updating peer synchronization info", e); - } + Network.getInstance().peerMisbehaved(peer); break; } diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index 119e66d6..e4c05ee3 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -7,7 +7,6 @@ import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.net.UnknownHostException; import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -56,6 +55,7 @@ import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; import org.qortal.settings.Settings; import org.qortal.utils.ExecuteProduceConsume; +import org.qortal.utils.ExecutorDumper; import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot; import org.qortal.utils.NTP; @@ -104,6 +104,8 @@ public class Network { private final byte[] ourPeerId; private final int maxMessageSize; + + private List allKnownPeers; private List connectedPeers; private List selfPeers; @@ -152,7 +154,7 @@ public class Network { networkEPC = new NetworkProcessor(networkExecutor); } - public void start() throws IOException { + public void start() throws IOException, DataException { // Grab P2P port from settings int listenPort = Settings.getInstance().getListenPort(); @@ -177,6 +179,11 @@ public class Network { throw new IOException("Can't create listen socket", e); } + // Load all known peers from repository + try (final Repository repository = RepositoryManager.getRepository()) { + allKnownPeers = repository.getNetworkRepository().getAllPeers(); + } + // Start up first networking thread networkEPC.start(); } @@ -209,6 +216,12 @@ public class Network { // Peer lists + public List getAllKnownPeers() { + synchronized (this.allKnownPeers) { + return new ArrayList<>(this.allKnownPeers); + } + } + public List getConnectedPeers() { synchronized (this.connectedPeers) { return new ArrayList<>(this.connectedPeers); @@ -223,24 +236,16 @@ public class Network { /** Returns list of connected peers that have completed handshaking. */ public List getHandshakedPeers() { - List peers = new ArrayList<>(); - synchronized (this.connectedPeers) { - peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); + return this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); } - - return peers; } /** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */ public List getUniqueHandshakedPeers() { - final List peers; + List peers = getHandshakedPeers(); - synchronized (this.connectedPeers) { - peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); - } - - // Returns true if this [inbound] peer has corresponding outbound peer with same ID + // Returns true if this peer is inbound and has corresponding outbound peer with same ID Predicate hasOutboundWithSameId = peer -> { // Peer is outbound so return fast if (peer.isOutbound()) @@ -249,7 +254,7 @@ public class Network { return peers.stream().anyMatch(otherPeer -> otherPeer.isOutbound() && Arrays.equals(otherPeer.getPeerId(), peer.getPeerId())); }; - // Filter out [inbound] peers that have corresponding outbound peer with the same ID + // Filter out inbound peers that have corresponding outbound peer with the same ID peers.removeIf(hasOutboundWithSameId); return peers; @@ -276,6 +281,32 @@ public class Network { } } + + // Peer list filters + + /** Must be inside synchronized (this.selfPeers) {...} */ + private final Predicate isSelfPeer = peerData -> { + PeerAddress peerAddress = peerData.getAddress(); + return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress)); + }; + + /** Must be inside synchronized (this.connectedPeers) {...} */ + private final Predicate isConnectedPeer = peerData -> { + PeerAddress peerAddress = peerData.getAddress(); + return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); + }; + + /** Must be inside synchronized (this.connectedPeers) {...} */ + private final Predicate isResolvedAsConnectedPeer = peerData -> { + try { + InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress(); + return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress)); + } catch (UnknownHostException e) { + // Can't resolve - no point even trying to connect + return true; + } + }; + // Initial setup public static void installInitialPeers(Repository repository) throws DataException { @@ -297,6 +328,12 @@ public class Network { super(executor); } + @Override + protected void onSpawnFailure() { + // For debugging: + // ExecutorDumper.dump(this.executor, 3, ExecuteProduceConsume.class); + } + @Override protected Task produceTask(boolean canBlock) throws InterruptedException { Task task; @@ -504,7 +541,7 @@ public class Network { LOGGER.debug(() -> String.format("Connection accepted from peer %s", PeerAddress.fromSocket(socketChannel.socket()))); - newPeer = new Peer(socketChannel); + newPeer = new Peer(socketChannel, channelSelector); this.connectedPeers.add(newPeer); } } catch (IOException e) { @@ -512,105 +549,15 @@ public class Network { try { socketChannel.close(); } catch (IOException ce) { + // Couldn't close? } return; } - try { - socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); - socketChannel.configureBlocking(false); - socketChannel.register(channelSelector, SelectionKey.OP_READ); - } catch (IOException e) { - // Remove from connected peers - synchronized (this.connectedPeers) { - this.connectedPeers.remove(newPeer); - } - - return; - } - this.onPeerReady(newPeer); } - public void prunePeers() throws InterruptedException, DataException { - final Long now = NTP.getTime(); - if (now == null) - return; - - // Disconnect peers that are stuck during handshake - List handshakePeers = this.getConnectedPeers(); - - // Disregard peers that have completed handshake or only connected recently - handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT); - - for (Peer peer : handshakePeers) - peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name())); - - // Prune 'old' peers from repository... - // Pruning peers isn't critical so no need to block for a repository instance. - try (final Repository repository = RepositoryManager.tryRepository()) { - if (repository == null) - return; - - // Fetch all known peers - List peers = repository.getNetworkRepository().getAllPeers(); - - // 'Old' peers: - // we have attempted to connect within the last day - // we last managed to connect over a week ago - Predicate isNotOldPeer = peerData -> { - if (peerData.getLastAttempted() == null || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD) - return true; - - if (peerData.getLastConnected() == null || peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD) - return true; - - return false; - }; - - // Disregard peers that are NOT 'old' - peers.removeIf(isNotOldPeer); - - // Don't consider already connected peers (simple address match) - Predicate isConnectedPeer = peerData -> { - PeerAddress peerAddress = peerData.getAddress(); - return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); - }; - - synchronized (this.connectedPeers) { - peers.removeIf(isConnectedPeer); - } - - for (PeerData peerData : peers) { - LOGGER.debug(String.format("Deleting old peer %s from repository", peerData.getAddress().toString())); - repository.getNetworkRepository().delete(peerData.getAddress()); - } - - repository.saveChanges(); - } - } - - private final Predicate isSelfPeer = peerData -> { - PeerAddress peerAddress = peerData.getAddress(); - return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress)); - }; - - private final Predicate isConnectedPeer = peerData -> { - PeerAddress peerAddress = peerData.getAddress(); - return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); - }; - - private final Predicate isResolvedAsConnectedPeer = peerData -> { - try { - InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress(); - return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress)); - } catch (UnknownHostException e) { - // Can't resolve - no point even trying to connect - return true; - } - }; - private Peer getConnectablePeer(final Long now) throws InterruptedException { // We can't block here so use tryRepository(). We don't NEED to connect a new peer. try (final Repository repository = RepositoryManager.tryRepository()) { @@ -618,7 +565,7 @@ public class Network { return null; // Find an address to connect to - List peers = repository.getNetworkRepository().getAllPeers(); + List peers = this.getAllKnownPeers(); // Don't consider peers with recent connection failures final long lastAttemptedThreshold = now - CONNECT_FAILURE_BACKOFF; @@ -631,14 +578,12 @@ public class Network { peers.removeIf(isSelfPeer); } - // Don't consider already connected peers (simple address match) synchronized (this.connectedPeers) { + // Don't consider already connected peers (simple address match) peers.removeIf(isConnectedPeer); - } - // Don't consider already connected peers (resolved address match) - // XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS - synchronized (this.connectedPeers) { + // Don't consider already connected peers (resolved address match) + // XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS peers.removeIf(isResolvedAsConnectedPeer); } @@ -647,17 +592,18 @@ public class Network { return null; // Pick random peer - int peerIndex = new SecureRandom().nextInt(peers.size()); + int peerIndex = new Random().nextInt(peers.size()); // Pick candidate PeerData peerData = peers.get(peerIndex); Peer newPeer = new Peer(peerData); // Update connection attempt info - repository.discardChanges(); peerData.setLastAttempted(now); - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); + synchronized (this.allKnownPeers) { + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } return newPeer; } catch (DataException e) { @@ -667,7 +613,7 @@ public class Network { } private void connectPeer(Peer newPeer) throws InterruptedException { - SocketChannel socketChannel = newPeer.connect(); + SocketChannel socketChannel = newPeer.connect(this.channelSelector); if (socketChannel == null) return; @@ -678,15 +624,6 @@ public class Network { this.connectedPeers.add(newPeer); } - try { - socketChannel.register(channelSelector, SelectionKey.OP_READ); - } catch (ClosedChannelException e) { - // If channel has somehow already closed then remove from connectedPeers - synchronized (this.connectedPeers) { - this.connectedPeers.remove(newPeer); - } - } - this.onPeerReady(newPeer); } @@ -718,15 +655,21 @@ public class Network { synchronized (this.connectedPeers) { this.connectedPeers.remove(peer); } + } - // If this is an inbound peer then remove from known peers list - // as remote port is not likely to be remote peer's listen port - if (!peer.isOutbound()) + public void peerMisbehaved(Peer peer) { + PeerData peerData = peer.getPeerData(); + peerData.setLastMisbehaved(NTP.getTime()); + + // Only update repository if outbound peer + if (peer.isOutbound()) try (final Repository repository = RepositoryManager.getRepository()) { - repository.getNetworkRepository().delete(peer.getPeerData().getAddress()); - repository.saveChanges(); + synchronized (this.allKnownPeers) { + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } } catch (DataException e) { - LOGGER.error(String.format("Repository issue while trying to delete inbound peer %s", peer), e); + LOGGER.warn("Repository issue while updating peer synchronization info", e); } } @@ -828,7 +771,7 @@ public class Network { private void onGetPeersMessage(Peer peer, Message message) { // Send our known peers - if (!peer.sendMessage(buildPeersMessage(peer))) + if (!peer.sendMessage(this.buildPeersMessage(peer))) peer.disconnect("failed to send peers list"); } @@ -845,7 +788,7 @@ public class Network { // Also add peer's details peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost())); - mergePeers(peer.toString(), peerAddresses); + opportunisticMergePeers(peer.toString(), peerAddresses); } private void onPingMessage(Peer peer, Message message) { @@ -875,7 +818,7 @@ public class Network { peerV2Addresses.add(0, sendingPeerAddress); } - mergePeers(peer.toString(), peerV2Addresses); + opportunisticMergePeers(peer.toString(), peerV2Addresses); } private void onPeerVerifyMessage(Peer peer, Message message) { @@ -925,8 +868,10 @@ public class Network { // Update connection info for outbound peers only if (peer.isOutbound()) try (final Repository repository = RepositoryManager.getRepository()) { - repository.getNetworkRepository().save(peer.getPeerData()); - repository.saveChanges(); + synchronized (this.allKnownPeers) { + repository.getNetworkRepository().save(peer.getPeerData()); + repository.saveChanges(); + } } catch (DataException e) { LOGGER.error(String.format("Repository issue while trying to update outbound peer %s", peer), e); } @@ -964,77 +909,72 @@ public class Network { /** Returns PEERS message made from peers we've connected to recently, and this node's details */ public Message buildPeersMessage(Peer peer) { - try (final Repository repository = RepositoryManager.getRepository()) { - List knownPeers = repository.getNetworkRepository().getAllPeers(); + List knownPeers = this.getAllKnownPeers(); - // Filter out peers that we've not connected to ever or within X milliseconds - final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD; - Predicate notRecentlyConnected = peerData -> { - final Long lastAttempted = peerData.getLastAttempted(); - final Long lastConnected = peerData.getLastConnected(); + // Filter out peers that we've not connected to ever or within X milliseconds + final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD; + Predicate notRecentlyConnected = peerData -> { + final Long lastAttempted = peerData.getLastAttempted(); + final Long lastConnected = peerData.getLastConnected(); - if (lastAttempted == null || lastConnected == null) - return true; + if (lastAttempted == null || lastConnected == null) + return true; - if (lastConnected < lastAttempted) - return true; + if (lastConnected < lastAttempted) + return true; - if (lastConnected < connectionThreshold) - return true; + if (lastConnected < connectionThreshold) + return true; - return false; - }; - knownPeers.removeIf(notRecentlyConnected); + return false; + }; + knownPeers.removeIf(notRecentlyConnected); - if (peer.getVersion() >= 2) { - List peerAddresses = new ArrayList<>(); + if (peer.getVersion() >= 2) { + List peerAddresses = new ArrayList<>(); - for (PeerData peerData : knownPeers) { - try { - InetAddress address = InetAddress.getByName(peerData.getAddress().getHost()); + for (PeerData peerData : knownPeers) { + try { + InetAddress address = InetAddress.getByName(peerData.getAddress().getHost()); - // Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org - if (!peer.getIsLocal() && Peer.isAddressLocal(address)) - continue; + // Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org + if (!peer.getIsLocal() && Peer.isAddressLocal(address)) + continue; - peerAddresses.add(peerData.getAddress()); - } catch (UnknownHostException e) { - // Couldn't resolve hostname to IP address so discard - } + peerAddresses.add(peerData.getAddress()); + } catch (UnknownHostException e) { + // Couldn't resolve hostname to IP address so discard } - - // New format PEERS_V2 message that supports hostnames, IPv6 and ports - return new PeersV2Message(peerAddresses); - } else { - // Map to socket addresses - List peerAddresses = new ArrayList<>(); - - for (PeerData peerData : knownPeers) { - try { - // We have to resolve to literal IP address to check for IPv4-ness. - // This isn't great if hostnames have both IPv6 and IPv4 DNS entries. - InetAddress address = InetAddress.getByName(peerData.getAddress().getHost()); - - // Legacy PEERS message doesn't support IPv6 - if (address instanceof Inet6Address) - continue; - - // Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org - if (!peer.getIsLocal() && !Peer.isAddressLocal(address)) - continue; - - peerAddresses.add(address); - } catch (UnknownHostException e) { - // Couldn't resolve hostname to IP address so discard - } - } - - // Legacy PEERS message that only sends IPv4 addresses - return new PeersMessage(peerAddresses); } - } catch (DataException e) { - LOGGER.error("Repository issue while building PEERS message", e); - return new PeersMessage(Collections.emptyList()); + + // New format PEERS_V2 message that supports hostnames, IPv6 and ports + return new PeersV2Message(peerAddresses); + } else { + // Map to socket addresses + List peerAddresses = new ArrayList<>(); + + for (PeerData peerData : knownPeers) { + try { + // We have to resolve to literal IP address to check for IPv4-ness. + // This isn't great if hostnames have both IPv6 and IPv4 DNS entries. + InetAddress address = InetAddress.getByName(peerData.getAddress().getHost()); + + // Legacy PEERS message doesn't support IPv6 + if (address instanceof Inet6Address) + continue; + + // Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org + if (!peer.getIsLocal() && !Peer.isAddressLocal(address)) + continue; + + peerAddresses.add(address); + } catch (UnknownHostException e) { + // Couldn't resolve hostname to IP address so discard + } + } + + // Legacy PEERS message that only sends IPv4 addresses + return new PeersMessage(peerAddresses); } } @@ -1077,26 +1017,38 @@ public class Network { } public boolean forgetPeer(PeerAddress peerAddress) throws DataException { - try (final Repository repository = RepositoryManager.getRepository()) { - int numDeleted = repository.getNetworkRepository().delete(peerAddress); - repository.saveChanges(); + int numDeleted; - disconnectPeer(peerAddress); + synchronized (this.allKnownPeers) { + this.allKnownPeers.removeIf(peerData -> peerData.getAddress().equals(peerAddress)); - return numDeleted != 0; + try (final Repository repository = RepositoryManager.getRepository()) { + numDeleted = repository.getNetworkRepository().delete(peerAddress); + repository.saveChanges(); + } } + + disconnectPeer(peerAddress); + + return numDeleted != 0; } public int forgetAllPeers() throws DataException { - try (final Repository repository = RepositoryManager.getRepository()) { - int numDeleted = repository.getNetworkRepository().deleteAllPeers(); - repository.saveChanges(); + int numDeleted; - for (Peer peer : this.getConnectedPeers()) - peer.disconnect("to be forgotten"); + synchronized (this.allKnownPeers) { + this.allKnownPeers.clear(); - return numDeleted; + try (final Repository repository = RepositoryManager.getRepository()) { + numDeleted = repository.getNetworkRepository().deleteAllPeers(); + repository.saveChanges(); + } } + + for (Peer peer : this.getConnectedPeers()) + peer.disconnect("to be forgotten"); + + return numDeleted; } private void disconnectPeer(PeerAddress peerAddress) { @@ -1116,7 +1068,72 @@ public class Network { // Network-wide calls - private void mergePeers(String addedBy, List peerAddresses) { + public void prunePeers() throws DataException { + final Long now = NTP.getTime(); + if (now == null) + return; + + // Disconnect peers that are stuck during handshake + List handshakePeers = this.getConnectedPeers(); + + // Disregard peers that have completed handshake or only connected recently + handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT); + + for (Peer peer : handshakePeers) + peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name())); + + // Prune 'old' peers from repository... + // Pruning peers isn't critical so no need to block for a repository instance. + try (final Repository repository = RepositoryManager.tryRepository()) { + if (repository == null) + return; + + synchronized (this.allKnownPeers) { + // Fetch all known peers + List peers = new ArrayList<>(this.allKnownPeers); + + // 'Old' peers: + // We attempted to connect within the last day + // but we last managed to connect over a week ago. + Predicate isNotOldPeer = peerData -> { + if (peerData.getLastAttempted() == null || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD) + return true; + + if (peerData.getLastConnected() == null || peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD) + return true; + + return false; + }; + + // Disregard peers that are NOT 'old' + peers.removeIf(isNotOldPeer); + + // Don't consider already connected peers (simple address match) + synchronized (this.connectedPeers) { + peers.removeIf(isConnectedPeer); + } + + for (PeerData peerData : peers) { + LOGGER.debug(() -> String.format("Deleting old peer %s from repository", peerData.getAddress().toString())); + repository.getNetworkRepository().delete(peerData.getAddress()); + } + + repository.saveChanges(); + } + } + } + + public void mergePeers(String addedBy, long addedWhen, List peerAddresses) throws DataException { + mergePeersLock.lock(); + + try (final Repository repository = RepositoryManager.getRepository()) { + this.mergePeers(repository, addedBy, addedWhen, peerAddresses); + } finally { + mergePeersLock.unlock(); + } + } + + private void opportunisticMergePeers(String addedBy, List peerAddresses) { final Long addedWhen = NTP.getTime(); if (addedWhen == null) return; @@ -1131,27 +1148,42 @@ public class Network { if (repository == null) return; - List knownPeers = repository.getNetworkRepository().getAllPeers(); + this.mergePeers(repository, addedBy, addedWhen, peerAddresses); - // Filter out duplicates - Predicate isKnownAddress = peerAddress -> knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress)); + } catch (DataException e) { + // Already logged by this.mergePeers() + } + } finally { + mergePeersLock.unlock(); + } + } + + private void mergePeers(Repository repository, String addedBy, long addedWhen, List peerAddresses) throws DataException { + List newPeers; + synchronized (this.allKnownPeers) { + for (PeerData knownPeerData : this.allKnownPeers) { + // Filter out duplicates, without resolving via DNS + Predicate isKnownAddress = peerAddress -> knownPeerData.getAddress().equals(peerAddress); peerAddresses.removeIf(isKnownAddress); + } - repository.discardChanges(); + // Add leftover peer addresses to known peers list + newPeers = peerAddresses.stream().map(peerAddress -> new PeerData(peerAddress, addedWhen, addedBy)).collect(Collectors.toList()); - // Save the rest into database - for (PeerAddress peerAddress : peerAddresses) { - PeerData peerData = new PeerData(peerAddress, addedWhen, addedBy); - LOGGER.info(String.format("Adding new peer %s to repository", peerAddress)); + this.allKnownPeers.addAll(newPeers); + + try { + // Save new peers into database + for (PeerData peerData : newPeers) { + LOGGER.info(String.format("Adding new peer %s to repository", peerData.getAddress())); repository.getNetworkRepository().save(peerData); } repository.saveChanges(); } catch (DataException e) { - LOGGER.error("Repository issue while merging peers list from remote node", e); + LOGGER.error(String.format("Repository issue while merging peers list from %s", addedBy), e); + throw e; } - } finally { - mergePeersLock.unlock(); } } diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index 83f8cfe0..419aa415 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -7,6 +7,8 @@ import java.net.SocketTimeoutException; import java.net.StandardSocketOptions; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.security.SecureRandom; import java.util.Collections; @@ -108,10 +110,10 @@ public class Peer { } /** Construct Peer using existing, connected socket */ - public Peer(SocketChannel socketChannel) throws IOException { + public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException { this.isOutbound = false; this.socketChannel = socketChannel; - sharedSetup(); + sharedSetup(channelSelector); this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress()); this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); @@ -254,17 +256,18 @@ public class Peer { new SecureRandom().nextBytes(verificationCodeExpected); } - private void sharedSetup() throws IOException { + private void sharedSetup(Selector channelSelector) throws IOException { this.connectionTimestamp = NTP.getTime(); this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); this.socketChannel.configureBlocking(false); + this.socketChannel.register(channelSelector, SelectionKey.OP_READ); this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC! this.replyQueues = Collections.synchronizedMap(new HashMap>()); this.pendingMessages = new LinkedBlockingQueue<>(); } - public SocketChannel connect() { - LOGGER.trace(String.format("Connecting to peer %s", this)); + public SocketChannel connect(Selector channelSelector) { + LOGGER.trace(() -> String.format("Connecting to peer %s", this)); try { this.resolvedAddress = this.peerData.getAddress().toSocketAddress(); @@ -272,10 +275,6 @@ public class Peer { this.socketChannel = SocketChannel.open(); this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT); - - LOGGER.debug(String.format("Connected to peer %s", this)); - sharedSetup(); - return socketChannel; } catch (SocketTimeoutException e) { LOGGER.trace(String.format("Connection timed out to peer %s", this)); return null; @@ -286,6 +285,20 @@ public class Peer { LOGGER.trace(String.format("Connection failed to peer %s", this)); return null; } + + try { + LOGGER.debug(() -> String.format("Connected to peer %s", this)); + sharedSetup(channelSelector); + return socketChannel; + } catch (IOException e) { + LOGGER.trace(String.format("Post-connection setup failed, peer %s", this)); + try { + socketChannel.close(); + } catch (IOException ce) { + // Failed to close? + } + return null; + } } /** diff --git a/src/main/java/org/qortal/utils/ExecuteProduceConsume.java b/src/main/java/org/qortal/utils/ExecuteProduceConsume.java index 8be654e5..334322bb 100644 --- a/src/main/java/org/qortal/utils/ExecuteProduceConsume.java +++ b/src/main/java/org/qortal/utils/ExecuteProduceConsume.java @@ -30,7 +30,7 @@ public abstract class ExecuteProduceConsume implements Runnable { private final Logger logger; private final boolean isLoggerTraceEnabled; - private ExecutorService executor; + protected ExecutorService executor; // These are volatile to prevent thread-local caching of values // but all are updated inside synchronized blocks @@ -85,6 +85,10 @@ public abstract class ExecuteProduceConsume implements Runnable { return snapshot; } + protected void onSpawnFailure() { + /* Allow override in subclasses */ + } + /** * Returns a Task to be performed, possibly blocking. * @@ -180,6 +184,7 @@ public abstract class ExecuteProduceConsume implements Runnable { ++this.spawnFailures; this.hasThreadPending = false; this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId())); + this.onSpawnFailure(); } } else { this.logger.trace(() -> String.format("[%d] NOT spawning another thread", Thread.currentThread().getId())); diff --git a/src/main/java/org/qortal/utils/ExecutorDumper.java b/src/main/java/org/qortal/utils/ExecutorDumper.java new file mode 100644 index 00000000..838fe2db --- /dev/null +++ b/src/main/java/org/qortal/utils/ExecutorDumper.java @@ -0,0 +1,81 @@ +package org.qortal.utils; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.locks.ReentrantLock; + +public abstract class ExecutorDumper { + + private static final String OUR_CLASS_NAME = ExecutorDumper.class.getName(); + + public static void dump(ExecutorService executor, int checkDepth, Class skipClass) { + if (executor instanceof ThreadPoolExecutor) + dumpThreadPoolExecutor((ThreadPoolExecutor) executor, checkDepth, skipClass); + + return; + } + + private static void dumpThreadPoolExecutor(ThreadPoolExecutor executor, int checkDepth, Class skipClass) { + try { + Field mainLockField = executor.getClass().getDeclaredField("mainLock"); + mainLockField.setAccessible(true); + + Field workersField = executor.getClass().getDeclaredField("workers"); + workersField.setAccessible(true); + + Class[] declaredClasses = executor.getClass().getDeclaredClasses(); + + Class workerClass = null; + for (int i = 0; i < declaredClasses.length; ++i) + if (declaredClasses[i].getSimpleName().equals("Worker")) { + workerClass = declaredClasses[i]; + break; + } + + if (workerClass == null) + return; + + Field workerThreadField = workerClass.getDeclaredField("thread"); + workerThreadField.setAccessible(true); + + String skipClassName = skipClass.getName(); + + ReentrantLock mainLock = (ReentrantLock) mainLockField.get(executor); + mainLock.lock(); + + try { + @SuppressWarnings("unchecked") + HashSet workers = (HashSet) workersField.get(executor); + + WORKER_LOOP: + for (Object workerObj : workers) { + Thread thread = (Thread) workerThreadField.get(workerObj); + + StackTraceElement[] stackTrace = thread.getStackTrace(); + if (stackTrace.length == 0) + continue; + + for (int d = 0; d < checkDepth; ++d) { + String stackClassName = stackTrace[d].getClassName(); + if (stackClassName.equals(skipClassName) || stackClassName.equals(OUR_CLASS_NAME)) + continue WORKER_LOOP; + } + + System.out.println(String.format("[%d] %s:", thread.getId(), thread.getName())); + + for (int d = 0; d < stackTrace.length; ++d) + System.out.println(String.format("\t\t%s.%s at %s:%d", + stackTrace[d].getClassName(), stackTrace[d].getMethodName(), + stackTrace[d].getFileName(), stackTrace[d].getLineNumber())); + } + } finally { + mainLock.unlock(); + } + } catch (Exception e) { + // + } + } + +}