diff --git a/src/com/google/bitcoin/core/PeerGroup.java b/src/com/google/bitcoin/core/PeerGroup.java index 1f5fc6bb..2374355c 100644 --- a/src/com/google/bitcoin/core/PeerGroup.java +++ b/src/com/google/bitcoin/core/PeerGroup.java @@ -25,13 +25,12 @@ import com.google.bitcoin.store.BlockStoreException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.xml.bind.annotation.XmlElementRef; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -61,7 +60,7 @@ public class PeerGroup { private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); - private static final int CONNECTION_DELAY_MILLIS = 5 * 1000; + public static final int DEFAULT_CONNECTION_DELAY_MILLIS = 5 * 1000; private static final int CORE_THREADS = 1; private static final int THREAD_KEEP_ALIVE_SECONDS = 1; @@ -79,28 +78,37 @@ public class PeerGroup { private Peer downloadPeer; // Callback for events related to chain download private PeerEventListener downloadListener; - // Callbacks for events related to peer connection/disconnection private Set peerEventListeners; + // Peer discovery sources, will be polled occasionally if there aren't enough inactives. + private Set peerDiscoverers; private NetworkParameters params; private BlockStore blockStore; private BlockChain chain; + private int connectionDelayMillis; /** - * Create a PeerGroup + * Creates a PeerGroup with the given parameters and a default 5 second connection timeout. */ public PeerGroup(BlockStore blockStore, NetworkParameters params, BlockChain chain) { + this(blockStore, params, chain, DEFAULT_CONNECTION_DELAY_MILLIS); + } + + /** + * Creates a PeerGroup with the given parameters. The connectionDelayMillis parameter controls how long the + * PeerGroup will wait between attempts to connect to nodes or read from any added peer discovery sources. + */ + public PeerGroup(BlockStore blockStore, NetworkParameters params, BlockChain chain, int connectionDelayMillis) { this.blockStore = blockStore; this.params = params; this.chain = chain; - + this.connectionDelayMillis = connectionDelayMillis; + inactives = new LinkedBlockingQueue(); - peers = Collections.synchronizedSet(new HashSet()); - peerEventListeners = Collections.synchronizedSet(new HashSet()); - + peerDiscoverers = Collections.synchronizedSet(new HashSet()); peerPool = new ThreadPoolExecutor(CORE_THREADS, DEFAULT_CONNECTIONS, THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue(1), @@ -140,18 +148,7 @@ public class PeerGroup { /** Add addresses from a discovery source to the list of potential peers to connect to */ public void addPeerDiscovery(PeerDiscovery peerDiscovery) { - // TODO(miron) consider remembering the discovery source and retrying occasionally - InetSocketAddress[] addresses; - try { - addresses = peerDiscovery.getPeers(); - } catch (PeerDiscoveryException e) { - log.error("Failed to discover peer addresses from discovery source", e); - return; - } - - for (int i = 0; i < addresses.length; i++) { - inactives.add(new PeerAddress(addresses[i])); - } + peerDiscoverers.add(peerDiscovery); } /** Starts the background thread that makes connections. */ @@ -205,19 +202,21 @@ public class PeerGroup { public void run() { try { while (running) { - tryNextPeer(); + if (inactives.size() == 0) { + discoverPeers(); + } else { + tryNextPeer(); + } // We started a new peer connection, delay before trying another one - Thread.sleep(CONNECTION_DELAY_MILLIS); + Thread.sleep(connectionDelayMillis); } } catch (InterruptedException ex) { synchronized (this) { running = false; } } - peerPool.shutdownNow(); - synchronized (peers) { for (Peer peer : peers) { peer.disconnect(); @@ -225,16 +224,31 @@ public class PeerGroup { } } - /* - * Try connecting to a peer. If we exceed the number of connections, delay and try - * again. - */ + private void discoverPeers() { + for (PeerDiscovery peerDiscovery : peerDiscoverers) { + InetSocketAddress[] addresses; + try { + addresses = peerDiscovery.getPeers(); + } catch (PeerDiscoveryException e) { + // Will try again later. + log.error("Failed to discover peer addresses from discovery source", e); + return; + } + + for (int i = 0; i < addresses.length; i++) { + inactives.add(new PeerAddress(addresses[i])); + } + + if (inactives.size() > 0) break; + } + } + + /** Try connecting to a peer. If we exceed the number of connections, delay and try again. */ private void tryNextPeer() throws InterruptedException { final PeerAddress address = inactives.take(); while (true) { try { - final Peer peer = new Peer(params, address, - blockStore.getChainHead().getHeight(), chain); + final Peer peer = new Peer(params, address, blockStore.getChainHead().getHeight(), chain); Runnable command = new Runnable() { public void run() { try { @@ -285,7 +299,7 @@ public class PeerGroup { // If we got here, we should retry this address because an error unrelated // to the peer has occurred. - Thread.sleep(CONNECTION_DELAY_MILLIS); + Thread.sleep(connectionDelayMillis); } } } diff --git a/tests/com/google/bitcoin/core/PeerGroupTest.java b/tests/com/google/bitcoin/core/PeerGroupTest.java index 6ad59284..8789f5d5 100644 --- a/tests/com/google/bitcoin/core/PeerGroupTest.java +++ b/tests/com/google/bitcoin/core/PeerGroupTest.java @@ -16,14 +16,20 @@ package com.google.bitcoin.core; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.google.bitcoin.discovery.PeerDiscovery; +import com.google.bitcoin.discovery.PeerDiscoveryException; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.MemoryBlockStore; import org.junit.Before; import org.junit.Test; +import java.net.InetSocketAddress; +import java.util.concurrent.Semaphore; + public class PeerGroupTest { static final NetworkParameters params = NetworkParameters.unitTests(); @@ -36,7 +42,7 @@ public class PeerGroupTest { wallet = new Wallet(params); blockStore = new MemoryBlockStore(params); BlockChain chain = new BlockChain(params, wallet, blockStore); - peerGroup = new PeerGroup(blockStore, params, chain); + peerGroup = new PeerGroup(blockStore, params, chain, 1000); } @Test @@ -46,4 +52,31 @@ public class PeerGroupTest { peerGroup.addEventListener(listener); assertTrue(peerGroup.removeEventListener(listener)); } + + @Test + public void peerDiscoveryPolling() throws Exception { + // Check that if peer discovery fails, we keep trying until we have some nodes to talk with. + final Semaphore sem = new Semaphore(0); + final boolean[] result = new boolean[1]; + result[0] = false; + peerGroup.addPeerDiscovery(new PeerDiscovery() { + public InetSocketAddress[] getPeers() throws PeerDiscoveryException { + if (result[0] == false) { + // Pretend we are not connected to the internet. + result[0] = true; + throw new PeerDiscoveryException("test failure"); + } else { + // Return a bogus address. + sem.release(); + return new InetSocketAddress[]{new InetSocketAddress("localhost", 0)}; + } + } + }); + peerGroup.start(); + sem.acquire(); + // Check that we did indeed throw an exception. If we got here it means we threw and then PeerGroup tried + // again a bit later. + assertTrue(result[0]); + peerGroup.stop(); + } }