diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 9b2690a6..72a407f0 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -24,6 +24,7 @@ import com.google.bitcoin.utils.Locks; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.util.concurrent.*; +import net.jcip.annotations.GuardedBy; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.*; import org.jboss.netty.channel.group.ChannelGroup; @@ -77,15 +78,15 @@ public class PeerGroup extends AbstractIdleService { // Addresses to try to connect to, excluding active peers. private final List inactives; // Currently active peers. This is an ordered list rather than a set to make unit tests predictable. - private final List peers; + @GuardedBy("lock") private final List peers; // Currently connecting peers. - private final List pendingPeers; + @GuardedBy("lock") private final List pendingPeers; private final ChannelGroup channels; // The peer that has been selected for the purposes of downloading announced data. - private Peer downloadPeer; + @GuardedBy("lock") private Peer downloadPeer; // Callback for events related to chain download - private PeerEventListener downloadListener; + @GuardedBy("lock") private PeerEventListener downloadListener; // Callbacks for events related to peer connection/disconnection private final CopyOnWriteArrayList peerEventListeners; // Peer discovery sources, will be polled occasionally if there aren't enough inactives. @@ -97,7 +98,7 @@ public class PeerGroup extends AbstractIdleService { private final MemoryPool memoryPool; // How many connections we want to have open at the current time. If we lose connections, we'll try opening more // until we reach this count. - private int maxConnections; + @GuardedBy("lock") private int maxConnections; // Runs a background thread that we use for scheduling pings to our peers, so we can measure their performance // and network latency. We ping peers every pingIntervalMsec milliseconds. @@ -136,15 +137,11 @@ public class PeerGroup extends AbstractIdleService { private class PeerStartupListener implements Peer.PeerLifecycleListener { public void onPeerConnected(Peer peer) { - pendingPeers.remove(peer); - peers.add(peer); handleNewPeer(peer); } public void onPeerDisconnected(Peer peer) { // The channel will be automatically removed from channels. - pendingPeers.remove(peer); - peers.remove(peer); handlePeerDeath(peer); } } @@ -228,8 +225,8 @@ public class PeerGroup extends AbstractIdleService { } inactives = Collections.synchronizedList(new ArrayList()); - peers = Collections.synchronizedList(new ArrayList()); - pendingPeers = Collections.synchronizedList(new ArrayList()); + peers = new ArrayList(); + pendingPeers = new ArrayList(); channels = new DefaultChannelGroup(); peerDiscoverers = new CopyOnWriteArraySet(); peerEventListeners = new CopyOnWriteArrayList(); @@ -449,8 +446,11 @@ public class PeerGroup extends AbstractIdleService { * use numConnectedPeers(). */ public List getConnectedPeers() { - synchronized (peers) { + lock.lock(); + try { return new ArrayList(peers); + } finally { + lock.unlock(); } } @@ -458,8 +458,11 @@ public class PeerGroup extends AbstractIdleService { * Returns a list containing Peers that did not complete connection yet. */ public List getPendingPeers() { - synchronized (pendingPeers) { + lock.lock(); + try { return new ArrayList(pendingPeers); + } finally { + lock.unlock(); } } @@ -563,6 +566,7 @@ public class PeerGroup extends AbstractIdleService { @Override protected void shutDown() throws Exception { // This is run on a separate thread by the AbstractIdleService implementation. + pingTimer.cancel(); // Blocking close of all sockets. TODO: there is a race condition here, for the solution see: // http://biasedbit.com/netty-releaseexternalresources-hangs/ channels.close().await(); @@ -571,7 +575,6 @@ public class PeerGroup extends AbstractIdleService { for (PeerDiscovery peerDiscovery : peerDiscoverers) { peerDiscovery.shutdown(); } - pingTimer.cancel(); } /** @@ -746,10 +749,8 @@ public class PeerGroup extends AbstractIdleService { // TODO: be more nuanced about which peer to download from. We can also try // downloading from multiple peers and handle the case when a new peer comes along // with a longer chain after we thought we were done. - synchronized (peers) { - if (!peers.isEmpty()) { - startBlockChainDownloadFromPeer(peers.iterator().next()); - } + if (!peers.isEmpty()) { + startBlockChainDownloadFromPeer(peers.iterator().next()); } } finally { lock.unlock(); @@ -773,11 +774,15 @@ public class PeerGroup extends AbstractIdleService { } protected void handleNewPeer(final Peer peer) { + int newSize = -1; lock.lock(); try { // Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point. // Sets up the newly connected peer so it can do everything it needs to. log.info("{}: New peer", peer); + pendingPeers.remove(peer); + peers.add(peer); + newSize = peers.size(); // Give the peer a filter that can be used to probabilistically drop transactions that // aren't relevant to our wallet. We may still receive some false positives, which is // OK because it helps improve wallet privacy. Old nodes will just ignore the message. @@ -816,11 +821,11 @@ public class PeerGroup extends AbstractIdleService { peer.addEventListener(listener); } setupPingingForNewPeer(peer); - for (PeerEventListener listener : peerEventListeners) - listener.onPeerConnected(peer, peers.size()); } finally { lock.unlock(); } + for (PeerEventListener listener : peerEventListeners) + listener.onPeerConnected(peer, newSize); } private void setupPingingForNewPeer(final Peer peer) { @@ -868,8 +873,8 @@ public class PeerGroup extends AbstractIdleService { } /** Returns true if at least one peer received an inv. */ - private synchronized boolean announcePendingWalletTransactions(List announceWallets, - List announceToPeers) { + private boolean announcePendingWalletTransactions(List announceWallets, + List announceToPeers) { checkState(lock.isLocked()); // Build up an inv announcing the hashes of all pending transactions in all our wallets. InventoryMessage inv = new InventoryMessage(params); @@ -966,25 +971,21 @@ public class PeerGroup extends AbstractIdleService { // we synchronize only the parts that need it. // Peer deaths can occur during startup if a connect attempt after peer discovery aborts immediately. - if (state() != State.RUNNING && state() != State.STARTING) return; + final State state = state(); + if (state != State.RUNNING && state != State.STARTING) return; - checkArgument(!peers.contains(peer)); - final Peer downloadPeer; - final PeerEventListener downloadListener; + int numPeers = 0; + int numConnectedPeers = 0; lock.lock(); try { - downloadPeer = this.downloadPeer; - downloadListener = this.downloadListener; - } finally { - lock.unlock(); - } - if (peer == downloadPeer) { - log.info("Download peer died. Picking a new one."); - setDownloadPeer(null); - // Pick a new one and possibly tell it to download the chain. - // TODO: Fix lock inversion here. - synchronized (peers) { - Peer newDownloadPeer = selectDownloadPeer(peers); + pendingPeers.remove(peer); + peers.remove(peer); + log.info("{}: Peer died", peer.getAddress()); + if (peer == downloadPeer) { + log.info("Download peer died. Picking a new one."); + setDownloadPeer(null); + // Pick a new one and possibly tell it to download the chain. + final Peer newDownloadPeer = selectDownloadPeer(peers); if (newDownloadPeer != null) { setDownloadPeer(newDownloadPeer); if (downloadListener != null) { @@ -992,10 +993,13 @@ public class PeerGroup extends AbstractIdleService { } } } + numPeers = peers.size() + pendingPeers.size(); + numConnectedPeers = peers.size(); + } finally { + lock.unlock(); } // Replace this peer with a new one to keep our connection count up, if necessary. - // The calculation is a little racy. - if (peers.size() + pendingPeers.size() < getMaxConnections()) { + if (numPeers < getMaxConnections()) { try { connectToAnyPeer(); } catch (PeerDiscoveryException e) { @@ -1007,7 +1011,7 @@ public class PeerGroup extends AbstractIdleService { peer.removeWallet(wallet); } for (PeerEventListener listener : peerEventListeners) { - listener.onPeerDisconnected(peer, peers.size()); + listener.onPeerDisconnected(peer, numConnectedPeers); peer.removeEventListener(listener); } } @@ -1035,8 +1039,13 @@ public class PeerGroup extends AbstractIdleService { * @return a future that will be triggered when the number of connected peers >= numPeers */ public ListenableFuture waitForPeers(final int numPeers) { - if (peers.size() >= numPeers) { - return Futures.immediateFuture(this); + lock.lock(); + try { + if (peers.size() >= numPeers) { + return Futures.immediateFuture(this); + } + } finally { + lock.unlock(); } final SettableFuture future = SettableFuture.create(); addEventListener(new AbstractPeerEventListener() { @@ -1124,8 +1133,14 @@ public class PeerGroup extends AbstractIdleService { // This can be called immediately if we already have enough. Otherwise it'll be called from a peer // thread. - // Pick a peer to be the lucky recipient of our tx. - final Peer somePeer = peers.get(0); + // Pick a peer to be the lucky recipient of our tx. This can race if the peer we pick dies immediately. + final Peer somePeer; + lock.lock(); + try { + somePeer = peers.get(0); + } finally { + lock.unlock(); + } log.info("broadcastTransaction: Enough peers, adding {} to the memory pool and sending to {}", tx.getHashAsString(), somePeer); final Transaction pinnedTx = memoryPool.seen(tx, somePeer.getAddress()); @@ -1248,13 +1263,19 @@ public class PeerGroup extends AbstractIdleService { * If no peers are connected, returns zero. */ public int getMostCommonChainHeight() { - // Copy the peers list so we can calculate on it without violating lock ordering. - ArrayList peers; - synchronized (this.peers) { - peers = new ArrayList(this.peers); + lock.lock(); + try { + return getMostCommonChainHeight(this.peers); + } finally { + lock.unlock(); } - if (peers.isEmpty()) - return 0; + } + + /** + * Returns most commonly reported chain height from the given list of {@link Peer}s. + * If multiple heights are tied, the highest is returned. If no peers are connected, returns zero. + */ + public static int getMostCommonChainHeight(final List peers) { int s = peers.size(); int[] heights = new int[s]; int[] counts = new int[s]; @@ -1269,7 +1290,7 @@ public class PeerGroup extends AbstractIdleService { break; } else if (heights[cursor] == 0) { // A new height we didn't see before. - Preconditions.checkState(counts[cursor] == 0); + checkState(counts[cursor] == 0); heights[cursor] = h; counts[cursor] = 1; maxCount = Math.max(maxCount, 1); @@ -1296,19 +1317,15 @@ public class PeerGroup extends AbstractIdleService { } /** Given a list of Peers, return a Peer to be used as the download peer. */ - protected Peer selectDownloadPeer(List origPeers) { + protected static Peer selectDownloadPeer(List peers) { // Characteristics to select for in order of importance: // - Chain height is reasonable (majority of nodes) // - High enough protocol version for the features we want (but we'll settle for less) // - Ping time. - List peers; - synchronized (origPeers) { - peers = new ArrayList(origPeers); - } if (peers.isEmpty()) return null; // Make sure we don't select a peer that is behind/synchronizing itself. - int mostCommonChainHeight = getMostCommonChainHeight(); + int mostCommonChainHeight = getMostCommonChainHeight(peers); List candidates = new ArrayList(); for (Peer peer : peers) { if (peer.getBestHeight() == mostCommonChainHeight) candidates.add(peer);