mirror of
https://github.com/Qortal/altcoinj.git
synced 2025-02-07 06:44:16 +00:00
Re-arrange the actions in PeerGroup.broadcastTransaction()
This fixes a bug where Netty complains about blocking an IO thread due to the await call on send and resolves a potential race condition.
This commit is contained in:
parent
8c488a1687
commit
6ac8eb54cb
@ -1046,14 +1046,61 @@ public class PeerGroup extends AbstractIdleService {
|
||||
ListenableFuture<PeerGroup> peerAvailabilityFuture = waitForPeers(minConnections);
|
||||
peerAvailabilityFuture.addListener(new Runnable() {
|
||||
public void run() {
|
||||
// This can be called immediately if we already have enough peers. Otherwise it'll be called from a
|
||||
// peer thread.
|
||||
// TODO: Fix the race that exists here.
|
||||
// We now have enough connected peers to send the transaction.
|
||||
// This can be called immediately if we already have enough. Otherwise it'll be called from a peer
|
||||
// thread. TODO: Fix the race that exists here.
|
||||
|
||||
// Pick a peer to be the lucky recipient of our tx.
|
||||
final Peer somePeer = peers.get(0);
|
||||
log.info("broadcastTransaction: Enough peers, adding {} to the memory pool and sending to {}",
|
||||
tx.getHashAsString(), somePeer);
|
||||
final Transaction pinnedTx = memoryPool.seen(tx, somePeer.getAddress());
|
||||
try {
|
||||
// Prepare to send the transaction by adding a listener that'll be called when confidence changes.
|
||||
// Only bother with this if we might actually hear back:
|
||||
if (minConnections > 1) tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
|
||||
public void onConfidenceChanged(Transaction tx) {
|
||||
// The number of peers that announced this tx has gone up. This will run in a peer thread.
|
||||
final int numSeenPeers = tx.getConfidence().numBroadcastPeers();
|
||||
boolean done = false;
|
||||
log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(),
|
||||
numSeenPeers);
|
||||
synchronized (PeerGroup.this) {
|
||||
if (numSeenPeers >= minConnections) {
|
||||
// We've seen the min required number of peers announce the transaction. Note that we
|
||||
// can't wait for the current number of connected peers right now because we could have
|
||||
// added more peers after the broadcast took place, which means they won't have seen
|
||||
// the transaction. In future when peers sync up their memory pools after they connect
|
||||
// we could come back and change this.
|
||||
//
|
||||
// Now tell the wallet about the transaction. If the wallet created the transaction then
|
||||
// it already knows and will ignore this. If it's a transaction we received from
|
||||
// somebody else via a side channel and are now broadcasting, this will put it into the
|
||||
// wallet now we know it's valid.
|
||||
for (Wallet wallet : wallets) {
|
||||
try {
|
||||
if (wallet.isPendingTransactionRelevant(pinnedTx)) {
|
||||
// Assumption here is there are no dependencies of the created transaction.
|
||||
wallet.receivePending(pinnedTx, null);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setException(t);
|
||||
return;
|
||||
}
|
||||
}
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
if (done) {
|
||||
// We're done! Run this outside of the peer group lock as setting the future may immediately
|
||||
// invoke any listeners associated with it and it's simpler if the PeerGroup isn't locked.
|
||||
log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString());
|
||||
tx.getConfidence().removeEventListener(this);
|
||||
future.set(pinnedTx);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Satoshis code sends an inv in this case and then lets the peer request the tx data. We just
|
||||
// blast out the TX here for a couple of reasons. Firstly it's simpler: in the case where we have
|
||||
// just a single connection we don't have to wait for getdata to be received and handled before
|
||||
@ -1062,76 +1109,37 @@ public class PeerGroup extends AbstractIdleService {
|
||||
// transaction or not. However, we are not a fully validating node and this is advertised in
|
||||
// our version message, as SPV nodes cannot relay it doesn't give away any additional information
|
||||
// to skip the inv here - we wouldn't send invs anyway.
|
||||
somePeer.sendMessage(pinnedTx).awaitUninterruptibly();
|
||||
ChannelFuture sendComplete = somePeer.sendMessage(pinnedTx);
|
||||
// If we've been limited to talk to only one peer, we can't wait to hear back because the
|
||||
// remote peer won't tell us about transactions we just announced to it for obvious reasons.
|
||||
// So we just have to assume we're done, at that point. This happens when we're not given
|
||||
// any peer discovery source and the user just calls connectTo() once.
|
||||
if (minConnections == 1) {
|
||||
sendComplete.addListener(new ChannelFutureListener() {
|
||||
public void operationComplete(ChannelFuture _) throws Exception {
|
||||
synchronized (PeerGroup.this) {
|
||||
for (Wallet wallet : wallets) {
|
||||
try {
|
||||
if (wallet.isPendingTransactionRelevant(pinnedTx)) {
|
||||
// Assumption here is there are no dependencies of the created
|
||||
// transaction.
|
||||
wallet.receivePending(pinnedTx, null);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setException(t);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
future.set(pinnedTx);
|
||||
return;
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (IOException e) {
|
||||
future.setException(e);
|
||||
return;
|
||||
}
|
||||
|
||||
// If we've been limited to talk to only one peer, we can't wait to hear back because the remote peer
|
||||
// won't tell us about transactions we just announced to it for obvious reasons. So we just have to
|
||||
// assume we're done, at that point. This happens when we're not given any peer discovery source and
|
||||
// the user just calls connectTo() once.
|
||||
if (minConnections == 1) {
|
||||
synchronized (PeerGroup.this) {
|
||||
for (Wallet wallet : wallets) {
|
||||
try {
|
||||
if (wallet.isPendingTransactionRelevant(pinnedTx)) {
|
||||
// Assumption here is there are no dependencies of the created transaction.
|
||||
wallet.receivePending(pinnedTx, null);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setException(t);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
future.set(pinnedTx);
|
||||
return;
|
||||
}
|
||||
|
||||
tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
|
||||
public void onConfidenceChanged(Transaction tx) {
|
||||
// This will run in a peer thread.
|
||||
final int numSeenPeers = tx.getConfidence().numBroadcastPeers();
|
||||
boolean done = false;
|
||||
log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(),
|
||||
numSeenPeers);
|
||||
synchronized (PeerGroup.this) {
|
||||
if (numSeenPeers >= minConnections) {
|
||||
// We've seen the min required number of peers announce the transaction. Note that we
|
||||
// can't wait for the current number of connected peers right now because we could have
|
||||
// added more peers after the broadcast took place, which means they won't have seen
|
||||
// the transaction. In future when peers sync up their memory pools after they connect
|
||||
// we could come back and change this.
|
||||
//
|
||||
// Now tell the wallet about the transaction. If the wallet created the transaction then
|
||||
// it already knows and will ignore this. If it's a transaction we received from
|
||||
// somebody else via a side channel and are now broadcasting, this will put it into the
|
||||
// wallet now we know it's valid.
|
||||
for (Wallet wallet : wallets) {
|
||||
try {
|
||||
if (wallet.isPendingTransactionRelevant(pinnedTx)) {
|
||||
// Assumption here is there are no dependencies of the created transaction.
|
||||
wallet.receivePending(pinnedTx, null);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setException(t);
|
||||
return;
|
||||
}
|
||||
}
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
if (done) {
|
||||
// We're done! Run this outside of the peer group lock as setting the future may immediately
|
||||
// invoke any listeners associated with it and it's simpler if the PeerGroup isn't locked.
|
||||
log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString());
|
||||
tx.getConfidence().removeEventListener(this);
|
||||
future.set(pinnedTx);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}, MoreExecutors.sameThreadExecutor());
|
||||
return future;
|
||||
|
Loading…
Reference in New Issue
Block a user