3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-31 23:32:16 +00:00

More changes/simplifications to PeerGroup locking.

This commit is contained in:
Mike Hearn 2013-03-27 17:44:51 +01:00
parent 61c8c07468
commit 3c6f435fde

View File

@ -24,6 +24,7 @@ import com.google.bitcoin.utils.Locks;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.*;
import net.jcip.annotations.GuardedBy;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroup;
@ -77,15 +78,15 @@ public class PeerGroup extends AbstractIdleService {
// Addresses to try to connect to, excluding active peers. // Addresses to try to connect to, excluding active peers.
private final List<PeerAddress> inactives; private final List<PeerAddress> inactives;
// Currently active peers. This is an ordered list rather than a set to make unit tests predictable. // Currently active peers. This is an ordered list rather than a set to make unit tests predictable.
private final List<Peer> peers; @GuardedBy("lock") private final List<Peer> peers;
// Currently connecting peers. // Currently connecting peers.
private final List<Peer> pendingPeers; @GuardedBy("lock") private final List<Peer> pendingPeers;
private final ChannelGroup channels; private final ChannelGroup channels;
// The peer that has been selected for the purposes of downloading announced data. // The peer that has been selected for the purposes of downloading announced data.
private Peer downloadPeer; @GuardedBy("lock") private Peer downloadPeer;
// Callback for events related to chain download // Callback for events related to chain download
private PeerEventListener downloadListener; @GuardedBy("lock") private PeerEventListener downloadListener;
// Callbacks for events related to peer connection/disconnection // Callbacks for events related to peer connection/disconnection
private final CopyOnWriteArrayList<PeerEventListener> peerEventListeners; private final CopyOnWriteArrayList<PeerEventListener> peerEventListeners;
// Peer discovery sources, will be polled occasionally if there aren't enough inactives. // Peer discovery sources, will be polled occasionally if there aren't enough inactives.
@ -97,7 +98,7 @@ public class PeerGroup extends AbstractIdleService {
private final MemoryPool memoryPool; private final MemoryPool memoryPool;
// How many connections we want to have open at the current time. If we lose connections, we'll try opening more // How many connections we want to have open at the current time. If we lose connections, we'll try opening more
// until we reach this count. // until we reach this count.
private int maxConnections; @GuardedBy("lock") private int maxConnections;
// Runs a background thread that we use for scheduling pings to our peers, so we can measure their performance // Runs a background thread that we use for scheduling pings to our peers, so we can measure their performance
// and network latency. We ping peers every pingIntervalMsec milliseconds. // and network latency. We ping peers every pingIntervalMsec milliseconds.
@ -136,15 +137,11 @@ public class PeerGroup extends AbstractIdleService {
private class PeerStartupListener implements Peer.PeerLifecycleListener { private class PeerStartupListener implements Peer.PeerLifecycleListener {
public void onPeerConnected(Peer peer) { public void onPeerConnected(Peer peer) {
pendingPeers.remove(peer);
peers.add(peer);
handleNewPeer(peer); handleNewPeer(peer);
} }
public void onPeerDisconnected(Peer peer) { public void onPeerDisconnected(Peer peer) {
// The channel will be automatically removed from channels. // The channel will be automatically removed from channels.
pendingPeers.remove(peer);
peers.remove(peer);
handlePeerDeath(peer); handlePeerDeath(peer);
} }
} }
@ -228,8 +225,8 @@ public class PeerGroup extends AbstractIdleService {
} }
inactives = Collections.synchronizedList(new ArrayList<PeerAddress>()); inactives = Collections.synchronizedList(new ArrayList<PeerAddress>());
peers = Collections.synchronizedList(new ArrayList<Peer>()); peers = new ArrayList<Peer>();
pendingPeers = Collections.synchronizedList(new ArrayList<Peer>()); pendingPeers = new ArrayList<Peer>();
channels = new DefaultChannelGroup(); channels = new DefaultChannelGroup();
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>(); peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerEventListeners = new CopyOnWriteArrayList<PeerEventListener>(); peerEventListeners = new CopyOnWriteArrayList<PeerEventListener>();
@ -449,8 +446,11 @@ public class PeerGroup extends AbstractIdleService {
* use numConnectedPeers(). * use numConnectedPeers().
*/ */
public List<Peer> getConnectedPeers() { public List<Peer> getConnectedPeers() {
synchronized (peers) { lock.lock();
try {
return new ArrayList<Peer>(peers); return new ArrayList<Peer>(peers);
} finally {
lock.unlock();
} }
} }
@ -458,8 +458,11 @@ public class PeerGroup extends AbstractIdleService {
* Returns a list containing Peers that did not complete connection yet. * Returns a list containing Peers that did not complete connection yet.
*/ */
public List<Peer> getPendingPeers() { public List<Peer> getPendingPeers() {
synchronized (pendingPeers) { lock.lock();
try {
return new ArrayList<Peer>(pendingPeers); return new ArrayList<Peer>(pendingPeers);
} finally {
lock.unlock();
} }
} }
@ -563,6 +566,7 @@ public class PeerGroup extends AbstractIdleService {
@Override @Override
protected void shutDown() throws Exception { protected void shutDown() throws Exception {
// This is run on a separate thread by the AbstractIdleService implementation. // This is run on a separate thread by the AbstractIdleService implementation.
pingTimer.cancel();
// Blocking close of all sockets. TODO: there is a race condition here, for the solution see: // Blocking close of all sockets. TODO: there is a race condition here, for the solution see:
// http://biasedbit.com/netty-releaseexternalresources-hangs/ // http://biasedbit.com/netty-releaseexternalresources-hangs/
channels.close().await(); channels.close().await();
@ -571,7 +575,6 @@ public class PeerGroup extends AbstractIdleService {
for (PeerDiscovery peerDiscovery : peerDiscoverers) { for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown(); peerDiscovery.shutdown();
} }
pingTimer.cancel();
} }
/** /**
@ -746,10 +749,8 @@ public class PeerGroup extends AbstractIdleService {
// TODO: be more nuanced about which peer to download from. We can also try // TODO: be more nuanced about which peer to download from. We can also try
// downloading from multiple peers and handle the case when a new peer comes along // downloading from multiple peers and handle the case when a new peer comes along
// with a longer chain after we thought we were done. // with a longer chain after we thought we were done.
synchronized (peers) { if (!peers.isEmpty()) {
if (!peers.isEmpty()) { startBlockChainDownloadFromPeer(peers.iterator().next());
startBlockChainDownloadFromPeer(peers.iterator().next());
}
} }
} finally { } finally {
lock.unlock(); lock.unlock();
@ -773,11 +774,15 @@ public class PeerGroup extends AbstractIdleService {
} }
protected void handleNewPeer(final Peer peer) { protected void handleNewPeer(final Peer peer) {
int newSize = -1;
lock.lock(); lock.lock();
try { try {
// Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point. // Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point.
// Sets up the newly connected peer so it can do everything it needs to. // Sets up the newly connected peer so it can do everything it needs to.
log.info("{}: New peer", peer); log.info("{}: New peer", peer);
pendingPeers.remove(peer);
peers.add(peer);
newSize = peers.size();
// Give the peer a filter that can be used to probabilistically drop transactions that // Give the peer a filter that can be used to probabilistically drop transactions that
// aren't relevant to our wallet. We may still receive some false positives, which is // aren't relevant to our wallet. We may still receive some false positives, which is
// OK because it helps improve wallet privacy. Old nodes will just ignore the message. // OK because it helps improve wallet privacy. Old nodes will just ignore the message.
@ -816,11 +821,11 @@ public class PeerGroup extends AbstractIdleService {
peer.addEventListener(listener); peer.addEventListener(listener);
} }
setupPingingForNewPeer(peer); setupPingingForNewPeer(peer);
for (PeerEventListener listener : peerEventListeners)
listener.onPeerConnected(peer, peers.size());
} finally { } finally {
lock.unlock(); lock.unlock();
} }
for (PeerEventListener listener : peerEventListeners)
listener.onPeerConnected(peer, newSize);
} }
private void setupPingingForNewPeer(final Peer peer) { private void setupPingingForNewPeer(final Peer peer) {
@ -868,8 +873,8 @@ public class PeerGroup extends AbstractIdleService {
} }
/** Returns true if at least one peer received an inv. */ /** Returns true if at least one peer received an inv. */
private synchronized boolean announcePendingWalletTransactions(List<Wallet> announceWallets, private boolean announcePendingWalletTransactions(List<Wallet> announceWallets,
List<Peer> announceToPeers) { List<Peer> announceToPeers) {
checkState(lock.isLocked()); checkState(lock.isLocked());
// Build up an inv announcing the hashes of all pending transactions in all our wallets. // Build up an inv announcing the hashes of all pending transactions in all our wallets.
InventoryMessage inv = new InventoryMessage(params); InventoryMessage inv = new InventoryMessage(params);
@ -966,25 +971,21 @@ public class PeerGroup extends AbstractIdleService {
// we synchronize only the parts that need it. // we synchronize only the parts that need it.
// Peer deaths can occur during startup if a connect attempt after peer discovery aborts immediately. // Peer deaths can occur during startup if a connect attempt after peer discovery aborts immediately.
if (state() != State.RUNNING && state() != State.STARTING) return; final State state = state();
if (state != State.RUNNING && state != State.STARTING) return;
checkArgument(!peers.contains(peer)); int numPeers = 0;
final Peer downloadPeer; int numConnectedPeers = 0;
final PeerEventListener downloadListener;
lock.lock(); lock.lock();
try { try {
downloadPeer = this.downloadPeer; pendingPeers.remove(peer);
downloadListener = this.downloadListener; peers.remove(peer);
} finally { log.info("{}: Peer died", peer.getAddress());
lock.unlock(); if (peer == downloadPeer) {
} log.info("Download peer died. Picking a new one.");
if (peer == downloadPeer) { setDownloadPeer(null);
log.info("Download peer died. Picking a new one."); // Pick a new one and possibly tell it to download the chain.
setDownloadPeer(null); final Peer newDownloadPeer = selectDownloadPeer(peers);
// Pick a new one and possibly tell it to download the chain.
// TODO: Fix lock inversion here.
synchronized (peers) {
Peer newDownloadPeer = selectDownloadPeer(peers);
if (newDownloadPeer != null) { if (newDownloadPeer != null) {
setDownloadPeer(newDownloadPeer); setDownloadPeer(newDownloadPeer);
if (downloadListener != null) { if (downloadListener != null) {
@ -992,10 +993,13 @@ public class PeerGroup extends AbstractIdleService {
} }
} }
} }
numPeers = peers.size() + pendingPeers.size();
numConnectedPeers = peers.size();
} finally {
lock.unlock();
} }
// Replace this peer with a new one to keep our connection count up, if necessary. // Replace this peer with a new one to keep our connection count up, if necessary.
// The calculation is a little racy. if (numPeers < getMaxConnections()) {
if (peers.size() + pendingPeers.size() < getMaxConnections()) {
try { try {
connectToAnyPeer(); connectToAnyPeer();
} catch (PeerDiscoveryException e) { } catch (PeerDiscoveryException e) {
@ -1007,7 +1011,7 @@ public class PeerGroup extends AbstractIdleService {
peer.removeWallet(wallet); peer.removeWallet(wallet);
} }
for (PeerEventListener listener : peerEventListeners) { for (PeerEventListener listener : peerEventListeners) {
listener.onPeerDisconnected(peer, peers.size()); listener.onPeerDisconnected(peer, numConnectedPeers);
peer.removeEventListener(listener); peer.removeEventListener(listener);
} }
} }
@ -1035,8 +1039,13 @@ public class PeerGroup extends AbstractIdleService {
* @return a future that will be triggered when the number of connected peers >= numPeers * @return a future that will be triggered when the number of connected peers >= numPeers
*/ */
public ListenableFuture<PeerGroup> waitForPeers(final int numPeers) { public ListenableFuture<PeerGroup> waitForPeers(final int numPeers) {
if (peers.size() >= numPeers) { lock.lock();
return Futures.immediateFuture(this); try {
if (peers.size() >= numPeers) {
return Futures.immediateFuture(this);
}
} finally {
lock.unlock();
} }
final SettableFuture<PeerGroup> future = SettableFuture.create(); final SettableFuture<PeerGroup> future = SettableFuture.create();
addEventListener(new AbstractPeerEventListener() { addEventListener(new AbstractPeerEventListener() {
@ -1124,8 +1133,14 @@ public class PeerGroup extends AbstractIdleService {
// This can be called immediately if we already have enough. Otherwise it'll be called from a peer // This can be called immediately if we already have enough. Otherwise it'll be called from a peer
// thread. // thread.
// Pick a peer to be the lucky recipient of our tx. // Pick a peer to be the lucky recipient of our tx. This can race if the peer we pick dies immediately.
final Peer somePeer = peers.get(0); final Peer somePeer;
lock.lock();
try {
somePeer = peers.get(0);
} finally {
lock.unlock();
}
log.info("broadcastTransaction: Enough peers, adding {} to the memory pool and sending to {}", log.info("broadcastTransaction: Enough peers, adding {} to the memory pool and sending to {}",
tx.getHashAsString(), somePeer); tx.getHashAsString(), somePeer);
final Transaction pinnedTx = memoryPool.seen(tx, somePeer.getAddress()); final Transaction pinnedTx = memoryPool.seen(tx, somePeer.getAddress());
@ -1248,13 +1263,19 @@ public class PeerGroup extends AbstractIdleService {
* If no peers are connected, returns zero. * If no peers are connected, returns zero.
*/ */
public int getMostCommonChainHeight() { public int getMostCommonChainHeight() {
// Copy the peers list so we can calculate on it without violating lock ordering. lock.lock();
ArrayList<Peer> peers; try {
synchronized (this.peers) { return getMostCommonChainHeight(this.peers);
peers = new ArrayList<Peer>(this.peers); } finally {
lock.unlock();
} }
if (peers.isEmpty()) }
return 0;
/**
* Returns most commonly reported chain height from the given list of {@link Peer}s.
* If multiple heights are tied, the highest is returned. If no peers are connected, returns zero.
*/
public static int getMostCommonChainHeight(final List<Peer> peers) {
int s = peers.size(); int s = peers.size();
int[] heights = new int[s]; int[] heights = new int[s];
int[] counts = new int[s]; int[] counts = new int[s];
@ -1269,7 +1290,7 @@ public class PeerGroup extends AbstractIdleService {
break; break;
} else if (heights[cursor] == 0) { } else if (heights[cursor] == 0) {
// A new height we didn't see before. // A new height we didn't see before.
Preconditions.checkState(counts[cursor] == 0); checkState(counts[cursor] == 0);
heights[cursor] = h; heights[cursor] = h;
counts[cursor] = 1; counts[cursor] = 1;
maxCount = Math.max(maxCount, 1); maxCount = Math.max(maxCount, 1);
@ -1296,19 +1317,15 @@ public class PeerGroup extends AbstractIdleService {
} }
/** Given a list of Peers, return a Peer to be used as the download peer. */ /** Given a list of Peers, return a Peer to be used as the download peer. */
protected Peer selectDownloadPeer(List<Peer> origPeers) { protected static Peer selectDownloadPeer(List<Peer> peers) {
// Characteristics to select for in order of importance: // Characteristics to select for in order of importance:
// - Chain height is reasonable (majority of nodes) // - Chain height is reasonable (majority of nodes)
// - High enough protocol version for the features we want (but we'll settle for less) // - High enough protocol version for the features we want (but we'll settle for less)
// - Ping time. // - Ping time.
List<Peer> peers;
synchronized (origPeers) {
peers = new ArrayList<Peer>(origPeers);
}
if (peers.isEmpty()) if (peers.isEmpty())
return null; return null;
// Make sure we don't select a peer that is behind/synchronizing itself. // Make sure we don't select a peer that is behind/synchronizing itself.
int mostCommonChainHeight = getMostCommonChainHeight(); int mostCommonChainHeight = getMostCommonChainHeight(peers);
List<Peer> candidates = new ArrayList<Peer>(); List<Peer> candidates = new ArrayList<Peer>();
for (Peer peer : peers) { for (Peer peer : peers) {
if (peer.getBestHeight() == mostCommonChainHeight) candidates.add(peer); if (peer.getBestHeight() == mostCommonChainHeight) candidates.add(peer);