(API CHANGE) Return a TransactionBroadcast object from PeerGroup.broadcastTransaction.

Old code can be updated by simply calling future() on the returned object to get the previous result.

TransactionBroadcast now has a progress reporting interface, which is ideal for connection to progress bars, pie charts, whatever else you want to use in the UI for showing the progress of sending money/broadcasting a tx.
This commit is contained in:
Mike Hearn
2015-02-20 15:01:54 +01:00
parent 298cccfe4f
commit bef4980abd
11 changed files with 129 additions and 34 deletions

View File

@@ -1660,13 +1660,14 @@ public class PeerGroup implements TransactionBroadcaster {
* of connections to wait for before commencing broadcast. * of connections to wait for before commencing broadcast.
*/ */
@Override @Override
public ListenableFuture<Transaction> broadcastTransaction(final Transaction tx) { public TransactionBroadcast broadcastTransaction(final Transaction tx) {
return broadcastTransaction(tx, Math.max(1, getMinBroadcastConnections())); return broadcastTransaction(tx, Math.max(1, getMinBroadcastConnections()));
} }
/** /**
* <p>Given a transaction, sends it un-announced to one peer and then waits for it to be received back from other * <p>Given a transaction, sends it un-announced to one peer and then waits for it to be received back from other
* peers. Once all connected peers have announced the transaction, the future will be completed. If anything goes * peers. Once all connected peers have announced the transaction, the future available via the
* {@link org.bitcoinj.core.TransactionBroadcast#future()} method will be completed. If anything goes
* wrong the exception will be thrown when get() is called, or you can receive it via a callback on the * wrong the exception will be thrown when get() is called, or you can receive it via a callback on the
* {@link ListenableFuture}. This method returns immediately, so if you want it to block just call get() on the * {@link ListenableFuture}. This method returns immediately, so if you want it to block just call get() on the
* result.</p> * result.</p>
@@ -1674,17 +1675,14 @@ public class PeerGroup implements TransactionBroadcaster {
* <p>Note that if the PeerGroup is limited to only one connection (discovery is not activated) then the future * <p>Note that if the PeerGroup is limited to only one connection (discovery is not activated) then the future
* will complete as soon as the transaction was successfully written to that peer.</p> * will complete as soon as the transaction was successfully written to that peer.</p>
* *
* <p>Other than for sending your own transactions, this method is useful if you have received a transaction from
* someone and want to know that it's valid. It's a bit of a weird hack because the current version of the Bitcoin
* protocol does not inform you if you send an invalid transaction. Because sending bad transactions counts towards
* your DoS limit, be careful with relaying lots of unknown transactions. Otherwise you might get kicked off the
* network.</p>
*
* <p>The transaction won't be sent until there are at least minConnections active connections available. * <p>The transaction won't be sent until there are at least minConnections active connections available.
* A good choice for proportion would be between 0.5 and 0.8 but if you want faster transmission during initial * A good choice for proportion would be between 0.5 and 0.8 but if you want faster transmission during initial
* bringup of the peer group you can lower it.</p> * bringup of the peer group you can lower it.</p>
*
* <p>The returned {@link org.bitcoinj.core.TransactionBroadcast} object can be used to get progress feedback,
* which is calculated by watching the transaction propagate across the network and be announced by peers.</p>
*/ */
public ListenableFuture<Transaction> broadcastTransaction(final Transaction tx, final int minConnections) { public TransactionBroadcast broadcastTransaction(final Transaction tx, final int minConnections) {
// TODO: Context being owned by BlockChain isn't right w.r.t future intentions so it shouldn't really be optional here. // TODO: Context being owned by BlockChain isn't right w.r.t future intentions so it shouldn't really be optional here.
final TransactionBroadcast broadcast = new TransactionBroadcast(this, chain != null ? chain.getContext() : null, tx); final TransactionBroadcast broadcast = new TransactionBroadcast(this, chain != null ? chain.getContext() : null, tx);
broadcast.setMinConnections(minConnections); broadcast.setMinConnections(minConnections);
@@ -1722,7 +1720,7 @@ public class PeerGroup implements TransactionBroadcaster {
// at all. // at all.
runningBroadcasts.add(broadcast); runningBroadcasts.add(broadcast);
broadcast.broadcast(); broadcast.broadcast();
return broadcast.future(); return broadcast;
} }
/** /**

View File

@@ -24,6 +24,9 @@ import org.slf4j.*;
import javax.annotation.*; import javax.annotation.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkState;
/** /**
* Represents a single transaction broadcast that we are performing. A broadcast occurs after a new transaction is created * Represents a single transaction broadcast that we are performing. A broadcast occurs after a new transaction is created
@@ -59,6 +62,28 @@ public class TransactionBroadcast {
this.minConnections = Math.max(1, peerGroup.getMinBroadcastConnections()); this.minConnections = Math.max(1, peerGroup.getMinBroadcastConnections());
} }
// Only for mock broadcasts.
private TransactionBroadcast(Transaction tx) {
this.peerGroup = null;
this.context = null;
this.tx = tx;
}
@VisibleForTesting
public static TransactionBroadcast createMockBroadcast(Transaction tx, final SettableFuture<Transaction> future) {
return new TransactionBroadcast(tx) {
@Override
public ListenableFuture<Transaction> broadcast() {
return future;
}
@Override
public ListenableFuture<Transaction> future() {
return future;
}
};
}
public ListenableFuture<Transaction> future() { public ListenableFuture<Transaction> future() {
return future; return future;
} }
@@ -157,6 +182,10 @@ public class TransactionBroadcast {
boolean mined = tx.getAppearsInHashes() != null; boolean mined = tx.getAppearsInHashes() != null;
log.info("broadcastTransaction: {}: TX {} seen by {} peers{}", reason, pinnedTx.getHashAsString(), log.info("broadcastTransaction: {}: TX {} seen by {} peers{}", reason, pinnedTx.getHashAsString(),
numSeenPeers, mined ? " and mined" : ""); numSeenPeers, mined ? " and mined" : "");
// Progress callback on the requested thread.
invokeProgressCallback(numSeenPeers, mined);
if (numSeenPeers >= numWaitingFor || mined) { if (numSeenPeers >= numWaitingFor || mined) {
// We've seen the min required number of peers announce the transaction, or it was included // We've seen the min required number of peers announce the transaction, or it was included
// in a block. Normally we'd expect to see it fully propagate before it gets mined, but // in a block. Normally we'd expect to see it fully propagate before it gets mined, but
@@ -178,4 +207,64 @@ public class TransactionBroadcast {
} }
} }
} }
private void invokeProgressCallback(int numSeenPeers, boolean mined) {
final ProgressCallback callback;
Executor executor;
synchronized (this) {
callback = this.callback;
executor = this.progressCallbackExecutor;
}
if (callback != null) {
final double progress = Math.min(1.0, mined ? 1.0 : numSeenPeers / (double) numWaitingFor);
checkState(progress >= 0.0 && progress <= 1.0, progress);
try {
if (executor == null)
callback.onBroadcastProgress(progress);
else
executor.execute(new Runnable() {
@Override
public void run() {
callback.onBroadcastProgress(progress);
}
});
} catch (Throwable e) {
log.error("Exception during progress callback", e);
}
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
/** An interface for receiving progress information on the propagation of the tx, from 0.0 to 1.0 */
public interface ProgressCallback {
/**
* onBroadcastProgress will be invoked on the provided executor when the progress of the transaction
* broadcast has changed, because the transaction has been announced by another peer or because the transaction
* was found inside a mined block (in this case progress will go to 1.0 immediately). Any exceptions thrown
* by this callback will be logged and ignored.
*/
void onBroadcastProgress(double progress);
}
@Nullable private ProgressCallback callback;
@Nullable private Executor progressCallbackExecutor;
/**
* Sets the given callback for receiving progress values, which will run on the user thread. See
* {@link org.bitcoinj.utils.Threading} for details.
*/
public void setProgressCallback(ProgressCallback callback) {
setProgressCallback(callback, Threading.USER_THREAD);
}
/**
* Sets the given callback for receiving progress values, which will run on the given executor. If the executor
* is null then the callback will run on a network thread and may be invoked multiple times in parallel. You
* probably want to provide your UI thread or Threading.USER_THREAD for the second parameter.
*/
public synchronized void setProgressCallback(ProgressCallback callback, @Nullable Executor executor) {
this.callback = callback;
this.progressCallbackExecutor = executor;
}
} }

