mirror of
https://github.com/Qortal/altcoinj.git
synced 2025-11-03 14:07:14 +00:00
Fix over-eager locking in PeerGroup and ensure the resulting code doesn't race, resolving a lock inversion. Also, take into account the possibility of a block being mined before it's fully propagated and add some discussion.
Resolves issue 353.
This commit is contained in:
@@ -1124,7 +1124,7 @@ public class PeerGroup extends AbstractIdleService {
|
|||||||
public void run() {
|
public void run() {
|
||||||
// We now have enough connected peers to send the transaction.
|
// We now have enough connected peers to send the transaction.
|
||||||
// This can be called immediately if we already have enough. Otherwise it'll be called from a peer
|
// This can be called immediately if we already have enough. Otherwise it'll be called from a peer
|
||||||
// thread. TODO: Fix the race that exists here.
|
// thread.
|
||||||
|
|
||||||
// Pick a peer to be the lucky recipient of our tx.
|
// Pick a peer to be the lucky recipient of our tx.
|
||||||
final Peer somePeer = peers.get(0);
|
final Peer somePeer = peers.get(0);
|
||||||
@@ -1135,47 +1135,47 @@ public class PeerGroup extends AbstractIdleService {
|
|||||||
// Only bother with this if we might actually hear back:
|
// Only bother with this if we might actually hear back:
|
||||||
if (minConnections > 1) tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
|
if (minConnections > 1) tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
|
||||||
public void onConfidenceChanged(Transaction tx) {
|
public void onConfidenceChanged(Transaction tx) {
|
||||||
// The number of peers that announced this tx has gone up. This will run in a peer thread.
|
// The number of peers that announced this tx has gone up.
|
||||||
final int numSeenPeers = tx.getConfidence().numBroadcastPeers();
|
// Thread safe - this can run in parallel.
|
||||||
boolean done = false;
|
final TransactionConfidence conf = tx.getConfidence();
|
||||||
log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(),
|
int numSeenPeers = conf.numBroadcastPeers();
|
||||||
numSeenPeers);
|
boolean mined = conf.getConfidenceType() != TransactionConfidence.ConfidenceType.NOT_SEEN_IN_CHAIN;
|
||||||
lock.lock();
|
log.info("broadcastTransaction: TX {} seen by {} peers{}",
|
||||||
try {
|
new Object[]{pinnedTx.getHashAsString(), numSeenPeers, mined ? " and mined" : ""});
|
||||||
if (numSeenPeers >= minConnections) {
|
if (!(numSeenPeers >= minConnections || mined))
|
||||||
// We've seen the min required number of peers announce the transaction. Note that we
|
return;
|
||||||
// can't wait for the current number of connected peers right now because we could have
|
// We've seen the min required number of peers announce the transaction, or it was included
|
||||||
// added more peers after the broadcast took place, which means they won't have seen
|
// in a block. Normally we'd expect to see it fully propagate before it gets mined, but
|
||||||
// the transaction. In future when peers sync up their memory pools after they connect
|
// it can be that a block is solved very soon after broadcast, and it's also possible that
|
||||||
// we could come back and change this.
|
// due to version skew and changes in the relay rules our transaction is not going to
|
||||||
|
// fully propagate yet can get mined anyway.
|
||||||
|
//
|
||||||
|
// Note that we can't wait for the current number of connected peers right now because we
|
||||||
|
// could have added more peers after the broadcast took place, which means they won't
|
||||||
|
// have seen the transaction. In future when peers sync up their memory pools after they
|
||||||
|
// connect we could come back and change this.
|
||||||
|
//
|
||||||
|
// OK, now tell the wallet about the transaction. If the wallet created the transaction then
|
||||||
|
// it already knows and will ignore this. If it's a transaction we received from
|
||||||
|
// somebody else via a side channel and are now broadcasting, this will put it into the
|
||||||
|
// wallet now we know it's valid.
|
||||||
|
for (Wallet wallet : wallets) {
|
||||||
|
try {
|
||||||
|
// Assumption here is there are no dependencies of the created transaction.
|
||||||
//
|
//
|
||||||
// Now tell the wallet about the transaction. If the wallet created the transaction then
|
// We may end up with two threads trying to do this in parallel - the wallet will
|
||||||
// it already knows and will ignore this. If it's a transaction we received from
|
// ignore whichever one loses the race.
|
||||||
// somebody else via a side channel and are now broadcasting, this will put it into the
|
wallet.receivePending(pinnedTx, null);
|
||||||
// wallet now we know it's valid.
|
} catch (Throwable t) {
|
||||||
for (Wallet wallet : wallets) {
|
future.setException(t); // RE-ENTRANCY POINT
|
||||||
try {
|
return;
|
||||||
if (wallet.isPendingTransactionRelevant(pinnedTx)) {
|
|
||||||
// Assumption here is there are no dependencies of the created transaction.
|
|
||||||
wallet.receivePending(pinnedTx, null);
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
|
||||||
future.setException(t);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
done = true;
|
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
if (done) {
|
|
||||||
// We're done! Run this outside of the peer group lock as setting the future may immediately
|
|
||||||
// invoke any listeners associated with it and it's simpler if the PeerGroup isn't locked.
|
|
||||||
log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString());
|
|
||||||
tx.getConfidence().removeEventListener(this);
|
|
||||||
future.set(pinnedTx);
|
|
||||||
}
|
}
|
||||||
|
// We're done! It's important that the PeerGroup lock is not held (by this thread) at this
|
||||||
|
// point to avoid triggering inversions when the Future completes.
|
||||||
|
log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString());
|
||||||
|
tx.getConfidence().removeEventListener(this);
|
||||||
|
future.set(pinnedTx); // RE-ENTRANCY POINT
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -1187,6 +1187,8 @@ public class PeerGroup extends AbstractIdleService {
|
|||||||
// transaction or not. However, we are not a fully validating node and this is advertised in
|
// transaction or not. However, we are not a fully validating node and this is advertised in
|
||||||
// our version message, as SPV nodes cannot relay it doesn't give away any additional information
|
// our version message, as SPV nodes cannot relay it doesn't give away any additional information
|
||||||
// to skip the inv here - we wouldn't send invs anyway.
|
// to skip the inv here - we wouldn't send invs anyway.
|
||||||
|
//
|
||||||
|
// TODO: The peer we picked might be dead by now. If we can't write the message, pick again and retry.
|
||||||
ChannelFuture sendComplete = somePeer.sendMessage(pinnedTx);
|
ChannelFuture sendComplete = somePeer.sendMessage(pinnedTx);
|
||||||
// If we've been limited to talk to only one peer, we can't wait to hear back because the
|
// If we've been limited to talk to only one peer, we can't wait to hear back because the
|
||||||
// remote peer won't tell us about transactions we just announced to it for obvious reasons.
|
// remote peer won't tell us about transactions we just announced to it for obvious reasons.
|
||||||
@@ -1195,22 +1197,14 @@ public class PeerGroup extends AbstractIdleService {
|
|||||||
if (minConnections == 1) {
|
if (minConnections == 1) {
|
||||||
sendComplete.addListener(new ChannelFutureListener() {
|
sendComplete.addListener(new ChannelFutureListener() {
|
||||||
public void operationComplete(ChannelFuture _) throws Exception {
|
public void operationComplete(ChannelFuture _) throws Exception {
|
||||||
lock.lock();
|
for (Wallet wallet : wallets) {
|
||||||
try {
|
try {
|
||||||
for (Wallet wallet : wallets) {
|
// Assumption here is there are no dependencies of the created transaction.
|
||||||
try {
|
wallet.receivePending(pinnedTx, null);
|
||||||
if (wallet.isPendingTransactionRelevant(pinnedTx)) {
|
} catch (Throwable t) {
|
||||||
// Assumption here is there are no dependencies of the created
|
future.setException(t);
|
||||||
// transaction.
|
return;
|
||||||
wallet.receivePending(pinnedTx, null);
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
|
||||||
future.setException(t);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
}
|
||||||
future.set(pinnedTx);
|
future.set(pinnedTx);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -885,6 +885,10 @@ public class Wallet implements Serializable, BlockChainListener {
|
|||||||
// Do a brief risk analysis of the transaction and its dependencies to check for any possible attacks.
|
// Do a brief risk analysis of the transaction and its dependencies to check for any possible attacks.
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
|
// Repeat the check of relevancy here, even though the caller may have already done so - this is to avoid
|
||||||
|
// race conditions where receivePending may be being called in parallel.
|
||||||
|
if (!isPendingTransactionRelevant(tx))
|
||||||
|
return;
|
||||||
AnalysisResult analysis = analyzeTransactionAndDependencies(tx, dependencies);
|
AnalysisResult analysis = analyzeTransactionAndDependencies(tx, dependencies);
|
||||||
if (analysis.timeLocked != null && !doesAcceptTimeLockedTransactions()) {
|
if (analysis.timeLocked != null && !doesAcceptTimeLockedTransactions()) {
|
||||||
log.warn("Transaction {}, dependency of {} has a time lock value of {}", new Object[]{
|
log.warn("Transaction {}, dependency of {} has a time lock value of {}", new Object[]{
|
||||||
|
|||||||
Reference in New Issue
Block a user