3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-30 23:02:15 +00:00

Query the memory pool of nodes that support Bloom filtering.

This commit is contained in:
Mike Hearn 2013-02-19 15:51:34 +01:00
parent f4033076e8
commit 3c606516be
9 changed files with 85 additions and 34 deletions

View File

@ -68,6 +68,7 @@ public class BitcoinSerializer {
names.put(BloomFilter.class, "filterload"); names.put(BloomFilter.class, "filterload");
names.put(FilteredBlock.class, "merkleblock"); names.put(FilteredBlock.class, "merkleblock");
names.put(NotFoundMessage.class, "notfound"); names.put(NotFoundMessage.class, "notfound");
names.put(MemoryPoolMessage.class, "mempool");
} }
/** /**
@ -250,6 +251,8 @@ public class BitcoinSerializer {
return new BloomFilter(params, payloadBytes); return new BloomFilter(params, payloadBytes);
} else if (command.equals("notfound")) { } else if (command.equals("notfound")) {
return new NotFoundMessage(params, payloadBytes); return new NotFoundMessage(params, payloadBytes);
} else if (command.equals("mempool")) {
return new MemoryPoolMessage();
} else { } else {
log.warn("No support for deserializing message with name {}", command); log.warn("No support for deserializing message with name {}", command);
return new UnknownMessage(params, command, payloadBytes); return new UnknownMessage(params, command, payloadBytes);

View File

@ -25,6 +25,8 @@ import java.util.*;
* of the block header and a {@link PartialMerkleTree} which contains the transactions which matched the filter.</p> * of the block header and a {@link PartialMerkleTree} which contains the transactions which matched the filter.</p>
*/ */
public class FilteredBlock extends Message { public class FilteredBlock extends Message {
/** The protocol version at which Bloom filtering started to be supported. */
public static final int MIN_PROTOCOL_VERSION = 70000;
private Block header; private Block header;
// The PartialMerkleTree of transactions // The PartialMerkleTree of transactions

View File

@ -0,0 +1,32 @@
/*
* Copyright 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.core;
/**
* The "mempool" message asks a remote peer to announce all transactions in its memory pool, possibly restricted by
* any Bloom filter set on the connection. The list of transaction hashes comes back in an inv message. Note that
* this is different to the {@link MemoryPool} object which doesn't try to keep track of all pending transactions,
* it's just a holding area for transactions that a part of the app may find interesting. The mempool message has
* no fields.
*/
public class MemoryPoolMessage extends Message {
@Override
void parse() throws ProtocolException {}
@Override
protected void parseLite() throws ProtocolException {}
}

View File

@ -835,11 +835,12 @@ public class Peer {
// the duplicate check in blockChainDownload(). But the satoshi client may change in future so // the duplicate check in blockChainDownload(). But the satoshi client may change in future so
// it's better to be safe here. // it's better to be safe here.
if (!pendingBlockDownloads.contains(item.hash)) { if (!pendingBlockDownloads.contains(item.hash)) {
if (getPeerVersionMessage().clientVersion > 70000 && useFilteredBlocks) { if (getPeerVersionMessage().isBloomFilteringSupported() && useFilteredBlocks) {
getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash)); getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash));
pingAfterGetData = true; pingAfterGetData = true;
} else } else {
getdata.addItem(item); getdata.addItem(item);
}
pendingBlockDownloads.add(item.hash); pendingBlockDownloads.add(item.hash);
} }
} }
@ -1108,9 +1109,8 @@ public class Peer {
} }
protected synchronized ListenableFuture<Long> ping(long nonce) throws IOException, ProtocolException { protected synchronized ListenableFuture<Long> ping(long nonce) throws IOException, ProtocolException {
int peerVersion = getPeerVersionMessage().clientVersion; if (!getPeerVersionMessage().isPingPongSupported())
if (peerVersion < Pong.MIN_PROTOCOL_VERSION) throw new ProtocolException("Peer version is too low for measurable pings: " + getPeerVersionMessage());
throw new ProtocolException("Peer version is too low for measurable pings: " + peerVersion);
PendingPing pendingPing = new PendingPing(nonce); PendingPing pendingPing = new PendingPing(nonce);
pendingPings.add(pendingPing); pendingPings.add(pendingPing);
sendMessage(new Ping(pendingPing.nonce)); sendMessage(new Ping(pendingPing.nonce));
@ -1194,7 +1194,8 @@ public class Peer {
/** /**
* If set to false, the peer won't try and fetch blocks and transactions it hears about. Normally, only one * If set to false, the peer won't try and fetch blocks and transactions it hears about. Normally, only one
* peer should download missing blocks. Defaults to true. * peer should download missing blocks. Defaults to true. Changing this value from false to true may trigger
* a request to the remote peer for the contents of its memory pool, if Bloom filtering is active.
*/ */
public void setDownloadData(boolean downloadData) { public void setDownloadData(boolean downloadData) {
this.downloadData.set(downloadData); this.downloadData.set(downloadData);

View File

@ -580,12 +580,16 @@ public class PeerGroup extends AbstractIdleService {
for (Wallet w : wallets) for (Wallet w : wallets)
filter.merge(w.getBloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak)); filter.merge(w.getBloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak));
bloomFilter = filter; bloomFilter = filter;
log.info("Sending all peers an updated Bloom Filter."); for (Peer peer : peers) {
for (Peer peer : peers) if (peer.getPeerVersionMessage().isBloomFilteringSupported()) {
try { try {
// peers of a low version will simply ignore filterload messages log.info("{}: Sending peer an updated Bloom Filter.", peer);
peer.sendMessage(filter); peer.sendMessage(filter);
} catch (IOException e) { } } catch (IOException e) {
throw new RuntimeException(e);
}
}
}
} }
// Do this last so that bloomFilter is already set when it gets called. // Do this last so that bloomFilter is already set when it gets called.
setFastCatchupTimeSecs(earliestKeyTime); setFastCatchupTimeSecs(earliestKeyTime);
@ -709,13 +713,17 @@ public class PeerGroup extends AbstractIdleService {
protected synchronized void handleNewPeer(final Peer peer) { protected synchronized void handleNewPeer(final Peer peer) {
// Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point. // Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point.
// Sets up the newly connected peer so it can do everything it needs to.
log.info("{}: New peer", peer); log.info("{}: New peer", peer);
// Give the peer a filter that can be used to probabilistically drop transactions that // Give the peer a filter that can be used to probabilistically drop transactions that
// aren't relevant to our wallet. We may still receive some false positives, which is // aren't relevant to our wallet. We may still receive some false positives, which is
// OK because it helps improve wallet privacy. // OK because it helps improve wallet privacy. Old nodes will just ignore the message.
try { try {
if (bloomFilter != null) if (bloomFilter != null && peer.getPeerVersionMessage().isBloomFilteringSupported()) {
log.info("{}: Sending Bloom filter and querying memory pool", peer);
peer.sendMessage(bloomFilter); peer.sendMessage(bloomFilter);
peer.sendMessage(new MemoryPoolMessage());
}
} catch (IOException e) { } // That was quick...already disconnected } catch (IOException e) { } // That was quick...already disconnected
// Link the peer to the memory pool so broadcast transactions have their confidence levels updated. // Link the peer to the memory pool so broadcast transactions have their confidence levels updated.
peer.setMemoryPool(memoryPool); peer.setMemoryPool(memoryPool);
@ -742,7 +750,7 @@ public class PeerGroup extends AbstractIdleService {
// then sends us fake relevant transactions. We'll attempt to relay the bad transactions, our badness score // then sends us fake relevant transactions. We'll attempt to relay the bad transactions, our badness score
// in the Satoshi client will increase and we'll get disconnected. // in the Satoshi client will increase and we'll get disconnected.
// //
// TODO: Find a way to balance the desire to propagate useful transactions against obscure DoS attacks. // TODO: Find a way to balance the desire to propagate useful transactions against DoS attacks.
announcePendingWalletTransactions(wallets, Collections.singletonList(peer)); announcePendingWalletTransactions(wallets, Collections.singletonList(peer));
// And set up event listeners for clients. This will allow them to find out about new transactions and blocks. // And set up event listeners for clients. This will allow them to find out about new transactions and blocks.
for (PeerEventListener listener : peerEventListeners) { for (PeerEventListener listener : peerEventListeners) {
@ -756,6 +764,7 @@ public class PeerGroup extends AbstractIdleService {
} }
}); });
final PeerGroup thisGroup = this; final PeerGroup thisGroup = this;
// TODO: Move this into the Peer object itself.
peer.addEventListener(new AbstractPeerEventListener() { peer.addEventListener(new AbstractPeerEventListener() {
int filteredBlocksReceivedFromPeer = 0; int filteredBlocksReceivedFromPeer = 0;
@Override @Override

View File

@ -293,4 +293,19 @@ public class VersionMessage extends Message {
if (component.contains("/") || component.contains("(") || component.contains(")")) if (component.contains("/") || component.contains("(") || component.contains(")"))
throw new IllegalArgumentException("name contains invalid characters"); throw new IllegalArgumentException("name contains invalid characters");
} }
/**
* Returns true if the clientVersion field is >= Pong.MIN_PROTOCOL_VERSION. If it is then ping() is usable.
*/
public boolean isPingPongSupported() {
return clientVersion >= Pong.MIN_PROTOCOL_VERSION;
}
/**
* Returns true if the clientVersion field is >= FilteredBlock.MIN_PROTOCOL_VERSION. If it is then Bloom filtering
* is available and the memory pool of the remote peer will be queried when the downloadData property is true.
*/
public boolean isBloomFilteringSupported() {
return clientVersion >= FilteredBlock.MIN_PROTOCOL_VERSION;
}
} }

View File

@ -90,16 +90,11 @@ public class FilteredBlockAndPartialMerkleTreeTests extends TestWithPeerGroup {
// Create a peer. // Create a peer.
FakeChannel p1 = connectPeer(1); FakeChannel p1 = connectPeer(1);
assertEquals(1, peerGroup.numConnectedPeers()); assertEquals(1, peerGroup.numConnectedPeers());
// Send an inv for block 100001 // Send an inv for block 100001
InventoryMessage inv = new InventoryMessage(unitTestParams); InventoryMessage inv = new InventoryMessage(unitTestParams);
inv.addBlock(block); inv.addBlock(block);
inbound(p1, inv); inbound(p1, inv);
// First thing sent is a copy of the generated bloom filter
//TODO assertTrue(outbound(p1).equals(filter)); (need to set the nTweak to 0xDEADBEEF)
assertTrue(outbound(p1) instanceof BloomFilter);
// Check that we properly requested the correct FilteredBlock // Check that we properly requested the correct FilteredBlock
Object getData = outbound(p1); Object getData = outbound(p1);
assertTrue(getData instanceof GetDataMessage); assertTrue(getData instanceof GetDataMessage);
@ -107,7 +102,7 @@ public class FilteredBlockAndPartialMerkleTreeTests extends TestWithPeerGroup {
assertTrue(((GetDataMessage)getData).getItems().get(0).hash.equals(block.getHash())); assertTrue(((GetDataMessage)getData).getItems().get(0).hash.equals(block.getHash()));
assertTrue(((GetDataMessage)getData).getItems().get(0).type == InventoryItem.Type.FilteredBlock); assertTrue(((GetDataMessage)getData).getItems().get(0).type == InventoryItem.Type.FilteredBlock);
//Check that we then immediately pinged // Check that we then immediately pinged.
Object ping = outbound(p1); Object ping = outbound(p1);
assertTrue(ping instanceof Ping); assertTrue(ping instanceof Ping);

View File

@ -110,10 +110,8 @@ public class PeerGroupTest extends TestWithPeerGroup {
// Note: we start with p2 here to verify that transactions are downloaded from whichever peer announces first // Note: we start with p2 here to verify that transactions are downloaded from whichever peer announces first
// which does not have to be the same as the download peer (which is really the "block download peer"). // which does not have to be the same as the download peer (which is really the "block download peer").
inbound(p2, inv); inbound(p2, inv);
assertTrue(outbound(p2) instanceof BloomFilter);
assertTrue(outbound(p2) instanceof GetDataMessage); assertTrue(outbound(p2) instanceof GetDataMessage);
inbound(p1, inv); inbound(p1, inv);
assertTrue(outbound(p1) instanceof BloomFilter);
assertNull(outbound(p1)); // Only one peer is used to download. assertNull(outbound(p1)); // Only one peer is used to download.
inbound(p2, t1); inbound(p2, t1);
assertNull(outbound(p1)); assertNull(outbound(p1));
@ -148,11 +146,9 @@ public class PeerGroupTest extends TestWithPeerGroup {
// Only peer 1 tries to download it. // Only peer 1 tries to download it.
inbound(p1, inv); inbound(p1, inv);
assertTrue(outbound(p1) instanceof BloomFilter);
assertTrue(outbound(p1) instanceof GetDataMessage); assertTrue(outbound(p1) instanceof GetDataMessage);
assertTrue(outbound(p2) instanceof BloomFilter);
assertNull(outbound(p2)); assertNull(outbound(p2));
// Peer 1 goes away. // Peer 1 goes away, peer 2 becomes the download peer and thus queries the remote mempool.
closePeer(peerOf(p1)); closePeer(peerOf(p1));
// Peer 2 fetches it next time it hears an inv (should it fetch immediately?). // Peer 2 fetches it next time it hears an inv (should it fetch immediately?).
inbound(p2, inv); inbound(p2, inv);
@ -178,7 +174,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
// Expect a zero hash getblocks on p1. This is how the process starts. // Expect a zero hash getblocks on p1. This is how the process starts.
peerGroup.startBlockChainDownload(new AbstractPeerEventListener() { peerGroup.startBlockChainDownload(new AbstractPeerEventListener() {
}); });
assertTrue(outbound(p1) instanceof BloomFilter);
GetBlocksMessage getblocks = (GetBlocksMessage) outbound(p1); GetBlocksMessage getblocks = (GetBlocksMessage) outbound(p1);
assertEquals(Sha256Hash.ZERO_HASH, getblocks.getStopHash()); assertEquals(Sha256Hash.ZERO_HASH, getblocks.getStopHash());
// We give back an inv with some blocks in it. // We give back an inv with some blocks in it.
@ -193,7 +188,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
inbound(p1, b1); inbound(p1, b1);
// Now we successfully connect to another peer. There should be no messages sent. // Now we successfully connect to another peer. There should be no messages sent.
FakeChannel p2 = connectPeer(2); FakeChannel p2 = connectPeer(2);
assertTrue(outbound(p2) instanceof BloomFilter);
Message message = (Message)outbound(p2); Message message = (Message)outbound(p2);
assertNull(message == null ? "" : message.toString(), message); assertNull(message == null ? "" : message.toString(), message);
peerGroup.stop(); peerGroup.stop();
@ -221,14 +215,12 @@ public class PeerGroupTest extends TestWithPeerGroup {
// Peer 2 advertises the tx but does not receive it yet. // Peer 2 advertises the tx but does not receive it yet.
inbound(p2, inv); inbound(p2, inv);
assertTrue(outbound(p2) instanceof BloomFilter);
assertTrue(outbound(p2) instanceof GetDataMessage); assertTrue(outbound(p2) instanceof GetDataMessage);
assertEquals(0, tx.getConfidence().numBroadcastPeers()); assertEquals(0, tx.getConfidence().numBroadcastPeers());
assertTrue(peerGroup.getMemoryPool().maybeWasSeen(tx.getHash())); assertTrue(peerGroup.getMemoryPool().maybeWasSeen(tx.getHash()));
assertNull(event[0]); assertNull(event[0]);
// Peer 1 advertises the tx, we don't do anything as it's already been requested. // Peer 1 advertises the tx, we don't do anything as it's already been requested.
inbound(p1, inv); inbound(p1, inv);
assertTrue(outbound(p1) instanceof BloomFilter);
assertNull(outbound(p1)); assertNull(outbound(p1));
// Peer 2 gets sent the tx and requests the dependency. // Peer 2 gets sent the tx and requests the dependency.
inbound(p2, tx); inbound(p2, tx);
@ -274,7 +266,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
// Send ourselves a bit of money. // Send ourselves a bit of money.
Block b1 = TestUtils.makeSolvedTestBlock(blockStore, address); Block b1 = TestUtils.makeSolvedTestBlock(blockStore, address);
inbound(p1, b1); inbound(p1, b1);
assertTrue(outbound(p1) instanceof BloomFilter);
assertNull(outbound(p1)); assertNull(outbound(p1));
assertEquals(Utils.toNanoCoins(50, 0), wallet.getBalance()); assertEquals(Utils.toNanoCoins(50, 0), wallet.getBalance());
@ -325,12 +316,11 @@ public class PeerGroupTest extends TestWithPeerGroup {
GetDataMessage getdata = new GetDataMessage(params); GetDataMessage getdata = new GetDataMessage(params);
getdata.addItem(inv1.getItems().get(0)); getdata.addItem(inv1.getItems().get(0));
inbound(p1, getdata); inbound(p1, getdata);
assertTrue(outbound(p1) instanceof BloomFilter); assertTrue(outbound(p1) instanceof BloomFilter); // Filter is recalculated.
Transaction t4 = (Transaction) outbound(p1); Transaction t4 = (Transaction) outbound(p1);
assertEquals(t3, t4); assertEquals(t3, t4);
FakeChannel p3 = connectPeer(3); FakeChannel p3 = connectPeer(3);
assertTrue(outbound(p3) instanceof BloomFilter);
assertTrue(outbound(p3) instanceof InventoryMessage); assertTrue(outbound(p3) instanceof InventoryMessage);
control.verify(); control.verify();
} }
@ -371,7 +361,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
VersionMessage versionMessage = new VersionMessage(params, 2); VersionMessage versionMessage = new VersionMessage(params, 2);
versionMessage.clientVersion = Pong.MIN_PROTOCOL_VERSION; versionMessage.clientVersion = Pong.MIN_PROTOCOL_VERSION;
FakeChannel p1 = connectPeer(1, versionMessage); FakeChannel p1 = connectPeer(1, versionMessage);
assertTrue(outbound(p1) instanceof BloomFilter);
Ping ping = (Ping) outbound(p1); Ping ping = (Ping) outbound(p1);
inbound(p1, new Pong(ping.getNonce())); inbound(p1, new Pong(ping.getNonce()));
assertTrue(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE); assertTrue(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE);

View File

@ -36,6 +36,7 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
super.setUp(blockStore); super.setUp(blockStore);
remoteVersionMessage = new VersionMessage(unitTestParams, 1); remoteVersionMessage = new VersionMessage(unitTestParams, 1);
remoteVersionMessage.clientVersion = FilteredBlock.MIN_PROTOCOL_VERSION;
ClientBootstrap bootstrap = new ClientBootstrap(new ChannelFactory() { ClientBootstrap bootstrap = new ClientBootstrap(new ChannelFactory() {
public void releaseExternalResources() {} public void releaseExternalResources() {}
@ -69,6 +70,10 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
FakeChannel p = (FakeChannel) peerGroup.connectTo(remoteAddress).getChannel(); FakeChannel p = (FakeChannel) peerGroup.connectTo(remoteAddress).getChannel();
assertTrue(p.nextEvent() instanceof ChannelStateEvent); assertTrue(p.nextEvent() instanceof ChannelStateEvent);
inbound(p, versionMessage); inbound(p, versionMessage);
if (versionMessage.isBloomFilteringSupported()) {
assertTrue(outbound(p) instanceof BloomFilter);
assertTrue(outbound(p) instanceof MemoryPoolMessage);
}
return p; return p;
} }
} }