diff --git a/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java b/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java index 7d1f6ec3..462912e7 100644 --- a/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java +++ b/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java @@ -72,6 +72,7 @@ public class BitcoinSerializer { names.put(HeadersMessage.class, "headers"); names.put(BloomFilter.class, "filterload"); names.put(FilteredBlock.class, "merkleblock"); + names.put(NotFoundMessage.class, "notfound"); } /** @@ -280,6 +281,8 @@ public class BitcoinSerializer { return new AlertMessage(params, payloadBytes); } else if (command.equals("filterload")) { return new BloomFilter(params, payloadBytes); + } else if (command.equals("notfound")) { + return new NotFoundMessage(params, payloadBytes); } else { log.warn("No support for deserializing message with name {}", command); return new UnknownMessage(params, command, payloadBytes); diff --git a/core/src/main/java/com/google/bitcoin/core/NotFoundMessage.java b/core/src/main/java/com/google/bitcoin/core/NotFoundMessage.java new file mode 100644 index 00000000..9ac5b1f2 --- /dev/null +++ b/core/src/main/java/com/google/bitcoin/core/NotFoundMessage.java @@ -0,0 +1,31 @@ +/* + * Copyright 2012 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.core; + +/** + * Sent by a peer when a getdata request doesn't find the requested data in the mempool. It has the same format + * as an inventory message and lists the hashes of the missing items. + */ +public class NotFoundMessage extends InventoryMessage { + public NotFoundMessage(NetworkParameters params) { + super(params); + } + + public NotFoundMessage(NetworkParameters params, byte[] payloadBytes) throws ProtocolException { + super(params, payloadBytes); + } +} diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index e12aca9e..a3c1aff3 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -22,6 +22,9 @@ import com.google.bitcoin.store.BlockStoreException; import com.google.bitcoin.utils.EventListenerInvoker; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.jboss.netty.channel.*; @@ -33,6 +36,7 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; /** * A Peer handles the high level communication with a Bitcoin node. @@ -234,7 +238,11 @@ public class Peer { currentFilteredBlock = null; } - if (m instanceof InventoryMessage) { + if (m instanceof NotFoundMessage) { + // This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool. + // Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next. + processNotFoundMessage((NotFoundMessage) m); + } else if (m instanceof InventoryMessage) { processInv((InventoryMessage) m); } else if (m instanceof Block) { processBlock((Block) m); @@ -258,7 +266,7 @@ public class Peer { } else if (m instanceof HeadersMessage) { processHeaders((HeadersMessage) m); } else if (m instanceof AlertMessage) { - processAlert((AlertMessage)m); + processAlert((AlertMessage) m); } else if (m instanceof VersionMessage) { synchronized (Peer.this) { peerVersionMessage = (VersionMessage)m; @@ -295,6 +303,24 @@ public class Peer { } } + private void processNotFoundMessage(NotFoundMessage m) { + // This is received when we previously did a getdata but the peer couldn't find what we requested in it's + // memory pool. Typically, because we are downloading dependencies of a relevant transaction and reached + // the bottom of the dependency tree (where the unconfirmed transactions connect to transactions that are + // in the chain). + // + // We go through and cancel the pending getdata futures for the items we were told weren't found. + for (ListIterator it = getDataFutures.listIterator(); it.hasNext();) { + GetDataRequest req = it.next(); + for (InventoryItem item : m.getItems()) { + if (item.hash.equals(req.hash)) { + req.future.cancel(true); + getDataFutures.remove(req); + } + } + } + } + private synchronized void processAlert(AlertMessage m) { try { if (m.isSignatureValid()) { @@ -383,6 +409,11 @@ public class Peer { // We may get back a different transaction object. tx = memoryPool.seen(tx, getAddress()); } + if (maybeHandleRequestedData(tx)) + return; + // Tell all listeners (like wallets) about this tx so they can decide whether to keep it or not. If no + // listener keeps a reference around then the memory pool will forget about it after a while too because + // it uses weak references. final Transaction ftx = tx; EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker() { @Override @@ -392,6 +423,129 @@ public class Peer { }); } + /** + *

Returns a future that wraps a list of all transactions that the given transaction depends on, recursively. + * Only transactions in peers memory pools are included; the recursion stops at transactions that are in the + * current best chain. So it doesn't make much sense to provide a tx that was already in the best chain and + * a precondition checks this.

+ * + *

For example, if tx has 2 inputs that connect to transactions A and B, and transaction B is unconfirmed and + * has one input connecting to transaction C that is unconfirmed, and transaction C connects to transaction D + * that is in the chain, then this method will return either {B, C} or {C, B}.

+ * + *

This method is useful for apps that want to learn about how long an unconfirmed transaction might take + * to confirm, by checking for unexpectedly time locked transactions, unusually deep dependency trees or fee-paying + * transactions that depend on unconfirmed free transactions.

+ * + *

Note that dependencies downloaded this way will not trigger the onTransaction method of event listeners.

+ */ + public ListenableFuture> downloadDependencies(Transaction tx) { + TransactionConfidence.ConfidenceType txConfidence = tx.getConfidence().getConfidenceType(); + Preconditions.checkArgument(txConfidence != TransactionConfidence.ConfidenceType.BUILDING); + log.info("Downloading dependencies of {}", tx.getHashAsString()); + final LinkedList results = new LinkedList(); + // future will be invoked when the entire dependency tree has been walked and the results compiled. + final ListenableFuture future = downloadDependenciesInternal(tx, new Object(), results); + final SettableFuture> resultFuture = SettableFuture.create(); + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Object _) { + resultFuture.set(results); + } + + public void onFailure(Throwable throwable) { + resultFuture.setException(throwable); + } + }); + return resultFuture; + } + + // The marker object in the future returned is the same as the parameter. It is arbitrary and can be anything. + private ListenableFuture downloadDependenciesInternal(final Transaction tx, + final Object marker, + final List results) { + final SettableFuture resultFuture = SettableFuture.create(); + final Sha256Hash rootTxHash = tx.getHash(); + // We want to recursively grab its dependencies. This is so listeners can learn important information like + // whether a transaction is dependent on a timelocked transaction or has an unexpectedly deep dependency tree + // or depends on a no-fee transaction. + // + // Firstly find any that are already in the memory pool so if they weren't garbage collected yet, they won't + // be deleted. Use COW sets to make unit tests deterministic and because they are small. It's slower for + // the case of transactions with tons of inputs. + Set dependencies = new CopyOnWriteArraySet(); + Set needToRequest = new CopyOnWriteArraySet(); + for (TransactionInput input : tx.getInputs()) { + // There may be multiple inputs that connect to the same transaction. + Sha256Hash hash = input.getOutpoint().getHash(); + Transaction dep = memoryPool.get(hash); + if (dep == null) { + needToRequest.add(hash); + } else { + dependencies.add(dep); + } + } + results.addAll(dependencies); + try { + // Build the request for the missing dependencies. + List> futures = Lists.newArrayList(); + GetDataMessage getdata = new GetDataMessage(params); + for (Sha256Hash hash : needToRequest) { + getdata.addTransaction(hash); + GetDataRequest req = new GetDataRequest(); + req.hash = hash; + req.future = SettableFuture.create(); + futures.add(req.future); + getDataFutures.add(req); + } + // The transactions we already grabbed out of the mempool must still be considered by the code below. + for (Transaction dep : dependencies) { + futures.add(Futures.immediateFuture(dep)); + } + ListenableFuture> successful = Futures.successfulAsList(futures); + Futures.addCallback(successful, new FutureCallback>() { + public void onSuccess(List transactions) { + // Once all transactions either were received, or we know there are no more to come ... + // Note that transactions will contain "null" for any positions that weren't successful. + List> childFutures = Lists.newLinkedList(); + for (Transaction tx : transactions) { + if (tx == null) continue; + log.info("Downloaded dependency of {}: {}", rootTxHash, tx.getHashAsString()); + results.add(tx); + // Now recurse into the dependencies of this transaction too. + childFutures.add(downloadDependenciesInternal(tx, marker, results)); + } + if (childFutures.size() == 0) { + // Short-circuit: we're at the bottom of this part of the tree. + resultFuture.set(marker); + } else { + // There are some children to download. Wait until it's done (and their children and their + // children...) to inform the caller that we're finished. + Futures.addCallback(Futures.successfulAsList(childFutures), new FutureCallback>() { + public void onSuccess(List objects) { + resultFuture.set(marker); + } + + public void onFailure(Throwable throwable) { + resultFuture.setException(throwable); + } + }); + } + } + + public void onFailure(Throwable throwable) { + resultFuture.setException(throwable); + } + }); + // Start the operation. + sendMessage(getdata); + } catch (IOException e) { + log.error("Couldn't send getdata in downloadDependencies({})", tx.getHash()); + resultFuture.setException(e); + return resultFuture; + } + return resultFuture; + } + private synchronized void processBlock(Block m) throws IOException { log.debug("{}: Received broadcast block {}", address, m.getHashAsString()); try { @@ -481,7 +635,7 @@ public class Peer { } } - private boolean maybeHandleRequestedData(Block m) { + private boolean maybeHandleRequestedData(Message m) { boolean found = false; Sha256Hash hash = m.getHash(); for (ListIterator it = getDataFutures.listIterator(); it.hasNext();) { @@ -622,18 +776,33 @@ public class Peer { * Asks the connected peer for the block of the given hash, and returns a future representing the answer. * If you want the block right away and don't mind waiting for it, just call .get() on the result. Your thread * will block until the peer answers. - * - * @param blockHash Hash of the block you wareare requesting. - * @throws IOException */ public ListenableFuture getBlock(Sha256Hash blockHash) throws IOException { log.info("Request to fetch block {}", blockHash); GetDataMessage getdata = new GetDataMessage(params); - InventoryItem inventoryItem = new InventoryItem(InventoryItem.Type.Block, blockHash); - getdata.addItem(inventoryItem); + getdata.addBlock(blockHash); + return sendSingleGetData(getdata); + } + + /** + * Asks the connected peer for the given transaction from its memory pool. Transactions in the chain cannot be + * retrieved this way because peers don't have a transaction ID to transaction-pos-on-disk index, and besides, + * in future many peers will delete old transaction data they don't need. + */ + public ListenableFuture getPeerMempoolTransaction(Sha256Hash hash) throws IOException { + // TODO: Unit test this method. + log.info("Request to fetch peer mempool tx {}", hash); + GetDataMessage getdata = new GetDataMessage(params); + getdata.addTransaction(hash); + return sendSingleGetData(getdata); + } + + /** Sends a getdata with a single item in it. */ + private ListenableFuture sendSingleGetData(GetDataMessage getdata) throws IOException { + Preconditions.checkArgument(getdata.getItems().size() == 1); GetDataRequest req = new GetDataRequest(); req.future = SettableFuture.create(); - req.hash = blockHash; + req.hash = getdata.getItems().get(0).hash; getDataFutures.add(req); sendMessage(getdata); return req.future; diff --git a/core/src/main/java/com/google/bitcoin/core/Wallet.java b/core/src/main/java/com/google/bitcoin/core/Wallet.java index 884ca793..5b1bf688 100644 --- a/core/src/main/java/com/google/bitcoin/core/Wallet.java +++ b/core/src/main/java/com/google/bitcoin/core/Wallet.java @@ -2211,7 +2211,7 @@ public class Wallet implements Serializable, BlockChainListener { * The returned object can be used to connect the wallet to a {@link Peer} or {@link PeerGroup} in order to * receive and process blocks and transactions. */ - public synchronized PeerEventListener getPeerEventListener() { + synchronized PeerEventListener getPeerEventListener() { if (peerEventListener == null) { // Instantiate here to avoid issues with wallets resurrected from serialized copies. peerEventListener = new AbstractPeerEventListener() { diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index 9ad4075a..f8faa557 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -19,6 +19,7 @@ package com.google.bitcoin.core; import com.google.bitcoin.core.Peer.PeerHandler; import com.google.common.util.concurrent.ListenableFuture; import org.easymock.Capture; +import org.easymock.CaptureType; import org.jboss.netty.channel.*; import org.junit.Before; import org.junit.Test; @@ -38,6 +39,7 @@ public class PeerTest extends TestWithNetworkConnections { private Capture event; private PeerHandler handler; private static final int OTHER_PEER_CHAIN_HEIGHT = 110; + private MemoryPool memoryPool; @Override @Before @@ -47,8 +49,10 @@ public class PeerTest extends TestWithNetworkConnections { VersionMessage ver = new VersionMessage(unitTestParams, 100); peer = new Peer(unitTestParams, blockChain, ver); peer.addWallet(wallet); + memoryPool = new MemoryPool(); + peer.setMemoryPool(memoryPool); handler = peer.getHandler(); - event = new Capture(); + event = new Capture(CaptureType.ALL); pipeline.sendDownstream(capture(event)); expectLastCall().anyTimes(); } @@ -409,7 +413,7 @@ public class PeerTest extends TestWithNetworkConnections { // Request headers until the last 2 blocks. peer.setDownloadParameters((Utils.now().getTime() / 1000) - (600*2) + 1, false); peer.startBlockChainDownload(); - GetHeadersMessage getheaders = (GetHeadersMessage) event.getValue().getMessage(); + GetHeadersMessage getheaders = (GetHeadersMessage) outbound(); List expectedLocator = new ArrayList(); expectedLocator.add(b1.getHash()); expectedLocator.add(unitTestParams.genesisBlock.getHash()); @@ -424,7 +428,7 @@ public class PeerTest extends TestWithNetworkConnections { expectedLocator.add(b1.getHash()); expectedLocator.add(unitTestParams.genesisBlock.getHash()); inbound(peer, headers); - GetBlocksMessage getblocks = (GetBlocksMessage) event.getValue().getMessage(); + GetBlocksMessage getblocks = (GetBlocksMessage) outbound(); assertEquals(expectedLocator, getblocks.getLocator()); assertEquals(Sha256Hash.ZERO_HASH, getblocks.getStopHash()); // We're supposed to get an inv here. @@ -467,10 +471,100 @@ public class PeerTest extends TestWithNetworkConnections { assertEquals(elapsed, peer.getLastPingTime()); assertEquals(7250, peer.getPingTime()); } - + + @Test + public void recursiveDownload() throws Exception { + // Our peer announces a normal transaction that depends on a different transaction that is time locked such + // that it will never confirm. There's not currently any use case for doing that to us, so it's an attack. + // + // Peer should notice this by downloading all transaction dependencies and searching for timelocked ones. + // Also, if a dependency chain is absurdly deep, the wallet shouldn't hear about it because it may just be + // a different way to achieve the same thing (a payment that will not confirm for a very long time). + control.replay(); + connect(); + + final Transaction[] onTx = new Transaction[1]; + peer.addEventListener(new AbstractPeerEventListener() { + @Override + public void onTransaction(Peer peer, Transaction t) { + onTx[0] = t; + } + }); + + // Make the some fake transactions in the following graph: + // t1 -> t2 -> [t5] + // -> t3 -> t4 -> [t6] + // The ones in brackets are assumed to be in the chain and are represented only by hashes. + ECKey to = new ECKey(); + Transaction t2 = TestUtils.createFakeTx(unitTestParams, Utils.toNanoCoins(1, 0), to); + Sha256Hash t5 = t2.getInput(0).getOutpoint().getHash(); + Transaction t4 = TestUtils.createFakeTx(unitTestParams, Utils.toNanoCoins(1, 0), new ECKey()); + Sha256Hash t6 = t4.getInput(0).getOutpoint().getHash(); + t4.addOutput(Utils.toNanoCoins(1, 0), new ECKey()); + Transaction t3 = new Transaction(unitTestParams); + t3.addInput(t4.getOutput(0)); + t3.addOutput(Utils.toNanoCoins(1, 0), new ECKey()); + Transaction t1 = new Transaction(unitTestParams); + t1.addInput(t2.getOutput(0)); + t1.addInput(t3.getOutput(0)); + t1.addOutput(Utils.toNanoCoins(1, 0), to); + t1 = TestUtils.roundTripTransaction(unitTestParams, t1); + t2 = TestUtils.roundTripTransaction(unitTestParams, t2); + t3 = TestUtils.roundTripTransaction(unitTestParams, t3); + t4 = TestUtils.roundTripTransaction(unitTestParams, t4); + + // Announce the first one. Wait for it to be downloaded. + InventoryMessage inv = new InventoryMessage(unitTestParams); + inv.addTransaction(t1); + inbound(peer, inv); + GetDataMessage getdata = (GetDataMessage) outbound(); + assertEquals(t1.getHash(), getdata.getItems().get(0).hash); + inbound(peer, t1); + assertEquals(t1, onTx[0]); + // We want its dependencies so ask for them. + ListenableFuture> futures = peer.downloadDependencies(t1); + assertFalse(futures.isDone()); + // It will recursively ask for the dependencies of t1: t2 and t3. + getdata = (GetDataMessage) outbound(); + assertEquals(t2.getHash(), getdata.getItems().get(0).hash); + assertEquals(t3.getHash(), getdata.getItems().get(1).hash); + // For some random reason, t4 is delivered at this point before it's needed - perhaps it was a Bloom filter + // false positive. We do this to check that the mempool is being checked for seen transactions before + // requesting them. + inbound(peer, t4); + // Deliver the requested transactions. + inbound(peer, t2); + inbound(peer, t3); + assertFalse(futures.isDone()); + // It will recursively ask for the dependencies of t2: t5 and t4, but not t3 because it already found t4. + getdata = (GetDataMessage) outbound(); + assertEquals(getdata.getItems().get(0).hash, t2.getInput(0).getOutpoint().getHash()); + // t5 isn't found and t4 is. + NotFoundMessage notFound = new NotFoundMessage(unitTestParams); + notFound.addItem(new InventoryItem(InventoryItem.Type.Transaction, t5)); + inbound(peer, notFound); + assertFalse(futures.isDone()); + // Continue to explore the t4 branch and ask for t6, which is in the chain. + getdata = (GetDataMessage) outbound(); + assertEquals(t6, getdata.getItems().get(0).hash); + notFound = new NotFoundMessage(unitTestParams); + notFound.addItem(new InventoryItem(InventoryItem.Type.Transaction, t6)); + inbound(peer, notFound); + // That's it, we explored the entire tree. + assertTrue(futures.isDone()); + List results = futures.get(); + assertTrue(results.contains(t2)); + assertTrue(results.contains(t3)); + assertTrue(results.contains(t4)); + } + + // TODO: Use generics here to avoid unnecessary casting. private Message outbound() { - Message message = (Message)event.getValue().getMessage(); - event.reset(); + List messages = event.getValues(); + if (messages.isEmpty()) + throw new AssertionError("No messages sent when one was expected"); + Message message = (Message)messages.get(0).getMessage(); + messages.remove(0); return message; } } diff --git a/examples/src/main/java/com/google/bitcoin/examples/FetchBlock.java b/examples/src/main/java/com/google/bitcoin/examples/FetchBlock.java index 403450a1..97d6997f 100644 --- a/examples/src/main/java/com/google/bitcoin/examples/FetchBlock.java +++ b/examples/src/main/java/com/google/bitcoin/examples/FetchBlock.java @@ -37,7 +37,7 @@ public class FetchBlock { BlockChain chain = new BlockChain(params, blockStore); PeerGroup peerGroup = new PeerGroup(params, chain); peerGroup.startAndWait(); - PeerAddress addr = new PeerAddress(InetAddress.getLocalHost(), params.port, 31000); + PeerAddress addr = new PeerAddress(InetAddress.getLocalHost(), params.port); peerGroup.addAddress(addr); peerGroup.waitForPeers(1).get(); Peer peer = peerGroup.getConnectedPeers().get(0); diff --git a/examples/src/main/java/com/google/bitcoin/examples/FetchTransactions.java b/examples/src/main/java/com/google/bitcoin/examples/FetchTransactions.java new file mode 100644 index 00000000..bb7f0a6a --- /dev/null +++ b/examples/src/main/java/com/google/bitcoin/examples/FetchTransactions.java @@ -0,0 +1,60 @@ +/* + * Copyright 2012 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.examples; + +import com.google.bitcoin.core.*; +import com.google.bitcoin.store.BlockStore; +import com.google.bitcoin.store.MemoryBlockStore; +import com.google.bitcoin.utils.BriefLogFormatter; +import com.google.common.util.concurrent.ListenableFuture; + +import java.net.InetAddress; +import java.util.List; + +/** + * Downloads the given transaction and its dependencies from a peers memory pool then prints them out. + */ +public class FetchTransactions { + public static void main(String[] args) throws Exception { + BriefLogFormatter.init(); + System.out.println("Connecting to node"); + final NetworkParameters params = NetworkParameters.testNet(); + + BlockStore blockStore = new MemoryBlockStore(params); + BlockChain chain = new BlockChain(params, blockStore); + PeerGroup peerGroup = new PeerGroup(params, chain); + peerGroup.startAndWait(); + peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost(), params.port)); + peerGroup.waitForPeers(1).get(); + Peer peer = peerGroup.getConnectedPeers().get(0); + + Sha256Hash txHash = new Sha256Hash(args[0]); + ListenableFuture future = peer.getPeerMempoolTransaction(txHash); + System.out.println("Waiting for node to send us the requested transaction: " + txHash); + Transaction tx = future.get(); + System.out.println(tx); + + System.out.println("Waiting for node to send us the dependencies ..."); + List deps = peer.downloadDependencies(tx).get(); + for (Transaction dep : deps) { + System.out.println("Got dependency " + dep.getHashAsString()); + } + + System.out.println("Done."); + peerGroup.stopAndWait(); + } +}