diff --git a/src/com/google/bitcoin/core/PeerGroup.java b/src/com/google/bitcoin/core/PeerGroup.java index 3eb0dc1c..f2972776 100644 --- a/src/com/google/bitcoin/core/PeerGroup.java +++ b/src/com/google/bitcoin/core/PeerGroup.java @@ -21,7 +21,6 @@ import com.google.bitcoin.discovery.PeerDiscovery; import com.google.bitcoin.discovery.PeerDiscoveryException; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStoreException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,36 +28,32 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * Maintain a number of connections to peers. - * + *

*

PeerGroup tries to maintain a constant number of connections to a set of distinct peers. * Each peer runs a network listener in its own thread. When a connection is lost, a new peer * will be tried after a delay as long as the number of connections less than the maximum. - * + *

*

Connections are made to addresses from a provided list. When that list is exhausted, * we start again from the head of the list. - * + *

*

The PeerGroup can broadcast a transaction to the currently connected set of peers. It can * also handle download of the blockchain from peers, restarting the process when peers die. - * - * @author miron@google.com (Miron Cuperman a.k.a devrandom) * + * @author miron@google.com (Miron Cuperman a.k.a devrandom) */ public class PeerGroup { private static final int DEFAULT_CONNECTIONS = 4; private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); - + public static final int DEFAULT_CONNECTION_DELAY_MILLIS = 5 * 1000; private static final int CORE_THREADS = 1; private static final int THREAD_KEEP_ALIVE_SECONDS = 1; @@ -81,7 +76,7 @@ public class PeerGroup { private Set peerEventListeners; // Peer discovery sources, will be polled occasionally if there aren't enough inactives. private Set peerDiscoverers; - + private NetworkParameters params; private BlockStore blockStore; private BlockChain chain; @@ -121,36 +116,42 @@ public class PeerGroup { public void addEventListener(PeerEventListener listener) { peerEventListeners.add(listener); } - + public boolean removeEventListener(PeerEventListener listener) { return peerEventListeners.remove(listener); } - + /** * Depending on the environment, this should normally be between 1 and 10, default is 4. - * + * * @param maxConnections the maximum number of peer connections that this group will try to make. */ public void setMaxConnections(int maxConnections) { peerPool.setMaximumPoolSize(maxConnections); } - + public int getMaxConnections() { return peerPool.getMaximumPoolSize(); } - - /** Add an address to the list of potential peers to connect to */ + + /** + * Add an address to the list of potential peers to connect to + */ public void addAddress(PeerAddress peerAddress) { // TODO(miron) consider deduplication inactives.add(peerAddress); } - - /** Add addresses from a discovery source to the list of potential peers to connect to */ + + /** + * Add addresses from a discovery source to the list of potential peers to connect to + */ public void addPeerDiscovery(PeerDiscovery peerDiscovery) { peerDiscoverers.add(peerDiscovery); } - - /** Starts the background thread that makes connections. */ + + /** + * Starts the background thread that makes connections. + */ public void start() { this.connectThread = new Thread(new PeerExecutionRunnable(), "Peer group thread"); running = true; @@ -159,7 +160,7 @@ public class PeerGroup { /** * Stop this PeerGroup - * + *

*

The peer group will be asynchronously shut down. After it is shut down * all peers will be disconnected and no threads will be running. */ @@ -171,7 +172,7 @@ public class PeerGroup { /** * Broadcast a transaction to all connected peers - * + * * @return whether we sent to at least one peer */ public boolean broadcastTransaction(Transaction tx) { @@ -193,7 +194,7 @@ public class PeerGroup { /** * Repeatedly get the next peer address from the inactive queue * and try to connect. - * + *

*

We can be terminated with Thread.interrupt. When an interrupt is received, * we will ask the executor to shutdown and ask each peer to disconnect. At that point * no threads or network connections will be active. @@ -206,7 +207,7 @@ public class PeerGroup { } else { tryNextPeer(); } - + // We started a new peer connection, delay before trying another one Thread.sleep(connectionDelayMillis); } @@ -242,7 +243,9 @@ public class PeerGroup { } } - /** Try connecting to a peer. If we exceed the number of connections, delay and try again. */ + /** + * Try connecting to a peer. If we exceed the number of connections, delay and try again. + */ private void tryNextPeer() throws InterruptedException { final PeerAddress address = inactives.take(); while (true) { @@ -295,7 +298,7 @@ public class PeerGroup { running = false; throw new RuntimeException(e); } - + // If we got here, we should retry this address because an error unrelated // to the peer has occurred. Thread.sleep(connectionDelayMillis); @@ -305,10 +308,10 @@ public class PeerGroup { /** * Start downloading the blockchain from the first available peer. - * + *

*

If no peers are currently connected, the download will be started * once a peer starts. If the peer dies, the download will resume with another peer. - * + * * @param listener a listener for chain download events, may not be null */ public synchronized void startBlockChainDownload(PeerEventListener listener) { @@ -317,16 +320,15 @@ public class PeerGroup { // downloading from multiple peers and handle the case when a new peer comes along // with a longer chain after we thought we were done. synchronized (peers) { - if (!peers.isEmpty()) - { + if (!peers.isEmpty()) { startBlockChainDownloadFromPeer(peers.iterator().next()); } } } - + /** * Download the blockchain from peers.

- * + *

* This method waits until the download is complete. "Complete" is defined as downloading * from at least one peer all the blocks that are in that peer's inventory. */ @@ -339,7 +341,7 @@ public class PeerGroup { throw new RuntimeException(e); } } - + protected synchronized void handleNewPeer(Peer peer) { if (downloadListener != null && downloadPeer == null) startBlockChainDownloadFromPeer(peer); @@ -351,7 +353,7 @@ public class PeerGroup { } } } - + protected synchronized void handlePeerDeath(Peer peer) { if (peer == downloadPeer) { downloadPeer = null; @@ -381,7 +383,7 @@ public class PeerGroup { } downloadPeer = peer; } - + static class PeerGroupThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; @@ -391,14 +393,14 @@ public class PeerGroup { PeerGroupThreadFactory() { group = Thread.currentThread().getThreadGroup(); namePrefix = "PeerGroup-" + - poolNumber.getAndIncrement() + - "-thread-"; + poolNumber.getAndIncrement() + + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, - namePrefix + threadNumber.getAndIncrement(), - 0); + namePrefix + threadNumber.getAndIncrement(), + 0); // Lower the priority of the peer threads. This is to avoid competing with UI threads created by the API // user when doing lots of work, like downloading the block chain. We select a priority level one lower // than the parent thread, or the minimum.