From 9577bd644efe0b1a8cdc35cf5e6305a0ad05252b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 16 Nov 2012 14:36:04 -0500 Subject: [PATCH] Ask peers for filtered blocks when appropriate and handle them. --- .../bitcoin/core/BitcoinSerializer.java | 3 + .../com/google/bitcoin/core/BloomFilter.java | 5 ++ .../google/bitcoin/core/InventoryItem.java | 3 +- .../java/com/google/bitcoin/core/Peer.java | 81 ++++++++++++++++++- .../com/google/bitcoin/core/PeerGroup.java | 7 +- .../google/bitcoin/core/PeerGroupTest.java | 47 ++--------- .../com/google/bitcoin/core/PeerTest.java | 2 +- .../core/TestWithNetworkConnections.java | 9 ++- .../bitcoin/core/TestWithPeerGroup.java | 66 +++++++++++++++ 9 files changed, 171 insertions(+), 52 deletions(-) create mode 100644 core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java diff --git a/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java b/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java index 4210f043..7d1f6ec3 100644 --- a/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java +++ b/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java @@ -71,6 +71,7 @@ public class BitcoinSerializer { names.put(GetAddrMessage.class, "getaddr"); names.put(HeadersMessage.class, "headers"); names.put(BloomFilter.class, "filterload"); + names.put(FilteredBlock.class, "merkleblock"); } /** @@ -256,6 +257,8 @@ public class BitcoinSerializer { message = new InventoryMessage(params, payloadBytes, parseLazy, parseRetain, length); } else if (command.equals("block")) { message = new Block(params, payloadBytes, parseLazy, parseRetain, length); + } else if (command.equals("merkleblock")) { + message = new FilteredBlock(params, payloadBytes); } else if (command.equals("getdata")) { message = new GetDataMessage(params, payloadBytes, parseLazy, parseRetain, length); } else if (command.equals("tx")) { diff --git a/core/src/main/java/com/google/bitcoin/core/BloomFilter.java b/core/src/main/java/com/google/bitcoin/core/BloomFilter.java index 812c7d88..904c5a34 100644 --- a/core/src/main/java/com/google/bitcoin/core/BloomFilter.java +++ b/core/src/main/java/com/google/bitcoin/core/BloomFilter.java @@ -75,6 +75,11 @@ public class BloomFilter extends Message { *

Keep in mind that a remote node can do a pretty good job estimating the order of magnitude of the false positive * rate of a given filter you provide it when considering the anonymity of a given filter.

* + *

In order for filtered block download to function efficiently, the number of matched transactions in any given + * block should be less than (with some headroom) the maximum size of the MemoryPool used by the Peer + * doing the downloading (default is {@link MemoryPool#MAX_SIZE}). See the comment in processBlock(FilteredBlock) + * for more information on this restriction.

+ * *

randomNonce is a tweak for the hash function used to prevent some theoretical DoS attacks. * It should be a random value, however secureness of the random value is of no great consequence.

*/ diff --git a/core/src/main/java/com/google/bitcoin/core/InventoryItem.java b/core/src/main/java/com/google/bitcoin/core/InventoryItem.java index 99dafa2e..dc7aeadb 100644 --- a/core/src/main/java/com/google/bitcoin/core/InventoryItem.java +++ b/core/src/main/java/com/google/bitcoin/core/InventoryItem.java @@ -26,7 +26,8 @@ public class InventoryItem { public enum Type { Error, Transaction, - Block + Block, + FilteredBlock } public final Type type; diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index 7ebf93b3..e12aca9e 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -16,6 +16,7 @@ package com.google.bitcoin.core; +import com.google.bitcoin.core.InventoryItem.Type; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStoreException; import com.google.bitcoin.utils.EventListenerInvoker; @@ -73,6 +74,8 @@ public class Peer { // set AND our best block is before that date, switch to false until block headers beyond that point have been // received at which point it gets set to true again. This isn't relevant unless downloadData is true. private boolean downloadBlockBodies = true; + // Whether to request filtered blocks instead of full blocks if the protocol version allows for them. + private boolean useFilteredBlocks = false; // Keeps track of things we requested internally with getdata but didn't receive yet, so we can avoid re-requests. // It's not quite the same as getDataFutures, as this is used only for getdatas done as part of downloading // the chain and so is lighter weight (we just keep a bunch of hashes not futures). @@ -206,6 +209,8 @@ public class Peer { e.getChannel().close(); } + private FilteredBlock currentFilteredBlock = null; + /** Handle incoming Bitcoin messages */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { @@ -223,13 +228,27 @@ public class Peer { } if (m == null) return; + + if (currentFilteredBlock != null && !(m instanceof Transaction)) { + processBlock(currentFilteredBlock); + currentFilteredBlock = null; + } if (m instanceof InventoryMessage) { processInv((InventoryMessage) m); } else if (m instanceof Block) { processBlock((Block) m); + } else if (m instanceof FilteredBlock) { + currentFilteredBlock = (FilteredBlock)m; } else if (m instanceof Transaction) { - processTransaction((Transaction) m); + if (currentFilteredBlock != null) { + if (!currentFilteredBlock.provideTransaction((Transaction)m)) { + processBlock(currentFilteredBlock); + currentFilteredBlock = null; + processTransaction((Transaction) m); + } + } else + processTransaction((Transaction) m); } else if (m instanceof GetDataMessage) { processGetData((GetDataMessage) m); } else if (m instanceof AddressMessage) { @@ -407,6 +426,50 @@ public class Peer { // no matter how many blocks are solved, and therefore that the (2) duplicate filtering can work. blockChainDownload(blockChain.getOrphanRoot(m.getHash()).getHash()); } + } catch (VerificationException e) { + // We don't want verification failures to kill the thread. + log.warn("FilteredBlock verification failed", e); + } catch (PrunedException e) { + // Unreachable when in SPV mode. + throw new RuntimeException(e); + } + } + + private synchronized void processBlock(FilteredBlock m) throws IOException { + log.debug("{}: Received broadcast filtered block {}", address, m.getHash().toString()); + try { + if (!downloadData) { + log.warn("Received block we did not ask for: {}", m.getHash().toString()); + return; + } + + // Note that we currently do nothing about peers which do not include transactions which + // actually match our filter or which do not send us all the transactions (TODO: Do something about that). + + pendingBlockDownloads.remove(m.getBlockHeader().getHash()); + // Otherwise it's a block sent to us because the peer thought we needed it, so add it to the block chain. + // This call will synchronize on blockChain. + if (blockChain.add(m)) { + // The block was successfully linked into the chain. Notify the user of our progress. + invokeOnBlocksDownloaded(m.getBlockHeader()); + } else { + // This block is an orphan - we don't know how to get from it back to the genesis block yet. That + // must mean that there are blocks we are missing, so do another getblocks with a new block locator + // to ask the peer to send them to us. This can happen during the initial block chain download where + // the peer will only send us 500 at a time and then sends us the head block expecting us to request + // the others. + // + // We must do two things here: + // (1) Request from current top of chain to the oldest ancestor of the received block in the orphan set + // (2) Filter out duplicate getblock requests (done in blockChainDownload). + // + // The reason for (1) is that otherwise if new blocks were solved during the middle of chain download + // we'd do a blockChainDownload() on the new best chain head, which would cause us to try and grab the + // chain twice (or more!) on the same connection! The block chain would filter out the duplicates but + // only at a huge speed penalty. By finding the orphan root we ensure every getblocks looks the same + // no matter how many blocks are solved, and therefore that the (2) duplicate filtering can work. + blockChainDownload(blockChain.getOrphanRoot(m.getHash()).getHash()); + } } catch (VerificationException e) { // We don't want verification failures to kill the thread. log.warn("Block verification failed", e); @@ -503,6 +566,10 @@ public class Peer { memoryPool.seen(item.hash, this.getAddress()); } } + + // If we are requesting filteredblocks we have to send a ping after the getdata so that we have a clear + // end to the final FilteredBlock's transactions (in the form of a pong) sent to us + boolean pingAfterGetData = false; if (blocks.size() > 0 && downloadData && blockChain != null) { // Ideally, we'd only ask for the data here if we actually needed it. However that can imply a lot of @@ -526,7 +593,11 @@ public class Peer { // the duplicate check in blockChainDownload(). But the satoshi client may change in future so // it's better to be safe here. if (!pendingBlockDownloads.contains(item.hash)) { - getdata.addItem(item); + if (getPeerVersionMessage().clientVersion > 70000 && useFilteredBlocks) { + getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash)); + pingAfterGetData = true; + } else + getdata.addItem(item); pendingBlockDownloads.add(item.hash); } } @@ -542,6 +613,9 @@ public class Peer { // This will cause us to receive a bunch of block or tx messages. sendMessage(getdata); } + + if (pingAfterGetData) + sendMessage(new Ping((long) Math.random() * Long.MAX_VALUE)); } /** @@ -574,7 +648,7 @@ public class Peer { * * @param secondsSinceEpoch Time in seconds since the epoch or 0 to reset to always downloading block bodies. */ - public synchronized void setFastCatchupTime(long secondsSinceEpoch) { + public synchronized void setDownloadParameters(long secondsSinceEpoch, boolean useFilteredBlocks) { Preconditions.checkNotNull(blockChain); if (secondsSinceEpoch == 0) { fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); @@ -587,6 +661,7 @@ public class Peer { downloadBlockBodies = false; } } + this.useFilteredBlocks = useFilteredBlocks; } /** diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 0f4ca829..d119169e 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -559,7 +559,6 @@ public class PeerGroup extends AbstractIdleService { earliestKeyTime = Math.min(earliestKeyTime, w.getEarliestKeyCreationTime()); elements += w.getBloomFilterElementCount(); } - setFastCatchupTimeSecs(earliestKeyTime); if (chain == null || !chain.shouldVerifyTransactions()) { long nTweak = new Random().nextLong(); @@ -574,6 +573,8 @@ public class PeerGroup extends AbstractIdleService { peer.sendMessage(filter); } catch (IOException e) { } } + //Do this last so that bloomFilter is already set when it gets called + setFastCatchupTimeSecs(earliestKeyTime); } /** @@ -826,7 +827,7 @@ public class PeerGroup extends AbstractIdleService { log.info("Setting download peer: {}", downloadPeer); downloadPeer.setDownloadData(true); if (chain != null) - downloadPeer.setFastCatchupTime(fastCatchupTimeSecs); + downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilter != null); } } @@ -849,7 +850,7 @@ public class PeerGroup extends AbstractIdleService { Preconditions.checkState(chain == null || !chain.shouldVerifyTransactions(), "Fast catchup is incompatible with fully verifying"); fastCatchupTimeSecs = secondsSinceEpoch; if (downloadPeer != null) { - downloadPeer.setFastCatchupTime(secondsSinceEpoch); + downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilter != null); } } diff --git a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java index 841772dc..7c41063c 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java @@ -18,6 +18,8 @@ package com.google.bitcoin.core; import com.google.bitcoin.discovery.PeerDiscovery; import com.google.bitcoin.discovery.PeerDiscoveryException; +import com.google.bitcoin.store.MemoryBlockStore; + import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.*; import org.junit.After; @@ -34,42 +36,15 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; -public class PeerGroupTest extends TestWithNetworkConnections { +public class PeerGroupTest extends TestWithPeerGroup { static final NetworkParameters params = NetworkParameters.unitTests(); - - private PeerGroup peerGroup; - - private VersionMessage remoteVersionMessage; - + @Override @Before public void setUp() throws Exception { - super.setUp(); - - remoteVersionMessage = new VersionMessage(params, 1); + super.setUp(new MemoryBlockStore(NetworkParameters.unitTests())); - ClientBootstrap bootstrap = new ClientBootstrap(new ChannelFactory() { - public void releaseExternalResources() {} - public Channel newChannel(ChannelPipeline pipeline) { - ChannelSink sink = new FakeChannelSink(); - return new FakeChannel(this, pipeline, sink); - } - }); - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - VersionMessage ver = new VersionMessage(params, 1); - ChannelPipeline p = Channels.pipeline(); - - Peer peer = new Peer(params, blockChain, ver); - peer.addLifecycleListener(peerGroup.startupListener); - p.addLast("peer", peer.getHandler()); - return p; - } - - }); - peerGroup = new PeerGroup(params, blockChain, bootstrap); peerGroup.addWallet(wallet); - peerGroup.setPingIntervalMsec(0); // Disable the pings as they just get in the way of most tests. } @After @@ -148,18 +123,6 @@ public class PeerGroupTest extends TestWithNetworkConnections { peerGroup.stop(); } - private FakeChannel connectPeer(int id) { - return connectPeer(id, remoteVersionMessage); - } - - private FakeChannel connectPeer(int id, VersionMessage versionMessage) { - InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id); - FakeChannel p = (FakeChannel) peerGroup.connectTo(remoteAddress).getChannel(); - assertTrue(p.nextEvent() instanceof ChannelStateEvent); - inbound(p, versionMessage); - return p; - } - @Test public void singleDownloadPeer1() throws Exception { // Check that we don't attempt to retrieve blocks on multiple peers. diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index 7f3185bf..9ad4075a 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -407,7 +407,7 @@ public class PeerTest extends TestWithNetworkConnections { Block b4 = makeSolvedTestBlock(unitTestParams, b3); // Request headers until the last 2 blocks. - peer.setFastCatchupTime((Utils.now().getTime() / 1000) - (600*2) + 1); + peer.setDownloadParameters((Utils.now().getTime() / 1000) - (600*2) + 1, false); peer.startBlockChainDownload(); GetHeadersMessage getheaders = (GetHeadersMessage) event.getValue().getMessage(); List expectedLocator = new ArrayList(); diff --git a/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java b/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java index 3101702c..3f1ac37e 100644 --- a/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java +++ b/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java @@ -16,6 +16,7 @@ package com.google.bitcoin.core; +import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.MemoryBlockStore; import com.google.bitcoin.utils.BriefLogFormatter; import org.easymock.EasyMock; @@ -36,7 +37,7 @@ import static org.easymock.EasyMock.expect; public class TestWithNetworkConnections { protected IMocksControl control; protected NetworkParameters unitTestParams; - protected MemoryBlockStore blockStore; + protected BlockStore blockStore; protected BlockChain blockChain; protected Wallet wallet; protected ECKey key; @@ -48,13 +49,17 @@ public class TestWithNetworkConnections { protected ChannelPipeline pipeline; public void setUp() throws Exception { + setUp(new MemoryBlockStore(NetworkParameters.unitTests())); + } + + public void setUp(BlockStore blockStore) throws Exception { BriefLogFormatter.init(); control = createStrictControl(); control.checkOrder(false); unitTestParams = NetworkParameters.unitTests(); - blockStore = new MemoryBlockStore(unitTestParams); + this.blockStore = blockStore; wallet = new Wallet(unitTestParams); key = new ECKey(); address = key.toAddress(unitTestParams); diff --git a/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java b/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java new file mode 100644 index 00000000..287e1397 --- /dev/null +++ b/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java @@ -0,0 +1,66 @@ +package com.google.bitcoin.core; + +import static org.junit.Assert.assertTrue; + +import java.net.InetSocketAddress; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; +import org.junit.Before; + +import com.google.bitcoin.store.BlockStore; + +/** + * Utility class that makes it easy to work with mock NetworkConnections in PeerGroups. + */ +public class TestWithPeerGroup extends TestWithNetworkConnections { + protected PeerGroup peerGroup; + + protected VersionMessage remoteVersionMessage; + + public void setUp(BlockStore blockStore) throws Exception { + super.setUp(blockStore); + + remoteVersionMessage = new VersionMessage(unitTestParams, 1); + + ClientBootstrap bootstrap = new ClientBootstrap(new ChannelFactory() { + public void releaseExternalResources() {} + public Channel newChannel(ChannelPipeline pipeline) { + ChannelSink sink = new FakeChannelSink(); + return new FakeChannel(this, pipeline, sink); + } + }); + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + public ChannelPipeline getPipeline() throws Exception { + VersionMessage ver = new VersionMessage(unitTestParams, 1); + ChannelPipeline p = Channels.pipeline(); + + Peer peer = new Peer(unitTestParams, blockChain, ver); + peer.addLifecycleListener(peerGroup.startupListener); + p.addLast("peer", peer.getHandler()); + return p; + } + + }); + peerGroup = new PeerGroup(unitTestParams, blockChain, bootstrap); + peerGroup.setPingIntervalMsec(0); // Disable the pings as they just get in the way of most tests. + } + + protected FakeChannel connectPeer(int id) { + return connectPeer(id, remoteVersionMessage); + } + + protected FakeChannel connectPeer(int id, VersionMessage versionMessage) { + InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id); + FakeChannel p = (FakeChannel) peerGroup.connectTo(remoteAddress).getChannel(); + assertTrue(p.nextEvent() instanceof ChannelStateEvent); + inbound(p, versionMessage); + return p; + } +}