3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 14:54:15 +00:00

Refactor bloom filter calculations out of PeerGroup into a separate FilterMerger class.

This commit is contained in:
Mike Hearn 2014-05-07 21:53:34 +02:00
parent 26823d1bf7
commit 46ad86a9af
2 changed files with 106 additions and 57 deletions

View File

@ -19,6 +19,7 @@ package com.google.bitcoin.core;
import com.google.bitcoin.net.BlockingClientManager;
import com.google.bitcoin.net.ClientConnectionManager;
import com.google.bitcoin.net.FilterMerger;
import com.google.bitcoin.net.NioClientManager;
import com.google.bitcoin.net.discovery.PeerDiscovery;
import com.google.bitcoin.net.discovery.PeerDiscoveryException;
@ -29,6 +30,7 @@ import com.google.bitcoin.utils.ListenerRegistration;
import com.google.bitcoin.utils.Threading;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
@ -138,7 +140,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
@Override
public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) {
double rate = checkNotNull(chain).getFalsePositiveRate();
if (rate > bloomFilterFPRate * MAX_FP_RATE_INCREASE) {
if (rate > bloomFilterMerger.getBloomFilterFPRate() * MAX_FP_RATE_INCREASE) {
log.info("Force update Bloom filter due to high false positive rate");
recalculateFastCatchupAndFilter(FilterRecalculateMode.FORCE_SEND);
}
@ -240,8 +242,6 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
// Visible for testing
PeerEventListener startupListener = new PeerStartupListener();
// A bloom filter generated from all connected wallets that is given to new peers
private BloomFilter bloomFilter;
/**
* <p>A reasonable default for the bloom filter false positive rate on mainnet. FP rates are values between 0.0 and 1.0
* where 1.0 is "all transactions" i.e. 100%.</p>
@ -252,11 +252,9 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
public static final double DEFAULT_BLOOM_FILTER_FP_RATE = 0.0005;
/** Maximum increase in FP rate before forced refresh of the bloom filter */
public static final double MAX_FP_RATE_INCREASE = 2.0f;
// The false positive rate for bloomFilter
private double bloomFilterFPRate = DEFAULT_BLOOM_FILTER_FP_RATE;
// We use a constant tweak to avoid giving up privacy when we regenerate our filter with new keys
private final long bloomFilterTweak = (long) (Math.random() * Long.MAX_VALUE);
private int lastBloomFilterElementCount;
// An object that calculates bloom filters given a list of filter providers, whilst tracking some state useful
// for privacy purposes.
private FilterMerger bloomFilterMerger;
/** The default timeout between when a connection attempt begins and version message exchange completes */
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 5000;
@ -357,6 +355,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerEventListeners = new CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>>();
runningBroadcasts = Collections.synchronizedSet(new HashSet<TransactionBroadcast>());
bloomFilterMerger = new FilterMerger(DEFAULT_BLOOM_FILTER_FP_RATE);
}
/**
@ -864,54 +863,24 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
if (chain != null && chain.shouldVerifyTransactions())
return;
log.info("Recalculating filter in mode {}", mode);
long earliestKeyTimeSecs = Long.MAX_VALUE;
int elements = 0;
boolean requiresUpdateAll = false;
for (PeerFilterProvider p : peerFilterProviders) {
earliestKeyTimeSecs = Math.min(earliestKeyTimeSecs, p.getEarliestKeyCreationTime());
elements += p.getBloomFilterElementCount();
requiresUpdateAll = requiresUpdateAll || p.isRequiringUpdateAllBloomFilter();
FilterMerger.Result result = bloomFilterMerger.calculate(ImmutableList.copyOf(peerFilterProviders));
boolean send;
switch (mode) {
case SEND_IF_CHANGED: send = result.changed; break;
case DONT_SEND: send = false; break;
case FORCE_SEND: send = true; break;
default: throw new UnsupportedOperationException();
}
if (elements > 0) {
// We stair-step our element count so that we avoid creating a filter with different parameters
// as much as possible as that results in a loss of privacy.
// The constant 100 here is somewhat arbitrary, but makes sense for small to medium wallets -
// it will likely mean we never need to create a filter with different parameters.
lastBloomFilterElementCount = elements > lastBloomFilterElementCount ? elements + 100 : lastBloomFilterElementCount;
BloomFilter.BloomUpdate bloomFlags =
requiresUpdateAll ? BloomFilter.BloomUpdate.UPDATE_ALL : BloomFilter.BloomUpdate.UPDATE_P2PUBKEY_ONLY;
BloomFilter filter = new BloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak, bloomFlags);
for (PeerFilterProvider p : peerFilterProviders)
filter.merge(p.getBloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak));
boolean changed = !filter.equals(bloomFilter);
boolean send = false;
bloomFilter = filter;
switch (mode) {
case SEND_IF_CHANGED: send = changed; break;
case DONT_SEND: send = false; break;
case FORCE_SEND: send = true; break;
}
if (send) {
for (Peer peer : peers)
peer.setBloomFilter(filter);
// Reset the false positive estimate so that we don't send a flood of filter updates
// if the estimate temporarily overshoots our threshold.
if (chain != null)
chain.resetFalsePositiveEstimate();
}
if (send) {
for (Peer peer : peers)
peer.setBloomFilter(result.filter);
// Reset the false positive estimate so that we don't send a flood of filter updates
// if the estimate temporarily overshoots our threshold.
if (chain != null)
chain.resetFalsePositiveEstimate();
}
// Now adjust the earliest key time backwards by a week to handle the case of clock drift. This can occur
// both in block header timestamps and if the users clock was out of sync when the key was first created
// (to within a small amount of tolerance).
earliestKeyTimeSecs -= 86400 * 7;
// Do this last so that bloomFilter is already set when it gets called.
setFastCatchupTimeSecs(earliestKeyTimeSecs);
setFastCatchupTimeSecs(result.earliestKeyTimeSecs);
} finally {
lock.unlock();
}
@ -929,7 +898,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
public void setBloomFilterFalsePositiveRate(double bloomFilterFPRate) {
lock.lock();
try {
this.bloomFilterFPRate = bloomFilterFPRate;
bloomFilterMerger.setBloomFilterFPRate(bloomFilterFPRate);
recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
} finally {
lock.unlock();
@ -1070,7 +1039,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
// Give the peer a filter that can be used to probabilistically drop transactions that
// aren't relevant to our wallet. We may still receive some false positives, which is
// OK because it helps improve wallet privacy. Old nodes will just ignore the message.
if (bloomFilter != null) peer.setBloomFilter(bloomFilter);
if (bloomFilterMerger.getLastFilter() != null) peer.setBloomFilter(bloomFilterMerger.getLastFilter());
// Link the peer to the memory pool so broadcast transactions have their confidence levels updated.
peer.setDownloadData(false);
// TODO: The peer should calculate the fast catchup time from the added wallets here.
@ -1183,7 +1152,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
if (downloadListener != null)
peer.addEventListener(downloadListener, Threading.SAME_THREAD);
downloadPeer.setDownloadData(true);
downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilter != null);
downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilterMerger.getLastFilter() != null);
}
} finally {
lock.unlock();
@ -1211,7 +1180,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
Preconditions.checkState(chain == null || !chain.shouldVerifyTransactions(), "Fast catchup is incompatible with fully verifying");
fastCatchupTimeSecs = secondsSinceEpoch;
if (downloadPeer != null) {
downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilter != null);
downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilterMerger.getLastFilter() != null);
}
} finally {
lock.unlock();

View File

@ -0,0 +1,80 @@
package com.google.bitcoin.net;
import com.google.bitcoin.core.BloomFilter;
import com.google.bitcoin.core.PeerFilterProvider;
import com.google.common.collect.ImmutableList;
// This code is unit tested by the PeerGroup tests.
/**
* <p>A reusable object that will calculate, given a list of {@link com.google.bitcoin.core.PeerFilterProvider}s, a merged
* {@link com.google.bitcoin.core.BloomFilter} and earliest key time for all of them.
* Used by the {@link com.google.bitcoin.core.PeerGroup} class internally.</p>
*
* <p>Thread safety: this class tracks the element count of the last filter it calculated and so must be synchronised
* externally or used from only one thread. It will acquire a lock on each filter in turn before performing the
* calculation because the providers may be mutated in other threads in parallel, but global consistency is required
* to produce a merged filter.</p>
*/
public class FilterMerger {
// We use a constant tweak to avoid giving up privacy when we regenerate our filter with new keys
private final long bloomFilterTweak = (long) (Math.random() * Long.MAX_VALUE);
private double bloomFilterFPRate;
private int lastBloomFilterElementCount;
private BloomFilter lastFilter;
public FilterMerger(double bloomFilterFPRate) {
this.bloomFilterFPRate = bloomFilterFPRate;
}
public static class Result {
public BloomFilter filter;
public long earliestKeyTimeSecs;
public boolean changed;
}
public Result calculate(ImmutableList<PeerFilterProvider> providers) {
Result result = new Result();
result.earliestKeyTimeSecs = Long.MAX_VALUE;
int elements = 0;
boolean requiresUpdateAll = false;
for (PeerFilterProvider p : providers) {
result.earliestKeyTimeSecs = Math.min(result.earliestKeyTimeSecs, p.getEarliestKeyCreationTime());
elements += p.getBloomFilterElementCount();
requiresUpdateAll = requiresUpdateAll || p.isRequiringUpdateAllBloomFilter();
}
if (elements > 0) {
// We stair-step our element count so that we avoid creating a filter with different parameters
// as much as possible as that results in a loss of privacy.
// The constant 100 here is somewhat arbitrary, but makes sense for small to medium wallets -
// it will likely mean we never need to create a filter with different parameters.
lastBloomFilterElementCount = elements > lastBloomFilterElementCount ? elements + 100 : lastBloomFilterElementCount;
BloomFilter.BloomUpdate bloomFlags =
requiresUpdateAll ? BloomFilter.BloomUpdate.UPDATE_ALL : BloomFilter.BloomUpdate.UPDATE_P2PUBKEY_ONLY;
BloomFilter filter = new BloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak, bloomFlags);
for (PeerFilterProvider p : providers)
filter.merge(p.getBloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak));
result.changed = !filter.equals(lastFilter);
result.filter = lastFilter = filter;
}
// Now adjust the earliest key time backwards by a week to handle the case of clock drift. This can occur
// both in block header timestamps and if the users clock was out of sync when the key was first created
// (to within a small amount of tolerance).
result.earliestKeyTimeSecs -= 86400 * 7;
return result;
}
public void setBloomFilterFPRate(double bloomFilterFPRate) {
this.bloomFilterFPRate = bloomFilterFPRate;
}
public double getBloomFilterFPRate() {
return bloomFilterFPRate;
}
public BloomFilter getLastFilter() {
return lastFilter;
}
}