3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-31 07:12:17 +00:00

Major rework of how confidence listeners are called:

* API change: TransactionConfidence.Listener now takes a reason enum describing the general class of change.
* Confidence listeners are now invoked in the user code thread as well, thus eliminating any chance of unexpected re-entrancy.
* The wallet batches up confidence changes and executes them all at the end of major operations, avoiding confusing intermediate transitions that could occur in the previous design.
* Much code has been simplified as a result and it's now harder to screw up.
This commit is contained in:
Mike Hearn 2013-06-20 14:47:16 +02:00
parent 5de80dfedf
commit 6b7d653614
10 changed files with 187 additions and 177 deletions

View File

@ -168,7 +168,6 @@ public class MemoryPool {
* @return An object that is semantically the same TX but may be a different object instance.
*/
public Transaction seen(Transaction tx, PeerAddress byPeer) {
boolean skipUnlock = false;
lock.lock();
try {
cleanPool();
@ -203,13 +202,8 @@ public class MemoryPool {
TransactionConfidence confidence = tx.getConfidence();
log.debug("{}: Adding tx [{}] {} to the memory pool",
new Object[]{byPeer, confidence.numBroadcastPeers(), tx.getHashAsString()});
// Copy the previously announced peers into the confidence and then clear it out. Unlock here
// because markBroadcastBy can trigger event listeners and thus inversions. After the lock is
// released "entry" may be changing arbitrarily and isn't usable.
skipUnlock = true;
lock.unlock();
for (PeerAddress a : addrs) {
confidence.markBroadcastBy(a);
markBroadcast(a, tx);
}
return tx;
}
@ -225,7 +219,7 @@ public class MemoryPool {
return tx;
}
} finally {
if (!skipUnlock) lock.unlock();
lock.unlock();
}
}
@ -272,15 +266,10 @@ public class MemoryPool {
}
private void markBroadcast(PeerAddress byPeer, Transaction tx) {
// Marking a TX as broadcast by a peer can run event listeners that might call back into Peer or PeerGroup.
// Thus we unlock ourselves here to avoid potential inversions.
checkState(lock.isLocked());
lock.unlock();
try {
tx.getConfidence().markBroadcastBy(byPeer);
} finally {
lock.lock();
}
final TransactionConfidence confidence = tx.getConfidence();
confidence.markBroadcastBy(byPeer);
confidence.queueListeners(TransactionConfidence.Listener.ChangeReason.SEEN_PEERS);
}
/**

View File

@ -1147,9 +1147,8 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
// Prepare to send the transaction by adding a listener that'll be called when confidence changes.
// Only bother with this if we might actually hear back:
if (minConnections > 1) tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
public void onConfidenceChanged(Transaction tx) {
public void onConfidenceChanged(Transaction tx, TransactionConfidence.Listener.ChangeReason reason) {
// The number of peers that announced this tx has gone up.
// Thread safe - this can run in parallel.
final TransactionConfidence conf = tx.getConfidence();
int numSeenPeers = conf.numBroadcastPeers();
boolean mined = tx.getAppearsInHashes() != null;

View File

@ -272,8 +272,6 @@ public class Transaction extends ChildMessage implements Serializable {
addBlockAppearance(block.getHeader().getHash());
if (bestChain) {
// This can cause event listeners on TransactionConfidence to run. After these lines complete, the wallets
// state may have changed!
TransactionConfidence transactionConfidence = getConfidence();
// Reset the work done.
try {

View File

@ -16,6 +16,7 @@
package com.google.bitcoin.core;
import com.google.bitcoin.utils.Threading;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@ -63,7 +64,7 @@ public class TransactionConfidence implements Serializable {
*/
private CopyOnWriteArrayList<PeerAddress> broadcastBy;
/** The Transaction that this confidence object is associated with. */
private Transaction transaction;
private final Transaction transaction;
// Lazily created listeners array.
private transient CopyOnWriteArrayList<Listener> listeners;
@ -158,7 +159,13 @@ public class TransactionConfidence implements Serializable {
* <p>During listener execution, it's safe to remove the current listener but not others.</p>
*/
public interface Listener {
public void onConfidenceChanged(Transaction tx);
/** An enum that describes why a transaction confidence listener is being invoked (i.e. the class of change). */
public enum ChangeReason {
TYPE,
DEPTH,
SEEN_PEERS,
}
public void onConfidenceChanged(Transaction tx, ChangeReason reason);
}
/**
@ -214,19 +221,16 @@ public class TransactionConfidence implements Serializable {
* Called by other objects in the system, like a {@link Wallet}, when new information about the confidence of a
* transaction becomes available.
*/
public void setConfidenceType(ConfidenceType confidenceType) {
public synchronized void setConfidenceType(ConfidenceType confidenceType) {
// Don't inform the event listeners if the confidence didn't really change.
synchronized (this) {
if (confidenceType == this.confidenceType)
return;
this.confidenceType = confidenceType;
if (confidenceType == ConfidenceType.PENDING) {
depth = 0;
appearedAtChainHeight = -1;
workDone = BigInteger.ZERO;
}
if (confidenceType == this.confidenceType)
return;
this.confidenceType = confidenceType;
if (confidenceType == ConfidenceType.PENDING) {
depth = 0;
appearedAtChainHeight = -1;
workDone = BigInteger.ZERO;
}
runListeners();
}
@ -238,15 +242,12 @@ public class TransactionConfidence implements Serializable {
*
* @param address IP address of the peer, used as a proxy for identity.
*/
public void markBroadcastBy(PeerAddress address) {
public synchronized void markBroadcastBy(PeerAddress address) {
if (!broadcastBy.addIfAbsent(address))
return; // Duplicate.
synchronized (this) {
if (getConfidenceType() == ConfidenceType.UNKNOWN) {
this.confidenceType = ConfidenceType.PENDING;
}
if (getConfidenceType() == ConfidenceType.UNKNOWN) {
this.confidenceType = ConfidenceType.PENDING;
}
runListeners();
}
/**
@ -303,17 +304,13 @@ public class TransactionConfidence implements Serializable {
* Updates the internal counter that tracks how deeply buried the block is.
* Work is the value of block.getWork().
*/
public void notifyWorkDone(Block block) throws VerificationException {
boolean notify = false;
synchronized (this) {
if (getConfidenceType() == ConfidenceType.BUILDING) {
this.depth++;
this.workDone = this.workDone.add(block.getWork());
notify = true;
}
}
if (notify)
runListeners();
public synchronized boolean notifyWorkDone(Block block) throws VerificationException {
if (getConfidenceType() != ConfidenceType.BUILDING)
return false; // Should this be an assert?
this.depth++;
this.workDone = this.workDone.add(block.getWork());
return true;
}
/**
@ -390,9 +387,20 @@ public class TransactionConfidence implements Serializable {
}
}
private void runListeners() {
for (Listener listener : listeners)
listener.onConfidenceChanged(transaction);
/**
* Call this after adjusting the confidence, for cases where listeners should be notified. This has to be done
* explicitly rather than being done automatically because sometimes complex changes to transaction states can
* result in a series of confidence changes that are not really useful to see separately. By invoking listeners
* explicitly, more precise control is available. Note that this will run the listeners on the user code thread.
*/
public void queueListeners(final Listener.ChangeReason reason) {
for (final Listener listener : listeners) {
Threading.userCode.execute(new Runnable() {
@Override public void run() {
listener.onConfidenceChanged(transaction, reason);
}
});
}
}
/**
@ -420,21 +428,20 @@ public class TransactionConfidence implements Serializable {
* depth to one will wait until it appears in a block on the best chain, and zero will wait until it has been seen
* on the network.
*/
public ListenableFuture<Transaction> getDepthFuture(final int depth) {
public synchronized ListenableFuture<Transaction> getDepthFuture(final int depth) {
final SettableFuture<Transaction> result = SettableFuture.create();
synchronized (this) {
if (getDepthInBlocks() >= depth) {
result.set(transaction);
}
addEventListener(new Listener() {
@Override public void onConfidenceChanged(Transaction tx) {
if (getDepthInBlocks() >= depth) {
removeEventListener(this);
result.set(transaction);
}
}
});
if (getDepthInBlocks() >= depth) {
result.set(transaction);
}
addEventListener(new Listener() {
@Override public void onConfidenceChanged(Transaction tx, ChangeReason reason) {
// Runs in user code thread.
if (getDepthInBlocks() >= depth) {
removeEventListener(this);
result.set(transaction);
}
}
});
return result;
}
}

View File

@ -28,7 +28,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.*;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import org.bitcoinj.wallet.Protos.Wallet.EncryptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -143,6 +142,13 @@ public class Wallet implements Serializable, BlockChainListener {
// Whether or not to ignore nLockTime > 0 transactions that are received to the mempool.
private boolean acceptTimeLockedTransactions;
// Stuff for notifying transaction objects that we changed their confidences. The purpose of this is to avoid
// spuriously sending lots of repeated notifications to listeners that API users aren't really interested in as a
// side effect of how the code is written (e.g. during re-orgs confidence data gets adjusted multiple times).
private int onWalletChangedSuppressions;
private boolean insideReorg;
private Map<Transaction, TransactionConfidence.Listener.ChangeReason> confidenceChanged;
/** Represents the results of a {@link CoinSelector#select(java.math.BigInteger, java.util.LinkedList)} operation */
public static class CoinSelection {
public BigInteger valueGathered;
@ -287,6 +293,7 @@ public class Wallet implements Serializable, BlockChainListener {
dead = new HashMap<Sha256Hash, Transaction>();
eventListeners = new CopyOnWriteArrayList<WalletEventListener>();
extensions = new HashMap<String, WalletExtension>();
confidenceChanged = new HashMap<Transaction, TransactionConfidence.Listener.ChangeReason>();
createTransientState();
}
@ -294,22 +301,23 @@ public class Wallet implements Serializable, BlockChainListener {
ignoreNextNewBlock = new HashSet<Sha256Hash>();
txConfidenceListener = new TransactionConfidence.Listener() {
@Override
public void onConfidenceChanged(Transaction tx) {
lock.lock();
// The invokers unlock us immediately so if an exception is thrown, the lock will be already open.
checkBalanceFuturesLocked(null);
queueOnTransactionConfidenceChanged(tx);
// Many onWalletChanged events will not occur because they are suppressed, eg, because:
// - we are inside a re-org
// - we are in the middle of processing a block
// - the confidence is changing because a new best block was accepted
// It will run in cases like:
// - the tx is pending and another peer announced it
// - the tx is pending and was killed by a detected double spend that was not in a block
// The latter case cannot happen today because we won't hear about it, but in future this may
// become more common if conflict notices are implemented.
maybeQueueOnWalletChanged();
lock.unlock();
public void onConfidenceChanged(Transaction tx, TransactionConfidence.Listener.ChangeReason reason) {
// This will run on the user code thread so we shouldn't do anything too complicated here.
// We only want to queue a wallet changed event and auto-save if the number of peers announcing
// the transaction has changed, as that confidence change is made by the networking code which
// doesn't necessarily know at that point which wallets contain which transactions, so it's up
// to us to listen for that. Other types of confidence changes (type, etc) are triggered by us,
// so we'll queue up a wallet change event in other parts of the code.
if (reason == ChangeReason.SEEN_PEERS) {
lock.lock();
try {
checkBalanceFuturesLocked(null);
queueOnTransactionConfidenceChanged(tx);
maybeQueueOnWalletChanged();
} finally {
lock.unlock();
}
}
}
};
acceptTimeLockedTransactions = false;
@ -799,7 +807,7 @@ public class Wallet implements Serializable, BlockChainListener {
Transaction tx = pending.get(txHash);
if (tx == null)
return;
receive(tx, block, blockType, false);
receive(tx, block, blockType);
} finally {
lock.unlock();
}
@ -850,24 +858,11 @@ public class Wallet implements Serializable, BlockChainListener {
Utils.bitcoinValueToFriendlyString(valueSentToMe)));
}
if (tx.getConfidence().getSource().equals(TransactionConfidence.Source.UNKNOWN)) {
log.warn("Wallet received transaction with an unknown source. Consider tagging tx!");
}
// Mark the tx as having been seen but is not yet in the chain. This will normally have been done already by
// the Peer before we got to this point, but in some cases (unit tests, other sources of transactions) it may
// have been missed out.
ConfidenceType currentConfidence = tx.getConfidence().getConfidenceType();
if (currentConfidence == ConfidenceType.UNKNOWN) {
tx.getConfidence().setConfidenceType(ConfidenceType.PENDING);
// Manually invoke the wallet tx confidence listener here as we didn't yet commit therefore the
// txConfidenceListener wasn't added.
checkBalanceFuturesLocked(null);
queueOnTransactionConfidenceChanged(tx);
log.warn("Wallet received transaction with an unknown source. Consider tagging it!");
}
// If this tx spends any of our unspent outputs, mark them as spent now, then add to the pending pool. This
// ensures that if some other client that has our keys broadcasts a spend we stay in sync. Also updates the
// timestamp on the transaction and registers/runs event listeners.
//
// Note that after we return from this function, the wallet may have been modified.
commitTx(tx);
} finally {
lock.unlock();
@ -999,13 +994,13 @@ public class Wallet implements Serializable, BlockChainListener {
BlockChain.NewBlockType blockType) throws VerificationException {
lock.lock();
try {
receive(tx, block, blockType, false);
receive(tx, block, blockType);
} finally {
lock.unlock();
}
}
private void receive(Transaction tx, StoredBlock block, BlockChain.NewBlockType blockType, boolean reorg) throws VerificationException {
private void receive(Transaction tx, StoredBlock block, BlockChain.NewBlockType blockType) throws VerificationException {
// Runs in a peer thread.
checkState(lock.isLocked());
BigInteger prevBalance = getBalance();
@ -1042,8 +1037,6 @@ public class Wallet implements Serializable, BlockChainListener {
if (spentBy != null) spentBy.disconnect();
}
}
// TODO: This can trigger tx confidence listeners to be run in the case of double spends.
// We should delay the execution of the listeners until the bottom to avoid the wallet mutating.
processTxFromBestChain(tx);
} else {
checkState(sideChain);
@ -1051,7 +1044,7 @@ public class Wallet implements Serializable, BlockChainListener {
// some miners are also trying to include the transaction into the current best chain too, so let's treat
// it as pending, except we don't need to do any risk analysis on it.
if (wasPending) {
// Just put it back in without touching the connections.
// Just put it back in without touching the connections or confidence.
addWalletTransaction(Pool.PENDING, tx);
} else {
// Ignore the case where a tx appears on a side chain at the same time as the best chain (this is
@ -1060,7 +1053,6 @@ public class Wallet implements Serializable, BlockChainListener {
if (!unspent.containsKey(hash) && !spent.containsKey(hash)) {
// Otherwise put it (possibly back) into pending.
// Committing it updates the spent flags and inserts into the pool as well.
tx.getConfidence().setConfidenceType(ConfidenceType.PENDING);
commitTx(tx);
}
}
@ -1069,7 +1061,6 @@ public class Wallet implements Serializable, BlockChainListener {
if (block != null) {
// Mark the tx as appearing in this block so we can find it later after a re-org. This also tells the tx
// confidence object about the block and sets its work done/depth appropriately.
// TODO: This can trigger re-entrancy: delay running confidence listeners.
tx.setBlockAppearance(block, bestChain);
if (bestChain) {
// Don't notify this tx of work done in notifyNewBestBlock which will be called immediately after
@ -1079,6 +1070,16 @@ public class Wallet implements Serializable, BlockChainListener {
}
}
onWalletChangedSuppressions--;
// Side chains don't affect confidence.
if (bestChain) {
// notifyNewBestBlock will be invoked next and will then call maybeQueueOnWalletChanged for us.
confidenceChanged.put(tx, TransactionConfidence.Listener.ChangeReason.TYPE);
} else {
maybeQueueOnWalletChanged();
}
// Inform anyone interested that we have received or sent coins but only if:
// - This is not due to a re-org.
// - The coins appeared on the best chain.
@ -1086,30 +1087,38 @@ public class Wallet implements Serializable, BlockChainListener {
// - We have not already informed the user about the coins when we received the tx broadcast, or for our
// own spends. If users want to know when a broadcast tx becomes confirmed, they need to use tx confidence
// listeners.
if (!reorg && bestChain && !wasPending) {
if (!insideReorg && bestChain) {
BigInteger newBalance = getBalance(); // This is slow.
log.info("Balance is now: " + bitcoinValueToFriendlyString(newBalance));
int diff = valueDifference.compareTo(BigInteger.ZERO);
// We pick one callback based on the value difference, though a tx can of course both send and receive
// coins from the wallet.
if (diff > 0) {
checkBalanceFuturesLocked(newBalance);
queueOnCoinsReceived(tx, prevBalance, newBalance);
} else if (diff < 0) {
queueOnCoinsSent(tx, prevBalance, newBalance);
} else {
// We have a transaction that didn't change our balance. Probably we sent coins between our own keys.
maybeQueueOnWalletChanged();
if (!wasPending) {
int diff = valueDifference.compareTo(BigInteger.ZERO);
// We pick one callback based on the value difference, though a tx can of course both send and receive
// coins from the wallet.
if (diff > 0) {
queueOnCoinsReceived(tx, prevBalance, newBalance);
} else if (diff < 0) {
queueOnCoinsSent(tx, prevBalance, newBalance);
}
}
checkBalanceFuturesLocked(newBalance);
}
// Wallet change notification will be sent shortly after the block is finished processing, in notifyNewBestBlock
onWalletChangedSuppressions--;
informConfidenceListenersIfNotReorganizing();
checkState(isConsistent());
queueAutoSave();
}
private void informConfidenceListenersIfNotReorganizing() {
if (insideReorg)
return;
for (Map.Entry<Transaction, TransactionConfidence.Listener.ChangeReason> entry : confidenceChanged.entrySet()) {
final Transaction tx = entry.getKey();
tx.getConfidence().queueListeners(entry.getValue());
queueOnTransactionConfidenceChanged(tx);
}
confidenceChanged.clear();
}
/**
* <p>Called by the {@link BlockChain} when a new block on the best chain is seen, AFTER relevant wallet
* transactions are extracted and sent to us UNLESS the new block caused a re-org, in which case this will
@ -1132,7 +1141,6 @@ public class Wallet implements Serializable, BlockChainListener {
// TODO: Clarify the code below.
// Notify all the BUILDING transactions of the new block.
// This is so that they can update their work done and depth.
onWalletChangedSuppressions++;
Set<Transaction> transactions = getTransactions(true);
for (Transaction tx : transactions) {
if (ignoreNextNewBlock.contains(tx.getHash())) {
@ -1141,11 +1149,13 @@ public class Wallet implements Serializable, BlockChainListener {
ignoreNextNewBlock.remove(tx.getHash());
} else {
tx.getConfidence().notifyWorkDone(block.getHeader());
confidenceChanged.put(tx, TransactionConfidence.Listener.ChangeReason.DEPTH);
}
}
queueAutoSave();
onWalletChangedSuppressions--;
informConfidenceListenersIfNotReorganizing();
maybeQueueOnWalletChanged();
queueAutoSave();
} finally {
lock.unlock();
}
@ -1281,6 +1291,7 @@ public class Wallet implements Serializable, BlockChainListener {
if (overridingTx == null) {
// killedTx depended on a transaction that died because it was double spent or a coinbase that got re-orgd.
killedTx.getConfidence().setOverridingTransaction(null);
confidenceChanged.put(killedTx, TransactionConfidence.Listener.ChangeReason.TYPE);
pending.remove(killedTxHash);
unspent.remove(killedTxHash);
spent.remove(killedTxHash);
@ -1316,8 +1327,8 @@ public class Wallet implements Serializable, BlockChainListener {
maybeMovePool(overridingInput.getOutpoint().fromTx, "kill");
}
}
log.info("Informing tx listeners of double spend event");
killedTx.getConfidence().setOverridingTransaction(overridingTx); // RE-ENTRY POINT
killedTx.getConfidence().setOverridingTransaction(overridingTx);
confidenceChanged.put(killedTx, TransactionConfidence.Listener.ChangeReason.TYPE);
// TODO: Recursively kill other transactions that were double spent.
}
@ -1382,9 +1393,10 @@ public class Wallet implements Serializable, BlockChainListener {
// Add to the pending pool. It'll be moved out once we receive this transaction on the best chain.
// This also registers txConfidenceListener so wallet listeners get informed.
log.info("->pending: {}", tx.getHashAsString());
tx.getConfidence().setConfidenceType(ConfidenceType.PENDING);
confidenceChanged.put(tx, TransactionConfidence.Listener.ChangeReason.TYPE);
addWalletTransaction(Pool.PENDING, tx);
// Event listeners may re-enter so we cannot make assumptions about wallet state after this loop completes.
try {
BigInteger valueSentFromMe = tx.getValueSentFromMe(this);
BigInteger valueSentToMe = tx.getValueSentToMe(this);
@ -1403,6 +1415,7 @@ public class Wallet implements Serializable, BlockChainListener {
}
checkState(isConsistent());
informConfidenceListenersIfNotReorganizing();
queueAutoSave();
} finally {
lock.unlock();
@ -1757,7 +1770,7 @@ public class Wallet implements Serializable, BlockChainListener {
* likely be rejected by the network in this case.</p>
*/
public static SendRequest to(Address destination, BigInteger value) {
SendRequest req = new Wallet.SendRequest();
SendRequest req = new SendRequest();
req.tx = new Transaction(destination.getParameters());
req.tx.addOutput(value, destination);
return req;
@ -2020,9 +2033,11 @@ public class Wallet implements Serializable, BlockChainListener {
}
// Label the transaction as being self created. We can use this later to spend its change output even before
// the transaction is confirmed.
req.tx.getConfidence().setConfidenceType(ConfidenceType.PENDING);
// the transaction is confirmed. We deliberately won't bother notifying listeners here as there's not much
// point - the user isn't interested in a confidence transition they made themselves.
req.tx.getConfidence().setSource(TransactionConfidence.Source.SELF);
// TODO: Remove this - a newly completed tx isn't really pending, nothing was done with it yet.
req.tx.getConfidence().setConfidenceType(ConfidenceType.PENDING);
req.completed = true;
req.fee = calculatedFee;
log.info(" completed {} with {} inputs", req.tx.getHashAsString(), req.tx.getInputs().size());
@ -2340,6 +2355,13 @@ public class Wallet implements Serializable, BlockChainListener {
// to try and corrupt the internal data structures. We should try harder to avoid this but it's tricky
// because there are so many ways the block can be invalid.
// Avoid spuriously informing the user of wallet/tx confidence changes whilst we're re-organizing.
checkState(confidenceChanged.size() == 0);
checkState(!insideReorg);
insideReorg = true;
checkState(onWalletChangedSuppressions == 0);
onWalletChangedSuppressions++;
// Map block hash to transactions that appear in it.
Multimap<Sha256Hash, Transaction> mapBlockTx = ArrayListMultimap.create();
for (Transaction tx : getTransactions(true)) {
@ -2360,10 +2382,6 @@ public class Wallet implements Serializable, BlockChainListener {
log.info(" {}", b.getHeader().getHashAsString());
}
// Avoid spuriously informing the user of wallet changes whilst we're re-organizing. This also prevents the
// user from modifying wallet contents (eg, trying to spend) whilst we're in the middle of the process.
onWalletChangedSuppressions++;
Collections.reverse(newBlocks); // Need bottom-to-top but we get top-to-bottom.
// For each block in the old chain, disconnect the transactions. It doesn't matter if
@ -2410,6 +2428,7 @@ public class Wallet implements Serializable, BlockChainListener {
if (tx.isCoinBase()) continue;
log.info(" ->pending {}", tx.getHash());
tx.getConfidence().setConfidenceType(ConfidenceType.PENDING); // Wipe height/depth/work data.
confidenceChanged.put(tx, TransactionConfidence.Listener.ChangeReason.TYPE);
addWalletTransaction(Pool.PENDING, tx);
updateForSpends(tx, false);
}
@ -2445,21 +2464,24 @@ public class Wallet implements Serializable, BlockChainListener {
for (Transaction tx : mapBlockTx.get(block.getHeader().getHash())) {
log.info(" tx {}", tx.getHash());
try {
receive(tx, block, BlockChain.NewBlockType.BEST_CHAIN, true);
receive(tx, block, BlockChain.NewBlockType.BEST_CHAIN);
} catch (ScriptException e) {
throw new RuntimeException(e); // Cannot happen as these blocks were already verified.
}
}
notifyNewBestBlock(block);
}
checkState(isConsistent());
final BigInteger balance = getBalance();
log.info("post-reorg balance is {}", Utils.bitcoinValueToFriendlyString(balance));
// Inform event listeners that a re-org took place. They should save the wallet at this point.
checkBalanceFuturesLocked(balance);
// Inform event listeners that a re-org took place.
queueOnReorganize();
insideReorg = false;
onWalletChangedSuppressions--;
maybeQueueOnWalletChanged();
checkState(isConsistent());
checkBalanceFuturesLocked(balance);
informConfidenceListenersIfNotReorganizing();
queueAutoSave();
} finally {
lock.unlock();
}
@ -2468,12 +2490,13 @@ public class Wallet implements Serializable, BlockChainListener {
/**
* Subtract the supplied depth and work done from the given transactions.
*/
private static void subtractDepthAndWorkDone(int depthToSubtract, BigInteger workDoneToSubtract,
Collection<Transaction> transactions) {
private void subtractDepthAndWorkDone(int depthToSubtract, BigInteger workDoneToSubtract,
Collection<Transaction> transactions) {
for (Transaction tx : transactions) {
if (tx.getConfidence().getConfidenceType() == ConfidenceType.BUILDING) {
tx.getConfidence().setDepthInBlocks(tx.getConfidence().getDepthInBlocks() - depthToSubtract);
tx.getConfidence().setWorkDone(tx.getConfidence().getWorkDone().subtract(workDoneToSubtract));
confidenceChanged.put(tx, TransactionConfidence.Listener.ChangeReason.DEPTH);
}
}
}
@ -3100,7 +3123,6 @@ public class Wallet implements Serializable, BlockChainListener {
}
}
private int onWalletChangedSuppressions = 0;
private void maybeQueueOnWalletChanged() {
// Don't invoke the callback in some circumstances, eg, whilst we are re-organizing or fiddling with
// transactions due to a new block arriving. It will be called later instead.
@ -3140,6 +3162,7 @@ public class Wallet implements Serializable, BlockChainListener {
private void queueOnReorganize() {
checkState(lock.isLocked());
checkState(insideReorg);
for (final WalletEventListener listener : eventListeners) {
Threading.userCode.execute(new Runnable() {
@Override public void run() {

View File

@ -43,7 +43,10 @@ public class Threading {
/**
* Put a dummy task into the queue and wait for it to be run. Because it's single threaded, this means all
* tasks submitted before this point are now completed.
* tasks submitted before this point are now completed. Usually you won't want to use this method - it's a
* convenience primarily used in unit testing. If you want to wait for an event to be called the right thing
* to do is usually to create a {@link com.google.common.util.concurrent.SettableFuture} and then call set
* on it. You can then either block on that future, compose it, add listeners to it and so on.
*/
public static void waitForUserCode() {
// If this assert fires it means you have a bug in your code - you can't call this method inside your own

View File

@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory;
import java.math.BigInteger;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
@ -60,17 +62,17 @@ public class ChainSplitTest {
public void testForking1() throws Exception {
// Check that if the block chain forks, we end up using the right chain. Only tests inbound transactions
// (receiving coins). Checking that we understand reversed spends is in testForking2.
final boolean[] reorgHappened = new boolean[1];
final int[] walletChanged = new int[1];
final AtomicBoolean reorgHappened = new AtomicBoolean();
final AtomicInteger walletChanged = new AtomicInteger();
wallet.addEventListener(new AbstractWalletEventListener() {
@Override
public void onReorganize(Wallet wallet) {
reorgHappened[0] = true;
reorgHappened.set(true);
}
@Override
public void onWalletChanged(Wallet wallet) {
walletChanged[0]++;
walletChanged.incrementAndGet();
}
});
@ -80,8 +82,8 @@ public class ChainSplitTest {
assertTrue(chain.add(b1));
assertTrue(chain.add(b2));
Threading.waitForUserCode();
assertFalse(reorgHappened[0]);
assertEquals(2, walletChanged[0]);
assertFalse(reorgHappened.get());
assertEquals(2, walletChanged.get());
// We got two blocks which generated 50 coins each, to us.
assertEquals("100.00", Utils.bitcoinValueToFriendlyString(wallet.getBalance()));
// We now have the following chain:
@ -96,8 +98,8 @@ public class ChainSplitTest {
Block b3 = b1.createNextBlock(someOtherGuy);
assertTrue(chain.add(b3));
Threading.waitForUserCode();
assertFalse(reorgHappened[0]); // No re-org took place.
assertEquals(2, walletChanged[0]);
assertFalse(reorgHappened.get()); // No re-org took place.
assertEquals(2, walletChanged.get());
assertEquals("100.00", Utils.bitcoinValueToFriendlyString(wallet.getBalance()));
// Check we can handle multi-way splits: this is almost certainly going to be extremely rare, but we have to
// handle it anyway. The same transaction appears in b7/b8 (side chain) but not b2 or b3.
@ -112,15 +114,15 @@ public class ChainSplitTest {
b8.solve();
assertTrue(chain.add(b8));
Threading.waitForUserCode();
assertFalse(reorgHappened[0]); // No re-org took place.
assertEquals(2, walletChanged[0]);
assertFalse(reorgHappened.get()); // No re-org took place.
assertEquals(5, walletChanged.get());
assertEquals("100.00", Utils.bitcoinValueToFriendlyString(wallet.getBalance()));
// Now we add another block to make the alternative chain longer.
assertTrue(chain.add(b3.createNextBlock(someOtherGuy)));
Threading.waitForUserCode();
assertTrue(reorgHappened[0]); // Re-org took place.
assertEquals(3, walletChanged[0]);
reorgHappened[0] = false;
assertTrue(reorgHappened.get()); // Re-org took place.
assertEquals(6, walletChanged.get());
reorgHappened.set(false);
//
// genesis -> b1 -> b2
// \-> b3 -> b4
@ -137,8 +139,8 @@ public class ChainSplitTest {
// \-> b3 -> b4
//
Threading.waitForUserCode();
assertTrue(reorgHappened[0]);
assertEquals(4, walletChanged[0]);
assertTrue(reorgHappened.get());
assertEquals(9, walletChanged.get());
assertEquals("200.00", Utils.bitcoinValueToFriendlyString(wallet.getBalance()));
}

View File

@ -241,7 +241,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
assertTrue(tx.getConfidence().wasBroadcastBy(peerOf(p2).getAddress()));
tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
public void onConfidenceChanged(Transaction tx) {
public void onConfidenceChanged(Transaction tx, TransactionConfidence.Listener.ChangeReason reason) {
event[1] = tx;
}
});
@ -299,8 +299,8 @@ public class PeerGroupTest extends TestWithPeerGroup {
InventoryMessage inv = new InventoryMessage(params);
inv.addTransaction(t1);
inbound(p2, inv);
assertTrue(sendResult.broadcastComplete.isDone());
Threading.waitForUserCode();
assertTrue(sendResult.broadcastComplete.isDone());
assertEquals(transactions[0], sendResult.tx);
assertEquals(transactions[0].getConfidence().numBroadcastPeers(), 2);
// Confirm it.

View File

@ -179,6 +179,7 @@ public class WalletTest extends TestWithWallet {
final ListenableFuture<BigInteger> estimatedFuture = wallet.getBalanceFuture(v1, Wallet.BalanceType.ESTIMATED);
assertFalse(availFuture.isDone());
assertFalse(estimatedFuture.isDone());
// Send some pending coins to the wallet.
Transaction t1 = sendMoneyToWallet(wallet, v1, toAddress, null);
Threading.waitForUserCode();
final ListenableFuture<Transaction> depthFuture = t1.getConfidence().getDepthFuture(1);
@ -186,9 +187,11 @@ public class WalletTest extends TestWithWallet {
assertEquals(BigInteger.ZERO, wallet.getBalance());
assertEquals(v1, wallet.getBalance(Wallet.BalanceType.ESTIMATED));
assertFalse(availFuture.isDone());
// Our estimated balance has reached the requested level.
assertTrue(estimatedFuture.isDone());
assertEquals(1, wallet.getPoolSize(Pool.PENDING));
assertEquals(0, wallet.getPoolSize(WalletTransaction.Pool.UNSPENT));
// Confirm the coins.
sendMoneyToWallet(wallet, t1, AbstractBlockChain.NewBlockType.BEST_CHAIN);
assertEquals("Incorrect confirmed tx balance", v1, wallet.getBalance());
assertEquals("Incorrect confirmed tx PENDING pool size", 0, wallet.getPoolSize(WalletTransaction.Pool.PENDING));
@ -652,7 +655,7 @@ public class WalletTest extends TestWithWallet {
// Make a fresh copy of the tx to ensure we're testing realistically.
flags[0] = flags[1] = false;
notifiedTx[0].getConfidence().addEventListener(new TransactionConfidence.Listener() {
public void onConfidenceChanged(Transaction tx) {
public void onConfidenceChanged(Transaction tx, TransactionConfidence.Listener.ChangeReason reason) {
flags[1] = true;
}
});

View File

@ -138,7 +138,6 @@ public class PingService {
wallet.addEventListener(new AbstractWalletEventListener() {
@Override
public void onCoinsReceived(Wallet w, Transaction tx, BigInteger prevBalance, BigInteger newBalance) {
// MUST BE THREAD SAFE
assert !newBalance.equals(BigInteger.ZERO);
if (!tx.isPending()) return;
// It was broadcast, but we can't really verify it's valid until it appears in a block.
@ -146,23 +145,12 @@ public class PingService {
System.out.println("Received pending tx for " + Utils.bitcoinValueToFriendlyString(value) +
": " + tx);
tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
public void onConfidenceChanged(final Transaction tx2) {
// Must be thread safe.
public void onConfidenceChanged(final Transaction tx2, TransactionConfidence.Listener.ChangeReason reason) {
if (tx2.getConfidence().getConfidenceType() == TransactionConfidence.ConfidenceType.BUILDING) {
// Coins were confirmed (appeared in a block).
tx2.getConfidence().removeEventListener(this);
// Run the process of sending the coins back on a separate thread. This is a temp hack
// until the threading changes in 0.9 are completed ... TX confidence listeners run
// with the wallet lock held and re-entering the wallet isn't always safe. We can solve
// this by just interacting with the wallet from a separate thread, which will wait until
// this thread is finished. It's a dumb API requirement and will go away soon.
new Thread() {
@Override
public void run() {
bounceCoins(wallet, tx2);
}
}.start();
forwardCoins(wallet, tx2);
} else {
System.out.println(String.format("Confidence of %s changed, is now: %s",
tx2.getHashAsString(), tx2.getConfidence().toString()));
@ -190,15 +178,13 @@ public class PingService {
peerGroup.downloadBlockChain();
System.out.println("Send coins to: " + key.toAddress(params).toString());
System.out.println("Waiting for coins to arrive. Press Ctrl-C to quit.");
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {}
}
private void bounceCoins(final Wallet wallet, Transaction tx) {
// It's impossible to pick one specific identity that you receive coins from in Bitcoin as there
// could be inputs from many addresses. So instead we just pick the first and assume they were all
// owned by the same person.
private void forwardCoins(final Wallet wallet, Transaction tx) {
try {
BigInteger value = tx.getValueSentToMe(wallet);
TransactionInput input = tx.getInputs().get(0);