diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 39da7035..7c592d85 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -26,6 +26,8 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.*; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.*; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +76,7 @@ public class PeerGroup extends AbstractIdleService { private List peers; // Currently connecting peers. private List pendingPeers; - private Map channelFutures; + private ChannelGroup channels; // The peer that has been selected for the purposes of downloading announced data. private Peer downloadPeer; @@ -117,9 +119,9 @@ public class PeerGroup extends AbstractIdleService { } public void onPeerDisconnected(Peer peer) { + // The channel will be automatically removed from channels. pendingPeers.remove(peer); peers.remove(peer); - channelFutures.remove(peer); handlePeerDeath(peer); } } @@ -193,7 +195,7 @@ public class PeerGroup extends AbstractIdleService { inactives = Collections.synchronizedList(new ArrayList()); peers = Collections.synchronizedList(new ArrayList()); pendingPeers = Collections.synchronizedList(new ArrayList()); - channelFutures = Collections.synchronizedMap(new HashMap()); + channels = new DefaultChannelGroup(); peerDiscoverers = new CopyOnWriteArraySet(); peerEventListeners = new ArrayList(); // This event listener is added to every peer. It's here so when we announce transactions via an "inv", every @@ -256,8 +258,8 @@ public class PeerGroup extends AbstractIdleService { this.maxConnections = maxConnections; if (!isRunning()) return; } - // We may now have too many or too few open connections. Adding the sizes together here is a race condition. - adjustment = maxConnections - (peers.size() + pendingPeers.size()); + // We may now have too many or too few open connections. Add more or drop some to get to the right amount. + adjustment = maxConnections - channels.size(); while (adjustment > 0) { try { connectToAnyPeer(); @@ -267,11 +269,7 @@ public class PeerGroup extends AbstractIdleService { adjustment--; } while (adjustment < 0) { - Channel channel; - synchronized (peers) { - channel = channelFutures.get(peers.remove(peers.size() - 1)).getChannel(); - } - channel.close(); + channels.iterator().next().close(); adjustment++; } } @@ -486,15 +484,10 @@ public class PeerGroup extends AbstractIdleService { synchronized (this) { pingTimer.cancel(); } - // TODO: Make this shutdown process use a ChannelGroup. - LinkedList futures; - synchronized (channelFutures) { - // Copy the list here because the act of closing the channel modifies the channelFutures map. - futures = new LinkedList(channelFutures.values()); - } - for (ChannelFuture future : futures) { - future.getChannel().close(); - } + // Blocking close of all sockets. TODO: there is a race condition here, for the solution see: + // http://biasedbit.com/netty-releaseexternalresources-hangs/ + channels.close().await(); + // All thread pools should be stopped by this call. bootstrap.releaseExternalResources(); for (PeerDiscovery peerDiscovery : peerDiscoverers) { peerDiscovery.shutdown(); @@ -573,6 +566,14 @@ public class PeerGroup extends AbstractIdleService { // Internal version. Do not call whilst holding the PeerGroup lock. protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) { ChannelFuture future = bootstrap.connect(address); + // Make sure that the channel group gets access to the channel only if it connects successfully (otherwise + // it cannot be closed and trying to do so will cause problems). + future.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) + channels.add(future.getChannel()); + } + }); // When the channel has connected and version negotiated successfully, handleNewPeer will end up being called on // a worker thread. @@ -585,8 +586,6 @@ public class PeerGroup extends AbstractIdleService { networkHandler.getOwnerObject().setRemoteAddress(address); } synchronized (this) { - Peer peer = peerFromChannelFuture(future); - channelFutures.put(peer, future); if (incrementMaxConnections) { // We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new // outbound connection. diff --git a/core/src/test/java/com/google/bitcoin/core/FakeChannel.java b/core/src/test/java/com/google/bitcoin/core/FakeChannel.java index f40a8c8b..3dbc6260 100644 --- a/core/src/test/java/com/google/bitcoin/core/FakeChannel.java +++ b/core/src/test/java/com/google/bitcoin/core/FakeChannel.java @@ -53,4 +53,9 @@ public class FakeChannel extends AbstractChannel { public ChannelEvent nextEventBlocking() throws InterruptedException { return events.take(); } + + @Override + public boolean setClosed() { + return super.setClosed(); + } } diff --git a/core/src/test/java/com/google/bitcoin/core/FakeChannelSink.java b/core/src/test/java/com/google/bitcoin/core/FakeChannelSink.java index 9d545a80..ff3bd578 100644 --- a/core/src/test/java/com/google/bitcoin/core/FakeChannelSink.java +++ b/core/src/test/java/com/google/bitcoin/core/FakeChannelSink.java @@ -20,7 +20,7 @@ public class FakeChannelSink extends AbstractChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - // Close + channel.setClosed(); } break; case BOUND: diff --git a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java index 61b56e00..c19d2a77 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java @@ -111,7 +111,6 @@ public class PeerGroupTest extends TestWithNetworkConnections { // Check that we did indeed throw an exception. If we got here it means we threw and then PeerGroup tried // again a bit later. assertTrue(result[0]); - peerGroup.stop(); } @Test