mirror of
https://github.com/Qortal/altcoinj.git
synced 2025-02-12 10:15:52 +00:00
Move refreshing of the bloom filter out of the PeerGroup and into the Peer, where it belongs.
This commit is contained in:
parent
bffc85fa24
commit
c21f182d78
@ -88,6 +88,15 @@ public class Peer {
|
|||||||
private boolean useFilteredBlocks = false;
|
private boolean useFilteredBlocks = false;
|
||||||
// The last filtered block we received, we're waiting to fill it out with transactions.
|
// The last filtered block we received, we're waiting to fill it out with transactions.
|
||||||
private FilteredBlock currentFilteredBlock = null;
|
private FilteredBlock currentFilteredBlock = null;
|
||||||
|
// The current Bloom filter set on the connection, used to tell the remote peer what transactions to send us.
|
||||||
|
private BloomFilter bloomFilter;
|
||||||
|
// How many filtered blocks have been received during the lifetime of this connection. Used to decide when to
|
||||||
|
// refresh the server-side side filter by sending a new one (it degrades over time as false positives are added
|
||||||
|
// on the remote side, see BIP 37 for a discussion of this).
|
||||||
|
private int filteredBlocksReceived;
|
||||||
|
// How frequently to refresh the filter. This should become dynamic in future and calculated depending on the
|
||||||
|
// actual false positive rate. For now a good value was determined empirically around January 2013.
|
||||||
|
private static final int RESEND_BLOOM_FILTER_BLOCK_COUNT = 25000;
|
||||||
// Keeps track of things we requested internally with getdata but didn't receive yet, so we can avoid re-requests.
|
// Keeps track of things we requested internally with getdata but didn't receive yet, so we can avoid re-requests.
|
||||||
// It's not quite the same as getDataFutures, as this is used only for getdatas done as part of downloading
|
// It's not quite the same as getDataFutures, as this is used only for getdatas done as part of downloading
|
||||||
// the chain and so is lighter weight (we just keep a bunch of hashes not futures).
|
// the chain and so is lighter weight (we just keep a bunch of hashes not futures).
|
||||||
@ -253,7 +262,7 @@ public class Peer {
|
|||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
|
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
|
||||||
processFilteredBlock(currentFilteredBlock);
|
endFilteredBlock(currentFilteredBlock);
|
||||||
currentFilteredBlock = null;
|
currentFilteredBlock = null;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
@ -269,16 +278,7 @@ public class Peer {
|
|||||||
} else if (m instanceof Block) {
|
} else if (m instanceof Block) {
|
||||||
processBlock((Block) m);
|
processBlock((Block) m);
|
||||||
} else if (m instanceof FilteredBlock) {
|
} else if (m instanceof FilteredBlock) {
|
||||||
// Filtered blocks come before the data that they refer to, so stash it here and then fill it out as
|
startFilteredBlock((FilteredBlock) m);
|
||||||
// messages stream in. We'll call processFilteredBlock when a non-tx message arrives (eg, another
|
|
||||||
// FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after
|
|
||||||
// a getblocks, to force the non-tx message path.
|
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
currentFilteredBlock = (FilteredBlock) m;
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
} else if (m instanceof Transaction) {
|
} else if (m instanceof Transaction) {
|
||||||
processTransaction((Transaction) m);
|
processTransaction((Transaction) m);
|
||||||
} else if (m instanceof GetDataMessage) {
|
} else if (m instanceof GetDataMessage) {
|
||||||
@ -318,9 +318,28 @@ public class Peer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processNotFoundMessage(NotFoundMessage m) {
|
private void startFilteredBlock(FilteredBlock m) throws IOException {
|
||||||
// This does not need to be locked.
|
// Filtered blocks come before the data that they refer to, so stash it here and then fill it out as
|
||||||
|
// messages stream in. We'll call endFilteredBlock when a non-tx message arrives (eg, another
|
||||||
|
// FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after
|
||||||
|
// a getblocks, to force the non-tx message path.
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
currentFilteredBlock = m;
|
||||||
|
// Potentially refresh the server side filter. Because the remote node adds hits back into the filter
|
||||||
|
// to save round-tripping back through us, the filter degrades over time as false positives get added,
|
||||||
|
// triggering yet more false positives. We refresh it every so often to get the FP rate back down.
|
||||||
|
filteredBlocksReceived++;
|
||||||
|
if (filteredBlocksReceived % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT - 1) {
|
||||||
|
sendMessage(bloomFilter);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processNotFoundMessage(NotFoundMessage m) {
|
||||||
|
checkNotLocked(lock);
|
||||||
// This is received when we previously did a getdata but the peer couldn't find what we requested in it's
|
// This is received when we previously did a getdata but the peer couldn't find what we requested in it's
|
||||||
// memory pool. Typically, because we are downloading dependencies of a relevant transaction and reached
|
// memory pool. Typically, because we are downloading dependencies of a relevant transaction and reached
|
||||||
// the bottom of the dependency tree (where the unconfirmed transactions connect to transactions that are
|
// the bottom of the dependency tree (where the unconfirmed transactions connect to transactions that are
|
||||||
@ -340,7 +359,7 @@ public class Peer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void processAlert(AlertMessage m) {
|
private void processAlert(AlertMessage m) {
|
||||||
// This does not need to be locked.
|
checkNotLocked(lock);
|
||||||
try {
|
try {
|
||||||
if (m.isSignatureValid()) {
|
if (m.isSignatureValid()) {
|
||||||
log.info("Received alert from peer {}: {}", toString(), m.getStatusBar());
|
log.info("Received alert from peer {}: {}", toString(), m.getStatusBar());
|
||||||
@ -411,7 +430,7 @@ public class Peer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void processGetData(GetDataMessage getdata) throws IOException {
|
private void processGetData(GetDataMessage getdata) throws IOException {
|
||||||
// This does not need to be locked.
|
checkNotLocked(lock);
|
||||||
log.info("{}: Received getdata message: {}", address.get(), getdata.toString());
|
log.info("{}: Received getdata message: {}", address.get(), getdata.toString());
|
||||||
ArrayList<Message> items = new ArrayList<Message>();
|
ArrayList<Message> items = new ArrayList<Message>();
|
||||||
for (PeerEventListener listener : eventListeners) {
|
for (PeerEventListener listener : eventListeners) {
|
||||||
@ -446,7 +465,7 @@ public class Peer {
|
|||||||
if (currentFilteredBlock != null) {
|
if (currentFilteredBlock != null) {
|
||||||
if (!currentFilteredBlock.provideTransaction(tx)) {
|
if (!currentFilteredBlock.provideTransaction(tx)) {
|
||||||
// Got a tx that didn't fit into the filtered block, so we must have received everything.
|
// Got a tx that didn't fit into the filtered block, so we must have received everything.
|
||||||
processFilteredBlock(currentFilteredBlock);
|
endFilteredBlock(currentFilteredBlock);
|
||||||
currentFilteredBlock = null;
|
currentFilteredBlock = null;
|
||||||
}
|
}
|
||||||
// Don't tell wallets or listeners about this tx as they'll learn about it when the filtered block is
|
// Don't tell wallets or listeners about this tx as they'll learn about it when the filtered block is
|
||||||
@ -706,7 +725,7 @@ public class Peer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Fix this duplication.
|
// TODO: Fix this duplication.
|
||||||
private void processFilteredBlock(FilteredBlock m) throws IOException {
|
private void endFilteredBlock(FilteredBlock m) throws IOException {
|
||||||
if (log.isDebugEnabled())
|
if (log.isDebugEnabled())
|
||||||
log.debug("{}: Received broadcast filtered block {}", address.get(), m.getHash().toString());
|
log.debug("{}: Received broadcast filtered block {}", address.get(), m.getHash().toString());
|
||||||
lock.lock();
|
lock.lock();
|
||||||
@ -1320,9 +1339,9 @@ public class Peer {
|
|||||||
* downloadData property is true, a {@link MemoryPoolMessage} is sent as well to trigger downloading of any
|
* downloadData property is true, a {@link MemoryPoolMessage} is sent as well to trigger downloading of any
|
||||||
* pending transactions that may be relevant.</p>
|
* pending transactions that may be relevant.</p>
|
||||||
*
|
*
|
||||||
* <p>The Peer does not keep a reference to the BloomFilter. Also, it will not automatically request filters
|
* <p>The Peer does not automatically request filters from any wallets added using {@link Peer#addWallet(Wallet)}.
|
||||||
* from any wallets added using {@link Peer#addWallet(Wallet)}. This is to allow callers to avoid redundantly
|
* This is to allow callers to avoid redundantly recalculating the same filter repeatedly when using multiple peers
|
||||||
* recalculating the same filter repeatedly when using multiple peers together.</p>
|
* and multiple wallets together.</p>
|
||||||
*
|
*
|
||||||
* <p>Therefore, you should not use this method if your app uses a {@link PeerGroup}. It is called for you.</p>
|
* <p>Therefore, you should not use this method if your app uses a {@link PeerGroup}. It is called for you.</p>
|
||||||
*
|
*
|
||||||
@ -1335,6 +1354,7 @@ public class Peer {
|
|||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
shouldQueryMemPool = memoryPool != null || downloadData.get();
|
shouldQueryMemPool = memoryPool != null || downloadData.get();
|
||||||
|
bloomFilter = filter;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
@ -1347,4 +1367,17 @@ public class Peer {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the last {@link BloomFilter} set by {@link Peer#setBloomFilter(BloomFilter)}. Bloom filters tell
|
||||||
|
* the remote node what transactions to send us, in a compact manner.
|
||||||
|
*/
|
||||||
|
public BloomFilter getBloomFilter() {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
return bloomFilter;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -164,12 +164,6 @@ public class PeerGroup extends AbstractIdleService {
|
|||||||
private final long bloomFilterTweak = (long) (Math.random() * Long.MAX_VALUE);
|
private final long bloomFilterTweak = (long) (Math.random() * Long.MAX_VALUE);
|
||||||
private int lastBloomFilterElementCount;
|
private int lastBloomFilterElementCount;
|
||||||
|
|
||||||
/**
|
|
||||||
* Every RESEND_BLOOM_FILTER_BLOCK_COUNT FilteredBlocks received, the bloom filter is refreshed.
|
|
||||||
* This prevents the actual false positive rate from ballooning as the filter gets elements added to it automatically.
|
|
||||||
*/
|
|
||||||
public static final int RESEND_BLOOM_FILTER_BLOCK_COUNT = 25000;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a PeerGroup with the given parameters. No chain is provided so this node will report its chain height
|
* Creates a PeerGroup with the given parameters. No chain is provided so this node will report its chain height
|
||||||
* as zero to other peers. This constructor is useful if you just want to explore the network but aren't interested
|
* as zero to other peers. This constructor is useful if you just want to explore the network but aren't interested
|
||||||
@ -829,28 +823,6 @@ public class PeerGroup extends AbstractIdleService {
|
|||||||
setupPingingForNewPeer(peer);
|
setupPingingForNewPeer(peer);
|
||||||
for (PeerEventListener listener : peerEventListeners)
|
for (PeerEventListener listener : peerEventListeners)
|
||||||
listener.onPeerConnected(peer, peers.size());
|
listener.onPeerConnected(peer, peers.size());
|
||||||
// TODO: Move this into the Peer object itself.
|
|
||||||
peer.addEventListener(new AbstractPeerEventListener() {
|
|
||||||
int filteredBlocksReceivedFromPeer = 0;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Message onPreMessageReceived(Peer peer, Message m) {
|
|
||||||
if (m instanceof FilteredBlock) {
|
|
||||||
filteredBlocksReceivedFromPeer++;
|
|
||||||
if (filteredBlocksReceivedFromPeer % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT - 1) {
|
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
peer.sendMessage(bloomFilter);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return m;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user