PeerGroup: Locking fix in newly added method. A bit more logging.

This commit is contained in:
Mike Hearn
2015-01-10 00:51:00 +01:00
parent 11463e729f
commit 7970b52504

View File

@@ -884,6 +884,7 @@ public class PeerGroup implements TransactionBroadcaster {
torClient.stop(); torClient.stop();
} }
vRunning = false; vRunning = false;
log.info("Stopped.");
} catch (Throwable e) { } catch (Throwable e) {
log.error("Exception when shutting down", e); // The executor swallows exceptions :( log.error("Exception when shutting down", e); // The executor swallows exceptions :(
} }
@@ -897,6 +898,7 @@ public class PeerGroup implements TransactionBroadcaster {
public void stop() { public void stop() {
try { try {
stopAsync(); stopAsync();
log.info("Awaiting PeerGroup shutdown ...");
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@@ -1525,21 +1527,26 @@ public class PeerGroup implements TransactionBroadcaster {
* @return a future that will be triggered when the number of connected peers implementing protocolVersion or higher >= numPeers * @return a future that will be triggered when the number of connected peers implementing protocolVersion or higher >= numPeers
*/ */
public ListenableFuture<List<Peer>> waitForPeersWithServiceMask(final int numPeers, final int mask) { public ListenableFuture<List<Peer>> waitForPeersWithServiceMask(final int numPeers, final int mask) {
List<Peer> foundPeers = findPeersWithServiceMask(mask); lock.lock();
if (foundPeers.size() >= numPeers) try {
return Futures.immediateFuture(foundPeers); List<Peer> foundPeers = findPeersWithServiceMask(mask);
final SettableFuture<List<Peer>> future = SettableFuture.create(); if (foundPeers.size() >= numPeers)
addEventListener(new AbstractPeerEventListener() { return Futures.immediateFuture(foundPeers);
@Override final SettableFuture<List<Peer>> future = SettableFuture.create();
public void onPeerConnected(Peer peer, int peerCount) { addEventListener(new AbstractPeerEventListener() {
final List<Peer> peers = findPeersWithServiceMask(mask); @Override
if (peers.size() >= numPeers) { public void onPeerConnected(Peer peer, int peerCount) {
future.set(peers); final List<Peer> peers = findPeersWithServiceMask(mask);
removeEventListener(this); if (peers.size() >= numPeers) {
future.set(peers);
removeEventListener(this);
}
} }
} });
}); return future;
return future; } finally {
lock.unlock();
}
} }
/** /**