From 4254e276fb38222c26a5d8531f2b0c6cdd40bef1 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Mon, 14 Jan 2013 22:30:32 +0100 Subject: [PATCH] Clean up Peer.java a bit - remove the custom future implementation and use Guava futures instead. --- .../java/com/google/bitcoin/core/Peer.java | 119 ++++++------------ 1 file changed, 36 insertions(+), 83 deletions(-) diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index 074f0f47..9d397373 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -31,7 +31,7 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.CopyOnWriteArrayList; /** * A Peer handles the high level communication with a Bitcoin node. @@ -50,10 +50,6 @@ public class Peer { private final NetworkParameters params; private final AbstractBlockChain blockChain; - // When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here - // whilst waiting for the response. Synchronized on itself. Is not used for downloads Peer generates itself. - // TODO: Make this work for transactions as well. - private final List> pendingGetBlockFutures; private PeerAddress address; private List eventListeners; private List lifecycleListeners; @@ -78,13 +74,21 @@ public class Peer { // received at which point it gets set to true again. This isn't relevant unless downloadData is true. private boolean downloadBlockBodies = true; // Keeps track of things we requested internally with getdata but didn't receive yet, so we can avoid re-requests. - // It's not quite the same as pendingGetBlockFutures, as this is used only for getdatas done as part of downloading + // It's not quite the same as getDataFutures, as this is used only for getdatas done as part of downloading // the chain and so is lighter weight (we just keep a bunch of hashes not futures). // // It is important to avoid a nasty edge case where we can end up with parallel chain downloads proceeding // simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us. private HashSet pendingBlockDownloads = new HashSet(); + // When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here + // whilst waiting for the response. Synchronized on itself. Is not used for downloads Peer generates itself. + private static class GetDataRequest { + Sha256Hash hash; + SettableFuture future; + } + private final List getDataFutures; + // Outstanding pings against this peer and how long the last one took to complete. private CopyOnWriteArrayList pendingPings; private long[] lastPingTimes; @@ -103,7 +107,7 @@ public class Peer { this.versionMessage = Preconditions.checkNotNull(ver); this.blockChain = chain; // Allowed to be null. this.downloadData = chain != null; - this.pendingGetBlockFutures = new ArrayList>(); + this.getDataFutures = new CopyOnWriteArrayList(); this.eventListeners = new CopyOnWriteArrayList(); this.lifecycleListeners = new CopyOnWriteArrayList(); this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); @@ -372,18 +376,8 @@ public class Peer { log.debug("{}: Received broadcast block {}", address, m.getHashAsString()); try { // Was this block requested by getBlock()? - synchronized (pendingGetBlockFutures) { - for (int i = 0; i < pendingGetBlockFutures.size(); i++) { - GetDataFuture f = pendingGetBlockFutures.get(i); - if (f.getItem().hash.equals(m.getHash())) { - // Yes, it was. So pass it through the future. - f.setResult(m); - // Blocks explicitly requested don't get sent to the block chain. - pendingGetBlockFutures.remove(i); - return; - } - } - } + if (maybeHandleRequestedData(m)) return; + if (!downloadData) { log.warn("Received block we did not ask for: {}", m.getHashAsString()); return; @@ -423,6 +417,21 @@ public class Peer { } } + private boolean maybeHandleRequestedData(Block m) { + boolean found = false; + Sha256Hash hash = m.getHash(); + for (ListIterator it = getDataFutures.listIterator(); it.hasNext();) { + GetDataRequest req = it.next(); + if (hash.equals(req.hash)) { + req.future.set(m); + getDataFutures.remove(req); + found = true; + // Keep going in case there are more. + } + } + return found; + } + private synchronized void invokeOnBlocksDownloaded(final Block m) { // It is possible for the peer block height difference to be negative when blocks have been solved and broadcast // since the time we first connected to the peer. However, it's weird and unexpected to receive a callback @@ -535,28 +544,24 @@ public class Peer { } /** - * Asks the connected peer for the block of the given hash, and returns a Future representing the answer. + * Asks the connected peer for the block of the given hash, and returns a future representing the answer. * If you want the block right away and don't mind waiting for it, just call .get() on the result. Your thread - * will block until the peer answers. You can also use the Future object to wait with a timeout, or just check - * whether it's done later. + * will block until the peer answers. * * @param blockHash Hash of the block you wareare requesting. * @throws IOException */ - public Future getBlock(Sha256Hash blockHash) throws IOException { + public ListenableFuture getBlock(Sha256Hash blockHash) throws IOException { log.info("Request to fetch block {}", blockHash); GetDataMessage getdata = new GetDataMessage(params); InventoryItem inventoryItem = new InventoryItem(InventoryItem.Type.Block, blockHash); getdata.addItem(inventoryItem); - GetDataFuture future = new GetDataFuture(inventoryItem); - // Add to the list of things we're waiting for. It's important this come before the network send to avoid - // race conditions. - synchronized (pendingGetBlockFutures) { - pendingGetBlockFutures.add(future); - } - + GetDataRequest req = new GetDataRequest(); + req.future = SettableFuture.create(); + req.hash = blockHash; + getDataFutures.add(req); sendMessage(getdata); - return future; + return req.future; } /** @@ -597,58 +602,6 @@ public class Peer { removeEventListener(wallet.getPeerEventListener()); } - // A GetDataFuture wraps the result of a getBlock or (in future) getTransaction so the owner of the object can - // decide whether to wait forever, wait for a short while or check later after doing other work. - private static class GetDataFuture implements Future { - private boolean cancelled; - private final InventoryItem item; - private final CountDownLatch latch; - private T result; - - GetDataFuture(InventoryItem item) { - this.item = item; - this.latch = new CountDownLatch(1); - } - - public boolean cancel(boolean b) { - // Cannot cancel a getdata - once sent, it's sent. - cancelled = true; - return false; - } - - public boolean isCancelled() { - return cancelled; - } - - public boolean isDone() { - return result != null || cancelled; - } - - public T get() throws InterruptedException, ExecutionException { - latch.await(); - return Preconditions.checkNotNull(result); - } - - public T get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { - if (!latch.await(l, timeUnit)) - throw new TimeoutException(); - return Preconditions.checkNotNull(result); - } - - InventoryItem getItem() { - return item; - } - - /** Called by the Peer when the result has arrived. Completes the task. */ - void setResult(T result) { - // This should be called in the network loop thread for this peer - this.result = result; - // Now release the thread that is waiting. We don't need to synchronize here as the latch establishes - // a memory barrier. - latch.countDown(); - } - } - /** * Sends the given message on the peers Channel. */