Optimized peer lists

- Removed synchronization from connectedPeers, and replaced it with an unmodifiableList.
- Added additional immuatable caches: handshakedPeers and outboundHandshakedPeers

This should greatly reduce the amount of time spent waiting around for access to the connectedPeers array, since it is now immediately accessible without needing to obtain a lock. It also removes calls to stream() which were consuming large amounts of CPU to constantly filter the connected peers down to a list of handshaked peers.

Thanks to @catbref for these great suggestions.
This commit is contained in:
CalDescent 2022-03-04 15:14:12 +00:00
parent 84b42210f1
commit 924aa05681
5 changed files with 107 additions and 70 deletions

View File

@ -148,7 +148,8 @@ public class BlockMinter extends Thread {
} }
} }
List<Peer> peers = Network.getInstance().getHandshakedPeers(); // Needs a mutable copy of the unmodifiableList
List<Peer> peers = new ArrayList<>(Network.getInstance().getHandshakedPeers());
BlockData lastBlockData = blockRepository.getLastBlock(); BlockData lastBlockData = blockRepository.getLastBlock();
// Disregard peers that have "misbehaved" recently // Disregard peers that have "misbehaved" recently

View File

@ -2108,7 +2108,8 @@ public class Controller extends Thread {
if (minLatestBlockTimestamp == null) if (minLatestBlockTimestamp == null)
return null; return null;
List<Peer> peers = Network.getInstance().getHandshakedPeers(); // Needs a mutable copy of the unmodifiableList
List<Peer> peers = new ArrayList<>(Network.getInstance().getHandshakedPeers());
// Filter out unsuitable peers // Filter out unsuitable peers
Iterator<Peer> iterator = peers.iterator(); Iterator<Peer> iterator = peers.iterator();
@ -2157,7 +2158,8 @@ public class Controller extends Thread {
if (latestBlockData == null || latestBlockData.getTimestamp() < minLatestBlockTimestamp) if (latestBlockData == null || latestBlockData.getTimestamp() < minLatestBlockTimestamp)
return false; return false;
List<Peer> peers = Network.getInstance().getHandshakedPeers(); // Needs a mutable copy of the unmodifiableList
List<Peer> peers = new ArrayList<>(Network.getInstance().getHandshakedPeers());
if (peers == null) if (peers == null)
return false; return false;

View File

@ -195,7 +195,8 @@ public class Synchronizer extends Thread {
if (this.isSynchronizing) if (this.isSynchronizing)
return true; return true;
List<Peer> peers = Network.getInstance().getHandshakedPeers(); // Needs a mutable copy of the unmodifiableList
List<Peer> peers = new ArrayList<>(Network.getInstance().getHandshakedPeers());
// Disregard peers that have "misbehaved" recently // Disregard peers that have "misbehaved" recently
peers.removeIf(Controller.hasMisbehaved); peers.removeIf(Controller.hasMisbehaved);
@ -211,7 +212,8 @@ public class Synchronizer extends Thread {
checkRecoveryModeForPeers(peers); checkRecoveryModeForPeers(peers);
if (recoveryMode) { if (recoveryMode) {
peers = Network.getInstance().getHandshakedPeers(); // Needs a mutable copy of the unmodifiableList
peers = new ArrayList<>(Network.getInstance().getHandshakedPeers());
peers.removeIf(Controller.hasOnlyGenesisBlock); peers.removeIf(Controller.hasOnlyGenesisBlock);
peers.removeIf(Controller.hasMisbehaved); peers.removeIf(Controller.hasMisbehaved);
peers.removeIf(Controller.hasOldVersion); peers.removeIf(Controller.hasOldVersion);

View File

@ -93,7 +93,8 @@ public class ArbitraryDataManager extends Thread {
continue; continue;
} }
List<Peer> peers = Network.getInstance().getHandshakedPeers(); // Needs a mutable copy of the unmodifiableList
List<Peer> peers = new ArrayList<>(Network.getInstance().getHandshakedPeers());
// Disregard peers that have "misbehaved" recently // Disregard peers that have "misbehaved" recently
peers.removeIf(Controller.hasMisbehaved); peers.removeIf(Controller.hasMisbehaved);

View File

@ -100,7 +100,9 @@ public class Network {
private long nextDisconnectionCheck = 0L; private long nextDisconnectionCheck = 0L;
private final List<PeerData> allKnownPeers = new ArrayList<>(); private final List<PeerData> allKnownPeers = new ArrayList<>();
private final List<Peer> connectedPeers = new ArrayList<>(); private List<Peer> connectedPeers = Collections.unmodifiableList(new ArrayList<>());
private List<Peer> handshakedPeers = Collections.unmodifiableList(new ArrayList<>());
private List<Peer> outboundHandshakedPeers = Collections.unmodifiableList(new ArrayList<>());
private final List<PeerAddress> selfPeers = new ArrayList<>(); private final List<PeerAddress> selfPeers = new ArrayList<>();
private final ExecuteProduceConsume networkEPC; private final ExecuteProduceConsume networkEPC;
@ -237,9 +239,23 @@ public class Network {
} }
public List<Peer> getConnectedPeers() { public List<Peer> getConnectedPeers() {
synchronized (this.connectedPeers) { return this.connectedPeers;
return new ArrayList<>(this.connectedPeers); }
}
public void addConnectedPeer(Peer peer) {
List<Peer> mutableConnectedPeers = new ArrayList<>(this.connectedPeers);
mutableConnectedPeers.add(peer);
this.connectedPeers = Collections.unmodifiableList(mutableConnectedPeers);
}
public void removeConnectedPeer(Peer peer) {
// Firstly remove from handshaked peers
this.removeHandshakedPeer(peer);
// Now remove from connected peers
List<Peer> mutableConnectedPeers = new ArrayList<>(this.connectedPeers);
mutableConnectedPeers.remove(peer);
this.connectedPeers = Collections.unmodifiableList(mutableConnectedPeers);
} }
public List<PeerAddress> getSelfPeers() { public List<PeerAddress> getSelfPeers() {
@ -274,13 +290,11 @@ public class Network {
} }
// Check if we're already connected to and handshaked with this peer // Check if we're already connected to and handshaked with this peer
Peer connectedPeer = null; Peer connectedPeer = this.connectedPeers.stream()
synchronized (this.connectedPeers) {
connectedPeer = this.connectedPeers.stream()
.filter(p -> p.getPeerData().getAddress().equals(peerAddress)) .filter(p -> p.getPeerData().getAddress().equals(peerAddress))
.findFirst() .findFirst()
.orElse(null); .orElse(null);
}
boolean isConnected = (connectedPeer != null); boolean isConnected = (connectedPeer != null);
boolean isHandshaked = this.getHandshakedPeers().stream() boolean isHandshaked = this.getHandshakedPeers().stream()
@ -328,10 +342,28 @@ public class Network {
* Returns list of connected peers that have completed handshaking. * Returns list of connected peers that have completed handshaking.
*/ */
public List<Peer> getHandshakedPeers() { public List<Peer> getHandshakedPeers() {
synchronized (this.connectedPeers) { return this.handshakedPeers;
return this.connectedPeers.stream() }
.filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED)
.collect(Collectors.toList()); public void addHandshakedPeer(Peer peer) {
List<Peer> mutableHandshakedPeers = new ArrayList<>(this.handshakedPeers);
mutableHandshakedPeers.add(peer);
this.handshakedPeers = Collections.unmodifiableList(mutableHandshakedPeers);
// Also add to outbound handshaked peers cache
if (peer.isOutbound()) {
this.addOutboundHandshakedPeer(peer);
}
}
public void removeHandshakedPeer(Peer peer) {
List<Peer> mutableHandshakedPeers = new ArrayList<>(this.handshakedPeers);
mutableHandshakedPeers.remove(peer);
this.handshakedPeers = Collections.unmodifiableList(mutableHandshakedPeers);
// Also remove from outbound handshaked peers cache
if (peer.isOutbound()) {
this.removeOutboundHandshakedPeer(peer);
} }
} }
@ -339,23 +371,36 @@ public class Network {
* Returns list of peers we connected to that have completed handshaking. * Returns list of peers we connected to that have completed handshaking.
*/ */
public List<Peer> getOutboundHandshakedPeers() { public List<Peer> getOutboundHandshakedPeers() {
synchronized (this.connectedPeers) { return this.outboundHandshakedPeers;
return this.connectedPeers.stream() }
.filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED)
.collect(Collectors.toList()); public void addOutboundHandshakedPeer(Peer peer) {
if (!peer.isOutbound()) {
return;
} }
List<Peer> mutableOutboundHandshakedPeers = new ArrayList<>(this.outboundHandshakedPeers);
mutableOutboundHandshakedPeers.add(peer);
this.outboundHandshakedPeers = Collections.unmodifiableList(mutableOutboundHandshakedPeers);
}
public void removeOutboundHandshakedPeer(Peer peer) {
if (!peer.isOutbound()) {
return;
}
List<Peer> mutableOutboundHandshakedPeers = new ArrayList<>(this.outboundHandshakedPeers);
mutableOutboundHandshakedPeers.remove(peer);
this.outboundHandshakedPeers = Collections.unmodifiableList(mutableOutboundHandshakedPeers);
} }
/** /**
* Returns first peer that has completed handshaking and has matching public key. * Returns first peer that has completed handshaking and has matching public key.
*/ */
public Peer getHandshakedPeerWithPublicKey(byte[] publicKey) { public Peer getHandshakedPeerWithPublicKey(byte[] publicKey) {
synchronized (this.connectedPeers) { return this.connectedPeers.stream()
return this.connectedPeers.stream() .filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED
.filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED && Arrays.equals(peer.getPeersPublicKey(), publicKey))
&& Arrays.equals(peer.getPeersPublicKey(), publicKey)) .findFirst().orElse(null);
.findFirst().orElse(null);
}
} }
// Peer list filters // Peer list filters
@ -368,17 +413,11 @@ public class Network {
return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress)); return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress));
}; };
/**
* Must be inside <tt>synchronized (this.connectedPeers) {...}</tt>
*/
private final Predicate<PeerData> isConnectedPeer = peerData -> { private final Predicate<PeerData> isConnectedPeer = peerData -> {
PeerAddress peerAddress = peerData.getAddress(); PeerAddress peerAddress = peerData.getAddress();
return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress));
}; };
/**
* Must be inside <tt>synchronized (this.connectedPeers) {...}</tt>
*/
private final Predicate<PeerData> isResolvedAsConnectedPeer = peerData -> { private final Predicate<PeerData> isResolvedAsConnectedPeer = peerData -> {
try { try {
InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress(); InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress();
@ -641,19 +680,18 @@ public class Network {
return; return;
} }
synchronized (this.connectedPeers) { if (connectedPeers.size() >= maxPeers) {
if (connectedPeers.size() >= maxPeers) { // We have enough peers
// We have enough peers LOGGER.debug("Connection discarded from peer {} because the server is full", address);
LOGGER.debug("Connection discarded from peer {} because the server is full", address); socketChannel.close();
socketChannel.close(); return;
return;
}
LOGGER.debug("Connection accepted from peer {}", address);
newPeer = new Peer(socketChannel, channelSelector);
this.connectedPeers.add(newPeer);
} }
LOGGER.debug("Connection accepted from peer {}", address);
newPeer = new Peer(socketChannel, channelSelector);
this.addConnectedPeer(newPeer);
} catch (IOException e) { } catch (IOException e) {
if (socketChannel.isOpen()) { if (socketChannel.isOpen()) {
try { try {
@ -701,16 +739,14 @@ public class Network {
peers.removeIf(isSelfPeer); peers.removeIf(isSelfPeer);
} }
synchronized (this.connectedPeers) { // Don't consider already connected peers (simple address match)
// Don't consider already connected peers (simple address match) peers.removeIf(isConnectedPeer);
peers.removeIf(isConnectedPeer);
// Don't consider already connected peers (resolved address match) // Don't consider already connected peers (resolved address match)
// XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS // XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS
peers.removeIf(isResolvedAsConnectedPeer); peers.removeIf(isResolvedAsConnectedPeer);
this.checkLongestConnection(now); this.checkLongestConnection(now);
}
// Any left? // Any left?
if (peers.isEmpty()) { if (peers.isEmpty()) {
@ -748,21 +784,16 @@ public class Network {
return false; return false;
} }
synchronized (this.connectedPeers) { this.addConnectedPeer(newPeer);
this.connectedPeers.add(newPeer);
}
this.onPeerReady(newPeer); this.onPeerReady(newPeer);
return true; return true;
} }
private Peer getPeerFromChannel(SocketChannel socketChannel) { private Peer getPeerFromChannel(SocketChannel socketChannel) {
synchronized (this.connectedPeers) { for (Peer peer : this.connectedPeers) {
for (Peer peer : this.connectedPeers) { if (peer.getSocketChannel() == socketChannel) {
if (peer.getSocketChannel() == socketChannel) { return peer;
return peer;
}
} }
} }
@ -826,9 +857,7 @@ public class Network {
LOGGER.debug("[{}] Failed to connect to peer {}", peer.getPeerConnectionId(), peer); LOGGER.debug("[{}] Failed to connect to peer {}", peer.getPeerConnectionId(), peer);
} }
synchronized (this.connectedPeers) { this.removeConnectedPeer(peer);
this.connectedPeers.remove(peer);
}
} }
public void peerMisbehaved(Peer peer) { public void peerMisbehaved(Peer peer) {
@ -989,6 +1018,9 @@ public class Network {
return; return;
} }
// Add to handshaked peers cache
this.addHandshakedPeer(peer);
// Make a note that we've successfully completed handshake (and when) // Make a note that we've successfully completed handshake (and when)
peer.getPeerData().setLastConnected(NTP.getTime()); peer.getPeerData().setLastConnected(NTP.getTime());
@ -1273,7 +1305,8 @@ public class Network {
} }
// Disconnect peers that are stuck during handshake // Disconnect peers that are stuck during handshake
List<Peer> handshakePeers = this.getConnectedPeers(); // Needs a mutable copy of the unmodifiableList
List<Peer> handshakePeers = new ArrayList<>(this.getConnectedPeers());
// Disregard peers that have completed handshake or only connected recently // Disregard peers that have completed handshake or only connected recently
handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED
@ -1315,9 +1348,7 @@ public class Network {
peers.removeIf(isNotOldPeer); peers.removeIf(isNotOldPeer);
// Don't consider already connected peers (simple address match) // Don't consider already connected peers (simple address match)
synchronized (this.connectedPeers) { peers.removeIf(isConnectedPeer);
peers.removeIf(isConnectedPeer);
}
for (PeerData peerData : peers) { for (PeerData peerData : peers) {
LOGGER.debug("Deleting old peer {} from repository", peerData.getAddress().toString()); LOGGER.debug("Deleting old peer {} from repository", peerData.getAddress().toString());