diff --git a/AUTHORS b/AUTHORS index 0b9adaac..577bd4e2 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1,5 +1,5 @@ Mike Hearn -Miron Cuperman +Miron Cuperman (a.k.a. devrandom) Xiaofeng Guo Thilo Planz Micheal Swiggs @@ -7,4 +7,4 @@ Gary Rowe Noa Resare John Sample Jan Møller -Wolfgang Nagele \ No newline at end of file +Wolfgang Nagele diff --git a/src/com/google/bitcoin/core/AbstractPeerEventListener.java b/src/com/google/bitcoin/core/AbstractPeerEventListener.java new file mode 100644 index 00000000..8e4fda63 --- /dev/null +++ b/src/com/google/bitcoin/core/AbstractPeerEventListener.java @@ -0,0 +1,35 @@ +/** + * Copyright 2011 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.core; + +/** + * Convenience abstract class for implmenting a PeerEventListener. + * + *

The default method implementations do nothing. + * + * @author miron@google.com (Miron Cuperman) + * + */ +public class AbstractPeerEventListener extends Object implements PeerEventListener { + @Override + public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) { + } + + @Override + public void onChainDownloadStarted(Peer peer, int blocksLeft) { + } +} diff --git a/src/com/google/bitcoin/core/DownloadListener.java b/src/com/google/bitcoin/core/DownloadListener.java new file mode 100644 index 00000000..a1848251 --- /dev/null +++ b/src/com/google/bitcoin/core/DownloadListener.java @@ -0,0 +1,101 @@ +/** + * Copyright 2011 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.core; + +import com.google.bitcoin.core.AbstractPeerEventListener; +import com.google.bitcoin.core.Block; +import com.google.bitcoin.core.Peer; + +import java.text.DateFormat; +import java.util.Date; +import java.util.concurrent.Semaphore; + +/** + * Listen to chain download events and print useful informational messages. + * + *

progress, startDownload, doneDownload maybe be overridden to change the way the user + * is notified. + * + *

Methods are called with the event listener object locked so your + * implementation does not have to be thread safe. + * + * @author miron@google.com (Miron Cuperman a.k.a. devrandom) + * + */ +public class DownloadListener extends AbstractPeerEventListener { + private int originalBlocksLeft = -1; + private int lastPercent = 0; + Semaphore done = new Semaphore(0); + + @Override + public void onChainDownloadStarted(Peer peer, int blocksLeft) { + startDownload(blocksLeft); + originalBlocksLeft = blocksLeft; + } + + @Override + public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) { + if (blocksLeft == 0) { + doneDownload(); + done.release(); + } + + if (blocksLeft < 0 || originalBlocksLeft <= 0) + return; + + double pct = 100.0 - (100.0 * (blocksLeft / (double) originalBlocksLeft)); + if ((int)pct != lastPercent) { + progress(pct, new Date(block.getTime())); + lastPercent = (int)pct; + } + } + + /** + * Called when download progress is made. + * + * @param pct the percentage of chain downloaded, estimated + * @param date the date of the last block downloaded + */ + protected void progress(double pct, Date date) { + System.out.println(String.format("Chain download %d%% done, block date %s", (int) pct, + DateFormat.getDateInstance().format(date))); + } + + /** + * Called when download is initiated. + * + * @param blocks the number of blocks to download, estimated + */ + protected void startDownload(int blocks) { + System.out.println("Downloading block chain of size " + blocks + ". " + + (blocks > 1000 ? "This may take a while." : "")); + } + + /** + * Called when we are done downloading the block chain. + */ + protected void doneDownload() { + System.out.println("Done downloading block chain"); + } + + /** + * Wait for the chain to be downloaded. + */ + public void await() throws InterruptedException { + done.acquire(); + } +} \ No newline at end of file diff --git a/src/com/google/bitcoin/core/NetworkConnection.java b/src/com/google/bitcoin/core/NetworkConnection.java index 54f649ba..c52adcc3 100644 --- a/src/com/google/bitcoin/core/NetworkConnection.java +++ b/src/com/google/bitcoin/core/NetworkConnection.java @@ -53,20 +53,22 @@ public class NetworkConnection { * Connect to the given IP address using the port specified as part of the network parameters. Once construction * is complete a functioning network channel is set up and running. * - * @param remoteIp IP address to connect to. IPv6 is not currently supported by BitCoin. + * @param peerAddress address to connect to. IPv6 is not currently supported by BitCoin. If + * port is not positive the default port from params is used. * @param params Defines which network to connect to and details of the protocol. * @param bestHeight How many blocks are in our best chain * @param connectTimeout Timeout in milliseconds when initially connecting to peer * @throws IOException if there is a network related failure. * @throws ProtocolException if the version negotiation failed. */ - public NetworkConnection(InetAddress remoteIp, NetworkParameters params, int bestHeight, int connectTimeout) + public NetworkConnection(PeerAddress peerAddress, NetworkParameters params, int bestHeight, int connectTimeout) throws IOException, ProtocolException { this.params = params; - this.remoteIp = remoteIp; + this.remoteIp = peerAddress.addr; + int port = (peerAddress.port > 0) ? peerAddress.port : params.port; - InetSocketAddress address = new InetSocketAddress(remoteIp, params.port); + InetSocketAddress address = new InetSocketAddress(remoteIp, port); socket = new Socket(); socket.connect(address, connectTimeout); @@ -105,6 +107,11 @@ public class NetworkConnection { // Handshake is done! } + public NetworkConnection(InetAddress inetAddress, NetworkParameters params, int bestHeight, int connectTimeout) + throws IOException, ProtocolException { + this(new PeerAddress(inetAddress), params, bestHeight, connectTimeout); + } + /** * Sends a "ping" message to the remote node. The protocol doesn't presently use this feature much. * @throws IOException diff --git a/src/com/google/bitcoin/core/Peer.java b/src/com/google/bitcoin/core/Peer.java index 9e4e751e..494e40f7 100644 --- a/src/com/google/bitcoin/core/Peer.java +++ b/src/com/google/bitcoin/core/Peer.java @@ -21,63 +21,101 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** - * A Peer handles the high level communication with a BitCoin node. It requires a NetworkConnection to be set up for - * it. After that it takes ownership of the connection, creates and manages its own thread used for communication - * with the network. All these threads synchronize on the block chain. + * A Peer handles the high level communication with a BitCoin node. + * + *

After making the connection with connect(), call run() to start the message handling loop. */ public class Peer { - private static final Logger log = LoggerFactory.getLogger(Peer.class); + private static final Logger log = LoggerFactory.getLogger(Peer.class); - private final NetworkConnection conn; + private NetworkConnection conn; private final NetworkParameters params; - private Thread thread; - // Whether the peer thread is supposed to be running or not. Set to false during shutdown so the peer thread + // Whether the peer loop is supposed to be running or not. Set to false during shutdown so the peer loop // knows to quit when the socket goes away. private boolean running; private final BlockChain blockChain; - // Used to notify clients when the initial block chain download is finished. - private CountDownLatch chainCompletionLatch; // When we want to download a block or transaction from a peer, the InventoryItem is put here whilst waiting for // the response. Synchronized on itself. private final List> pendingGetBlockFutures; + private int bestHeight; + + private PeerAddress address; + + private List eventListeners; + /** * Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that - * communication won't occur until you call start(). + * communication won't occur until you call connect(). + * + * @param bestHeight our current best chain height, to facilitate downloading */ - public Peer(NetworkParameters params, NetworkConnection conn, BlockChain blockChain) { - this.conn = conn; + public Peer(NetworkParameters params, PeerAddress address, int bestHeight, BlockChain blockChain) { this.params = params; + this.address = address; + this.bestHeight = bestHeight; this.blockChain = blockChain; this.pendingGetBlockFutures = new ArrayList>(); - } - - /** Starts the background thread that processes messages. */ - public void start() { - this.thread = new Thread(new Runnable() { - public void run() { - Peer.this.run(); - } - }); - synchronized (this) { - running = true; - } - this.thread.setName("Bitcoin peer thread: " + conn.toString()); - this.thread.start(); + this.eventListeners = new ArrayList(); } /** - * Runs in the peers network thread and manages communication with the peer. + * Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that + * communication won't occur until you call connect(). */ - private void run() { - assert Thread.currentThread() == thread; + public Peer(NetworkParameters params, PeerAddress address, BlockChain blockChain) { + this(params, address, 0, blockChain); + } + + public synchronized void addEventListener(PeerEventListener listener) { + eventListeners.add(listener); + } + + public synchronized void removeEventListener(PeerEventListener listener) { + eventListeners.remove(listener); + } + + @Override + public String toString() { + return "Peer(" + address.addr + ":" + address.port + ")"; + } + + /** + * Connects to the peer. + */ + public void connect() { + try { + conn = new NetworkConnection(address, params, bestHeight, 60000); + } catch (IOException ex) { + log.error("while trying to open connection", ex); + throw new RuntimeException(ex); + } catch (ProtocolException ex) { + log.error("while trying to negotiate connection", ex); + throw new RuntimeException(ex); + } + } + + /** + * Runs in the peers network loop and manages communication with the peer. + * + *

connect() must be called first + */ + public void run() { + // This should be called in the network loop thread for this peer + if (conn == null) + throw new RuntimeException("please call connect() first"); + + running = true; try { while (true) { Message m = conn.readMessage(); @@ -97,19 +135,26 @@ public class Peer { } catch (Exception e) { if (e instanceof IOException && !running) { // This exception was expected because we are tearing down the socket as part of quitting. - log.info("Shutting down peer thread"); + log.info("Shutting down peer loop"); } else { // We caught an unexpected exception. e.printStackTrace(); } } + + try { + conn.shutdown(); + } catch (IOException e) { + // Ignore exceptions on shutdown, socket might be dead + } + synchronized (this) { running = false; } } private void processBlock(Block m) throws IOException { - assert Thread.currentThread() == thread; + // This should called in the network loop thread for this peer try { // Was this block requested by getBlock()? synchronized (pendingGetBlockFutures) { @@ -128,11 +173,9 @@ public class Peer { // This call will synchronize on blockChain. if (blockChain.add(m)) { // The block was successfully linked into the chain. Notify the user of our progress. - if (chainCompletionLatch != null) { - chainCompletionLatch.countDown(); - if (chainCompletionLatch.getCount() == 0) { - // All blocks fetched, so we don't need this anymore. - chainCompletionLatch = null; + for (PeerEventListener listener : eventListeners) { + synchronized (listener) { + listener.onBlocksDownloaded(this, m, getPeerBlocksToGet()); } } } else { @@ -147,15 +190,16 @@ public class Peer { } } catch (VerificationException e) { // We don't want verification failures to kill the thread. - log.warn("block verification failed", e); + log.warn("Block verification failed", e); } catch (ScriptException e) { // We don't want script failures to kill the thread. - log.warn("script exception", e); + log.warn("Script exception", e); } } private void processInv(InventoryMessage inv) throws IOException { - assert Thread.currentThread() == thread; + // This should be called in the network loop thread for this peer + // The peer told us about some blocks or transactions they have. For now we only care about blocks. // Note that as we don't actually want to store the entire block chain or even the headers of the block // chain, we may end up requesting blocks we already requested before. This shouldn't (in theory) happen @@ -256,7 +300,7 @@ public class Peer { /** Called by the Peer when the result has arrived. Completes the task. */ void setResult(T result) { - assert Thread.currentThread() == thread; // Called from peer thread. + // This should be called in the network loop thread for this peer this.result = result; // Now release the thread that is waiting. We don't need to synchronize here as the latch establishes // a memory barrier. @@ -318,10 +362,24 @@ public class Peer { /** * Starts an asynchronous download of the block chain. The chain download is deemed to be complete once we've * downloaded the same number of blocks that the peer advertised having in its version handshake message. - * - * @return a {@link CountDownLatch} that can be used to track progress and wait for completion. */ - public CountDownLatch startBlockChainDownload() throws IOException { + public void startBlockChainDownload() throws IOException { + for (PeerEventListener listener : eventListeners) { + synchronized (listener) { + listener.onChainDownloadStarted(this, getPeerBlocksToGet()); + } + } + + if (getPeerBlocksToGet() > 0) { + // When we just want as many blocks as possible, we can set the target hash to zero. + blockChainDownload(Sha256Hash.ZERO_HASH); + } + } + + /** + * @return the number of blocks to get, based on our chain height and the peer reported height + */ + private int getPeerBlocksToGet() { // Chain will overflow signed int blocks in ~41,000 years. int chainHeight = (int) conn.getVersionMessage().bestHeight; if (chainHeight <= 0) { @@ -330,28 +388,18 @@ public class Peer { throw new RuntimeException("Peer does not have block chain"); } int blocksToGet = chainHeight - blockChain.getChainHead().getHeight(); - if (blocksToGet < 0) { - // This peer has fewer blocks than we do. It isn't usable. - // TODO: We can't do the right thing here until Mirons patch lands. For now just return a zero latch. - return new CountDownLatch(0); - } - chainCompletionLatch = new CountDownLatch(blocksToGet); - if (blocksToGet > 0) { - // When we just want as many blocks as possible, we can set the target hash to zero. - blockChainDownload(Sha256Hash.ZERO_HASH); - } - return chainCompletionLatch; + return blocksToGet; } /** - * Terminates the network connection and stops the background thread. + * Terminates the network connection and stops the message handling loop. */ public void disconnect() { synchronized (this) { running = false; } try { - // This will cause the background thread to die, but it's really ugly. We must do a better job of this. + // This is the correct way to stop an IO bound loop conn.shutdown(); } catch (IOException e) { // Don't care about this. diff --git a/src/com/google/bitcoin/core/PeerAddress.java b/src/com/google/bitcoin/core/PeerAddress.java index d83a825b..491c4b75 100644 --- a/src/com/google/bitcoin/core/PeerAddress.java +++ b/src/com/google/bitcoin/core/PeerAddress.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import java.math.BigInteger; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Date; @@ -38,10 +39,16 @@ public class PeerAddress extends Message { BigInteger services; long time; + /** + * Construct a peer address from a serialized payload. + */ public PeerAddress(NetworkParameters params, byte[] payload, int offset, int protocolVersion) throws ProtocolException { super(params, payload, offset, protocolVersion); } + /** + * Construct a peer address from a memorized or hardcoded address. + */ public PeerAddress(InetAddress addr, int port, int protocolVersion) { this.addr = addr; this.port = port; @@ -49,6 +56,19 @@ public class PeerAddress extends Message { this.services = BigInteger.ZERO; } + public PeerAddress(InetAddress addr, int port) { + this(addr, port, NetworkParameters.PROTOCOL_VERSION); + } + + public PeerAddress(InetAddress addr) { + this(addr, 0); + } + + public PeerAddress(InetSocketAddress addr) { + this(addr.getAddress(), addr.getPort()); + } + + @Override public void bitcoinSerializeToStream(OutputStream stream) throws IOException { if (protocolVersion >= 31402) { int secs = (int)(new Date().getTime() / 1000); @@ -71,7 +91,7 @@ public class PeerAddress extends Message { } @Override - protected void parse() throws ProtocolException { + protected void parse() { // Format of a serialized address: // uint32 timestamp // uint64 services (flags determining what the node can do) diff --git a/src/com/google/bitcoin/core/PeerEventListener.java b/src/com/google/bitcoin/core/PeerEventListener.java new file mode 100644 index 00000000..7f6ecc87 --- /dev/null +++ b/src/com/google/bitcoin/core/PeerEventListener.java @@ -0,0 +1,48 @@ +/** + * Copyright 2011 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.core; + +/** + * Implementing a PeerEventListener allows you to learn when significant Peer communication + * has occurred. + * + *

Methods are called with the event listener object locked so your + * implementation does not have to be thread safe. + * + * @author miron@google.com (Miron Cuperman a.k.a devrandom) + * + */ +public interface PeerEventListener { + /** + * Called on a Peer thread when a block is received. + * + *

The block may have transactions or may be a header only once getheaders is implemented. + * + * @param peer the peer receiving the block + * @param block the downloaded block + * @param blocksLeft the number of blocks left to download + */ + public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft); + + /** + * Called when a download is started with the initial number of blocks to be downloaded. + * + * @param peer the peer receiving the block + * @param blocksLeft the number of blocks left to download + */ + public void onChainDownloadStarted(Peer peer, int blocksLeft); +} diff --git a/src/com/google/bitcoin/core/PeerGroup.java b/src/com/google/bitcoin/core/PeerGroup.java new file mode 100644 index 00000000..b9682424 --- /dev/null +++ b/src/com/google/bitcoin/core/PeerGroup.java @@ -0,0 +1,340 @@ +/** + * Copyright 2011 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.google.bitcoin.core; + +import com.google.bitcoin.discovery.PeerDiscovery; +import com.google.bitcoin.discovery.PeerDiscoveryException; +import com.google.bitcoin.store.BlockStore; +import com.google.bitcoin.store.BlockStoreException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Maintain a number of connections to peers. + * + *

PeerGroup tries to maintain a constant number of connections to a set of distinct peers. + * Each peer runs a network listener in its own thread. When a connection is lost, a new peer + * will be tried after a delay as long as the number of connections less than the maximum. + * + *

Connections are made to addresses from a provided list. When that list is exhausted, + * we start again from the head of the list. + * + *

The PeerGroup can broadcast a transaction to the currently connected set of peers. It can + * also handle download of the blockchain from peers, restarting the process when peers die. + * + * @author miron@google.com (Miron Cuperman a.k.a devrandom) + * + */ +public class PeerGroup { + private static final int DEFAULT_CONNECTIONS = 4; + + private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); + + private static final int CONNECTION_DELAY_MILLIS = 5 * 1000; + private static final int CORE_THREADS = 1; + private static final int THREAD_KEEP_ALIVE_SECONDS = 1; + + // Maximum number of connections this peerGroup will make + private int maxConnections; + // Addresses to try to connect to, excluding active peers + private BlockingQueue inactives; + // Connection initiation thread + private Thread connectThread; + // True if the connection initiation thread should be running + private boolean running; + // A pool of threads for peers, of size maxConnection + private ThreadPoolExecutor peerPool; + // Currently active peers + private Set peers; + // The peer we are currently downloading the chain from + private Peer downloadPeer; + // Callback for events related to chain download + private PeerEventListener downloadListener; + + private NetworkParameters params; + private BlockStore blockStore; + private BlockChain chain; + + /** + * Create a PeerGroup + */ + public PeerGroup(BlockStore blockStore, NetworkParameters params, BlockChain chain) { + this.maxConnections = DEFAULT_CONNECTIONS; + this.blockStore = blockStore; + this.params = params; + this.chain = chain; + + inactives = new LinkedBlockingQueue(); + + peers = Collections.synchronizedSet(new HashSet()); + peerPool = new ThreadPoolExecutor(CORE_THREADS, this.maxConnections, + THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, + new LinkedBlockingQueue(1), + new PeerGroupThreadFactory()); + } + + /** + * Depending on the environment, this should normally be between 1 and 10, default is 4. + * + * @param maxConnections the maximum number of peer connections that this group will try to make. + */ + public void setMaxConnections(int maxConnections) { + this.maxConnections = maxConnections; + } + + public int getMaxConnections() { + return maxConnections; + } + + /** Add an address to the list of potential peers to connect to */ + public void addAddress(PeerAddress peerAddress) { + // TODO(miron) consider deduplication + inactives.add(peerAddress); + } + + /** Add addresses from a discovery source to the list of potential peers to connect to */ + public void addPeerDiscovery(PeerDiscovery peerDiscovery) { + // TODO(miron) consider remembering the discovery source and retrying occasionally + InetSocketAddress[] addresses; + try { + addresses = peerDiscovery.getPeers(); + } catch (PeerDiscoveryException e) { + log.error("Failed to discover peer addresses from discovery source", e); + return; + } + + for (int i = 0; i < addresses.length; i++) { + inactives.add(new PeerAddress(addresses[i])); + } + } + + /** Starts the background thread that makes connections. */ + public void start() { + this.connectThread = new Thread(new PeerExecutionRunnable(), "Peer group thread"); + running = true; + this.connectThread.start(); + } + + /** + * Stop this PeerGroup + * + *

The peer group will be asynchronously shut down. After it is shut down + * all peers will be disconnected and no threads will be running. + */ + public synchronized void stop() { + if (running) { + connectThread.interrupt(); + } + } + + /** + * Broadcast a transaction to all connected peers + * + * @return whether we sent to at least one peer + */ + public boolean broadcastTransaction(Transaction tx) { + boolean success = false; + for (Peer peer : peers) { + try { + peer.broadcastTransaction(tx); + success = true; + } catch (IOException e) { + log.error("failed to broadcast to " + peer, e); + } + } + return success; + } + + private final class PeerExecutionRunnable implements Runnable { + /** + * Repeatedly get the next peer address from the inactive queue + * and try to connect. + * + *

We can be terminated with Thread.interrupt. When an interrupt is received, + * we will ask the executor to shutdown and ask each peer to disconnect. At that point + * no threads or network connections will be active. + */ + @Override + public void run() { + try { + while (running) { + tryNextPeer(); + + // We started a new peer connection, delay before trying another one + Thread.sleep(CONNECTION_DELAY_MILLIS); + } + } catch (InterruptedException ex) { + synchronized (this) { + running = false; + } + } + + peerPool.shutdownNow(); + + for (Peer peer : peers) { + peer.disconnect(); + } + } + + /* + * Try connecting to a peer. If we exceed the number of connections, delay and try + * again. + */ + private void tryNextPeer() throws InterruptedException { + final PeerAddress address = inactives.take(); + while (true) { + try { + final Peer peer = new Peer(params, address, + blockStore.getChainHead().getHeight(), chain); + Runnable command = new Runnable() { + @Override + public void run() { + try { + log.info("connecting to " + peer); + peer.connect(); + peers.add(peer); + handleNewPeer(peer); + log.info("running " + peer); + peer.run(); + } + finally { + // In all cases, put the address back on the queue. + // We will retry this peer after all other peers have been tried. + inactives.add(address); + peers.remove(peer); + handlePeerDeath(peer); + } + } + }; + peerPool.execute(command); + break; + } catch (RejectedExecutionException e) { + // Reached maxConnections, try again after a delay + + // TODO - consider being smarter about retry. No need to retry + // if we reached maxConnections or if peer queue is empty. Also consider + // exponential backoff on peers and adjusting the sleep time according to the + // lowest backoff value in queue. + } catch (BlockStoreException e) { + // Fatal error + log.error("Block store corrupt?", e); + running = false; + break; + } + + // If we got here, we should retry this address because an error unrelated + // to the peer has occurred. + Thread.sleep(CONNECTION_DELAY_MILLIS); + } + } + } + + /** + * Start downloading the blockchain from the first available peer. + * + *

If no peers are currently connected, the download will be started + * once a peer starts. If the peer dies, the download will resume with another peer. + * + * @param listener a listener for chain download events, may not be null + */ + public synchronized void startBlockChainDownload(PeerEventListener listener) { + this.downloadListener = listener; + // TODO be more nuanced about which peer to download from. We can also try + // downloading from multiple peers and handle the case when a new peer comes along + // with a longer chain after we thought we were done. + if (!peers.isEmpty()) + startBlockChainDownloadFromPeer(peers.iterator().next()); + } + + /** + * Download the blockchain from peers. + * + *

This method wait until the download is complete. "Complete" is defined as downloading + * from at least one peer all the blocks that are in that peer's inventory. + */ + public void downloadBlockChain() { + DownloadListener listener = new DownloadListener(); + startBlockChainDownload(listener); + try { + listener.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected synchronized void handleNewPeer(Peer peer) { + if (downloadListener != null && downloadPeer == null) + startBlockChainDownloadFromPeer(peer); + } + + protected synchronized void handlePeerDeath(Peer peer) { + if (peer == downloadPeer) { + downloadPeer = null; + if (downloadListener != null && !peers.isEmpty()) + startBlockChainDownloadFromPeer(peers.iterator().next()); + } + } + + private synchronized void startBlockChainDownloadFromPeer(Peer peer) { + peer.addEventListener(downloadListener); + try { + peer.startBlockChainDownload(); + } catch (IOException e) { + log.error("failed to start block chain download from " + peer, e); + return; + } + downloadPeer = peer; + } + + static class PeerGroupThreadFactory implements ThreadFactory { + static final AtomicInteger poolNumber = new AtomicInteger(1); + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + PeerGroupThreadFactory() { + group = Thread.currentThread().getThreadGroup(); + namePrefix = "PeerGroup-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + t.setDaemon(true); + return t; + } + } +} diff --git a/src/com/google/bitcoin/core/Wallet.java b/src/com/google/bitcoin/core/Wallet.java index eeb66ec7..08d2355a 100644 --- a/src/com/google/bitcoin/core/Wallet.java +++ b/src/com/google/bitcoin/core/Wallet.java @@ -433,18 +433,44 @@ public class Wallet implements Serializable { } /** - * Sends coins to the given address, via the given {@link Peer}. Change is returned to the first key in the wallet. + * Sends coins to the given address, via the given {@link PeerGroup}. + * Change is returned to the first key in the wallet. + * * @param to Which address to send coins to. * @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this. * @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. * @throws IOException if there was a problem broadcasting the transaction */ - public synchronized Transaction sendCoins(Peer peer, Address to, BigInteger nanocoins) throws IOException { + public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException { + Transaction tx = createSend(to, nanocoins); + if (tx == null) // Not enough money! :-( + return null; + if (!peerGroup.broadcastTransaction(tx)) { + throw new IOException("Failed to broadcast tx to all connected peers"); + } + + // TODO - retry logic + confirmSend(tx); + return tx; + } + + /** + * Sends coins to the given address, via the given {@link Peer}. + * Change is returned to the first key in the wallet. + * + * @param to Which address to send coins to. + * @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this. + * @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. + * @throws IOException if there was a problem broadcasting the transaction + */ + public synchronized Transaction sendCoins(Peer peer, Address to, BigInteger nanocoins) + throws IOException { Transaction tx = createSend(to, nanocoins); if (tx == null) // Not enough money! :-( return null; peer.broadcastTransaction(tx); confirmSend(tx); + return tx; } diff --git a/src/com/google/bitcoin/examples/FetchBlock.java b/src/com/google/bitcoin/examples/FetchBlock.java index d7415bed..2bcfabb6 100644 --- a/src/com/google/bitcoin/examples/FetchBlock.java +++ b/src/com/google/bitcoin/examples/FetchBlock.java @@ -30,11 +30,17 @@ public class FetchBlock { public static void main(String[] args) throws Exception { System.out.println("Connecting to node"); final NetworkParameters params = NetworkParameters.prodNet(); - NetworkConnection conn = new NetworkConnection(InetAddress.getLocalHost(), params, 0, 60000); + BlockStore blockStore = new MemoryBlockStore(params); BlockChain chain = new BlockChain(params, blockStore); - Peer peer = new Peer(params, conn, chain); - peer.start(); + final Peer peer = new Peer(params, new PeerAddress(InetAddress.getLocalHost()), chain); + peer.connect(); + new Thread(new Runnable() { + @Override + public void run() { + peer.run(); + } + }).start(); Sha256Hash blockHash = new Sha256Hash(args[0]); Future future = peer.getBlock(blockHash); diff --git a/src/com/google/bitcoin/examples/PingService.java b/src/com/google/bitcoin/examples/PingService.java index 772ed4ea..68e5cca6 100644 --- a/src/com/google/bitcoin/examples/PingService.java +++ b/src/com/google/bitcoin/examples/PingService.java @@ -16,7 +16,19 @@ package com.google.bitcoin.examples; -import com.google.bitcoin.core.*; +import com.google.bitcoin.core.Address; +import com.google.bitcoin.core.BlockChain; +import com.google.bitcoin.core.DownloadListener; +import com.google.bitcoin.core.ECKey; +import com.google.bitcoin.core.NetworkParameters; +import com.google.bitcoin.core.PeerAddress; +import com.google.bitcoin.core.PeerGroup; +import com.google.bitcoin.core.ScriptException; +import com.google.bitcoin.core.Transaction; +import com.google.bitcoin.core.TransactionInput; +import com.google.bitcoin.core.Utils; +import com.google.bitcoin.core.Wallet; +import com.google.bitcoin.core.WalletEventListener; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BoundedOverheadBlockStore; @@ -24,8 +36,6 @@ import java.io.File; import java.io.IOException; import java.math.BigInteger; import java.net.InetAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; /** *

@@ -77,14 +87,15 @@ public class PingService { // Connect to the localhost node. One minute timeout since we won't try any other peers System.out.println("Connecting ..."); - NetworkConnection conn = new NetworkConnection(InetAddress.getByName("plan99.net"), params, - blockStore.getChainHead().getHeight(), 60000); BlockChain chain = new BlockChain(params, wallet, blockStore); - final Peer peer = new Peer(params, conn, chain); - peer.start(); + + final PeerGroup peerGroup = new PeerGroup(blockStore, params, chain); + peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); + peerGroup.start(); // We want to know when the balance changes. wallet.addEventListener(new WalletEventListener() { + @Override public void onCoinsReceived(Wallet w, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { // Running on a peer thread. assert !newBalance.equals(BigInteger.ZERO); @@ -97,7 +108,7 @@ public class PingService { BigInteger value = tx.getValueSentToMe(w); System.out.println("Received " + Utils.bitcoinValueToFriendlyString(value) + " from " + from.toString()); // Now send the coins back! - Transaction sendTx = w.sendCoins(peer, from, value); + Transaction sendTx = w.sendCoins(peerGroup, from, value); assert sendTx != null; // We should never try to send more coins than we have! System.out.println("Sent coins back! Transaction hash is " + sendTx.getHashAsString()); w.saveToFile(walletFile); @@ -112,24 +123,9 @@ public class PingService { } }); - CountDownLatch progress = peer.startBlockChainDownload(); - long max = progress.getCount(); // Racy but no big deal. - if (max > 0) { - System.out.println("Downloading block chain. " + (max > 1000 ? "This may take a while." : "")); - long current = max; - int lastPercent = 0; - while (current > 0) { - double pct = 100.0 - (100.0 * (current / (double) max)); - if ((int)pct != lastPercent) { - System.out.println(String.format("Chain download %d%% done", (int) pct)); - lastPercent = (int) pct; - } - progress.await(1, TimeUnit.SECONDS); - current = progress.getCount(); - } - } + peerGroup.downloadBlockChain(); System.out.println("Send coins to: " + key.toAddress(params).toString()); System.out.println("Waiting for coins to arrive. Press Ctrl-C to quit."); - // The peer thread keeps us alive until something kills the process. + // The PeerGroup thread keeps us alive until something kills the process. } } diff --git a/src/com/google/bitcoin/examples/PrivateKeys.java b/src/com/google/bitcoin/examples/PrivateKeys.java index 9bda7896..de8789fc 100644 --- a/src/com/google/bitcoin/examples/PrivateKeys.java +++ b/src/com/google/bitcoin/examples/PrivateKeys.java @@ -55,18 +55,21 @@ public class PrivateKeys { wallet.addKey(key); // Find the transactions that involve those coins. - NetworkConnection conn = new NetworkConnection(InetAddress.getLocalHost(), params, 0, 60000); - BlockChain chain = new BlockChain(params, wallet, new MemoryBlockStore(params)); - Peer peer = new Peer(params, conn, chain); - peer.start(); - peer.startBlockChainDownload().await(); + final MemoryBlockStore blockStore = new MemoryBlockStore(params); + BlockChain chain = new BlockChain(params, wallet, blockStore); + + final PeerGroup peerGroup = new PeerGroup(blockStore, params, chain); + peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); + peerGroup.start(); + peerGroup.downloadBlockChain(); + peerGroup.stop(); // And take them! System.out.println("Claiming " + Utils.bitcoinValueToFriendlyString(wallet.getBalance()) + " coins"); - wallet.sendCoins(peer, destination, wallet.getBalance()); + wallet.sendCoins(peerGroup, destination, wallet.getBalance()); // Wait a few seconds to let the packets flush out to the network (ugly). Thread.sleep(5000); - peer.disconnect(); + System.exit(0); } catch (ArrayIndexOutOfBoundsException e) { System.out.println("First arg should be private key in Base58 format. Second argument should be address " + "to send to."); diff --git a/src/com/google/bitcoin/examples/RefreshWallet.java b/src/com/google/bitcoin/examples/RefreshWallet.java index 52905f9a..d31c35d9 100644 --- a/src/com/google/bitcoin/examples/RefreshWallet.java +++ b/src/com/google/bitcoin/examples/RefreshWallet.java @@ -16,14 +16,20 @@ package com.google.bitcoin.examples; -import com.google.bitcoin.core.*; -import com.google.bitcoin.store.*; +import com.google.bitcoin.core.BlockChain; +import com.google.bitcoin.core.DownloadListener; +import com.google.bitcoin.core.NetworkParameters; +import com.google.bitcoin.core.PeerAddress; +import com.google.bitcoin.core.PeerGroup; +import com.google.bitcoin.core.Transaction; +import com.google.bitcoin.core.Wallet; +import com.google.bitcoin.core.WalletEventListener; +import com.google.bitcoin.store.BlockStore; +import com.google.bitcoin.store.MemoryBlockStore; import java.io.File; import java.math.BigInteger; import java.net.InetAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; /** * RefreshWallet loads a wallet, then processes the block chain to update the transaction pools within it. @@ -37,13 +43,14 @@ public class RefreshWallet { // Set up the components and link them together. final NetworkParameters params = NetworkParameters.testNet(); BlockStore blockStore = new MemoryBlockStore(params); - NetworkConnection conn = new NetworkConnection(InetAddress.getLocalHost(), params, - blockStore.getChainHead().getHeight(), 60000); BlockChain chain = new BlockChain(params, wallet, blockStore); - Peer peer = new Peer(params, conn, chain); - peer.start(); + + final PeerGroup peerGroup = new PeerGroup(blockStore, params, chain); + peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); + peerGroup.start(); wallet.addEventListener(new WalletEventListener() { + @Override public void onCoinsReceived(Wallet w, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { System.out.println("\nReceived tx " + tx.getHashAsString()); System.out.println(tx.toString()); @@ -51,19 +58,8 @@ public class RefreshWallet { }); // Now download and process the block chain. - CountDownLatch progress = peer.startBlockChainDownload(); - long max = progress.getCount(); // Racy but no big deal. - if (max > 0) { - System.out.println("Downloading block chain. " + (max > 1000 ? "This may take a while." : "")); - long current = max; - while (current > 0) { - double pct = 100.0 - (100.0 * (current / (double) max)); - System.out.println(String.format("Chain download %d%% done", (int) pct)); - progress.await(1, TimeUnit.SECONDS); - current = progress.getCount(); - } - } - peer.disconnect(); + peerGroup.downloadBlockChain(); + peerGroup.stop(); wallet.saveToFile(file); System.out.println("\nDone!\n"); System.out.println(wallet.toString()); diff --git a/tests/com/google/bitcoin/core/BlockChainTest.java b/tests/com/google/bitcoin/core/BlockChainTest.java index 3867c2a3..5b9fdbd8 100644 --- a/tests/com/google/bitcoin/core/BlockChainTest.java +++ b/tests/com/google/bitcoin/core/BlockChainTest.java @@ -45,6 +45,7 @@ public class BlockChainTest { @Before public void setUp() { + testNetChain = new BlockChain(testNet, new Wallet(testNet), new MemoryBlockStore(testNet)); unitTestParams = NetworkParameters.unitTests();