From be496b95a316dbca35b7ef229ebee42e5eac4583 Mon Sep 17 00:00:00 2001 From: Mike Rosseel Date: Fri, 26 Sep 2014 17:54:59 +0200 Subject: [PATCH] Implement better support for multiple peer discoveries. Resolves issue 302. --- .../core/AbstractPeerEventListener.java | 5 +++ .../org/bitcoinj/core/PeerEventListener.java | 9 +++++ .../java/org/bitcoinj/core/PeerGroup.java | 35 +++++++++++++++++-- .../bitcoinj/jni/NativePeerEventListener.java | 4 +++ .../java/org/bitcoinj/core/PeerGroupTest.java | 34 ++++++++++++++++++ 5 files changed, 85 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/bitcoinj/core/AbstractPeerEventListener.java b/core/src/main/java/org/bitcoinj/core/AbstractPeerEventListener.java index add09511..65866b18 100644 --- a/core/src/main/java/org/bitcoinj/core/AbstractPeerEventListener.java +++ b/core/src/main/java/org/bitcoinj/core/AbstractPeerEventListener.java @@ -17,11 +17,16 @@ package org.bitcoinj.core; import java.util.List; +import java.util.Set; /** * Convenience implementation of {@link PeerEventListener}. */ public class AbstractPeerEventListener implements PeerEventListener { + @Override + public void onPeersDiscovered(Set peerAddresses) { + } + @Override public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) { } diff --git a/core/src/main/java/org/bitcoinj/core/PeerEventListener.java b/core/src/main/java/org/bitcoinj/core/PeerEventListener.java index 962fd01a..31f4e538 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerEventListener.java +++ b/core/src/main/java/org/bitcoinj/core/PeerEventListener.java @@ -18,6 +18,7 @@ package org.bitcoinj.core; import javax.annotation.Nullable; import java.util.List; +import java.util.Set; /** *

Implementors can listen to events like blocks being downloaded/transactions being broadcast/connect/disconnects, @@ -25,6 +26,14 @@ import java.util.List; * provide transactions to remote peers when they ask for them.

*/ public interface PeerEventListener { + /** + *

Called when peers are discovered, this happens at startup of {@link PeerGroup} or if we run out of + * suitable {@link Peer}s to connect to.

+ * + * @param peerAddresses the set of discovered {@link PeerAddress}es + */ + public void onPeersDiscovered(Set peerAddresses); + /** * Called on a Peer thread when a block is received.

* diff --git a/core/src/main/java/org/bitcoinj/core/PeerGroup.java b/core/src/main/java/org/bitcoinj/core/PeerGroup.java index 37631332..7c88ec02 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerGroup.java +++ b/core/src/main/java/org/bitcoinj/core/PeerGroup.java @@ -32,6 +32,7 @@ import org.bitcoinj.utils.Threading; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.net.InetAddresses; import com.google.common.primitives.Ints; @@ -80,6 +81,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); private static final int DEFAULT_CONNECTIONS = 4; private static final int TOR_TIMEOUT_SECONDS = 60; + private int maxPeersToDiscoverCount = 100; protected final ReentrantLock lock = Threading.lock("peergroup"); @@ -661,12 +663,12 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac if (peerDiscoverers.isEmpty()) throw new PeerDiscoveryException("No peer discoverers registered"); long start = System.currentTimeMillis(); - Set addressSet = Sets.newHashSet(); + final Set addressSet = Sets.newHashSet(); for (PeerDiscovery peerDiscovery : peerDiscoverers) { InetSocketAddress[] addresses; addresses = peerDiscovery.getPeers(5, TimeUnit.SECONDS); for (InetSocketAddress address : addresses) addressSet.add(new PeerAddress(address)); - if (addressSet.size() > 0) break; + if (addressSet.size() >= maxPeersToDiscoverCount) break; } lock.lock(); try { @@ -676,6 +678,17 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac } finally { lock.unlock(); } + + final ImmutableSet peersDiscoveredSet = ImmutableSet.copyOf(addressSet); + for (final ListenerRegistration registration : peerEventListeners) { + registration.executor.execute(new Runnable() { + @Override + public void run() { + registration.listener.onPeersDiscovered(peersDiscoveredSet); + } + }); + } + log.info("Peer discovery took {}msec and returned {} items", System.currentTimeMillis() - start, addressSet.size()); } @@ -1649,6 +1662,24 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac return torClient; } + /** + * Returns the maximum number of {@link Peer}s to discover. This maximum is checked after + * each {@link PeerDiscovery} so this max number can be surpassed. + * @return the maximum number of peers to discover + */ + public int getMaxPeersToDiscoverCount() { + return maxPeersToDiscoverCount; + } + + /** + * Sets the maximum number of {@link Peer}s to discover. This maximum is checked after + * each {@link PeerDiscovery} so this max number can be surpassed. + * @param maxPeersToDiscoverCount the maximum number of peers to discover + */ + public void setMaxPeersToDiscoverCount(int maxPeersToDiscoverCount) { + this.maxPeersToDiscoverCount = maxPeersToDiscoverCount; + } + /** See {@link #setUseLocalhostPeerWhenPossible(boolean)} */ public boolean getUseLocalhostPeerWhenPossible() { lock.lock(); diff --git a/core/src/main/java/org/bitcoinj/jni/NativePeerEventListener.java b/core/src/main/java/org/bitcoinj/jni/NativePeerEventListener.java index 474c6f43..5010ae2d 100644 --- a/core/src/main/java/org/bitcoinj/jni/NativePeerEventListener.java +++ b/core/src/main/java/org/bitcoinj/jni/NativePeerEventListener.java @@ -19,6 +19,7 @@ package org.bitcoinj.jni; import org.bitcoinj.core.*; import java.util.List; +import java.util.Set; /** * An event listener that relays events to a native C++ object. A pointer to that object is stored in @@ -28,6 +29,9 @@ import java.util.List; public class NativePeerEventListener implements PeerEventListener { public long ptr; + @Override + public native void onPeersDiscovered(Set peerAddresses); + @Override public native void onBlocksDownloaded(Peer peer, Block block, int blocksLeft); diff --git a/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java b/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java index 5d97fa8f..073faf81 100644 --- a/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java +++ b/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java @@ -166,6 +166,40 @@ public class PeerGroupTest extends TestWithPeerGroup { assertTrue(result.get()); } + // Utility method to create a PeerDiscovery with a certain number of addresses. + private PeerDiscovery createPeerDiscovery(int nrOfAddressesWanted, int port) { + final InetSocketAddress[] addresses = new InetSocketAddress[nrOfAddressesWanted]; + for (int addressNr = 0; addressNr < nrOfAddressesWanted; addressNr++) { + // make each address unique by using the counter to increment the port. + addresses[addressNr] = new InetSocketAddress("localhost", port + addressNr); + } + return new PeerDiscovery() { + public InetSocketAddress[] getPeers(long unused, TimeUnit unused2) throws PeerDiscoveryException { + return addresses; + } + public void shutdown() { + } + }; + } + + @Test + public void multiplePeerDiscovery() throws InterruptedException { + peerGroup.setMaxPeersToDiscoverCount(98); + peerGroup.addPeerDiscovery(createPeerDiscovery(1, 0)); + peerGroup.addPeerDiscovery(createPeerDiscovery(2, 100)); + peerGroup.addPeerDiscovery(createPeerDiscovery(96, 200)); + peerGroup.addPeerDiscovery(createPeerDiscovery(3, 300)); + peerGroup.addPeerDiscovery(createPeerDiscovery(1, 400)); + peerGroup.addEventListener(new AbstractPeerEventListener() { + @Override + public void onPeersDiscovered(Set peerAddresses) { + assertEquals(99, peerAddresses.size()); + } + }); + peerGroup.startAsync(); + peerGroup.awaitRunning(); + } + @Test public void receiveTxBroadcast() throws Exception { // Check that when we receive transactions on all our peers, we do the right thing.