3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 06:44:16 +00:00

Add support for recursively downloading mempool dependencies of a given mempool transaction.

This isn't currently used anywhere, but can be used as the first step of doing some basic risk analysis of unconfirmed transactions.
This commit is contained in:
Mike Hearn 2013-01-19 18:57:10 +01:00
parent 43e1d084cb
commit b7b52c3fc9
7 changed files with 374 additions and 17 deletions

View File

@ -72,6 +72,7 @@ public class BitcoinSerializer {
names.put(HeadersMessage.class, "headers");
names.put(BloomFilter.class, "filterload");
names.put(FilteredBlock.class, "merkleblock");
names.put(NotFoundMessage.class, "notfound");
}
/**
@ -280,6 +281,8 @@ public class BitcoinSerializer {
return new AlertMessage(params, payloadBytes);
} else if (command.equals("filterload")) {
return new BloomFilter(params, payloadBytes);
} else if (command.equals("notfound")) {
return new NotFoundMessage(params, payloadBytes);
} else {
log.warn("No support for deserializing message with name {}", command);
return new UnknownMessage(params, command, payloadBytes);

View File

@ -0,0 +1,31 @@
/*
* Copyright 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.core;
/**
* Sent by a peer when a getdata request doesn't find the requested data in the mempool. It has the same format
* as an inventory message and lists the hashes of the missing items.
*/
public class NotFoundMessage extends InventoryMessage {
public NotFoundMessage(NetworkParameters params) {
super(params);
}
public NotFoundMessage(NetworkParameters params, byte[] payloadBytes) throws ProtocolException {
super(params, payloadBytes);
}
}

View File

