3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-31 07:12:17 +00:00

Fast shutdown for PeerGroup.

Concurrency safe peerDiscoverers

Resolves issue 118.
This commit is contained in:
Miron Cuperman 2012-02-09 14:52:48 -08:00
parent 6597f01874
commit af826772c5
6 changed files with 62 additions and 18 deletions

View File

@ -110,7 +110,7 @@ public class PeerGroup {
inactives = new LinkedBlockingQueue<PeerAddress>();
// TODO: Remove usage of synchronized sets here in favor of simple coarse-grained locking.
peers = Collections.synchronizedSet(new HashSet<Peer>());
peerDiscoverers = Collections.synchronizedSet(new HashSet<PeerDiscovery>());
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerPool = new ThreadPoolExecutor(
DEFAULT_CONNECTIONS,
DEFAULT_CONNECTIONS,
@ -397,13 +397,15 @@ public class PeerGroup {
while (running) {
// Modify the peer group under its lock, always.
int numPeers;
synchronized (PeerGroup.this) {
numPeers = peers.size();
if (inactives.size() == 0) {
discoverPeers();
} else if (numPeers < getMaxConnections()) {
tryNextPeer();
}
}
if (inactives.size() == 0) {
discoverPeers();
} else if (numPeers < getMaxConnections()) {
tryNextPeer();
}
// Wait for a task or the connection polling timeout to elapse. Tasks are only eligible to run
@ -427,6 +429,7 @@ public class PeerGroup {
synchronized (PeerGroup.this) {
running = false;
peerPool.shutdown();
shutdownPeerDiscovery();
synchronized (peers) {
for (Peer peer : peers) {
peer.disconnect();
@ -454,12 +457,18 @@ public class PeerGroup {
if (inactives.size() > 0) break;
}
}
private void shutdownPeerDiscovery() {
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown();
}
}
/**
* Try connecting to a peer. If we exceed the number of connections, delay and try again.
*/
private void tryNextPeer() throws InterruptedException {
final PeerAddress address = inactives.take();
PeerAddress address = inactives.take();
while (true) {
try {
VersionMessage ver = versionMessage.duplicate();
@ -477,6 +486,12 @@ public class PeerGroup {
// lowest backoff value in queue.
}
synchronized (PeerGroup.this) {
// Check if we are shutting down before next try
if (!running)
break;
}
// If we got here, we should retry this address because an error unrelated
// to the peer has occurred.
Thread.sleep(connectionDelayMillis);
@ -534,21 +549,28 @@ public class PeerGroup {
log.error("Unexpected exception whilst talking to " + peer, ex);
}
} finally {
// We may be terminating because of a controlled shutdown. If so, don't inform the user of individual
// peer connections or select a new download peer. Disconnection is the responsibility of the controlling
// thread in this case.
if (!running)
return;
boolean needHandleDeath;
synchronized (PeerGroup.this) {
// We may be terminating because of a controlled shutdown. If so, don't inform the user of individual
// peer connections or select a new download peer. Disconnection is the responsibility of the controlling
// thread in this case.
if (!running)
return;
// Disconnect and put the address back on the queue. We will retry this peer after all
// other peers have been tried.
peer.disconnect();
// Disconnect and put the address back on the queue. We will retry this peer after all
// other peers have been tried.
peer.disconnect();
needHandleDeath = peers.remove(peer);
}
// This is unsynchronized since it can take awhile
if (needHandleDeath)
handlePeerDeath(peer);
// We may not know the address if the peer was added directly.
if (address != null)
inactives.add(address);
if (peers.remove(peer))
handlePeerDeath(peer);
}
}
});

View File

@ -112,4 +112,7 @@ public class DnsDiscovery implements PeerDiscovery {
return defaultHosts;
}
/** We don't have a way to abort a DNS lookup, so this does nothing */
public void shutdown() {
}
}

View File

@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
@ -45,6 +46,8 @@ public class IrcDiscovery implements PeerDiscovery {
private BufferedWriter writer = null;
private Socket connection;
/**
* Finds a list of peers by connecting to an IRC network, joining a channel, decoding the nicks and then
* disconnecting.
@ -75,13 +78,22 @@ public class IrcDiscovery implements PeerDiscovery {
protected void onIRCReceive(String message) {
}
public void shutdown() {
try {
if (connection != null) {
connection.close();
}
} catch (IOException ex) {
// ignore
}
}
/**
* Returns a list of peers that were found in the IRC channel. Note that just because a peer appears in the list
* does not mean it is accepting connections.
*/
public InetSocketAddress[] getPeers() throws PeerDiscoveryException {
ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
Socket connection = null;
connection = null;
try {
connection = new Socket(server, port);
writer = new BufferedWriter(new OutputStreamWriter(connection.getOutputStream()));

View File

@ -27,4 +27,6 @@ public interface PeerDiscovery {
/** Returns an array of addresses. This method may block. */
InetSocketAddress[] getPeers() throws PeerDiscoveryException;
/** Stops any discovery in progress when we want to shut down quickly. */
void shutdown();
}

View File

@ -126,4 +126,7 @@ public class SeedPeers implements PeerDiscovery {
0x0f097059, 0x69ac957c, 0x366d8453, 0xb1ba2844, 0x8857f081, 0x70b5be63, 0xc545454b, 0xaf36ded1,
0xb5a4b052, 0x21f062d1, 0x72ab89b2, 0x74a45318, 0x8312e6bc, 0xb916965f, 0x8aa7c858, 0xfe7effad,
};
public void shutdown() {
}
}

View File

@ -89,6 +89,8 @@ public class PeerGroupTest extends TestWithNetworkConnections {
return new InetSocketAddress[]{new InetSocketAddress("localhost", 0)};
}
}
public void shutdown() {
}
});
peerGroup.start();
sem.acquire();