3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-30 23:02:15 +00:00

Ask peers for filtered blocks when appropriate and handle them.

This commit is contained in:
Matt Corallo 2012-11-16 14:36:04 -05:00
parent db8afbdadf
commit 9577bd644e
9 changed files with 171 additions and 52 deletions

View File

@ -71,6 +71,7 @@ public class BitcoinSerializer {
names.put(GetAddrMessage.class, "getaddr"); names.put(GetAddrMessage.class, "getaddr");
names.put(HeadersMessage.class, "headers"); names.put(HeadersMessage.class, "headers");
names.put(BloomFilter.class, "filterload"); names.put(BloomFilter.class, "filterload");
names.put(FilteredBlock.class, "merkleblock");
} }
/** /**
@ -256,6 +257,8 @@ public class BitcoinSerializer {
message = new InventoryMessage(params, payloadBytes, parseLazy, parseRetain, length); message = new InventoryMessage(params, payloadBytes, parseLazy, parseRetain, length);
} else if (command.equals("block")) { } else if (command.equals("block")) {
message = new Block(params, payloadBytes, parseLazy, parseRetain, length); message = new Block(params, payloadBytes, parseLazy, parseRetain, length);
} else if (command.equals("merkleblock")) {
message = new FilteredBlock(params, payloadBytes);
} else if (command.equals("getdata")) { } else if (command.equals("getdata")) {
message = new GetDataMessage(params, payloadBytes, parseLazy, parseRetain, length); message = new GetDataMessage(params, payloadBytes, parseLazy, parseRetain, length);
} else if (command.equals("tx")) { } else if (command.equals("tx")) {

View File

@ -75,6 +75,11 @@ public class BloomFilter extends Message {
* <p>Keep in mind that a remote node can do a pretty good job estimating the order of magnitude of the false positive * <p>Keep in mind that a remote node can do a pretty good job estimating the order of magnitude of the false positive
* rate of a given filter you provide it when considering the anonymity of a given filter.</p> * rate of a given filter you provide it when considering the anonymity of a given filter.</p>
* *
* <p>In order for filtered block download to function efficiently, the number of matched transactions in any given
* block should be less than (with some headroom) the maximum size of the MemoryPool used by the Peer
* doing the downloading (default is {@link MemoryPool#MAX_SIZE}). See the comment in processBlock(FilteredBlock)
* for more information on this restriction.</p>
*
* <p>randomNonce is a tweak for the hash function used to prevent some theoretical DoS attacks. * <p>randomNonce is a tweak for the hash function used to prevent some theoretical DoS attacks.
* It should be a random value, however secureness of the random value is of no great consequence.</p> * It should be a random value, however secureness of the random value is of no great consequence.</p>
*/ */

View File

