3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-12 10:15:52 +00:00

Use ChannelGroups for shutdowns. This cleans up the code and makes the PeerGroup shutdown futures work correctly.

This commit is contained in:
Mike Hearn 2013-01-04 18:29:47 +01:00
parent 972c19a95f
commit 7e8ed913ec
4 changed files with 26 additions and 23 deletions

View File

@ -26,6 +26,8 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.*;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -74,7 +76,7 @@ public class PeerGroup extends AbstractIdleService {
private List<Peer> peers; private List<Peer> peers;
// Currently connecting peers. // Currently connecting peers.
private List<Peer> pendingPeers; private List<Peer> pendingPeers;
private Map<Peer, ChannelFuture> channelFutures; private ChannelGroup channels;
// The peer that has been selected for the purposes of downloading announced data. // The peer that has been selected for the purposes of downloading announced data.
private Peer downloadPeer; private Peer downloadPeer;
@ -117,9 +119,9 @@ public class PeerGroup extends AbstractIdleService {
} }
public void onPeerDisconnected(Peer peer) { public void onPeerDisconnected(Peer peer) {
// The channel will be automatically removed from channels.
pendingPeers.remove(peer); pendingPeers.remove(peer);
peers.remove(peer); peers.remove(peer);
channelFutures.remove(peer);
handlePeerDeath(peer); handlePeerDeath(peer);
} }
} }
@ -193,7 +195,7 @@ public class PeerGroup extends AbstractIdleService {
inactives = Collections.synchronizedList(new ArrayList<PeerAddress>()); inactives = Collections.synchronizedList(new ArrayList<PeerAddress>());
peers = Collections.synchronizedList(new ArrayList<Peer>()); peers = Collections.synchronizedList(new ArrayList<Peer>());
pendingPeers = Collections.synchronizedList(new ArrayList<Peer>()); pendingPeers = Collections.synchronizedList(new ArrayList<Peer>());
channelFutures = Collections.synchronizedMap(new HashMap<Peer, ChannelFuture>()); channels = new DefaultChannelGroup();
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>(); peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerEventListeners = new ArrayList<PeerEventListener>(); peerEventListeners = new ArrayList<PeerEventListener>();
// This event listener is added to every peer. It's here so when we announce transactions via an "inv", every // This event listener is added to every peer. It's here so when we announce transactions via an "inv", every
@ -256,8 +258,8 @@ public class PeerGroup extends AbstractIdleService {
this.maxConnections = maxConnections; this.maxConnections = maxConnections;
if (!isRunning()) return; if (!isRunning()) return;
} }
// We may now have too many or too few open connections. Adding the sizes together here is a race condition. // We may now have too many or too few open connections. Add more or drop some to get to the right amount.
adjustment = maxConnections - (peers.size() + pendingPeers.size()); adjustment = maxConnections - channels.size();
while (adjustment > 0) { while (adjustment > 0) {
try { try {
connectToAnyPeer(); connectToAnyPeer();
@ -267,11 +269,7 @@ public class PeerGroup extends AbstractIdleService {
adjustment--; adjustment--;
} }
while (adjustment < 0) { while (adjustment < 0) {
Channel channel; channels.iterator().next().close();
synchronized (peers) {
channel = channelFutures.get(peers.remove(peers.size() - 1)).getChannel();
}
channel.close();
adjustment++; adjustment++;
} }
} }
@ -486,15 +484,10 @@ public class PeerGroup extends AbstractIdleService {
synchronized (this) { synchronized (this) {
pingTimer.cancel(); pingTimer.cancel();
} }
// TODO: Make this shutdown process use a ChannelGroup. // Blocking close of all sockets. TODO: there is a race condition here, for the solution see:
LinkedList<ChannelFuture> futures; // http://biasedbit.com/netty-releaseexternalresources-hangs/
synchronized (channelFutures) { channels.close().await();
// Copy the list here because the act of closing the channel modifies the channelFutures map. // All thread pools should be stopped by this call.
futures = new LinkedList<ChannelFuture>(channelFutures.values());
}
for (ChannelFuture future : futures) {
future.getChannel().close();
}
bootstrap.releaseExternalResources(); bootstrap.releaseExternalResources();
for (PeerDiscovery peerDiscovery : peerDiscoverers) { for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown(); peerDiscovery.shutdown();
@ -573,6 +566,14 @@ public class PeerGroup extends AbstractIdleService {
// Internal version. Do not call whilst holding the PeerGroup lock. // Internal version. Do not call whilst holding the PeerGroup lock.
protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) { protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) {
ChannelFuture future = bootstrap.connect(address); ChannelFuture future = bootstrap.connect(address);
// Make sure that the channel group gets access to the channel only if it connects successfully (otherwise
// it cannot be closed and trying to do so will cause problems).
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess())
channels.add(future.getChannel());
}
});
// When the channel has connected and version negotiated successfully, handleNewPeer will end up being called on // When the channel has connected and version negotiated successfully, handleNewPeer will end up being called on
// a worker thread. // a worker thread.
@ -585,8 +586,6 @@ public class PeerGroup extends AbstractIdleService {
networkHandler.getOwnerObject().setRemoteAddress(address); networkHandler.getOwnerObject().setRemoteAddress(address);
} }
synchronized (this) { synchronized (this) {
Peer peer = peerFromChannelFuture(future);
channelFutures.put(peer, future);
if (incrementMaxConnections) { if (incrementMaxConnections) {
// We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new // We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new
// outbound connection. // outbound connection.

View File

@ -53,4 +53,9 @@ public class FakeChannel extends AbstractChannel {
public ChannelEvent nextEventBlocking() throws InterruptedException { public ChannelEvent nextEventBlocking() throws InterruptedException {
return events.take(); return events.take();
} }
@Override
public boolean setClosed() {
return super.setClosed();
}
} }

View File

@ -20,7 +20,7 @@ public class FakeChannelSink extends AbstractChannelSink {
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
// Close channel.setClosed();
} }
break; break;
case BOUND: case BOUND:

View File

@ -111,7 +111,6 @@ public class PeerGroupTest extends TestWithNetworkConnections {
// Check that we did indeed throw an exception. If we got here it means we threw and then PeerGroup tried // Check that we did indeed throw an exception. If we got here it means we threw and then PeerGroup tried
// again a bit later. // again a bit later.
assertTrue(result[0]); assertTrue(result[0]);
peerGroup.stop();
} }
@Test @Test