Networking improvements: cached known peers, fewer DB accesses, EPC spawn-failure hook

This commit is contained in:
catbref 2020-04-24 16:57:20 +01:00
parent edb842f0d1
commit df4798e2a1
7 changed files with 373 additions and 253 deletions

View File

@ -15,6 +15,7 @@ public enum ApiError {
REPOSITORY_ISSUE(5, 500),
NON_PRODUCTION(6, 403),
BLOCKCHAIN_NEEDS_SYNC(7, 503),
NO_TIME_SYNC(8, 503),
// VALIDATION
INVALID_SIGNATURE(101, 400),

View File

@ -8,6 +8,7 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@ -29,9 +30,8 @@ import org.qortal.data.network.PeerData;
import org.qortal.network.Network;
import org.qortal.network.PeerAddress;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.NTP;
@Path("/peers")
@Tag(name = "Peers")
@ -81,11 +81,7 @@ public class PeersResource {
ApiError.REPOSITORY_ISSUE
})
public List<PeerData> getKnownPeers() {
try (final Repository repository = RepositoryManager.getRepository()) {
return repository.getNetworkRepository().getAllPeers();
} catch (DataException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e);
}
return Network.getInstance().getAllKnownPeers();
}
@GET
@ -166,12 +162,14 @@ public class PeersResource {
public String addPeer(String address) {
Security.checkApiCallAllowed(request);
try (final Repository repository = RepositoryManager.getRepository()) {
final Long addedWhen = NTP.getTime();
if (addedWhen == null)
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.NO_TIME_SYNC);
try {
PeerAddress peerAddress = PeerAddress.fromString(address);
PeerData peerData = new PeerData(peerAddress, System.currentTimeMillis(), "API");
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
Network.getInstance().mergePeers("API", addedWhen, Arrays.asList(peerAddress));
return "true";
} catch (IllegalArgumentException e) {

View File

@ -336,7 +336,7 @@ public class Controller extends Thread {
try {
Network network = Network.getInstance();
network.start();
} catch (IOException e) {
} catch (IOException | DataException e) {
LOGGER.error("Unable to start networking", e);
Controller.getInstance().shutdown();
Gui.getInstance().fatalError("Networking failure", e);
@ -549,17 +549,7 @@ public class Controller extends Thread {
LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name()));
// Don't use this peer again for a while
PeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(NTP.getTime());
// Only save to repository if outbound peer
if (peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.warn("Repository issue while updating peer synchronization info", e);
}
Network.getInstance().peerMisbehaved(peer);
break;
}

View File

@ -7,7 +7,6 @@ import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.net.UnknownHostException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@ -56,6 +55,7 @@ import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.ExecutorDumper;
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
import org.qortal.utils.NTP;
@ -104,6 +104,8 @@ public class Network {
private final byte[] ourPeerId;
private final int maxMessageSize;
private List<PeerData> allKnownPeers;
private List<Peer> connectedPeers;
private List<PeerAddress> selfPeers;
@ -152,7 +154,7 @@ public class Network {
networkEPC = new NetworkProcessor(networkExecutor);
}
public void start() throws IOException {
public void start() throws IOException, DataException {
// Grab P2P port from settings
int listenPort = Settings.getInstance().getListenPort();
@ -177,6 +179,11 @@ public class Network {
throw new IOException("Can't create listen socket", e);
}
// Load all known peers from repository
try (final Repository repository = RepositoryManager.getRepository()) {
allKnownPeers = repository.getNetworkRepository().getAllPeers();
}
// Start up first networking thread
networkEPC.start();
}
@ -209,6 +216,12 @@ public class Network {
// Peer lists
public List<PeerData> getAllKnownPeers() {
synchronized (this.allKnownPeers) {
return new ArrayList<>(this.allKnownPeers);
}
}
public List<Peer> getConnectedPeers() {
synchronized (this.connectedPeers) {
return new ArrayList<>(this.connectedPeers);
@ -223,24 +236,16 @@ public class Network {
/** Returns list of connected peers that have completed handshaking. */
public List<Peer> getHandshakedPeers() {
List<Peer> peers = new ArrayList<>();
synchronized (this.connectedPeers) {
peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
return this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
}
return peers;
}
/** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */
public List<Peer> getUniqueHandshakedPeers() {
final List<Peer> peers;
List<Peer> peers = getHandshakedPeers();
synchronized (this.connectedPeers) {
peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
}
// Returns true if this [inbound] peer has corresponding outbound peer with same ID
// Returns true if this peer is inbound and has corresponding outbound peer with same ID
Predicate<Peer> hasOutboundWithSameId = peer -> {
// Peer is outbound so return fast
if (peer.isOutbound())
@ -249,7 +254,7 @@ public class Network {
return peers.stream().anyMatch(otherPeer -> otherPeer.isOutbound() && Arrays.equals(otherPeer.getPeerId(), peer.getPeerId()));
};
// Filter out [inbound] peers that have corresponding outbound peer with the same ID
// Filter out inbound peers that have corresponding outbound peer with the same ID
peers.removeIf(hasOutboundWithSameId);
return peers;
@ -276,6 +281,32 @@ public class Network {
}
}
// Peer list filters
/** Must be inside <tt>synchronized (this.selfPeers) {...}</tt> */
private final Predicate<PeerData> isSelfPeer = peerData -> {
PeerAddress peerAddress = peerData.getAddress();
return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress));
};
/** Must be inside <tt>synchronized (this.connectedPeers) {...}</tt> */
private final Predicate<PeerData> isConnectedPeer = peerData -> {
PeerAddress peerAddress = peerData.getAddress();
return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress));
};
/** Must be inside <tt>synchronized (this.connectedPeers) {...}</tt> */
private final Predicate<PeerData> isResolvedAsConnectedPeer = peerData -> {
try {
InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress();
return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress));
} catch (UnknownHostException e) {
// Can't resolve - no point even trying to connect
return true;
}
};
// Initial setup
public static void installInitialPeers(Repository repository) throws DataException {
@ -297,6 +328,12 @@ public class Network {
super(executor);
}
@Override
protected void onSpawnFailure() {
// For debugging:
// ExecutorDumper.dump(this.executor, 3, ExecuteProduceConsume.class);
}
@Override
protected Task produceTask(boolean canBlock) throws InterruptedException {
Task task;
@ -504,7 +541,7 @@ public class Network {
LOGGER.debug(() -> String.format("Connection accepted from peer %s", PeerAddress.fromSocket(socketChannel.socket())));
newPeer = new Peer(socketChannel);
newPeer = new Peer(socketChannel, channelSelector);
this.connectedPeers.add(newPeer);
}
} catch (IOException e) {
@ -512,105 +549,15 @@ public class Network {
try {
socketChannel.close();
} catch (IOException ce) {
// Couldn't close?
}
return;
}
try {
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
socketChannel.configureBlocking(false);
socketChannel.register(channelSelector, SelectionKey.OP_READ);
} catch (IOException e) {
// Remove from connected peers
synchronized (this.connectedPeers) {
this.connectedPeers.remove(newPeer);
}
return;
}
this.onPeerReady(newPeer);
}
public void prunePeers() throws InterruptedException, DataException {
final Long now = NTP.getTime();
if (now == null)
return;
// Disconnect peers that are stuck during handshake
List<Peer> handshakePeers = this.getConnectedPeers();
// Disregard peers that have completed handshake or only connected recently
handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT);
for (Peer peer : handshakePeers)
peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name()));
// Prune 'old' peers from repository...
// Pruning peers isn't critical so no need to block for a repository instance.
try (final Repository repository = RepositoryManager.tryRepository()) {
if (repository == null)
return;
// Fetch all known peers
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
// 'Old' peers:
// we have attempted to connect within the last day
// we last managed to connect over a week ago
Predicate<PeerData> isNotOldPeer = peerData -> {
if (peerData.getLastAttempted() == null || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD)
return true;
if (peerData.getLastConnected() == null || peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD)
return true;
return false;
};
// Disregard peers that are NOT 'old'
peers.removeIf(isNotOldPeer);
// Don't consider already connected peers (simple address match)
Predicate<PeerData> isConnectedPeer = peerData -> {
PeerAddress peerAddress = peerData.getAddress();
return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress));
};
synchronized (this.connectedPeers) {
peers.removeIf(isConnectedPeer);
}
for (PeerData peerData : peers) {
LOGGER.debug(String.format("Deleting old peer %s from repository", peerData.getAddress().toString()));
repository.getNetworkRepository().delete(peerData.getAddress());
}
repository.saveChanges();
}
}
private final Predicate<PeerData> isSelfPeer = peerData -> {
PeerAddress peerAddress = peerData.getAddress();
return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress));
};
private final Predicate<PeerData> isConnectedPeer = peerData -> {
PeerAddress peerAddress = peerData.getAddress();
return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress));
};
private final Predicate<PeerData> isResolvedAsConnectedPeer = peerData -> {
try {
InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress();
return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress));
} catch (UnknownHostException e) {
// Can't resolve - no point even trying to connect
return true;
}
};
private Peer getConnectablePeer(final Long now) throws InterruptedException {
// We can't block here so use tryRepository(). We don't NEED to connect a new peer.
try (final Repository repository = RepositoryManager.tryRepository()) {
@ -618,7 +565,7 @@ public class Network {
return null;
// Find an address to connect to
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
List<PeerData> peers = this.getAllKnownPeers();
// Don't consider peers with recent connection failures
final long lastAttemptedThreshold = now - CONNECT_FAILURE_BACKOFF;
@ -631,14 +578,12 @@ public class Network {
peers.removeIf(isSelfPeer);
}
// Don't consider already connected peers (simple address match)
synchronized (this.connectedPeers) {
// Don't consider already connected peers (simple address match)
peers.removeIf(isConnectedPeer);
}
// Don't consider already connected peers (resolved address match)
// XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS
synchronized (this.connectedPeers) {
// Don't consider already connected peers (resolved address match)
// XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS
peers.removeIf(isResolvedAsConnectedPeer);
}
@ -647,17 +592,18 @@ public class Network {
return null;
// Pick random peer
int peerIndex = new SecureRandom().nextInt(peers.size());
int peerIndex = new Random().nextInt(peers.size());
// Pick candidate
PeerData peerData = peers.get(peerIndex);
Peer newPeer = new Peer(peerData);
// Update connection attempt info
repository.discardChanges();
peerData.setLastAttempted(now);
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
synchronized (this.allKnownPeers) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
}
return newPeer;
} catch (DataException e) {
@ -667,7 +613,7 @@ public class Network {
}
private void connectPeer(Peer newPeer) throws InterruptedException {
SocketChannel socketChannel = newPeer.connect();
SocketChannel socketChannel = newPeer.connect(this.channelSelector);
if (socketChannel == null)
return;
@ -678,15 +624,6 @@ public class Network {
this.connectedPeers.add(newPeer);
}
try {
socketChannel.register(channelSelector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
// If channel has somehow already closed then remove from connectedPeers
synchronized (this.connectedPeers) {
this.connectedPeers.remove(newPeer);
}
}
this.onPeerReady(newPeer);
}
@ -718,15 +655,21 @@ public class Network {
synchronized (this.connectedPeers) {
this.connectedPeers.remove(peer);
}
}
// If this is an inbound peer then remove from known peers list
// as remote port is not likely to be remote peer's listen port
if (!peer.isOutbound())
public void peerMisbehaved(Peer peer) {
PeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(NTP.getTime());
// Only update repository if outbound peer
if (peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().delete(peer.getPeerData().getAddress());
repository.saveChanges();
synchronized (this.allKnownPeers) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while trying to delete inbound peer %s", peer), e);
LOGGER.warn("Repository issue while updating peer synchronization info", e);
}
}
@ -828,7 +771,7 @@ public class Network {
private void onGetPeersMessage(Peer peer, Message message) {
// Send our known peers
if (!peer.sendMessage(buildPeersMessage(peer)))
if (!peer.sendMessage(this.buildPeersMessage(peer)))
peer.disconnect("failed to send peers list");
}
@ -845,7 +788,7 @@ public class Network {
// Also add peer's details
peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost()));
mergePeers(peer.toString(), peerAddresses);
opportunisticMergePeers(peer.toString(), peerAddresses);
}
private void onPingMessage(Peer peer, Message message) {
@ -875,7 +818,7 @@ public class Network {
peerV2Addresses.add(0, sendingPeerAddress);
}
mergePeers(peer.toString(), peerV2Addresses);
opportunisticMergePeers(peer.toString(), peerV2Addresses);
}
private void onPeerVerifyMessage(Peer peer, Message message) {
@ -925,8 +868,10 @@ public class Network {
// Update connection info for outbound peers only
if (peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peer.getPeerData());
repository.saveChanges();
synchronized (this.allKnownPeers) {
repository.getNetworkRepository().save(peer.getPeerData());
repository.saveChanges();
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while trying to update outbound peer %s", peer), e);
}
@ -964,77 +909,72 @@ public class Network {
/** Returns PEERS message made from peers we've connected to recently, and this node's details */
public Message buildPeersMessage(Peer peer) {
try (final Repository repository = RepositoryManager.getRepository()) {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
List<PeerData> knownPeers = this.getAllKnownPeers();
// Filter out peers that we've not connected to ever or within X milliseconds
final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD;
Predicate<PeerData> notRecentlyConnected = peerData -> {
final Long lastAttempted = peerData.getLastAttempted();
final Long lastConnected = peerData.getLastConnected();
// Filter out peers that we've not connected to ever or within X milliseconds
final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD;
Predicate<PeerData> notRecentlyConnected = peerData -> {
final Long lastAttempted = peerData.getLastAttempted();
final Long lastConnected = peerData.getLastConnected();
if (lastAttempted == null || lastConnected == null)
return true;
if (lastAttempted == null || lastConnected == null)
return true;
if (lastConnected < lastAttempted)
return true;
if (lastConnected < lastAttempted)
return true;
if (lastConnected < connectionThreshold)
return true;
if (lastConnected < connectionThreshold)
return true;
return false;
};
knownPeers.removeIf(notRecentlyConnected);
return false;
};
knownPeers.removeIf(notRecentlyConnected);
if (peer.getVersion() >= 2) {
List<PeerAddress> peerAddresses = new ArrayList<>();
if (peer.getVersion() >= 2) {
List<PeerAddress> peerAddresses = new ArrayList<>();
for (PeerData peerData : knownPeers) {
try {
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
for (PeerData peerData : knownPeers) {
try {
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
if (!peer.getIsLocal() && Peer.isAddressLocal(address))
continue;
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
if (!peer.getIsLocal() && Peer.isAddressLocal(address))
continue;
peerAddresses.add(peerData.getAddress());
} catch (UnknownHostException e) {
// Couldn't resolve hostname to IP address so discard
}
peerAddresses.add(peerData.getAddress());
} catch (UnknownHostException e) {
// Couldn't resolve hostname to IP address so discard
}
// New format PEERS_V2 message that supports hostnames, IPv6 and ports
return new PeersV2Message(peerAddresses);
} else {
// Map to socket addresses
List<InetAddress> peerAddresses = new ArrayList<>();
for (PeerData peerData : knownPeers) {
try {
// We have to resolve to literal IP address to check for IPv4-ness.
// This isn't great if hostnames have both IPv6 and IPv4 DNS entries.
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
// Legacy PEERS message doesn't support IPv6
if (address instanceof Inet6Address)
continue;
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
if (!peer.getIsLocal() && !Peer.isAddressLocal(address))
continue;
peerAddresses.add(address);
} catch (UnknownHostException e) {
// Couldn't resolve hostname to IP address so discard
}
}
// Legacy PEERS message that only sends IPv4 addresses
return new PeersMessage(peerAddresses);
}
} catch (DataException e) {
LOGGER.error("Repository issue while building PEERS message", e);
return new PeersMessage(Collections.emptyList());
// New format PEERS_V2 message that supports hostnames, IPv6 and ports
return new PeersV2Message(peerAddresses);
} else {
// Map to socket addresses
List<InetAddress> peerAddresses = new ArrayList<>();
for (PeerData peerData : knownPeers) {
try {
// We have to resolve to literal IP address to check for IPv4-ness.
// This isn't great if hostnames have both IPv6 and IPv4 DNS entries.
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
// Legacy PEERS message doesn't support IPv6
if (address instanceof Inet6Address)
continue;
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
if (!peer.getIsLocal() && !Peer.isAddressLocal(address))
continue;
peerAddresses.add(address);
} catch (UnknownHostException e) {
// Couldn't resolve hostname to IP address so discard
}
}
// Legacy PEERS message that only sends IPv4 addresses
return new PeersMessage(peerAddresses);
}
}
@ -1077,26 +1017,38 @@ public class Network {
}
public boolean forgetPeer(PeerAddress peerAddress) throws DataException {
try (final Repository repository = RepositoryManager.getRepository()) {
int numDeleted = repository.getNetworkRepository().delete(peerAddress);
repository.saveChanges();
int numDeleted;
disconnectPeer(peerAddress);
synchronized (this.allKnownPeers) {
this.allKnownPeers.removeIf(peerData -> peerData.getAddress().equals(peerAddress));
return numDeleted != 0;
try (final Repository repository = RepositoryManager.getRepository()) {
numDeleted = repository.getNetworkRepository().delete(peerAddress);
repository.saveChanges();
}
}
disconnectPeer(peerAddress);
return numDeleted != 0;
}
public int forgetAllPeers() throws DataException {
try (final Repository repository = RepositoryManager.getRepository()) {
int numDeleted = repository.getNetworkRepository().deleteAllPeers();
repository.saveChanges();
int numDeleted;
for (Peer peer : this.getConnectedPeers())
peer.disconnect("to be forgotten");
synchronized (this.allKnownPeers) {
this.allKnownPeers.clear();
return numDeleted;
try (final Repository repository = RepositoryManager.getRepository()) {
numDeleted = repository.getNetworkRepository().deleteAllPeers();
repository.saveChanges();
}
}
for (Peer peer : this.getConnectedPeers())
peer.disconnect("to be forgotten");
return numDeleted;
}
private void disconnectPeer(PeerAddress peerAddress) {
@ -1116,7 +1068,72 @@ public class Network {
// Network-wide calls
private void mergePeers(String addedBy, List<PeerAddress> peerAddresses) {
public void prunePeers() throws DataException {
final Long now = NTP.getTime();
if (now == null)
return;
// Disconnect peers that are stuck during handshake
List<Peer> handshakePeers = this.getConnectedPeers();
// Disregard peers that have completed handshake or only connected recently
handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT);
for (Peer peer : handshakePeers)
peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name()));
// Prune 'old' peers from repository...
// Pruning peers isn't critical so no need to block for a repository instance.
try (final Repository repository = RepositoryManager.tryRepository()) {
if (repository == null)
return;
synchronized (this.allKnownPeers) {
// Fetch all known peers
List<PeerData> peers = new ArrayList<>(this.allKnownPeers);
// 'Old' peers:
// We attempted to connect within the last day
// but we last managed to connect over a week ago.
Predicate<PeerData> isNotOldPeer = peerData -> {
if (peerData.getLastAttempted() == null || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD)
return true;
if (peerData.getLastConnected() == null || peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD)
return true;
return false;
};
// Disregard peers that are NOT 'old'
peers.removeIf(isNotOldPeer);
// Don't consider already connected peers (simple address match)
synchronized (this.connectedPeers) {
peers.removeIf(isConnectedPeer);
}
for (PeerData peerData : peers) {
LOGGER.debug(() -> String.format("Deleting old peer %s from repository", peerData.getAddress().toString()));
repository.getNetworkRepository().delete(peerData.getAddress());
}
repository.saveChanges();
}
}
}
public void mergePeers(String addedBy, long addedWhen, List<PeerAddress> peerAddresses) throws DataException {
mergePeersLock.lock();
try (final Repository repository = RepositoryManager.getRepository()) {
this.mergePeers(repository, addedBy, addedWhen, peerAddresses);
} finally {
mergePeersLock.unlock();
}
}
private void opportunisticMergePeers(String addedBy, List<PeerAddress> peerAddresses) {
final Long addedWhen = NTP.getTime();
if (addedWhen == null)
return;
@ -1131,27 +1148,42 @@ public class Network {
if (repository == null)
return;
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
this.mergePeers(repository, addedBy, addedWhen, peerAddresses);
// Filter out duplicates
Predicate<PeerAddress> isKnownAddress = peerAddress -> knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress));
} catch (DataException e) {
// Already logged by this.mergePeers()
}
} finally {
mergePeersLock.unlock();
}
}
private void mergePeers(Repository repository, String addedBy, long addedWhen, List<PeerAddress> peerAddresses) throws DataException {
List<PeerData> newPeers;
synchronized (this.allKnownPeers) {
for (PeerData knownPeerData : this.allKnownPeers) {
// Filter out duplicates, without resolving via DNS
Predicate<PeerAddress> isKnownAddress = peerAddress -> knownPeerData.getAddress().equals(peerAddress);
peerAddresses.removeIf(isKnownAddress);
}
repository.discardChanges();
// Add leftover peer addresses to known peers list
newPeers = peerAddresses.stream().map(peerAddress -> new PeerData(peerAddress, addedWhen, addedBy)).collect(Collectors.toList());
// Save the rest into database
for (PeerAddress peerAddress : peerAddresses) {
PeerData peerData = new PeerData(peerAddress, addedWhen, addedBy);
LOGGER.info(String.format("Adding new peer %s to repository", peerAddress));
this.allKnownPeers.addAll(newPeers);
try {
// Save new peers into database
for (PeerData peerData : newPeers) {
LOGGER.info(String.format("Adding new peer %s to repository", peerData.getAddress()));
repository.getNetworkRepository().save(peerData);
}
repository.saveChanges();
} catch (DataException e) {
LOGGER.error("Repository issue while merging peers list from remote node", e);
LOGGER.error(String.format("Repository issue while merging peers list from %s", addedBy), e);
throw e;
}
} finally {
mergePeersLock.unlock();
}
}

View File

@ -7,6 +7,8 @@ import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.SecureRandom;
import java.util.Collections;
@ -108,10 +110,10 @@ public class Peer {
}
/** Construct Peer using existing, connected socket */
public Peer(SocketChannel socketChannel) throws IOException {
public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException {
this.isOutbound = false;
this.socketChannel = socketChannel;
sharedSetup();
sharedSetup(channelSelector);
this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress());
this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
@ -254,17 +256,18 @@ public class Peer {
new SecureRandom().nextBytes(verificationCodeExpected);
}
private void sharedSetup() throws IOException {
private void sharedSetup(Selector channelSelector) throws IOException {
this.connectionTimestamp = NTP.getTime();
this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
this.socketChannel.configureBlocking(false);
this.socketChannel.register(channelSelector, SelectionKey.OP_READ);
this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC!
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
this.pendingMessages = new LinkedBlockingQueue<>();
}
public SocketChannel connect() {
LOGGER.trace(String.format("Connecting to peer %s", this));
public SocketChannel connect(Selector channelSelector) {
LOGGER.trace(() -> String.format("Connecting to peer %s", this));
try {
this.resolvedAddress = this.peerData.getAddress().toSocketAddress();
@ -272,10 +275,6 @@ public class Peer {
this.socketChannel = SocketChannel.open();
this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT);
LOGGER.debug(String.format("Connected to peer %s", this));
sharedSetup();
return socketChannel;
} catch (SocketTimeoutException e) {
LOGGER.trace(String.format("Connection timed out to peer %s", this));
return null;
@ -286,6 +285,20 @@ public class Peer {
LOGGER.trace(String.format("Connection failed to peer %s", this));
return null;
}
try {
LOGGER.debug(() -> String.format("Connected to peer %s", this));
sharedSetup(channelSelector);
return socketChannel;
} catch (IOException e) {
LOGGER.trace(String.format("Post-connection setup failed, peer %s", this));
try {
socketChannel.close();
} catch (IOException ce) {
// Failed to close?
}
return null;
}
}
/**

View File

@ -30,7 +30,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
private final Logger logger;
private final boolean isLoggerTraceEnabled;
private ExecutorService executor;
protected ExecutorService executor;
// These are volatile to prevent thread-local caching of values
// but all are updated inside synchronized blocks
@ -85,6 +85,10 @@ public abstract class ExecuteProduceConsume implements Runnable {
return snapshot;
}
protected void onSpawnFailure() {
/* Allow override in subclasses */
}
/**
* Returns a Task to be performed, possibly blocking.
*
@ -180,6 +184,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
++this.spawnFailures;
this.hasThreadPending = false;
this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId()));
this.onSpawnFailure();
}
} else {
this.logger.trace(() -> String.format("[%d] NOT spawning another thread", Thread.currentThread().getId()));

View File

@ -0,0 +1,81 @@
package org.qortal.utils;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantLock;
public abstract class ExecutorDumper {
private static final String OUR_CLASS_NAME = ExecutorDumper.class.getName();
public static void dump(ExecutorService executor, int checkDepth, Class<?> skipClass) {
if (executor instanceof ThreadPoolExecutor)
dumpThreadPoolExecutor((ThreadPoolExecutor) executor, checkDepth, skipClass);
return;
}
private static void dumpThreadPoolExecutor(ThreadPoolExecutor executor, int checkDepth, Class<?> skipClass) {
try {
Field mainLockField = executor.getClass().getDeclaredField("mainLock");
mainLockField.setAccessible(true);
Field workersField = executor.getClass().getDeclaredField("workers");
workersField.setAccessible(true);
Class<?>[] declaredClasses = executor.getClass().getDeclaredClasses();
Class<?> workerClass = null;
for (int i = 0; i < declaredClasses.length; ++i)
if (declaredClasses[i].getSimpleName().equals("Worker")) {
workerClass = declaredClasses[i];
break;
}
if (workerClass == null)
return;
Field workerThreadField = workerClass.getDeclaredField("thread");
workerThreadField.setAccessible(true);
String skipClassName = skipClass.getName();
ReentrantLock mainLock = (ReentrantLock) mainLockField.get(executor);
mainLock.lock();
try {
@SuppressWarnings("unchecked")
HashSet<Object> workers = (HashSet<Object>) workersField.get(executor);
WORKER_LOOP:
for (Object workerObj : workers) {
Thread thread = (Thread) workerThreadField.get(workerObj);
StackTraceElement[] stackTrace = thread.getStackTrace();
if (stackTrace.length == 0)
continue;
for (int d = 0; d < checkDepth; ++d) {
String stackClassName = stackTrace[d].getClassName();
if (stackClassName.equals(skipClassName) || stackClassName.equals(OUR_CLASS_NAME))
continue WORKER_LOOP;
}
System.out.println(String.format("[%d] %s:", thread.getId(), thread.getName()));
for (int d = 0; d < stackTrace.length; ++d)
System.out.println(String.format("\t\t%s.%s at %s:%d",
stackTrace[d].getClassName(), stackTrace[d].getMethodName(),
stackTrace[d].getFileName(), stackTrace[d].getLineNumber()));
}
} finally {
mainLock.unlock();
}
} catch (Exception e) {
//
}
}
}