3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 14:54:15 +00:00

Unlock TransactionConfidence event listeners, make sure MemoryPool is not locked when a tx is marked broadcast by a peer to avoid inversions via listeners.

Unfortunately this introduces some new FindBugs warnings because it doesn't understand the inside-out locking pattern used here, despite that it's correct.
Update issue 233.
This commit is contained in:
Mike Hearn 2013-03-07 17:38:13 +01:00
parent 0534231de9
commit c8c1e68152
3 changed files with 183 additions and 130 deletions

View File

@ -16,7 +16,7 @@
package com.google.bitcoin.core; package com.google.bitcoin.core;
import com.google.common.base.Preconditions; import com.google.bitcoin.utils.Locks;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -27,6 +27,10 @@ import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
/** /**
* <p>Tracks transactions that are being announced across the network. Typically one is created for you by a * <p>Tracks transactions that are being announced across the network. Typically one is created for you by a
@ -41,6 +45,7 @@ import java.util.Set;
*/ */
public class MemoryPool { public class MemoryPool {
private static final Logger log = LoggerFactory.getLogger(MemoryPool.class); private static final Logger log = LoggerFactory.getLogger(MemoryPool.class);
protected ReentrantLock lock = Locks.lock("mempool");
// For each transaction we may have seen: // For each transaction we may have seen:
// - only its hash in an inv packet // - only its hash in an inv packet
@ -108,41 +113,51 @@ public class MemoryPool {
* which bothered to keep a reference. Typically, this is because the transaction does not involve any keys that * which bothered to keep a reference. Typically, this is because the transaction does not involve any keys that
* are relevant to any of our wallets. * are relevant to any of our wallets.
*/ */
private synchronized void cleanPool() { private void cleanPool() {
Reference<? extends Transaction> ref; lock.lock();
while ((ref = referenceQueue.poll()) != null) { try {
// Find which transaction got deleted by the GC. Reference<? extends Transaction> ref;
WeakTransactionReference txRef = (WeakTransactionReference) ref; while ((ref = referenceQueue.poll()) != null) {
// And remove the associated map entry so the other bits of memory can also be reclaimed. // Find which transaction got deleted by the GC.
memoryPool.remove(txRef.hash); WeakTransactionReference txRef = (WeakTransactionReference) ref;
// And remove the associated map entry so the other bits of memory can also be reclaimed.
memoryPool.remove(txRef.hash);
}
} finally {
lock.unlock();
} }
} }
/** /**
* Returns the number of peers that have seen the given hash recently. * Returns the number of peers that have seen the given hash recently.
*/ */
public synchronized int numBroadcastPeers(Sha256Hash txHash) { public int numBroadcastPeers(Sha256Hash txHash) {
cleanPool(); lock.lock();
Entry entry = memoryPool.get(txHash); try {
if (entry == null) { cleanPool();
// No such TX known. Entry entry = memoryPool.get(txHash);
return 0; if (entry == null) {
} else if (entry.tx == null) { // No such TX known.
// We've seen at least one peer announce with an inv.
Preconditions.checkNotNull(entry.addresses);
return entry.addresses.size();
} else {
final Transaction tx = entry.tx.get();
if (tx == null) {
// We previously downloaded this transaction, but nothing cared about it so the garbage collector threw
// it away. We also deleted the set that tracked which peers had seen it. Treat this case as a zero and
// just delete it from the map.
memoryPool.remove(txHash);
return 0; return 0;
} else if (entry.tx == null) {
// We've seen at least one peer announce with an inv.
checkNotNull(entry.addresses);
return entry.addresses.size();
} else { } else {
Preconditions.checkState(entry.addresses == null); final Transaction tx = entry.tx.get();
return tx.getConfidence().numBroadcastPeers(); if (tx == null) {
// We previously downloaded this transaction, but nothing cared about it so the garbage collector threw
// it away. We also deleted the set that tracked which peers had seen it. Treat this case as a zero and
// just delete it from the map.
memoryPool.remove(txHash);
return 0;
} else {
checkState(entry.addresses == null);
return tx.getConfidence().numBroadcastPeers();
}
} }
} finally {
lock.unlock();
} }
} }
@ -152,54 +167,65 @@ public class MemoryPool {
* @param byPeer The Peer that received it. * @param byPeer The Peer that received it.
* @return An object that is semantically the same TX but may be a different object instance. * @return An object that is semantically the same TX but may be a different object instance.
*/ */
public synchronized Transaction seen(Transaction tx, PeerAddress byPeer) { public Transaction seen(Transaction tx, PeerAddress byPeer) {
cleanPool(); lock.lock();
Entry entry = memoryPool.get(tx.getHash()); try {
if (entry != null) { cleanPool();
// This TX or its hash have been previously announced. Entry entry = memoryPool.get(tx.getHash());
if (entry.tx != null) { if (entry != null) {
// We already downloaded it. // This TX or its hash have been previously announced.
Preconditions.checkState(entry.addresses == null); if (entry.tx != null) {
// We only want one canonical object instance for a transaction no matter how many times it is // We already downloaded it.
// deserialized. checkState(entry.addresses == null);
Transaction transaction = entry.tx.get(); // We only want one canonical object instance for a transaction no matter how many times it is
if (transaction == null) { // deserialized.
// We previously downloaded this transaction, but the garbage collector threw it away because Transaction transaction = entry.tx.get();
// no other part of the system cared enough to keep it around (it's not relevant to us). if (transaction == null) {
// Given the lack of interest last time we probably don't need to track it this time either. // We previously downloaded this transaction, but the garbage collector threw it away because
log.info("{}: Provided with a transaction that we previously threw away: {}", byPeer, tx.getHash()); // no other part of the system cared enough to keep it around (it's not relevant to us).
// Given the lack of interest last time we probably don't need to track it this time either.
log.info("{}: Provided with a transaction that we previously threw away: {}", byPeer, tx.getHash());
} else {
// We saw it before and kept it around. Hand back the canonical copy.
tx = transaction;
log.info("{}: Provided with a transaction downloaded before: [{}] {}",
new Object[]{byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHash()});
}
markBroadcast(byPeer, tx);
return tx;
} else { } else {
// We saw it before and kept it around. Hand back the canonical copy. // We received a transaction that we have previously seen announced but not downloaded until now.
tx = transaction; checkNotNull(entry.addresses);
log.info("{}: Provided with a transaction downloaded before: [{}] {}", entry.tx = new WeakTransactionReference(tx, referenceQueue);
new Object[] { byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHash() }); // Copy the previously announced peers into the confidence and then clear it out. Unlock here
// because markBroadcastBy can trigger event listeners and thus inversions.
lock.unlock();
try {
TransactionConfidence confidence = tx.getConfidence();
for (PeerAddress a : entry.addresses) {
confidence.markBroadcastBy(a);
}
entry.addresses = null;
log.debug("{}: Adding tx [{}] {} to the memory pool",
new Object[]{byPeer, confidence.numBroadcastPeers(), tx.getHashAsString()});
} finally {
lock.lock();
}
return tx;
} }
tx.getConfidence().markBroadcastBy(byPeer);
return tx;
} else { } else {
// We received a transaction that we have previously seen announced but not downloaded until now. // This often happens when we are downloading a Bloom filtered chain, or recursively downloading
Preconditions.checkNotNull(entry.addresses); // dependencies of a relevant transaction (see Peer.downloadDependencies).
log.debug("{}: Provided with a downloaded transaction we didn't see announced yet: {}",
byPeer, tx.getHashAsString());
entry = new Entry();
entry.tx = new WeakTransactionReference(tx, referenceQueue); entry.tx = new WeakTransactionReference(tx, referenceQueue);
// Copy the previously announced peers into the confidence and then clear it out. memoryPool.put(tx.getHash(), entry);
TransactionConfidence confidence = tx.getConfidence(); markBroadcast(byPeer, tx);
for (PeerAddress a : entry.addresses) {
confidence.markBroadcastBy(a);
}
entry.addresses = null;
log.debug("{}: Adding tx [{}] {} to the memory pool",
new Object[] { byPeer, confidence.numBroadcastPeers(), tx.getHashAsString() });
return tx; return tx;
} }
} else { } finally {
// This often happens when we are downloading a Bloom filtered chain, or recursively downloading lock.unlock();
// dependencies of a relevant transaction (see Peer.downloadDependencies).
log.debug("{}: Provided with a downloaded transaction we didn't see announced yet: {}",
byPeer, tx.getHashAsString());
entry = new Entry();
entry.tx = new WeakTransactionReference(tx, referenceQueue);
memoryPool.put(tx.getHash(), entry);
tx.getConfidence().markBroadcastBy(byPeer);
return tx;
} }
} }
@ -207,36 +233,53 @@ public class MemoryPool {
* Called by peers when they see a transaction advertised in an "inv" message. It either will increase the * Called by peers when they see a transaction advertised in an "inv" message. It either will increase the
* confidence of the pre-existing transaction or will just keep a record of the address for future usage. * confidence of the pre-existing transaction or will just keep a record of the address for future usage.
*/ */
public synchronized void seen(Sha256Hash hash, PeerAddress byPeer) { public void seen(Sha256Hash hash, PeerAddress byPeer) {
cleanPool(); lock.lock();
Entry entry = memoryPool.get(hash); try {
if (entry != null) { cleanPool();
// This TX or its hash have been previously announced. Entry entry = memoryPool.get(hash);
if (entry.tx != null) { if (entry != null) {
Preconditions.checkState(entry.addresses == null); // This TX or its hash have been previously announced.
Transaction tx = entry.tx.get(); if (entry.tx != null) {
if (tx != null) { checkState(entry.addresses == null);
tx.getConfidence().markBroadcastBy(byPeer); Transaction tx = entry.tx.get();
log.debug("{}: Announced transaction we have seen before [{}] {}", if (tx != null) {
new Object[] { byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHashAsString() }); markBroadcast(byPeer, tx);
log.debug("{}: Announced transaction we have seen before [{}] {}",
new Object[]{byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHashAsString()});
} else {
// The inv is telling us about a transaction that we previously downloaded, and threw away because
// nothing found it interesting enough to keep around. So do nothing.
}
} else { } else {
// The inv is telling us about a transaction that we previously downloaded, and threw away because checkNotNull(entry.addresses);
// nothing found it interesting enough to keep around. So do nothing. entry.addresses.add(byPeer);
log.debug("{}: Announced transaction we have seen announced before [{}] {}",
new Object[]{byPeer, entry.addresses.size(), hash});
} }
} else { } else {
Preconditions.checkNotNull(entry.addresses); // This TX has never been seen before.
entry = new Entry();
// TODO: Using hashsets here is inefficient compared to just having an array.
entry.addresses = new HashSet<PeerAddress>();
entry.addresses.add(byPeer); entry.addresses.add(byPeer);
log.debug("{}: Announced transaction we have seen announced before [{}] {}", memoryPool.put(hash, entry);
new Object[] { byPeer, entry.addresses.size(), hash }); log.info("{}: Announced new transaction [1] {}", byPeer, hash);
} }
} else { } finally {
// This TX has never been seen before. lock.unlock();
entry = new Entry(); }
// TODO: Using hashsets here is inefficient compared to just having an array. }
entry.addresses = new HashSet<PeerAddress>();
entry.addresses.add(byPeer); private void markBroadcast(PeerAddress byPeer, Transaction tx) {
memoryPool.put(hash, entry); // Marking a TX as broadcast by a peer can run event listeners that might call back into Peer or PeerGroup.
log.info("{}: Announced new transaction [1] {}", byPeer, hash); // Thus we unlock ourselves here to avoid potential inversions.
checkState(lock.isLocked());
lock.unlock();
try {
tx.getConfidence().markBroadcastBy(byPeer);
} finally {
lock.lock();
} }
} }
@ -245,14 +288,19 @@ public class MemoryPool {
* we only saw advertisements for it yet or it has been downloaded but garbage collected due to nowhere else * we only saw advertisements for it yet or it has been downloaded but garbage collected due to nowhere else
* holding a reference to it. * holding a reference to it.
*/ */
public synchronized Transaction get(Sha256Hash hash) { public Transaction get(Sha256Hash hash) {
Entry entry = memoryPool.get(hash); lock.lock();
if (entry == null) return null; // Unknown. try {
if (entry.tx == null) return null; // Seen but only in advertisements. Entry entry = memoryPool.get(hash);
if (entry.tx.get() == null) return null; // Was downloaded but garbage collected. if (entry == null) return null; // Unknown.
Transaction tx = entry.tx.get(); if (entry.tx == null) return null; // Seen but only in advertisements.
Preconditions.checkNotNull(tx); if (entry.tx.get() == null) return null; // Was downloaded but garbage collected.
return tx; Transaction tx = entry.tx.get();
checkNotNull(tx);
return tx;
} finally {
lock.unlock();
}
} }
/** /**
@ -260,8 +308,13 @@ public class MemoryPool {
* was broadcast, downloaded and nothing kept a reference to it will eventually be cleared out by the garbage * was broadcast, downloaded and nothing kept a reference to it will eventually be cleared out by the garbage
* collector and wasSeen() will return false - it does not keep a permanent record of every hash ever broadcast. * collector and wasSeen() will return false - it does not keep a permanent record of every hash ever broadcast.
*/ */
public synchronized boolean maybeWasSeen(Sha256Hash hash) { public boolean maybeWasSeen(Sha256Hash hash) {
Entry entry = memoryPool.get(hash); lock.lock();
return entry != null; try {
Entry entry = memoryPool.get(hash);
return entry != null;
} finally {
lock.unlock();
}
} }
} }

View File

@ -65,7 +65,7 @@ public class TransactionConfidence implements Serializable {
/** The Transaction that this confidence object is associated with. */ /** The Transaction that this confidence object is associated with. */
private Transaction transaction; private Transaction transaction;
// Lazily created listeners array. // Lazily created listeners array.
private transient ArrayList<Listener> listeners; private transient CopyOnWriteArrayList<Listener> listeners;
// The depth of the transaction on the best chain in blocks. An unconfirmed block has depth 0. // The depth of the transaction on the best chain in blocks. An unconfirmed block has depth 0.
private int depth; private int depth;
@ -152,6 +152,7 @@ public class TransactionConfidence implements Serializable {
public TransactionConfidence(Transaction tx) { public TransactionConfidence(Transaction tx) {
// Assume a default number of peers for our set. // Assume a default number of peers for our set.
broadcastBy = new CopyOnWriteArrayList<PeerAddress>(); broadcastBy = new CopyOnWriteArrayList<PeerAddress>();
listeners = new CopyOnWriteArrayList<Listener>();
transaction = tx; transaction = tx;
} }
@ -178,18 +179,13 @@ public class TransactionConfidence implements Serializable {
* {@link BlockChainListener}, attach it to a {@link BlockChain} and then use the getters on the * {@link BlockChainListener}, attach it to a {@link BlockChain} and then use the getters on the
* confidence object to determine the new depth.</p> * confidence object to determine the new depth.</p>
*/ */
public synchronized void addEventListener(Listener listener) { public void addEventListener(Listener listener) {
Preconditions.checkNotNull(listener); Preconditions.checkNotNull(listener);
if (listeners == null) listeners.addIfAbsent(listener);
listeners = new ArrayList<Listener>(2);
// Dedupe registrations. This makes the wallet code simpler.
if (!listeners.contains(listener))
listeners.add(listener);
} }
public synchronized void removeEventListener(Listener listener) { public void removeEventListener(Listener listener) {
Preconditions.checkNotNull(listener); Preconditions.checkNotNull(listener);
Preconditions.checkNotNull(listeners);
listeners.remove(listener); listeners.remove(listener);
} }
@ -225,11 +221,13 @@ public class TransactionConfidence implements Serializable {
* Called by other objects in the system, like a {@link Wallet}, when new information about the confidence of a * Called by other objects in the system, like a {@link Wallet}, when new information about the confidence of a
* transaction becomes available. * transaction becomes available.
*/ */
public synchronized void setConfidenceType(ConfidenceType confidenceType) { public void setConfidenceType(ConfidenceType confidenceType) {
// Don't inform the event listeners if the confidence didn't really change. // Don't inform the event listeners if the confidence didn't really change.
if (confidenceType == this.confidenceType) synchronized (this) {
return; if (confidenceType == this.confidenceType)
this.confidenceType = confidenceType; return;
this.confidenceType = confidenceType;
}
runListeners(); runListeners();
} }
@ -261,7 +259,7 @@ public class TransactionConfidence implements Serializable {
} }
/** /**
* Returns a synchronized set of {@link PeerAddress}es that announced the transaction. * Returns a snapshot of {@link PeerAddress}es that announced the transaction.
*/ */
public ListIterator<PeerAddress> getBroadcastBy() { public ListIterator<PeerAddress> getBroadcastBy() {
return broadcastBy.listIterator(); return broadcastBy.listIterator();
@ -310,12 +308,17 @@ public class TransactionConfidence implements Serializable {
* Updates the internal counter that tracks how deeply buried the block is. * Updates the internal counter that tracks how deeply buried the block is.
* Work is the value of block.getWork(). * Work is the value of block.getWork().
*/ */
public synchronized void notifyWorkDone(Block block) throws VerificationException { public void notifyWorkDone(Block block) throws VerificationException {
if (getConfidenceType() == ConfidenceType.BUILDING) { boolean notify = false;
this.depth++; synchronized (this) {
this.workDone = this.workDone.add(block.getWork()); if (getConfidenceType() == ConfidenceType.BUILDING) {
runListeners(); this.depth++;
this.workDone = this.workDone.add(block.getWork());
notify = true;
}
} }
if (notify)
runListeners();
} }
/** /**
@ -405,12 +408,8 @@ public class TransactionConfidence implements Serializable {
} }
private void runListeners() { private void runListeners() {
EventListenerInvoker.invoke(listeners, new EventListenerInvoker<Listener>() { for (Listener listener : listeners)
@Override listener.onConfidenceChanged(transaction);
public void invoke(Listener listener) {
listener.onConfidenceChanged(transaction);
}
});
} }
/** /**

View File

@ -120,6 +120,7 @@ public class PingService {
": " + tx); ": " + tx);
tx.getConfidence().addEventListener(new TransactionConfidence.Listener() { tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
public void onConfidenceChanged(Transaction tx2) { public void onConfidenceChanged(Transaction tx2) {
// Must be thread safe.
if (tx2.getConfidence().getConfidenceType() == TransactionConfidence.ConfidenceType.BUILDING) { if (tx2.getConfidence().getConfidenceType() == TransactionConfidence.ConfidenceType.BUILDING) {
// Coins were confirmed (appeared in a block). // Coins were confirmed (appeared in a block).
tx2.getConfidence().removeEventListener(this); tx2.getConfidence().removeEventListener(this);