From 93893e10ad0c4dc5ee10effb58e36d189eb3ede8 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Fri, 21 Dec 2012 22:37:12 +0100 Subject: [PATCH] Make PeerGroup use the Guava services framework. Makes start/stop optionally non-blocking. Resolves issue 258. --- .../bitcoin/core/PeerEventListener.java | 3 +- .../com/google/bitcoin/core/PeerGroup.java | 86 ++++++++----------- .../google/bitcoin/examples/PeerMonitor.java | 15 +++- 3 files changed, 49 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java b/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java index ff1bae22..b7c6314b 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java @@ -56,7 +56,8 @@ public interface PeerEventListener { public void onPeerConnected(Peer peer, int peerCount); /** - * Called when a peer is disconnected + * Called when a peer is disconnected. Note that this won't be called if the listener is registered on a + * {@link PeerGroup} and the group is in the process of shutting down. * * @param peer * @param peerCount the total number of connected peers diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index d013f570..07ce14c9 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -22,10 +22,7 @@ import com.google.bitcoin.discovery.PeerDiscovery; import com.google.bitcoin.discovery.PeerDiscoveryException; import com.google.bitcoin.utils.EventListenerInvoker; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.*; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; @@ -43,26 +40,31 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; /** - * Maintain a number of connections to peers.

+ *

Maintain a number of connections to peers.

* - * PeerGroup tries to maintain a constant number of connections to a set of distinct peers. + *

PeerGroup tries to maintain a constant number of connections to a set of distinct peers. * Each peer runs a network listener in its own thread. When a connection is lost, a new peer - * will be tried after a delay as long as the number of connections less than the maximum.

+ * will be tried after a delay as long as the number of connections less than the maximum.

* - * Connections are made to addresses from a provided list. When that list is exhausted, - * we start again from the head of the list.

+ *

Connections are made to addresses from a provided list. When that list is exhausted, + * we start again from the head of the list.

* - * The PeerGroup can broadcast a transaction to the currently connected set of peers. It can - * also handle download of the blockchain from peers, restarting the process when peers die. + *

The PeerGroup can broadcast a transaction to the currently connected set of peers. It can + * also handle download of the blockchain from peers, restarting the process when peers die.

+ * + *

PeerGroup implements the {@link Service} interface. This means before it will do anything, + * you must call the {@link com.google.common.util.concurrent.Service#start()} method (which returns + * a future) or {@link com.google.common.util.concurrent.Service#startAndWait()} method, which will block + * until peer discovery is completed and some outbound connections have been initiated (it will return + * before handshaking is done, however). You should call {@link com.google.common.util.concurrent.Service#stop()} + * when finished. Note that not all methods of PeerGroup are safe to call from a UI thread as some may do + * network IO, but starting and stopping the service should be fine.

*/ -public class PeerGroup { +public class PeerGroup extends AbstractIdleService { private static final int DEFAULT_CONNECTIONS = 4; private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); - // True if start() has been called. - private boolean running; - // These lists are all thread-safe so do not have to be accessed under the PeerGroup lock. // Addresses to try to connect to, excluding active peers. private List inactives; @@ -250,7 +252,7 @@ public class PeerGroup { int adjustment; synchronized (this) { this.maxConnections = maxConnections; - if (!running) return; + if (!isRunning()) return; } // We may now have too many or too few open connections. Adding the sizes together here is a race condition. adjustment = maxConnections - (peers.size() + pendingPeers.size()); @@ -428,7 +430,7 @@ public class PeerGroup { protected void connectToAnyPeer() throws PeerDiscoveryException { // Do not call this method whilst synchronized on the PeerGroup lock. final PeerAddress addr; - if (!isRunning()) return; + if (!(state() == State.STARTING || state() == State.RUNNING)) return; synchronized (inactives) { if (inactives.size() == 0) { discoverPeers(); @@ -449,44 +451,30 @@ public class PeerGroup { connectTo(addr.toSocketAddress(), false); } - /** - * Starts the PeerGroup. This may block whilst peer discovery takes place. - * @throws IllegalStateException if the PeerGroup is already running. - */ - public void start() { + @Override + protected void startUp() throws Exception { + // This is run in a background thread by the AbstractIdleService implementation. synchronized (this) { - if (running) - throw new IllegalStateException("PeerGroup is already running."); pingTimer = new Timer("Peer pinging thread", true); - running = true; } - // Bring up the requested number of connections. If a connect attempt fails, new peers will be tried until - // there is a success, so just calling connectToAnyPeer for the wanted number of peers is sufficient. + // Bring up the requested number of connections. If a connect attempt fails, + // new peers will be tried until there is a success, so just calling connectToAnyPeer for the wanted number + // of peers is sufficient. for (int i = 0; i < getMaxConnections(); i++) { try { connectToAnyPeer(); } catch (PeerDiscoveryException e) { + if (e.getCause() instanceof InterruptedException) return; log.error(e.getMessage()); } } } - /** - *

Stop this PeerGroup.

- * - *

The peer group will be shut down and all background threads and resources terminated. After a PeerGroup is - * stopped it can't be restarted again, create a new one instead.

- * - * @throws IllegalStateException if the PeerGroup wasn't started. - */ - public synchronized void stop() { - if (!running) - throw new IllegalStateException("PeerGroup not started"); - - running = false; - pingTimer.cancel(); - for (PeerDiscovery peerDiscovery : peerDiscoverers) { - peerDiscovery.shutdown(); + @Override + protected void shutDown() throws Exception { + // This is run on a separate thread by the AbstractIdleService implementation. + synchronized (this) { + pingTimer.cancel(); } // TODO: Make this shutdown process use a ChannelGroup. LinkedList futures; @@ -498,6 +486,9 @@ public class PeerGroup { future.getChannel().close(); } bootstrap.releaseExternalResources(); + for (PeerDiscovery peerDiscovery : peerDiscoverers) { + peerDiscovery.shutdown(); + } } /** @@ -558,10 +549,6 @@ public class PeerGroup { return peers.size(); } - public synchronized boolean isRunning() { - return running; - } - /** * Connect to a peer by creating a Netty channel to the destination address. * @@ -804,10 +791,7 @@ public class PeerGroup { // This can run on any Netty worker thread. Because connectToAnyPeer() must run unlocked to avoid circular // deadlock, this method must run largely unlocked too. Some members are thread-safe and others aren't, so // we synchronize only the parts that need it. - if (!isRunning()) { - log.info("Peer death while shutting down"); - return; - } + if (!isRunning()) return; checkArgument(!peers.contains(peer)); final Peer downloadPeer; final PeerEventListener downloadListener; diff --git a/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java b/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java index b03a1f14..de5a6e73 100644 --- a/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java +++ b/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java @@ -30,8 +30,9 @@ import javax.swing.table.AbstractTableModel; import java.awt.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; +import java.awt.event.WindowAdapter; +import java.awt.event.WindowEvent; import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; /** * Shows connected peers in a table view, so you can watch as they come and go. @@ -40,7 +41,6 @@ public class PeerMonitor { private NetworkParameters params; private PeerGroup peerGroup; private PeerTableModel peerTableModel; - private ScheduledThreadPoolExecutor pingService; public static void main(String[] args) throws Exception { BriefLogFormatter.init(); @@ -83,7 +83,16 @@ public class PeerMonitor { private void setupGUI() { JFrame window = new JFrame("Network monitor"); - window.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + window.setDefaultCloseOperation(JFrame.DO_NOTHING_ON_CLOSE); + window.addWindowListener(new WindowAdapter() { + @Override + public void windowClosing(WindowEvent windowEvent) { + System.out.println("Shutting down ..."); + peerGroup.stopAndWait(); + System.out.println("Shutdown complete."); + System.exit(0); + } + }); JPanel panel = new JPanel(); JLabel instructions = new JLabel("Number of peers to connect to: ");