3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 14:54:15 +00:00

PeerGroup: return the peers that match the requested protocol version in waitForPeersOfVersion.

This commit is contained in:
Mike Hearn 2014-05-20 17:51:32 +02:00
parent dfb3a763ce
commit ee2a91010e
3 changed files with 19 additions and 16 deletions

View File

@ -1283,7 +1283,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
* @param numPeers How many peers to wait for. * @param numPeers How many peers to wait for.
* @return a future that will be triggered when the number of connected peers >= numPeers * @return a future that will be triggered when the number of connected peers >= numPeers
*/ */
public ListenableFuture<PeerGroup> waitForPeers(final int numPeers) { public ListenableFuture<List<Peer>> waitForPeers(final int numPeers) {
return waitForPeersOfVersion(numPeers, 0); return waitForPeersOfVersion(numPeers, 0);
} }
@ -1295,16 +1295,17 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
* @param protocolVersion The protocol version the awaited peers must implement (or better). * @param protocolVersion The protocol version the awaited peers must implement (or better).
* @return a future that will be triggered when the number of connected peers implementing protocolVersion or higher >= numPeers * @return a future that will be triggered when the number of connected peers implementing protocolVersion or higher >= numPeers
*/ */
public ListenableFuture<PeerGroup> waitForPeersOfVersion(final int numPeers, final long protocolVersion) { public ListenableFuture<List<Peer>> waitForPeersOfVersion(final int numPeers, final long protocolVersion) {
int foundPeers = countPeersOfAtLeastVersion(protocolVersion); List<Peer> foundPeers = findPeersOfAtLeastVersion(protocolVersion);
if (foundPeers >= numPeers) { if (foundPeers.size() >= numPeers) {
return Futures.immediateFuture(this); return Futures.immediateFuture(foundPeers);
} }
final SettableFuture<PeerGroup> future = SettableFuture.create(); final SettableFuture<List<Peer>> future = SettableFuture.create();
addEventListener(new AbstractPeerEventListener() { addEventListener(new AbstractPeerEventListener() {
@Override public void onPeerConnected(Peer peer, int peerCount) { @Override public void onPeerConnected(Peer peer, int peerCount) {
if (countPeersOfAtLeastVersion(protocolVersion) >= numPeers) { final List<Peer> peers = findPeersOfAtLeastVersion(protocolVersion);
future.set(PeerGroup.this); if (peers.size() >= numPeers) {
future.set(peers);
removeEventListener(this); removeEventListener(this);
} }
} }
@ -1312,14 +1313,17 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
return future; return future;
} }
private int countPeersOfAtLeastVersion(long protocolVersion) { /**
* Returns a mutable array list of peers that implement the given protocol version or better.
*/
public List<Peer> findPeersOfAtLeastVersion(long protocolVersion) {
lock.lock(); lock.lock();
try { try {
int foundPeers = 0; ArrayList<Peer> results = new ArrayList<Peer>(peers.size());
for (Peer peer : peers) for (Peer peer : peers)
if (peer.getPeerVersionMessage().clientVersion >= protocolVersion) if (peer.getPeerVersionMessage().clientVersion >= protocolVersion)
foundPeers++; results.add(peer);
return foundPeers; return results;
} finally { } finally {
lock.unlock(); lock.unlock();
} }

View File

@ -65,8 +65,7 @@ public class TransactionBroadcast {
public ListenableFuture<Transaction> broadcast() { public ListenableFuture<Transaction> broadcast() {
log.info("Waiting for {} peers required for broadcast ...", minConnections); log.info("Waiting for {} peers required for broadcast ...", minConnections);
ListenableFuture<PeerGroup> peerAvailabilityFuture = peerGroup.waitForPeers(minConnections); peerGroup.waitForPeers(minConnections).addListener(new EnoughAvailablePeers(), Threading.SAME_THREAD);
peerAvailabilityFuture.addListener(new EnoughAvailablePeers(), Threading.SAME_THREAD);
return future; return future;
} }

View File

@ -585,7 +585,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
@Test @Test
public void waitForNumPeers1() throws Exception { public void waitForNumPeers1() throws Exception {
ListenableFuture<PeerGroup> future = peerGroup.waitForPeers(3); ListenableFuture<List<Peer>> future = peerGroup.waitForPeers(3);
peerGroup.startAsync(); peerGroup.startAsync();
peerGroup.awaitRunning(); peerGroup.awaitRunning();
assertFalse(future.isDone()); assertFalse(future.isDone());
@ -604,7 +604,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
final int baseVer = peerGroup.getMinRequiredProtocolVersion() + 3000; final int baseVer = peerGroup.getMinRequiredProtocolVersion() + 3000;
final int newVer = baseVer + 1000; final int newVer = baseVer + 1000;
ListenableFuture<PeerGroup> future = peerGroup.waitForPeersOfVersion(2, newVer); ListenableFuture<List<Peer>> future = peerGroup.waitForPeersOfVersion(2, newVer);
VersionMessage ver1 = new VersionMessage(params, 10); VersionMessage ver1 = new VersionMessage(params, 10);
ver1.clientVersion = baseVer; ver1.clientVersion = baseVer;