@ -26,7 +26,8 @@ public class InventoryItem {
public enum Type { public enum Type {
Error, Error,
Transaction, Transaction,
Block Block,
FilteredBlock
} }
public final Type type; public final Type type;

View File

@ -16,6 +16,7 @@
package com.google.bitcoin.core; package com.google.bitcoin.core;
import com.google.bitcoin.core.InventoryItem.Type;
import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.BlockStoreException; import com.google.bitcoin.store.BlockStoreException;
import com.google.bitcoin.utils.EventListenerInvoker; import com.google.bitcoin.utils.EventListenerInvoker;
@ -73,6 +74,8 @@ public class Peer {
// set AND our best block is before that date, switch to false until block headers beyond that point have been // set AND our best block is before that date, switch to false until block headers beyond that point have been
// received at which point it gets set to true again. This isn't relevant unless downloadData is true. // received at which point it gets set to true again. This isn't relevant unless downloadData is true.
private boolean downloadBlockBodies = true; private boolean downloadBlockBodies = true;
// Whether to request filtered blocks instead of full blocks if the protocol version allows for them.
private boolean useFilteredBlocks = false;
// Keeps track of things we requested internally with getdata but didn't receive yet, so we can avoid re-requests. // Keeps track of things we requested internally with getdata but didn't receive yet, so we can avoid re-requests.
// It's not quite the same as getDataFutures, as this is used only for getdatas done as part of downloading // It's not quite the same as getDataFutures, as this is used only for getdatas done as part of downloading
// the chain and so is lighter weight (we just keep a bunch of hashes not futures). // the chain and so is lighter weight (we just keep a bunch of hashes not futures).
@ -206,6 +209,8 @@ public class Peer {
e.getChannel().close(); e.getChannel().close();
} }
private FilteredBlock currentFilteredBlock = null;
/** Handle incoming Bitcoin messages */ /** Handle incoming Bitcoin messages */
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@ -223,13 +228,27 @@ public class Peer {
} }
if (m == null) return; if (m == null) return;
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
processBlock(currentFilteredBlock);
currentFilteredBlock = null;
}
if (m instanceof InventoryMessage) { if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m); processInv((InventoryMessage) m);
} else if (m instanceof Block) { } else if (m instanceof Block) {
processBlock((Block) m); processBlock((Block) m);
} else if (m instanceof FilteredBlock) {
currentFilteredBlock = (FilteredBlock)m;
} else if (m instanceof Transaction) { } else if (m instanceof Transaction) {
processTransaction((Transaction) m); if (currentFilteredBlock != null) {
if (!currentFilteredBlock.provideTransaction((Transaction)m)) {
processBlock(currentFilteredBlock);
currentFilteredBlock = null;
processTransaction((Transaction) m);
}
} else
processTransaction((Transaction) m);
} else if (m instanceof GetDataMessage) { } else if (m instanceof GetDataMessage) {
processGetData((GetDataMessage) m); processGetData((GetDataMessage) m);
} else if (m instanceof AddressMessage) { } else if (m instanceof AddressMessage) {
@ -407,6 +426,50 @@ public class Peer {
// no matter how many blocks are solved, and therefore that the (2) duplicate filtering can work. // no matter how many blocks are solved, and therefore that the (2) duplicate filtering can work.
blockChainDownload(blockChain.getOrphanRoot(m.getHash()).getHash()); blockChainDownload(blockChain.getOrphanRoot(m.getHash()).getHash());
} }
} catch (VerificationException e) {
// We don't want verification failures to kill the thread.
log.warn("FilteredBlock verification failed", e);
} catch (PrunedException e) {
// Unreachable when in SPV mode.
throw new RuntimeException(e);
}
}
private synchronized void processBlock(FilteredBlock m) throws IOException {
log.debug("{}: Received broadcast filtered block {}", address, m.getHash().toString());
try {
if (!downloadData) {
log.warn("Received block we did not ask for: {}", m.getHash().toString());
return;
}
// Note that we currently do nothing about peers which do not include transactions which
// actually match our filter or which do not send us all the transactions (TODO: Do something about that).
pendingBlockDownloads.remove(m.getBlockHeader().getHash());
// Otherwise it's a block sent to us because the peer thought we needed it, so add it to the block chain.
// This call will synchronize on blockChain.
if (blockChain.add(m)) {
// The block was successfully linked into the chain. Notify the user of our progress.
invokeOnBlocksDownloaded(m.getBlockHeader());
} else {
// This block is an orphan - we don't know how to get from it back to the genesis block yet. That
// must mean that there are blocks we are missing, so do another getblocks with a new block locator
// to ask the peer to send them to us. This can happen during the initial block chain download where
// the peer will only send us 500 at a time and then sends us the head block expecting us to request
// the others.
//
// We must do two things here:
// (1) Request from current top of chain to the oldest ancestor of the received block in the orphan set
// (2) Filter out duplicate getblock requests (done in blockChainDownload).
//
// The reason for (1) is that otherwise if new blocks were solved during the middle of chain download
// we'd do a blockChainDownload() on the new best chain head, which would cause us to try and grab the
// chain twice (or more!) on the same connection! The block chain would filter out the duplicates but
// only at a huge speed penalty. By finding the orphan root we ensure every getblocks looks the same
// no matter how many blocks are solved, and therefore that the (2) duplicate filtering can work.
blockChainDownload(blockChain.getOrphanRoot(m.getHash()).getHash());
}
} catch (VerificationException e) { } catch (VerificationException e) {
// We don't want verification failures to kill the thread. // We don't want verification failures to kill the thread.
log.warn("Block verification failed", e); log.warn("Block verification failed", e);
@ -503,6 +566,10 @@ public class Peer {
memoryPool.seen(item.hash, this.getAddress()); memoryPool.seen(item.hash, this.getAddress());
} }
} }
// If we are requesting filteredblocks we have to send a ping after the getdata so that we have a clear
// end to the final FilteredBlock's transactions (in the form of a pong) sent to us
boolean pingAfterGetData = false;
if (blocks.size() > 0 && downloadData && blockChain != null) { if (blocks.size() > 0 && downloadData && blockChain != null) {
// Ideally, we'd only ask for the data here if we actually needed it. However that can imply a lot of // Ideally, we'd only ask for the data here if we actually needed it. However that can imply a lot of
@ -526,7 +593,11 @@ public class Peer {
// the duplicate check in blockChainDownload(). But the satoshi client may change in future so // the duplicate check in blockChainDownload(). But the satoshi client may change in future so
// it's better to be safe here. // it's better to be safe here.
if (!pendingBlockDownloads.contains(item.hash)) { if (!pendingBlockDownloads.contains(item.hash)) {
getdata.addItem(item); if (getPeerVersionMessage().clientVersion > 70000 && useFilteredBlocks) {
getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash));
pingAfterGetData = true;
} else
getdata.addItem(item);
pendingBlockDownloads.add(item.hash); pendingBlockDownloads.add(item.hash);
} }
} }
@ -542,6 +613,9 @@ public class Peer {
// This will cause us to receive a bunch of block or tx messages. // This will cause us to receive a bunch of block or tx messages.
sendMessage(getdata); sendMessage(getdata);
} }
if (pingAfterGetData)
sendMessage(new Ping((long) Math.random() * Long.MAX_VALUE));
} }
/** /**
@ -574,7 +648,7 @@ public class Peer {
* *
* @param secondsSinceEpoch Time in seconds since the epoch or 0 to reset to always downloading block bodies. * @param secondsSinceEpoch Time in seconds since the epoch or 0 to reset to always downloading block bodies.
*/ */
public synchronized void setFastCatchupTime(long secondsSinceEpoch) { public synchronized void setDownloadParameters(long secondsSinceEpoch, boolean useFilteredBlocks) {
Preconditions.checkNotNull(blockChain); Preconditions.checkNotNull(blockChain);
if (secondsSinceEpoch == 0) { if (secondsSinceEpoch == 0) {
fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
@ -587,6 +661,7 @@ public class Peer {
downloadBlockBodies = false; downloadBlockBodies = false;
} }
} }
this.useFilteredBlocks = useFilteredBlocks;
} }
/** /**

View File

@ -559,7 +559,6 @@ public class PeerGroup extends AbstractIdleService {
earliestKeyTime = Math.min(earliestKeyTime, w.getEarliestKeyCreationTime()); earliestKeyTime = Math.min(earliestKeyTime, w.getEarliestKeyCreationTime());
elements += w.getBloomFilterElementCount(); elements += w.getBloomFilterElementCount();
} }
setFastCatchupTimeSecs(earliestKeyTime);
if (chain == null || !chain.shouldVerifyTransactions()) { if (chain == null || !chain.shouldVerifyTransactions()) {
long nTweak = new Random().nextLong(); long nTweak = new Random().nextLong();
@ -574,6 +573,8 @@ public class PeerGroup extends AbstractIdleService {
peer.sendMessage(filter); peer.sendMessage(filter);
} catch (IOException e) { } } catch (IOException e) { }
} }
//Do this last so that bloomFilter is already set when it gets called
setFastCatchupTimeSecs(earliestKeyTime);
} }
/** /**
@ -826,7 +827,7 @@ public class PeerGroup extends AbstractIdleService {
log.info("Setting download peer: {}", downloadPeer); log.info("Setting download peer: {}", downloadPeer);
downloadPeer.setDownloadData(true); downloadPeer.setDownloadData(true);
if (chain != null) if (chain != null)
downloadPeer.setFastCatchupTime(fastCatchupTimeSecs); downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilter != null);
} }
} }
@ -849,7 +850,7 @@ public class PeerGroup extends AbstractIdleService {
Preconditions.checkState(chain == null || !chain.shouldVerifyTransactions(), "Fast catchup is incompatible with fully verifying"); Preconditions.checkState(chain == null || !chain.shouldVerifyTransactions(), "Fast catchup is incompatible with fully verifying");
fastCatchupTimeSecs = secondsSinceEpoch; fastCatchupTimeSecs = secondsSinceEpoch;
if (downloadPeer != null) { if (downloadPeer != null) {
downloadPeer.setFastCatchupTime(secondsSinceEpoch); downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilter != null);
} }
} }

View File

@ -18,6 +18,8 @@ package com.google.bitcoin.core;
import com.google.bitcoin.discovery.PeerDiscovery; import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException; import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.store.MemoryBlockStore;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.junit.After; import org.junit.After;
@ -34,42 +36,15 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class PeerGroupTest extends TestWithNetworkConnections { public class PeerGroupTest extends TestWithPeerGroup {
static final NetworkParameters params = NetworkParameters.unitTests(); static final NetworkParameters params = NetworkParameters.unitTests();
private PeerGroup peerGroup;
private VersionMessage remoteVersionMessage;
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp(new MemoryBlockStore(NetworkParameters.unitTests()));
remoteVersionMessage = new VersionMessage(params, 1);
ClientBootstrap bootstrap = new ClientBootstrap(new ChannelFactory() {
public void releaseExternalResources() {}
public Channel newChannel(ChannelPipeline pipeline) {
ChannelSink sink = new FakeChannelSink();
return new FakeChannel(this, pipeline, sink);
}
});
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
VersionMessage ver = new VersionMessage(params, 1);
ChannelPipeline p = Channels.pipeline();
Peer peer = new Peer(params, blockChain, ver);
peer.addLifecycleListener(peerGroup.startupListener);
p.addLast("peer", peer.getHandler());
return p;
}
});
peerGroup = new PeerGroup(params, blockChain, bootstrap);
peerGroup.addWallet(wallet); peerGroup.addWallet(wallet);
peerGroup.setPingIntervalMsec(0); // Disable the pings as they just get in the way of most tests.
} }
@After @After
@ -148,18 +123,6 @@ public class PeerGroupTest extends TestWithNetworkConnections {
peerGroup.stop(); peerGroup.stop();
} }
private FakeChannel connectPeer(int id) {
return connectPeer(id, remoteVersionMessage);
}
private FakeChannel connectPeer(int id, VersionMessage versionMessage) {
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id);
FakeChannel p = (FakeChannel) peerGroup.connectTo(remoteAddress).getChannel();
assertTrue(p.nextEvent() instanceof ChannelStateEvent);
inbound(p, versionMessage);
return p;
}
@Test @Test
public void singleDownloadPeer1() throws Exception { public void singleDownloadPeer1() throws Exception {
// Check that we don't attempt to retrieve blocks on multiple peers. // Check that we don't attempt to retrieve blocks on multiple peers.

View File

@ -407,7 +407,7 @@ public class PeerTest extends TestWithNetworkConnections {
Block b4 = makeSolvedTestBlock(unitTestParams, b3); Block b4 = makeSolvedTestBlock(unitTestParams, b3);
// Request headers until the last 2 blocks. // Request headers until the last 2 blocks.
peer.setFastCatchupTime((Utils.now().getTime() / 1000) - (600*2) + 1); peer.setDownloadParameters((Utils.now().getTime() / 1000) - (600*2) + 1, false);
peer.startBlockChainDownload(); peer.startBlockChainDownload();
GetHeadersMessage getheaders = (GetHeadersMessage) event.getValue().getMessage(); GetHeadersMessage getheaders = (GetHeadersMessage) event.getValue().getMessage();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>(); List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();

View File

@ -16,6 +16,7 @@
package com.google.bitcoin.core; package com.google.bitcoin.core;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.MemoryBlockStore; import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.BriefLogFormatter; import com.google.bitcoin.utils.BriefLogFormatter;
import org.easymock.EasyMock; import org.easymock.EasyMock;
@ -36,7 +37,7 @@ import static org.easymock.EasyMock.expect;
public class TestWithNetworkConnections { public class TestWithNetworkConnections {
protected IMocksControl control; protected IMocksControl control;
protected NetworkParameters unitTestParams; protected NetworkParameters unitTestParams;
protected MemoryBlockStore blockStore; protected BlockStore blockStore;
protected BlockChain blockChain; protected BlockChain blockChain;
protected Wallet wallet; protected Wallet wallet;
protected ECKey key; protected ECKey key;
@ -48,13 +49,17 @@ public class TestWithNetworkConnections {
protected ChannelPipeline pipeline; protected ChannelPipeline pipeline;
public void setUp() throws Exception { public void setUp() throws Exception {
setUp(new MemoryBlockStore(NetworkParameters.unitTests()));
}
public void setUp(BlockStore blockStore) throws Exception {
BriefLogFormatter.init(); BriefLogFormatter.init();
control = createStrictControl(); control = createStrictControl();
control.checkOrder(false); control.checkOrder(false);
unitTestParams = NetworkParameters.unitTests(); unitTestParams = NetworkParameters.unitTests();
blockStore = new MemoryBlockStore(unitTestParams); this.blockStore = blockStore;
wallet = new Wallet(unitTestParams); wallet = new Wallet(unitTestParams);
key = new ECKey(); key = new ECKey();
address = key.toAddress(unitTestParams); address = key.toAddress(unitTestParams);

View File

@ -0,0 +1,66 @@
package com.google.bitcoin.core;
import static org.junit.Assert.assertTrue;
import java.net.InetSocketAddress;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.junit.Before;
import com.google.bitcoin.store.BlockStore;
/**
* Utility class that makes it easy to work with mock NetworkConnections in PeerGroups.
*/
public class TestWithPeerGroup extends TestWithNetworkConnections {
protected PeerGroup peerGroup;
protected VersionMessage remoteVersionMessage;
public void setUp(BlockStore blockStore) throws Exception {
super.setUp(blockStore);
remoteVersionMessage = new VersionMessage(unitTestParams, 1);
ClientBootstrap bootstrap = new ClientBootstrap(new ChannelFactory() {
public void releaseExternalResources() {}
public Channel newChannel(ChannelPipeline pipeline) {
ChannelSink sink = new FakeChannelSink();
return new FakeChannel(this, pipeline, sink);
}
});
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
VersionMessage ver = new VersionMessage(unitTestParams, 1);
ChannelPipeline p = Channels.pipeline();
Peer peer = new Peer(unitTestParams, blockChain, ver);
peer.addLifecycleListener(peerGroup.startupListener);
p.addLast("peer", peer.getHandler());
return p;
}
});
peerGroup = new PeerGroup(unitTestParams, blockChain, bootstrap);
peerGroup.setPingIntervalMsec(0); // Disable the pings as they just get in the way of most tests.
}
protected FakeChannel connectPeer(int id) {
return connectPeer(id, remoteVersionMessage);
}
protected FakeChannel connectPeer(int id, VersionMessage versionMessage) {
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id);
FakeChannel p = (FakeChannel) peerGroup.connectTo(remoteAddress).getChannel();
assertTrue(p.nextEvent() instanceof ChannelStateEvent);
inbound(p, versionMessage);
return p;
}
}