View File

@@ -16,13 +16,11 @@
package org.bitcoinj.core; package org.bitcoinj.core;
import com.google.common.util.concurrent.ListenableFuture;
/** /**
* A general interface which declares the ability to broadcast transactions. This is implemented * A general interface which declares the ability to broadcast transactions. This is implemented
* by {@link org.bitcoinj.core.PeerGroup}. * by {@link org.bitcoinj.core.PeerGroup}.
*/ */
public interface TransactionBroadcaster { public interface TransactionBroadcaster {
/** Broadcast the given transaction on the network */ /** Broadcast the given transaction on the network */
public ListenableFuture<Transaction> broadcastTransaction(final Transaction tx); public TransactionBroadcast broadcastTransaction(final Transaction tx);
} }

View File

@@ -3067,8 +3067,10 @@ public class Wallet extends BaseTaggableObject implements Serializable, BlockCha
public static class SendResult { public static class SendResult {
/** The Bitcoin transaction message that moves the money. */ /** The Bitcoin transaction message that moves the money. */
public Transaction tx; public Transaction tx;
/** A future that will complete once the tx message has been successfully broadcast to the network. */ /** A future that will complete once the tx message has been successfully broadcast to the network. This is just the result of calling broadcast.future() */
public ListenableFuture<Transaction> broadcastComplete; public ListenableFuture<Transaction> broadcastComplete;
/** The broadcast object returned by the linked TransactionBroadcaster */
public TransactionBroadcast broadcast;
} }
/** /**
@@ -3423,7 +3425,8 @@ public class Wallet extends BaseTaggableObject implements Serializable, BlockCha
// count of seen peers, the memory pool will update the transaction confidence object, that will invoke the // count of seen peers, the memory pool will update the transaction confidence object, that will invoke the
// txConfidenceListener which will in turn invoke the wallets event listener onTransactionConfidenceChanged // txConfidenceListener which will in turn invoke the wallets event listener onTransactionConfidenceChanged
// method. // method.
result.broadcastComplete = broadcaster.broadcastTransaction(tx); result.broadcast = broadcaster.broadcastTransaction(tx);
result.broadcastComplete = result.broadcast.future();
return result; return result;
} }
@@ -4674,7 +4677,7 @@ public class Wallet extends BaseTaggableObject implements Serializable, BlockCha
TransactionBroadcaster broadcaster = vTransactionBroadcaster; TransactionBroadcaster broadcaster = vTransactionBroadcaster;
for (Transaction tx : txns) { for (Transaction tx : txns) {
try { try {
final ListenableFuture<Transaction> future = broadcaster.broadcastTransaction(tx); final ListenableFuture<Transaction> future = broadcaster.broadcastTransaction(tx).future();
futures.add(future); futures.add(future);
Futures.addCallback(future, new FutureCallback<Transaction>() { Futures.addCallback(future, new FutureCallback<Transaction>() {
@Override @Override

View File

@@ -236,7 +236,7 @@ public class PaymentChannelServerState {
log.info("Broadcasting multisig contract: {}", multisigContract); log.info("Broadcasting multisig contract: {}", multisigContract);
state = State.WAITING_FOR_MULTISIG_ACCEPTANCE; state = State.WAITING_FOR_MULTISIG_ACCEPTANCE;
final SettableFuture<PaymentChannelServerState> future = SettableFuture.create(); final SettableFuture<PaymentChannelServerState> future = SettableFuture.create();
Futures.addCallback(broadcaster.broadcastTransaction(multisigContract), new FutureCallback<Transaction>() { Futures.addCallback(broadcaster.broadcastTransaction(multisigContract).future(), new FutureCallback<Transaction>() {
@Override public void onSuccess(Transaction transaction) { @Override public void onSuccess(Transaction transaction) {
log.info("Successfully broadcast multisig contract {}. Channel now open.", transaction.getHashAsString()); log.info("Successfully broadcast multisig contract {}. Channel now open.", transaction.getHashAsString());
try { try {
@@ -418,7 +418,7 @@ public class PaymentChannelServerState {
state = State.CLOSING; state = State.CLOSING;
log.info("Closing channel, broadcasting tx {}", tx); log.info("Closing channel, broadcasting tx {}", tx);
// The act of broadcasting the transaction will add it to the wallet. // The act of broadcasting the transaction will add it to the wallet.
ListenableFuture<Transaction> future = broadcaster.broadcastTransaction(tx); ListenableFuture<Transaction> future = broadcaster.broadcastTransaction(tx).future();
Futures.addCallback(future, new FutureCallback<Transaction>() { Futures.addCallback(future, new FutureCallback<Transaction>() {
@Override public void onSuccess(Transaction transaction) { @Override public void onSuccess(Transaction transaction) {
log.info("TX {} propagated, channel successfully closed.", transaction.getHash()); log.info("TX {} propagated, channel successfully closed.", transaction.getHash());

View File

@@ -16,10 +16,7 @@
package org.bitcoinj.testing; package org.bitcoinj.testing;
import org.bitcoinj.core.Transaction; import org.bitcoinj.core.*;
import org.bitcoinj.core.TransactionBroadcaster;
import org.bitcoinj.core.VerificationException;
import org.bitcoinj.core.Wallet;
import org.bitcoinj.utils.Threading; import org.bitcoinj.utils.Threading;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@@ -70,7 +67,7 @@ public class MockTransactionBroadcaster implements TransactionBroadcaster {
} }
@Override @Override
public SettableFuture<Transaction> broadcastTransaction(Transaction tx) { public TransactionBroadcast broadcastTransaction(Transaction tx) {
// Use a lock just to catch lock ordering inversions e.g. wallet->broadcaster. // Use a lock just to catch lock ordering inversions e.g. wallet->broadcaster.
lock.lock(); lock.lock();
try { try {
@@ -90,7 +87,7 @@ public class MockTransactionBroadcaster implements TransactionBroadcaster {
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
} }
}); });
return result; return TransactionBroadcast.createMockBroadcast(tx, result);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {

View File

@@ -17,12 +17,12 @@
package org.bitcoinj.core; package org.bitcoinj.core;
import com.google.common.util.concurrent.*;
import org.bitcoinj.params.UnitTestParams; import org.bitcoinj.params.UnitTestParams;
import org.bitcoinj.testing.FakeTxBuilder; import org.bitcoinj.testing.FakeTxBuilder;
import org.bitcoinj.testing.InboundMessageQueuer; import org.bitcoinj.testing.InboundMessageQueuer;
import org.bitcoinj.testing.TestWithPeerGroup; import org.bitcoinj.testing.TestWithPeerGroup;
import org.bitcoinj.utils.Threading; import org.bitcoinj.utils.Threading;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@@ -74,8 +74,16 @@ public class TransactionBroadcastTest extends TestWithPeerGroup {
InboundMessageQueuer[] channels = { connectPeer(1), connectPeer(2), connectPeer(3), connectPeer(4) }; InboundMessageQueuer[] channels = { connectPeer(1), connectPeer(2), connectPeer(3), connectPeer(4) };
Transaction tx = new Transaction(params); Transaction tx = new Transaction(params);
TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, blockChain.getContext(), tx); TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, blockChain.getContext(), tx);
final AtomicDouble lastProgress = new AtomicDouble();
broadcast.setProgressCallback(new TransactionBroadcast.ProgressCallback() {
@Override
public void onBroadcastProgress(double progress) {
lastProgress.set(progress);
}
});
ListenableFuture<Transaction> future = broadcast.broadcast(); ListenableFuture<Transaction> future = broadcast.broadcast();
assertFalse(future.isDone()); assertFalse(future.isDone());
assertEquals(0.0, lastProgress.get(), 0.0);
// We expect two peers to receive a tx message, and at least one of the others must announce for the future to // We expect two peers to receive a tx message, and at least one of the others must announce for the future to
// complete successfully. // complete successfully.
Message[] messages = { Message[] messages = {
@@ -91,11 +99,13 @@ public class TransactionBroadcastTest extends TestWithPeerGroup {
assertNull(messages[2]); assertNull(messages[2]);
Threading.waitForUserCode(); Threading.waitForUserCode();
assertFalse(future.isDone()); assertFalse(future.isDone());
assertEquals(0.0, lastProgress.get(), 0.0);
inbound(channels[1], InventoryMessage.with(tx)); inbound(channels[1], InventoryMessage.with(tx));
pingAndWait(channels[1]); pingAndWait(channels[1]);
Threading.waitForUserCode(); Threading.waitForUserCode();
// FIXME flaky test - future is not handled on user thread // FIXME flaky test - future is not handled on user thread
assertTrue(future.isDone()); assertTrue(future.isDone());
assertEquals(1.0, lastProgress.get(), 0.0);
} }
@Test @Test

View File

@@ -58,7 +58,7 @@ public class ChannelConnectionTest extends TestWithWallet {
private static final TransactionBroadcaster failBroadcaster = new TransactionBroadcaster() { private static final TransactionBroadcaster failBroadcaster = new TransactionBroadcaster() {
@Override @Override
public ListenableFuture<Transaction> broadcastTransaction(Transaction tx) { public TransactionBroadcast broadcastTransaction(Transaction tx) {
fail(); fail();
return null; return null;
} }
@@ -85,12 +85,12 @@ public class ChannelConnectionTest extends TestWithWallet {
broadcastTxPause = new Semaphore(0); broadcastTxPause = new Semaphore(0);
mockBroadcaster = new TransactionBroadcaster() { mockBroadcaster = new TransactionBroadcaster() {
@Override @Override
public ListenableFuture<Transaction> broadcastTransaction(Transaction tx) { public TransactionBroadcast broadcastTransaction(Transaction tx) {
broadcastTxPause.acquireUninterruptibly(); broadcastTxPause.acquireUninterruptibly();
SettableFuture<Transaction> future = SettableFuture.create(); SettableFuture<Transaction> future = SettableFuture.create();
future.set(tx); future.set(tx);
broadcasts.add(tx); broadcasts.add(tx);
return future; return TransactionBroadcast.createMockBroadcast(tx, future);
} }
}; };

View File

@@ -64,7 +64,7 @@ public class PaymentChannelStateTest extends TestWithWallet {
super.setUp(); super.setUp();
wallet.addExtension(new StoredPaymentChannelClientStates(wallet, new TransactionBroadcaster() { wallet.addExtension(new StoredPaymentChannelClientStates(wallet, new TransactionBroadcaster() {
@Override @Override
public ListenableFuture<Transaction> broadcastTransaction(Transaction tx) { public TransactionBroadcast broadcastTransaction(Transaction tx) {
fail(); fail();
return null; return null;
} }
@@ -79,10 +79,10 @@ public class PaymentChannelStateTest extends TestWithWallet {
broadcasts = new LinkedBlockingQueue<TxFuturePair>(); broadcasts = new LinkedBlockingQueue<TxFuturePair>();
mockBroadcaster = new TransactionBroadcaster() { mockBroadcaster = new TransactionBroadcaster() {
@Override @Override
public ListenableFuture<Transaction> broadcastTransaction(Transaction tx) { public TransactionBroadcast broadcastTransaction(Transaction tx) {
SettableFuture<Transaction> future = SettableFuture.create(); SettableFuture<Transaction> future = SettableFuture.create();
broadcasts.add(new TxFuturePair(tx, future)); broadcasts.add(new TxFuturePair(tx, future));
return future; return TransactionBroadcast.createMockBroadcast(tx, future);
} }
}; };
} }

View File

@@ -58,7 +58,7 @@ public class TestFeeLevel {
kit.wallet().completeTx(request); kit.wallet().completeTx(request);
log.info("Fee paid is {}", request.fee); log.info("Fee paid is {}", request.fee);
log.info("TX is {}", request.tx); log.info("TX is {}", request.tx);
kit.peerGroup().broadcastTransaction(request.tx).get(); kit.peerGroup().broadcastTransaction(request.tx).future().get();
log.info("Send complete, waiting for confirmation"); log.info("Send complete, waiting for confirmation");
request.tx.getConfidence().getDepthFuture(1).get(); request.tx.getConfidence().getDepthFuture(1).get();

View File

@@ -588,7 +588,7 @@ public class WalletTool {
// Wait for peers to connect, the tx to be sent to one of them and for it to be propagated across the // Wait for peers to connect, the tx to be sent to one of them and for it to be propagated across the
// network. Once propagation is complete and we heard the transaction back from all our peers, it will // network. Once propagation is complete and we heard the transaction back from all our peers, it will
// be committed to the wallet. // be committed to the wallet.
peers.broadcastTransaction(t).get(); peers.broadcastTransaction(t).future().get();
// Hack for regtest/single peer mode, as we're about to shut down and won't get an ACK from the remote end. // Hack for regtest/single peer mode, as we're about to shut down and won't get an ACK from the remote end.
List<Peer> peerList = peers.getConnectedPeers(); List<Peer> peerList = peers.getConnectedPeers();
if (peerList.size() == 1) if (peerList.size() == 1)
@@ -701,7 +701,7 @@ public class WalletTool {
if (future == null) { if (future == null) {
// No payment_url for submission so, broadcast and wait. // No payment_url for submission so, broadcast and wait.
peers.start(); peers.start();
peers.broadcastTransaction(req.tx).get(); peers.broadcastTransaction(req.tx).future().get();
} else { } else {
PaymentProtocol.Ack ack = future.get(); PaymentProtocol.Ack ack = future.get();
wallet.commitTx(req.tx); wallet.commitTx(req.tx);