diff --git a/core/src/main/java/com/google/bitcoin/core/MemoryPool.java b/core/src/main/java/com/google/bitcoin/core/MemoryPool.java index 4d802895..15f7c560 100644 --- a/core/src/main/java/com/google/bitcoin/core/MemoryPool.java +++ b/core/src/main/java/com/google/bitcoin/core/MemoryPool.java @@ -234,4 +234,29 @@ public class MemoryPool { log.info("{}: Announced new transaction [1] {}", byPeer, hash); } } + + /** + * Returns the {@link Transaction} for the given hash if we have downloaded it, or null if that hash is unknown or + * we only saw advertisements for it yet or it has been downloaded but garbage collected due to nowhere else + * holding a reference to it. + */ + public synchronized Transaction get(Sha256Hash hash) { + Entry entry = memoryPool.get(hash); + if (entry == null) return null; // Unknown. + if (entry.tx == null) return null; // Seen but only in advertisements. + if (entry.tx.get() == null) return null; // Was downloaded but garbage collected. + Transaction tx = entry.tx.get(); + Preconditions.checkNotNull(tx); + return tx; + } + + /** + * Returns true if the TX identified by hash has been seen before (ie, in an inv). Note that a transaction that + * was broadcast, downloaded and nothing kept a reference to it will eventually be cleared out by the garbage + * collector and wasSeen() will return false - it does not keep a permanent record of every hash ever broadcast. + */ + public synchronized boolean maybeWasSeen(Sha256Hash hash) { + Entry entry = memoryPool.get(hash); + return entry != null; + } } diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index f5c4e6ef..6f21a25f 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -25,6 +25,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; @@ -372,39 +374,61 @@ public class Peer { private void processInv(InventoryMessage inv) throws IOException { // This should be called in the network loop thread for this peer. List items = inv.getItems(); - if (memoryPool != null) - updateTransactionConfidenceLevels(items); - // If this peer isn't responsible for downloading stuff, don't go further. - if (!downloadData) - return; + // Separate out the blocks and transactions, we'll handle them differently + List transactions = new LinkedList(); + List blocks = new LinkedList(); - // The peer told us about some blocks or transactions they have. - Block topBlock = blockChain.getUnconnectedBlock(); - Sha256Hash topHash = (topBlock != null ? topBlock.getHash() : null); - if (isNewBlockTickle(topHash, items)) { - // An inv with a single hash containing our most recent unconnected block is a special inv, - // it's kind of like a tickle from the peer telling us that it's time to download more blocks to catch up to - // the block chain. We could just ignore this and treat it as a regular inv but then we'd download the head - // block over and over again after each batch of 500 blocks, which is wasteful. - blockChainDownload(topHash); - return; + for (InventoryItem item : items) { + switch (item.type) { + case Transaction: transactions.add(item); break; + case Block: blocks.add(item); break; + } } - // Just copy the message contents across - request whatever we're told about. - // TODO: Don't re-request items that were already fetched. + GetDataMessage getdata = new GetDataMessage(params); - for (InventoryItem item : items) { - getdata.addItem(item); - } - // This will cause us to receive a bunch of block or tx messages. - conn.writeMessage(getdata); - } - private void updateTransactionConfidenceLevels(List items) { - Preconditions.checkNotNull(memoryPool); - for (InventoryItem item : items) { - if (item.type != InventoryItem.Type.Transaction) continue; - memoryPool.seen(item.hash, this.getAddress()); + Iterator it = transactions.iterator(); + while (it.hasNext()) { + InventoryItem item = it.next(); + if (memoryPool == null && downloadData) { + // If there's no memory pool only download transactions if we're configured to. + getdata.addItem(item); + } else { + // Only download the transaction if we are the first peer that saw it be advertised. Other peers will also + // see it be advertised in inv packets asynchronously, they co-ordinate via the memory pool. We could + // potentially download transactions faster by always asking every peer for a tx when advertised, as remote + // peers run at different speeds. However to conserve bandwidth on mobile devices we try to only download a + // transaction once. This means we can miss broadcasts if the peer disconnects between sending us an inv and + // sending us the transaction: currently we'll never try to re-fetch after a timeout. + if (memoryPool.maybeWasSeen(item.hash)) { + // Some other peer already announced this so don't download. + it.remove(); + } else { + getdata.addItem(item); + } + memoryPool.seen(item.hash, this.getAddress()); + } + } + + if (blocks.size() > 0 && downloadData) { + Block topBlock = blockChain.getUnconnectedBlock(); + Sha256Hash topHash = (topBlock != null ? topBlock.getHash() : null); + if (isNewBlockTickle(topHash, blocks)) { + // An inv with a single hash containing our most recent unconnected block is a special inv, + // it's kind of like a tickle from the peer telling us that it's time to download more blocks to catch up to + // the block chain. We could just ignore this and treat it as a regular inv but then we'd download the head + // block over and over again after each batch of 500 blocks, which is wasteful. + blockChainDownload(topHash); + return; + } + // Request the advertised blocks only if we're the download peer. + for (InventoryItem item : blocks) getdata.addItem(item); + } + + if (!getdata.getItems().isEmpty()) { + // This will cause us to receive a bunch of block or tx messages. + conn.writeMessage(getdata); } } diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 11ec0e81..8bb682d6 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -82,7 +82,7 @@ public class PeerGroup { private VersionMessage versionMessage; // A class that tracks recent transactions that have been broadcast across the network, counts how many // peers announced them and updates the transaction confidence data. It is passed to each Peer. - private MemoryPool memoryPool; + private final MemoryPool memoryPool; private NetworkParameters params; private BlockChain chain; @@ -142,7 +142,8 @@ public class PeerGroup { } private synchronized List handleGetData(GetDataMessage m) { - // Scans the wallets for transactions in the getdata message and returns them. Invoked in parallel on peer threads. + // Scans the wallets for transactions in the getdata message and returns them. Invoked in parallel + // on peer threads. HashMap transactions = new HashMap(); for (Wallet w : wallets) { synchronized (w) { @@ -715,6 +716,17 @@ public class PeerGroup { } } + /** + * Returns the {@link MemoryPool} created by this peer group to synchronize its peers. The pool tracks advertised + * and downloaded transactions so their confidence can be measured as a proportion of how many peers announced it. + * With an un-tampered with internet connection, the more peers announce a transaction the more confidence you can + * have that it's really valid. + */ + public MemoryPool getMemoryPool() { + // Locking unneeded as memoryPool is final. + return memoryPool; + } + /** * Tells the PeerGroup to download only block headers before a certain time and bodies after that. See * {@link Peer#setFastCatchupTime(long)} for further explanation. Call this before starting block chain download. diff --git a/core/src/test/java/com/google/bitcoin/core/MemoryPoolTest.java b/core/src/test/java/com/google/bitcoin/core/MemoryPoolTest.java index 2b23aed2..23681755 100644 --- a/core/src/test/java/com/google/bitcoin/core/MemoryPoolTest.java +++ b/core/src/test/java/com/google/bitcoin/core/MemoryPoolTest.java @@ -23,6 +23,7 @@ import org.junit.Test; import java.net.InetAddress; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class MemoryPoolTest { private NetworkParameters params = NetworkParameters.unitTests(); @@ -52,6 +53,7 @@ public class MemoryPoolTest { assertEquals(tx1, pool.seen(tx2, address2)); assertEquals(2, tx1.getConfidence().numBroadcastPeers()); assertEquals(2, pool.numBroadcastPeers(tx1.getHash())); + assertEquals(tx1, pool.get(tx1.getHash())); } @Test @@ -61,6 +63,7 @@ public class MemoryPoolTest { assertEquals(0, pool.numBroadcastPeers(tx1.getHash())); pool.seen(tx1.getHash(), address1); assertEquals(1, pool.numBroadcastPeers(tx1.getHash())); + assertTrue(pool.maybeWasSeen(tx1.getHash())); pool.seen(tx1.getHash(), address2); assertEquals(2, pool.numBroadcastPeers(tx1.getHash())); Transaction t = pool.seen(tx1, address1); diff --git a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java index 02ba91ef..e3f0509d 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java @@ -209,7 +209,7 @@ public class PeerGroupTest extends TestWithNetworkConnections { @Test public void transactionConfidence() throws Exception { // Checks that we correctly count how many peers broadcast a transaction, so we can establish some measure of - // its trustworthyness assuming an untampered with internet connection. + // its trustworthyness assuming an untampered with internet connection. This is done via the MemoryPool class. MockNetworkConnection n1 = createMockNetworkConnection(); Peer p1 = new Peer(params, blockChain, n1); MockNetworkConnection n2 = createMockNetworkConnection(); @@ -231,13 +231,13 @@ public class PeerGroupTest extends TestWithNetworkConnections { } }); - // Peer 2 advertises the tx but does not download it. - assertNull(n2.exchange(inv)); - assertEquals(0, tx.getConfidence().numBroadcastPeers()); + // Peer 2 advertises the tx and requests a download of it, because it came first. + assertTrue(n2.exchange(inv) instanceof GetDataMessage); + assertTrue(peerGroup.getMemoryPool().maybeWasSeen(tx.getHash())); assertEquals(null, event[0]); - // Peer 1 (the download peer) advertises the tx, we download it. - n1.exchange(inv); // returns getdata - n1.exchange(tx); // returns nothing after a queue drain. + // Peer 1 advertises the tx, we don't do anything as it's already been requested. + assertNull(n1.exchange(inv)); + assertNull(n2.exchange(tx)); // Two peers saw this tx hash. assertEquals(2, tx.getConfidence().numBroadcastPeers()); assertEquals(tx, event[0]); diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index 9caabfda..2d6fb3b4 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -150,7 +150,7 @@ public class PeerTest extends TestWithNetworkConnections { @Test public void invDownloadTx() throws Exception { peer.setDownloadData(true); - // Make a transaction and tell the peer we have it.; + // Make a transaction and tell the peer we have it. BigInteger value = Utils.toNanoCoins(1, 0); Transaction tx = TestUtils.createFakeTx(unitTestParams, value, address); InventoryMessage inv = new InventoryMessage(unitTestParams); @@ -169,6 +169,40 @@ public class PeerTest extends TestWithNetworkConnections { assertEquals(value, wallet.getBalance(Wallet.BalanceType.ESTIMATED)); } + @Test + public void invDownloadTxMultiPeer() throws Exception { + // Check co-ordination of which peer to download via the memory pool. + MemoryPool pool = new MemoryPool(); + peer.setMemoryPool(pool); + + MockNetworkConnection conn2 = createMockNetworkConnection(); + Peer peer2 = new Peer(unitTestParams, blockChain, conn2); + peer2.addWallet(wallet); + peer2.setMemoryPool(pool); + + // Make a tx and advertise it to one of the peers. + BigInteger value = Utils.toNanoCoins(1, 0); + Transaction tx = TestUtils.createFakeTx(unitTestParams, value, address); + InventoryMessage inv = new InventoryMessage(unitTestParams); + InventoryItem item = new InventoryItem(InventoryItem.Type.Transaction, tx.getHash()); + inv.addItem(item); + + conn.inbound(inv); + runPeer(peer, conn); + conn.popInbound(); // Remove the disconnect marker. + + // We got a getdata message. + GetDataMessage message = (GetDataMessage) conn.popOutbound(); + assertEquals(1, message.getItems().size()); + assertEquals(tx.getHash(), message.getItems().get(0).hash); + assertTrue(pool.maybeWasSeen(tx.getHash())); + + // Advertising to peer2 results in no getdata message. + conn2.inbound(inv); + runPeer(peer2, conn2); + assertNull(conn.popOutbound()); + } + // Check that inventory message containing blocks we want is processed correctly. @Test public void newBlock() throws Exception {