@ -22,6 +22,9 @@ import com.google.bitcoin.store.BlockStoreException;
import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.jboss.netty.channel.*;
@ -33,6 +36,7 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* A Peer handles the high level communication with a Bitcoin node.
@ -234,7 +238,11 @@ public class Peer {
currentFilteredBlock = null;
}
if (m instanceof InventoryMessage) {
if (m instanceof NotFoundMessage) {
// This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool.
// Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next.
processNotFoundMessage((NotFoundMessage) m);
} else if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m);
} else if (m instanceof Block) {
processBlock((Block) m);
@ -258,7 +266,7 @@ public class Peer {
} else if (m instanceof HeadersMessage) {
processHeaders((HeadersMessage) m);
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage)m);
processAlert((AlertMessage) m);
} else if (m instanceof VersionMessage) {
synchronized (Peer.this) {
peerVersionMessage = (VersionMessage)m;
@ -295,6 +303,24 @@ public class Peer {
}
}
private void processNotFoundMessage(NotFoundMessage m) {
// This is received when we previously did a getdata but the peer couldn't find what we requested in it's
// memory pool. Typically, because we are downloading dependencies of a relevant transaction and reached
// the bottom of the dependency tree (where the unconfirmed transactions connect to transactions that are
// in the chain).
//
// We go through and cancel the pending getdata futures for the items we were told weren't found.
for (ListIterator<GetDataRequest> it = getDataFutures.listIterator(); it.hasNext();) {
GetDataRequest req = it.next();
for (InventoryItem item : m.getItems()) {
if (item.hash.equals(req.hash)) {
req.future.cancel(true);
getDataFutures.remove(req);
}
}
}
}
private synchronized void processAlert(AlertMessage m) {
try {
if (m.isSignatureValid()) {
@ -383,6 +409,11 @@ public class Peer {
// We may get back a different transaction object.
tx = memoryPool.seen(tx, getAddress());
}
if (maybeHandleRequestedData(tx))
return;
// Tell all listeners (like wallets) about this tx so they can decide whether to keep it or not. If no
// listener keeps a reference around then the memory pool will forget about it after a while too because
// it uses weak references.
final Transaction ftx = tx;
EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker<PeerEventListener>() {
@Override
@ -392,6 +423,129 @@ public class Peer {
});
}
/**
* <p>Returns a future that wraps a list of all transactions that the given transaction depends on, recursively.
* Only transactions in peers memory pools are included; the recursion stops at transactions that are in the
* current best chain. So it doesn't make much sense to provide a tx that was already in the best chain and
* a precondition checks this.</p>
*
* <p>For example, if tx has 2 inputs that connect to transactions A and B, and transaction B is unconfirmed and
* has one input connecting to transaction C that is unconfirmed, and transaction C connects to transaction D
* that is in the chain, then this method will return either {B, C} or {C, B}.</p>
*
* <p>This method is useful for apps that want to learn about how long an unconfirmed transaction might take
* to confirm, by checking for unexpectedly time locked transactions, unusually deep dependency trees or fee-paying
* transactions that depend on unconfirmed free transactions.</p>
*
* <p>Note that dependencies downloaded this way will not trigger the onTransaction method of event listeners.</p>
*/
public ListenableFuture<List<Transaction>> downloadDependencies(Transaction tx) {
TransactionConfidence.ConfidenceType txConfidence = tx.getConfidence().getConfidenceType();
Preconditions.checkArgument(txConfidence != TransactionConfidence.ConfidenceType.BUILDING);
log.info("Downloading dependencies of {}", tx.getHashAsString());
final LinkedList<Transaction> results = new LinkedList<Transaction>();
// future will be invoked when the entire dependency tree has been walked and the results compiled.
final ListenableFuture future = downloadDependenciesInternal(tx, new Object(), results);
final SettableFuture<List<Transaction>> resultFuture = SettableFuture.create();
Futures.addCallback(future, new FutureCallback() {
public void onSuccess(Object _) {
resultFuture.set(results);
}
public void onFailure(Throwable throwable) {
resultFuture.setException(throwable);
}
});
return resultFuture;
}
// The marker object in the future returned is the same as the parameter. It is arbitrary and can be anything.
private ListenableFuture<Object> downloadDependenciesInternal(final Transaction tx,
final Object marker,
final List<Transaction> results) {
final SettableFuture<Object> resultFuture = SettableFuture.create();
final Sha256Hash rootTxHash = tx.getHash();
// We want to recursively grab its dependencies. This is so listeners can learn important information like
// whether a transaction is dependent on a timelocked transaction or has an unexpectedly deep dependency tree
// or depends on a no-fee transaction.
//
// Firstly find any that are already in the memory pool so if they weren't garbage collected yet, they won't
// be deleted. Use COW sets to make unit tests deterministic and because they are small. It's slower for
// the case of transactions with tons of inputs.
Set<Transaction> dependencies = new CopyOnWriteArraySet<Transaction>();
Set<Sha256Hash> needToRequest = new CopyOnWriteArraySet<Sha256Hash>();
for (TransactionInput input : tx.getInputs()) {
// There may be multiple inputs that connect to the same transaction.
Sha256Hash hash = input.getOutpoint().getHash();
Transaction dep = memoryPool.get(hash);
if (dep == null) {
needToRequest.add(hash);
} else {
dependencies.add(dep);
}
}
results.addAll(dependencies);
try {
// Build the request for the missing dependencies.
List<ListenableFuture<Transaction>> futures = Lists.newArrayList();
GetDataMessage getdata = new GetDataMessage(params);
for (Sha256Hash hash : needToRequest) {
getdata.addTransaction(hash);
GetDataRequest req = new GetDataRequest();
req.hash = hash;
req.future = SettableFuture.create();
futures.add(req.future);
getDataFutures.add(req);
}
// The transactions we already grabbed out of the mempool must still be considered by the code below.
for (Transaction dep : dependencies) {
futures.add(Futures.immediateFuture(dep));
}
ListenableFuture<List<Transaction>> successful = Futures.successfulAsList(futures);
Futures.addCallback(successful, new FutureCallback<List<Transaction>>() {
public void onSuccess(List<Transaction> transactions) {
// Once all transactions either were received, or we know there are no more to come ...
// Note that transactions will contain "null" for any positions that weren't successful.
List<ListenableFuture<Object>> childFutures = Lists.newLinkedList();
for (Transaction tx : transactions) {
if (tx == null) continue;
log.info("Downloaded dependency of {}: {}", rootTxHash, tx.getHashAsString());
results.add(tx);
// Now recurse into the dependencies of this transaction too.
childFutures.add(downloadDependenciesInternal(tx, marker, results));
}
if (childFutures.size() == 0) {
// Short-circuit: we're at the bottom of this part of the tree.
resultFuture.set(marker);
} else {
// There are some children to download. Wait until it's done (and their children and their
// children...) to inform the caller that we're finished.
Futures.addCallback(Futures.successfulAsList(childFutures), new FutureCallback<List<Object>>() {
public void onSuccess(List<Object> objects) {
resultFuture.set(marker);
}
public void onFailure(Throwable throwable) {
resultFuture.setException(throwable);
}
});
}
}
public void onFailure(Throwable throwable) {
resultFuture.setException(throwable);
}
});
// Start the operation.
sendMessage(getdata);
} catch (IOException e) {
log.error("Couldn't send getdata in downloadDependencies({})", tx.getHash());
resultFuture.setException(e);
return resultFuture;
}
return resultFuture;
}
private synchronized void processBlock(Block m) throws IOException {
log.debug("{}: Received broadcast block {}", address, m.getHashAsString());
try {
@ -481,7 +635,7 @@ public class Peer {
}
}
private boolean maybeHandleRequestedData(Block m) {
private boolean maybeHandleRequestedData(Message m) {
boolean found = false;
Sha256Hash hash = m.getHash();
for (ListIterator<GetDataRequest> it = getDataFutures.listIterator(); it.hasNext();) {
@ -622,18 +776,33 @@ public class Peer {
* Asks the connected peer for the block of the given hash, and returns a future representing the answer.
* If you want the block right away and don't mind waiting for it, just call .get() on the result. Your thread
* will block until the peer answers.
*
* @param blockHash Hash of the block you wareare requesting.
* @throws IOException
*/
public ListenableFuture<Block> getBlock(Sha256Hash blockHash) throws IOException {
log.info("Request to fetch block {}", blockHash);
GetDataMessage getdata = new GetDataMessage(params);
InventoryItem inventoryItem = new InventoryItem(InventoryItem.Type.Block, blockHash);
getdata.addItem(inventoryItem);
getdata.addBlock(blockHash);
return sendSingleGetData(getdata);
}
/**
* Asks the connected peer for the given transaction from its memory pool. Transactions in the chain cannot be
* retrieved this way because peers don't have a transaction ID to transaction-pos-on-disk index, and besides,
* in future many peers will delete old transaction data they don't need.
*/
public ListenableFuture<Transaction> getPeerMempoolTransaction(Sha256Hash hash) throws IOException {
// TODO: Unit test this method.
log.info("Request to fetch peer mempool tx {}", hash);
GetDataMessage getdata = new GetDataMessage(params);
getdata.addTransaction(hash);
return sendSingleGetData(getdata);
}
/** Sends a getdata with a single item in it. */
private ListenableFuture sendSingleGetData(GetDataMessage getdata) throws IOException {
Preconditions.checkArgument(getdata.getItems().size() == 1);
GetDataRequest req = new GetDataRequest();
req.future = SettableFuture.create();
req.hash = blockHash;
req.hash = getdata.getItems().get(0).hash;
getDataFutures.add(req);
sendMessage(getdata);
return req.future;

View File

@ -2211,7 +2211,7 @@ public class Wallet implements Serializable, BlockChainListener {
* The returned object can be used to connect the wallet to a {@link Peer} or {@link PeerGroup} in order to
* receive and process blocks and transactions.
*/
public synchronized PeerEventListener getPeerEventListener() {
synchronized PeerEventListener getPeerEventListener() {
if (peerEventListener == null) {
// Instantiate here to avoid issues with wallets resurrected from serialized copies.
peerEventListener = new AbstractPeerEventListener() {

View File

@ -19,6 +19,7 @@ package com.google.bitcoin.core;
import com.google.bitcoin.core.Peer.PeerHandler;
import com.google.common.util.concurrent.ListenableFuture;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.jboss.netty.channel.*;
import org.junit.Before;
import org.junit.Test;
@ -38,6 +39,7 @@ public class PeerTest extends TestWithNetworkConnections {
private Capture<DownstreamMessageEvent> event;
private PeerHandler handler;
private static final int OTHER_PEER_CHAIN_HEIGHT = 110;
private MemoryPool memoryPool;
@Override
@Before
@ -47,8 +49,10 @@ public class PeerTest extends TestWithNetworkConnections {
VersionMessage ver = new VersionMessage(unitTestParams, 100);
peer = new Peer(unitTestParams, blockChain, ver);
peer.addWallet(wallet);
memoryPool = new MemoryPool();
peer.setMemoryPool(memoryPool);
handler = peer.getHandler();
event = new Capture<DownstreamMessageEvent>();
event = new Capture<DownstreamMessageEvent>(CaptureType.ALL);
pipeline.sendDownstream(capture(event));
expectLastCall().anyTimes();
}
@ -409,7 +413,7 @@ public class PeerTest extends TestWithNetworkConnections {
// Request headers until the last 2 blocks.
peer.setDownloadParameters((Utils.now().getTime() / 1000) - (600*2) + 1, false);
peer.startBlockChainDownload();
GetHeadersMessage getheaders = (GetHeadersMessage) event.getValue().getMessage();
GetHeadersMessage getheaders = (GetHeadersMessage) outbound();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();
expectedLocator.add(b1.getHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
@ -424,7 +428,7 @@ public class PeerTest extends TestWithNetworkConnections {
expectedLocator.add(b1.getHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
inbound(peer, headers);
GetBlocksMessage getblocks = (GetBlocksMessage) event.getValue().getMessage();
GetBlocksMessage getblocks = (GetBlocksMessage) outbound();
assertEquals(expectedLocator, getblocks.getLocator());
assertEquals(Sha256Hash.ZERO_HASH, getblocks.getStopHash());
// We're supposed to get an inv here.
@ -468,9 +472,99 @@ public class PeerTest extends TestWithNetworkConnections {
assertEquals(7250, peer.getPingTime());
}
@Test
public void recursiveDownload() throws Exception {
// Our peer announces a normal transaction that depends on a different transaction that is time locked such
// that it will never confirm. There's not currently any use case for doing that to us, so it's an attack.
//
// Peer should notice this by downloading all transaction dependencies and searching for timelocked ones.
// Also, if a dependency chain is absurdly deep, the wallet shouldn't hear about it because it may just be
// a different way to achieve the same thing (a payment that will not confirm for a very long time).
control.replay();
connect();
final Transaction[] onTx = new Transaction[1];
peer.addEventListener(new AbstractPeerEventListener() {
@Override
public void onTransaction(Peer peer, Transaction t) {
onTx[0] = t;
}
});
// Make the some fake transactions in the following graph:
// t1 -> t2 -> [t5]
// -> t3 -> t4 -> [t6]
// The ones in brackets are assumed to be in the chain and are represented only by hashes.
ECKey to = new ECKey();
Transaction t2 = TestUtils.createFakeTx(unitTestParams, Utils.toNanoCoins(1, 0), to);
Sha256Hash t5 = t2.getInput(0).getOutpoint().getHash();
Transaction t4 = TestUtils.createFakeTx(unitTestParams, Utils.toNanoCoins(1, 0), new ECKey());
Sha256Hash t6 = t4.getInput(0).getOutpoint().getHash();
t4.addOutput(Utils.toNanoCoins(1, 0), new ECKey());
Transaction t3 = new Transaction(unitTestParams);
t3.addInput(t4.getOutput(0));
t3.addOutput(Utils.toNanoCoins(1, 0), new ECKey());
Transaction t1 = new Transaction(unitTestParams);
t1.addInput(t2.getOutput(0));
t1.addInput(t3.getOutput(0));
t1.addOutput(Utils.toNanoCoins(1, 0), to);
t1 = TestUtils.roundTripTransaction(unitTestParams, t1);
t2 = TestUtils.roundTripTransaction(unitTestParams, t2);
t3 = TestUtils.roundTripTransaction(unitTestParams, t3);
t4 = TestUtils.roundTripTransaction(unitTestParams, t4);
// Announce the first one. Wait for it to be downloaded.
InventoryMessage inv = new InventoryMessage(unitTestParams);
inv.addTransaction(t1);
inbound(peer, inv);
GetDataMessage getdata = (GetDataMessage) outbound();
assertEquals(t1.getHash(), getdata.getItems().get(0).hash);
inbound(peer, t1);
assertEquals(t1, onTx[0]);
// We want its dependencies so ask for them.
ListenableFuture<List<Transaction>> futures = peer.downloadDependencies(t1);
assertFalse(futures.isDone());
// It will recursively ask for the dependencies of t1: t2 and t3.
getdata = (GetDataMessage) outbound();
assertEquals(t2.getHash(), getdata.getItems().get(0).hash);
assertEquals(t3.getHash(), getdata.getItems().get(1).hash);
// For some random reason, t4 is delivered at this point before it's needed - perhaps it was a Bloom filter
// false positive. We do this to check that the mempool is being checked for seen transactions before
// requesting them.
inbound(peer, t4);
// Deliver the requested transactions.
inbound(peer, t2);
inbound(peer, t3);
assertFalse(futures.isDone());
// It will recursively ask for the dependencies of t2: t5 and t4, but not t3 because it already found t4.
getdata = (GetDataMessage) outbound();
assertEquals(getdata.getItems().get(0).hash, t2.getInput(0).getOutpoint().getHash());
// t5 isn't found and t4 is.
NotFoundMessage notFound = new NotFoundMessage(unitTestParams);
notFound.addItem(new InventoryItem(InventoryItem.Type.Transaction, t5));
inbound(peer, notFound);
assertFalse(futures.isDone());
// Continue to explore the t4 branch and ask for t6, which is in the chain.
getdata = (GetDataMessage) outbound();
assertEquals(t6, getdata.getItems().get(0).hash);
notFound = new NotFoundMessage(unitTestParams);
notFound.addItem(new InventoryItem(InventoryItem.Type.Transaction, t6));
inbound(peer, notFound);
// That's it, we explored the entire tree.
assertTrue(futures.isDone());
List<Transaction> results = futures.get();
assertTrue(results.contains(t2));
assertTrue(results.contains(t3));
assertTrue(results.contains(t4));
}
// TODO: Use generics here to avoid unnecessary casting.
private Message outbound() {
Message message = (Message)event.getValue().getMessage();
event.reset();
List<DownstreamMessageEvent> messages = event.getValues();
if (messages.isEmpty())
throw new AssertionError("No messages sent when one was expected");
Message message = (Message)messages.get(0).getMessage();
messages.remove(0);
return message;
}
}

View File

@ -37,7 +37,7 @@ public class FetchBlock {
BlockChain chain = new BlockChain(params, blockStore);
PeerGroup peerGroup = new PeerGroup(params, chain);
peerGroup.startAndWait();
PeerAddress addr = new PeerAddress(InetAddress.getLocalHost(), params.port, 31000);
PeerAddress addr = new PeerAddress(InetAddress.getLocalHost(), params.port);
peerGroup.addAddress(addr);
peerGroup.waitForPeers(1).get();
Peer peer = peerGroup.getConnectedPeers().get(0);

View File

@ -0,0 +1,60 @@
/*
* Copyright 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.examples;
import com.google.bitcoin.core.*;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.BriefLogFormatter;
import com.google.common.util.concurrent.ListenableFuture;
import java.net.InetAddress;
import java.util.List;
/**
* Downloads the given transaction and its dependencies from a peers memory pool then prints them out.
*/
public class FetchTransactions {
public static void main(String[] args) throws Exception {
BriefLogFormatter.init();
System.out.println("Connecting to node");
final NetworkParameters params = NetworkParameters.testNet();
BlockStore blockStore = new MemoryBlockStore(params);
BlockChain chain = new BlockChain(params, blockStore);
PeerGroup peerGroup = new PeerGroup(params, chain);
peerGroup.startAndWait();
peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost(), params.port));
peerGroup.waitForPeers(1).get();
Peer peer = peerGroup.getConnectedPeers().get(0);
Sha256Hash txHash = new Sha256Hash(args[0]);
ListenableFuture<Transaction> future = peer.getPeerMempoolTransaction(txHash);
System.out.println("Waiting for node to send us the requested transaction: " + txHash);
Transaction tx = future.get();
System.out.println(tx);
System.out.println("Waiting for node to send us the dependencies ...");
List<Transaction> deps = peer.downloadDependencies(tx).get();
for (Transaction dep : deps) {
System.out.println("Got dependency " + dep.getHashAsString());
}
System.out.println("Done.");
peerGroup.stopAndWait();
}
}