3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-31 07:12:17 +00:00

Change PeerGroup.broadcastTransaction() to wait for propagation.

It means a send won't be considered completed until it's accepted by the net.
Also is for the case where you receive a transaction (eg, via Bluetooth) and
you want to broadcast it such that you can know it's valid.

Make WalletTool use --peers instead of --peer, a comma separated list of
addresses to use. Remove the crappy sleep after send now we can wait.

Resolves issue 167.
This commit is contained in:
Mike Hearn 2012-07-21 23:07:00 +02:00
parent 42152c2483
commit fd9eba1697
7 changed files with 234 additions and 133 deletions

View File

@ -309,7 +309,7 @@ public class Peer {
} }
} }
private void processGetData(GetDataMessage getdata) throws IOException { private synchronized void processGetData(GetDataMessage getdata) throws IOException {
log.info("Received getdata message: {}", getdata.toString()); log.info("Received getdata message: {}", getdata.toString());
ArrayList<Message> items = new ArrayList<Message>(); ArrayList<Message> items = new ArrayList<Message>();
for (PeerEventListener listener : eventListeners) { for (PeerEventListener listener : eventListeners) {
@ -322,7 +322,7 @@ public class Peer {
if (items.size() == 0) { if (items.size() == 0) {
return; return;
} }
log.info("Sending {} items gathered from listeners to peer", items.size()); log.info("{}: Sending {} items gathered from listeners to peer", this, items.size());
for (Message item : items) { for (Message item : items) {
sendMessage(item); sendMessage(item);
} }

View File

@ -22,6 +22,10 @@ import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException; import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.utils.EventListenerInvoker; import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@ -65,11 +69,14 @@ public class PeerGroup {
private PeerGroupThread peerGroupThread; private PeerGroupThread peerGroupThread;
// True if the connection initiation thread should be running // True if the connection initiation thread should be running
private boolean running; private boolean running;
// Currently active peers
private Set<Peer> peers; // TODO: Rationalize the data structures used here.
// Currently active peers. This is a linked list rather than a set to make unit tests predictable.
private LinkedList<Peer> peers;
// Currently connecting peers // Currently connecting peers
private Set<Peer> pendingPeers; private Set<Peer> pendingPeers;
private Map<Peer, ChannelFuture> channelFutures; private Map<Peer, ChannelFuture> channelFutures;
// The peer we are currently downloading the chain from // The peer we are currently downloading the chain from
private Peer downloadPeer; private Peer downloadPeer;
// Callback for events related to chain download // Callback for events related to chain download
@ -147,7 +154,11 @@ public class PeerGroup {
this.connectionDelayMillis = connectionDelayMillis; this.connectionDelayMillis = connectionDelayMillis;
this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
this.wallets = new ArrayList<Wallet>(1); this.wallets = new ArrayList<Wallet>(1);
this.maxConnections = DEFAULT_CONNECTIONS;
// This default sentinel value will be overridden by one of two actions:
// - adding a peer discovery source sets it to the default
// - using connectTo() will increment it by one
this.maxConnections = 0;
// Set up a default template version message that doesn't tell the other side what kind of BitCoinJ user // Set up a default template version message that doesn't tell the other side what kind of BitCoinJ user
// this is. // this is.
@ -157,10 +168,9 @@ public class PeerGroup {
this.bootstrap = bootstrap; this.bootstrap = bootstrap;
inactives = new LinkedBlockingQueue<PeerAddress>(); inactives = new LinkedBlockingQueue<PeerAddress>();
// TODO: Remove usage of synchronized sets here in favor of simple coarse-grained locking. peers = new LinkedList<Peer>();
peers = Collections.synchronizedSet(new HashSet<Peer>()); pendingPeers = new HashSet<Peer>();
pendingPeers = Collections.synchronizedSet(new HashSet<Peer>()); channelFutures = new HashMap<Peer, ChannelFuture>();
channelFutures = Collections.synchronizedMap(new HashMap<Peer, ChannelFuture>());
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>(); peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerEventListeners = new ArrayList<PeerEventListener>(); peerEventListeners = new ArrayList<PeerEventListener>();
// This event listener is added to every peer. It's here so when we announce transactions via an "inv", every // This event listener is added to every peer. It's here so when we announce transactions via an "inv", every
@ -211,19 +221,32 @@ public class PeerGroup {
} }
private synchronized List<Message> handleGetData(GetDataMessage m) { private synchronized List<Message> handleGetData(GetDataMessage m) {
// Scans the wallets for transactions in the getdata message and returns them. Invoked in parallel // Scans the wallets and memory pool for transactions in the getdata message and returns them.
// on peer threads. // Runs on peer threads.
HashMap<Sha256Hash, Message> transactions = new HashMap<Sha256Hash, Message>(); LinkedList<Message> transactions = new LinkedList<Message>();
LinkedList<InventoryItem> items = new LinkedList<InventoryItem>(m.getItems());
Iterator<InventoryItem> it = items.iterator();
while (it.hasNext()) {
InventoryItem item = it.next();
// Check the mempool first.
Transaction tx = memoryPool.get(item.hash);
if (tx != null) {
transactions.add(tx);
it.remove();
} else {
// Check the wallets.
for (Wallet w : wallets) { for (Wallet w : wallets) {
synchronized (w) { synchronized (w) {
for (InventoryItem item : m.getItems()) { tx = w.getTransaction(item.hash);
Transaction tx = w.getTransaction(item.hash);
if (tx == null) continue; if (tx == null) continue;
transactions.put(tx.getHash(), tx); transactions.add(tx);
it.remove();
break;
} }
} }
} }
return new LinkedList<Message>(transactions.values()); }
return transactions;
} }
/** /**
@ -310,17 +333,25 @@ public class PeerGroup {
} }
/** /**
* Add an address to the list of potential peers to connect to * Add an address to the list of potential peers to connect to. This will increment the total number of max
* connections by one, so if all you use is addAddress, it is guaranteed to be attempted. If you're using a
* mix of peer discovery and addAddress, there's no guarantee this address will be picked in preference to
* those found via discovery.
*
* @param peerAddress IP/port to use.
*/ */
public void addAddress(PeerAddress peerAddress) { public synchronized void addAddress(PeerAddress peerAddress) {
// TODO(miron) consider deduplication // TODO(miron) consider deduplication
inactives.add(peerAddress); inactives.add(peerAddress);
maxConnections++;
} }
/** /**
* Add addresses from a discovery source to the list of potential peers to connect to * Add addresses from a discovery source to the list of potential peers to connect to
*/ */
public void addPeerDiscovery(PeerDiscovery peerDiscovery) { public synchronized void addPeerDiscovery(PeerDiscovery peerDiscovery) {
if (getMaxConnections() == 0)
setMaxConnections(DEFAULT_CONNECTIONS);
peerDiscoverers.add(peerDiscovery); peerDiscoverers.add(peerDiscovery);
} }
@ -354,35 +385,6 @@ public class PeerGroup {
} }
} }
/**
* Queues a transaction for asynchronous broadcast. The transaction will be considered broadcast and forgotten
* about (by the PeerGroup) once it's been written out to at least one node, but that does not guarantee inclusion
* in the chain - incorrect fees or a flaky remote node can cause this as well. Wallets attached with
* {@link PeerGroup#addWallet(Wallet)} will have their pending transactions announced to every newly connected
* node.
*
* @return a Future that can be used to wait for the async broadcast to complete.
*/
public synchronized Future<Transaction> broadcastTransaction(final Transaction tx) {
FutureTask<Transaction> future = new FutureTask<Transaction>(new Runnable() {
public void run() {
// This is run with the peer group already locked.
synchronized (peers) {
for (Peer peer : peers) {
try {
log.info("{}: Sending transaction {}", peer.getAddress(), tx.getHashAsString());
peer.sendMessage(tx);
} catch (IOException e) {
log.warn("Caught IOException whilst sending transaction: {}", e.getMessage());
}
}
}
}
}, tx);
peerGroupThread.addTask(future);
return future;
}
/** /**
* <p>Link the given wallet to this PeerGroup. This is used for three purposes:</p> * <p>Link the given wallet to this PeerGroup. This is used for three purposes:</p>
* <ol> * <ol>
@ -560,8 +562,7 @@ public class PeerGroup {
/** /**
* Connect to a peer by creating a Netty channel to the destination address. * Connect to a peer by creating a Netty channel to the destination address.
* *
* @param address destination IP and port * @param address destination IP and port.
*
* @return a ChannelFuture that can be used to wait for the socket to connect. A socket * @return a ChannelFuture that can be used to wait for the socket to connect. A socket
* connection does not mean that protocol handshake has occured. * connection does not mean that protocol handshake has occured.
*/ */
@ -573,8 +574,11 @@ public class PeerGroup {
// This can be null in unit tests or apps that don't use TCP connections. // This can be null in unit tests or apps that don't use TCP connections.
networkHandler.getOwnerObject().setRemoteAddress(address); networkHandler.getOwnerObject().setRemoteAddress(address);
} }
synchronized (this) {
Peer peer = peerFromChannelFuture(future); Peer peer = peerFromChannelFuture(future);
channelFutures.put(peer, future); channelFutures.put(peer, future);
setMaxConnections(getMaxConnections() + 1);
}
return future; return future;
} }
@ -647,7 +651,7 @@ public class PeerGroup {
// in the Satoshi client will increase and we'll get disconnected. // in the Satoshi client will increase and we'll get disconnected.
// //
// TODO: Find a way to balance the desire to propagate useful transactions against obscure DoS attacks. // TODO: Find a way to balance the desire to propagate useful transactions against obscure DoS attacks.
announcePendingWalletTransactions(wallets, Collections.singleton(peer)); announcePendingWalletTransactions(wallets, Collections.singletonList(peer));
// And set up event listeners for clients. This will allow them to find out about new transactions and blocks. // And set up event listeners for clients. This will allow them to find out about new transactions and blocks.
for (PeerEventListener listener : peerEventListeners) { for (PeerEventListener listener : peerEventListeners) {
peer.addEventListener(listener); peer.addEventListener(listener);
@ -662,7 +666,7 @@ public class PeerGroup {
/** Returns true if at least one peer received an inv. */ /** Returns true if at least one peer received an inv. */
private synchronized boolean announcePendingWalletTransactions(List<Wallet> announceWallets, private synchronized boolean announcePendingWalletTransactions(List<Wallet> announceWallets,
Set<Peer> announceToPeers) { List<Peer> announceToPeers) {
// Build up an inv announcing the hashes of all pending transactions in all our wallets. // Build up an inv announcing the hashes of all pending transactions in all our wallets.
InventoryMessage inv = new InventoryMessage(params); InventoryMessage inv = new InventoryMessage(params);
for (Wallet w : announceWallets) { for (Wallet w : announceWallets) {
@ -744,7 +748,7 @@ public class PeerGroup {
// Pick a new one and possibly tell it to download the chain. // Pick a new one and possibly tell it to download the chain.
synchronized (peers) { synchronized (peers) {
if (!peers.isEmpty()) { if (!peers.isEmpty()) {
Peer next = peers.iterator().next(); Peer next = peers.peekFirst();
setDownloadPeer(next); setDownloadPeer(next);
if (downloadListener != null) { if (downloadListener != null) {
startBlockChainDownloadFromPeer(next); startBlockChainDownloadFromPeer(next);
@ -772,4 +776,120 @@ public class PeerGroup {
return; return;
} }
} }
/**
* Returns a future that is triggered when the number of connected peers is equal to the given number of connected
* peers. By using this with {@link com.google.bitcoin.core.PeerGroup#getMaxConnections()} you can wait until the
* network is fully online. To block immediately, just call get() on the result.
*
* @param numPeers How many peers to wait for.
* @return a future that will be triggered when the number of connected peers >= numPeers
*/
public synchronized ListenableFuture<PeerGroup> waitForPeers(final int numPeers) {
if (peers.size() >= numPeers) {
return Futures.immediateFuture(this);
}
final SettableFuture<PeerGroup> future = SettableFuture.create();
addEventListener(new AbstractPeerEventListener() {
@Override public void onPeerConnected(Peer peer, int peerCount) {
if (peerCount >= numPeers) {
future.set(PeerGroup.this);
removeEventListener(this);
}
}
});
return future;
}
/**
* <p>Given a transaction, sends it un-announced to one peer and then waits for it to be received back from other
* peers. Once all connected peers have announced the transaction, the future will be completed. If anything goes
* wrong the exception will be thrown when get() is called, or you can receive it via a callback on the
* {@link ListenableFuture}. This method returns immediately, so if you want it to block just call get() on the
* result.</p>
*
* <p>Note that if the PeerGroup is limited to only one connection (discovery is not activated) then the future
* will complete as soon as the transaction was successfully written to that peer.</p>
*
* <p>Other than for sending your own transactions, this method is useful if you have received a transaction from
* someone and want to know that it's valid. It's a bit of a weird hack because the current version of the Bitcoin
* protocol does not inform you if you send an invalid transaction. Because sending bad transactions counts towards
* your DoS limit, be careful with relaying lots of unknown transactions. Otherwise you might get kicked off the
* network.</p>
*
* <p>The transaction won't be sent until there are at least {@link com.google.bitcoin.core.PeerGroup#getMaxConnections()}
* active connections available.</p>
*/
public synchronized ListenableFuture<Transaction> broadcastTransaction(final Transaction tx) {
final SettableFuture<Transaction> future = SettableFuture.create();
final int maxConnections = getMaxConnections();
log.info("Waiting for {} peers ...", maxConnections);
ListenableFuture<PeerGroup> peerAvailabilityFuture = waitForPeers(maxConnections);
peerAvailabilityFuture.addListener(new Runnable() {
public void run() {
// This can be called immediately if we already have enough peers. Otherwise it'll be called from a
// peer thread.
final Peer somePeer = peers.getFirst();
log.info("broadcastTransaction: Enough peers, adding {} to the memory pool and sending to {}",
tx.getHashAsString(), somePeer);
final Transaction pinnedTx = memoryPool.seen(tx, somePeer.getAddress());
try {
// Satoshis code sends an inv in this case and then lets the peer request the tx data. We just
// blast out the TX here for a couple of reasons. Firstly it's simpler: in the case where we have
// just a single connection we don't have to wait for getdata to be received and handled before
// completing the future in the code immediately below. Secondly, it's faster. The reason the
// Satoshi client sends an inv is privacy - it means you can't tell if the peer originated the
// transaction or not. However, we are not a fully validating node and this is advertised in
// our version message, as SPV nodes cannot relay it doesn't give away any additional information
// to skip the inv here - we wouldn't send invs anyway.
somePeer.sendMessage(pinnedTx);
} catch (IOException e) {
future.setException(e);
return;
}
// If we've been limited to talk to only one peer, we can't wait to hear back because the remote peer
// won't tell us about transactions we just announced to it for obvious reasons. So we just have to
// assume we're done, at that point. This happens when we're not given any peer discovery source and
// the user just calls connectTo() once.
if (maxConnections == 1) {
future.set(pinnedTx);
return;
}
tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
public void onConfidenceChanged(Transaction tx) {
// This will run in a peer thread.
final int numSeenPeers = tx.getConfidence().getBroadcastBy().size();
boolean done = false;
log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(), numSeenPeers);
synchronized (PeerGroup.this) {
if (numSeenPeers >= PeerGroup.this.peers.size()) {
// We've seen at least the number of connected peers announce the tx. So now we have
// some confidence that the network accepted it, assuming an un-hijacked internet
// connection. As the wallets were never informed about the transaction (because it was
// never downloaded) do that now.
for (Wallet wallet : wallets) {
try {
wallet.receivePending(pinnedTx);
} catch (Throwable t) {
future.setException(t);
return;
}
}
done = true;
}
}
if (done) {
// We're done! Run this outside of the peer group lock as setting the future may immediately
// invoke any listeners associated with it and it's simpler if the PeerGroup isn't locked.
log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString());
future.set(pinnedTx);
}
}
});
}
}, MoreExecutors.sameThreadExecutor());
return future;
}
} }

View File

@ -21,6 +21,7 @@ import com.google.bitcoin.core.WalletTransaction.Pool;
import com.google.bitcoin.store.WalletProtobufSerializer; import com.google.bitcoin.store.WalletProtobufSerializer;
import com.google.bitcoin.utils.EventListenerInvoker; import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -28,7 +29,6 @@ import java.io.*;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.google.bitcoin.core.Utils.bitcoinValueToFriendlyString; import static com.google.bitcoin.core.Utils.bitcoinValueToFriendlyString;
@ -554,7 +554,7 @@ public class Wallet implements Serializable {
// between pools. // between pools.
EnumSet<Pool> containingPools = getContainingPools(tx); EnumSet<Pool> containingPools = getContainingPools(tx);
if (!containingPools.equals(EnumSet.noneOf(Pool.class))) { if (!containingPools.equals(EnumSet.noneOf(Pool.class))) {
log.info("Received tx we already saw in a block or created ourselves: " + tx.getHashAsString()); log.debug("Received tx we already saw in a block or created ourselves: " + tx.getHashAsString());
return; return;
} }
@ -1220,49 +1220,31 @@ public class Wallet implements Serializable {
return tx; return tx;
} }
/** public static class SendResult {
* Sends coins to the given address, via the given {@link PeerGroup}. Change is returned to {@link Wallet#getChangeAddress()}. public Transaction tx;
* The transaction will be announced to any connected nodes asynchronously. If you would like to know when public ListenableFuture<Transaction> broadcastComplete;
* the transaction was successfully sent to at least one node, use
* {@link Wallet#sendCoinsOffline(Address, java.math.BigInteger)} and then {@link PeerGroup#broadcastTransaction(Transaction)}
* on the result to obtain a {@link java.util.concurrent.Future<Transaction>}.
*
* @param peerGroup a PeerGroup to use for broadcast.
* @param to Which address to send coins to.
* @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this.
* @return the Transaction
* @throws IOException if there was a problem broadcasting the transaction
*/
public synchronized Transaction sendCoinsAsync(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException {
Transaction tx = sendCoinsOffline(to, nanocoins);
if (tx == null)
return null; // Not enough money.
// Just throw away the Future here. If the user wants it, they can call sendCoinsOffline/broadcastTransaction
// themselves.
peerGroup.broadcastTransaction(tx);
return tx;
} }
/** /**
* Sends coins to the given address, via the given {@link PeerGroup}. Change is returned to {@link Wallet#getChangeAddress()}. * Sends coins to the given address, via the given {@link PeerGroup}. Change is returned to
* The method will block until the transaction has been announced to at least one node. * {@link Wallet#getChangeAddress()}. The returned object provides both the transaction, and a future that can
* be used to learn when the broadcast is complete. Complete means, if the PeerGroup is limited to only one
* connection, when it was written out to the socket. Otherwise when the transaction is written out and we heard
* it back from a different peer.
* *
* @param peerGroup a PeerGroup to use for broadcast or null. * @param peerGroup a PeerGroup to use for broadcast or null.
* @param to Which address to send coins to. * @param to Which address to send coins to.
* @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this. * @param value How much value to send. You can use Utils.toNanoCoins() to calculate this.
* @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. * @return An object containing the transaction that was created, and a future for the broadcast of it.
*/ */
public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) { public SendResult sendCoins(PeerGroup peerGroup, Address to, BigInteger value) {
Transaction tx = sendCoinsOffline(to, nanocoins); Transaction tx = sendCoinsOffline(to, value);
if (tx == null) if (tx == null)
return null; // Not enough money. return null; // Not enough money.
try { SendResult result = new SendResult();
return peerGroup.broadcastTransaction(tx).get(); result.tx = tx;
} catch (InterruptedException e) { result.broadcastComplete = peerGroup.broadcastTransaction(tx);
throw new RuntimeException(e); return result;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
} }
/** /**

View File

@ -299,14 +299,6 @@ public class PeerGroupTest extends TestWithNetworkConnections {
PeerGroupThread peerGroupThread = control.createMock(PeerGroupThread.class); PeerGroupThread peerGroupThread = control.createMock(PeerGroupThread.class);
peerGroup.mockStart(peerGroupThread); peerGroup.mockStart(peerGroupThread);
peerGroupThread.addTask((FutureTask<Transaction>) anyObject());
EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
@SuppressWarnings("unchecked")
public Void answer() throws Throwable {
((FutureTask<Transaction>)EasyMock.getCurrentArguments()[0]).run();
return null;
}
});
peerGroupThread.interrupt(); peerGroupThread.interrupt();
EasyMock.expectLastCall(); EasyMock.expectLastCall();
@ -319,15 +311,21 @@ public class PeerGroupTest extends TestWithNetworkConnections {
assertEquals(Utils.toNanoCoins(50, 0), wallet.getBalance()); assertEquals(Utils.toNanoCoins(50, 0), wallet.getBalance());
// Now create a spend, and expect the announcement. // Now create a spend, and expect the announcement on p1.
Address dest = new ECKey().toAddress(params); Address dest = new ECKey().toAddress(params);
assertNotNull(wallet.sendCoins(peerGroup, dest, Utils.toNanoCoins(1, 0))); Wallet.SendResult sendResult = wallet.sendCoins(peerGroup, dest, Utils.toNanoCoins(1, 0));
assertNotNull(sendResult.tx);
assertFalse(sendResult.broadcastComplete.isDone());
Transaction t1 = (Transaction) outbound(p1); Transaction t1 = (Transaction) outbound(p1);
assertNotNull(t1); assertNotNull(t1);
// 49 BTC in change. // 49 BTC in change.
assertEquals(Utils.toNanoCoins(49, 0), t1.getValueSentToMe(wallet)); assertEquals(Utils.toNanoCoins(49, 0), t1.getValueSentToMe(wallet));
Transaction t2 = (Transaction) outbound(p2); // The future won't complete until it's heard back from the network on p2.
assertEquals(t1, t2); InventoryMessage inv = new InventoryMessage(params);
inv.addTransaction(t1);
inbound(p2, inv);
assertTrue(sendResult.broadcastComplete.isDone());
// Confirm it.
Block b2 = TestUtils.createFakeBlock(params, blockStore, t1).block; Block b2 = TestUtils.createFakeBlock(params, blockStore, t1).block;
inbound(p1, b2); inbound(p1, b2);
assertNull(outbound(p1)); assertNull(outbound(p1));
@ -338,19 +336,15 @@ public class PeerGroupTest extends TestWithNetworkConnections {
assertNull(outbound(p1)); // Nothing sent. assertNull(outbound(p1)); // Nothing sent.
// Add the wallet to the peer group (simulate initialization). Transactions should be announced. // Add the wallet to the peer group (simulate initialization). Transactions should be announced.
peerGroup.addWallet(wallet); peerGroup.addWallet(wallet);
// Transaction announced on the peers. // Transaction announced to the first peer.
InventoryMessage inv1 = (InventoryMessage) outbound(p1); InventoryMessage inv1 = (InventoryMessage) outbound(p1);
InventoryMessage inv2 = (InventoryMessage) outbound(p2);
assertEquals(t3.getHash(), inv1.getItems().get(0).hash); assertEquals(t3.getHash(), inv1.getItems().get(0).hash);
assertEquals(t3.getHash(), inv2.getItems().get(0).hash); // Peer asks for the transaction, and get it.
// Peers ask for the transaction, and get it.
GetDataMessage getdata = new GetDataMessage(params); GetDataMessage getdata = new GetDataMessage(params);
getdata.addItem(inv1.getItems().get(0)); getdata.addItem(inv1.getItems().get(0));
inbound(p1, getdata); inbound(p1, getdata);
Transaction t4 = (Transaction) outbound(p1); Transaction t4 = (Transaction) outbound(p1);
assertEquals(t3, t4); assertEquals(t3, t4);
inbound(p2, getdata);
assertEquals(t3, outbound(p2));
FakeChannel p3 = connectPeer(3); FakeChannel p3 = connectPeer(3);
assertTrue(outbound(p3) instanceof InventoryMessage); assertTrue(outbound(p3) instanceof InventoryMessage);

View File

@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.concurrent.ExecutionException;
/** /**
* PingService demonstrates basic usage of the library. It sits on the network and when it receives coins, simply * PingService demonstrates basic usage of the library. It sits on the network and when it receives coins, simply
@ -82,9 +83,10 @@ public class DerbyPingService {
BigInteger value = tx.getValueSentToMe(w); BigInteger value = tx.getValueSentToMe(w);
System.out.println("Received " + Utils.bitcoinValueToFriendlyString(value) + " from " + from.toString()); System.out.println("Received " + Utils.bitcoinValueToFriendlyString(value) + " from " + from.toString());
// Now send the coins back! // Now send the coins back!
Transaction sendTx = w.sendCoins(peerGroup, from, value); Wallet.SendResult sendTx = w.sendCoins(peerGroup, from, value);
assert sendTx != null; // We should never try to send more coins than we have! assert sendTx.tx != null; // We should never try to send more coins than we have!
System.out.println("Sent coins back! Transaction hash is " + sendTx.getHashAsString()); System.out.println("Sent coins back! Transaction hash is " + sendTx.tx.getHashAsString());
sendTx.broadcastComplete.get();
w.saveToFile(walletFile); w.saveToFile(walletFile);
} catch (ScriptException e) { } catch (ScriptException e) {
// If we didn't understand the scriptSig, just crash. // If we didn't understand the scriptSig, just crash.
@ -93,6 +95,10 @@ public class DerbyPingService {
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} }
} }
}); });

View File

@ -180,7 +180,7 @@ public class PingService {
} }
private void bounceCoins(Transaction tx) { private void bounceCoins(Transaction tx) {
// It's impossible to pick one specific identity that you receive coins from in BitCoin as there // It's impossible to pick one specific identity that you receive coins from in Bitcoin as there
// could be inputs from many addresses. So instead we just pick the first and assume they were all // could be inputs from many addresses. So instead we just pick the first and assume they were all
// owned by the same person. // owned by the same person.
try { try {
@ -189,7 +189,7 @@ public class PingService {
Address from = input.getFromAddress(); Address from = input.getFromAddress();
System.out.println("Received " + Utils.bitcoinValueToFriendlyString(value) + " from " + from.toString()); System.out.println("Received " + Utils.bitcoinValueToFriendlyString(value) + " from " + from.toString());
// Now send the coins back! // Now send the coins back!
Transaction sendTx = w.sendCoins(peerGroup, from, value); Transaction sendTx = w.sendCoins(peerGroup, from, value).tx;
assert sendTx != null; // We should never try to send more coins than we have! assert sendTx != null; // We should never try to send more coins than we have!
System.out.println("Sent coins back! Transaction hash is " + sendTx.getHashAsString()); System.out.println("Sent coins back! Transaction hash is " + sendTx.getHashAsString());
w.saveToFile(walletFile); w.saveToFile(walletFile);

View File

@ -64,7 +64,7 @@ public class WalletTool {
" --chain=<file> Specifies the name of the file that stores the block chain.\n" + " --chain=<file> Specifies the name of the file that stores the block chain.\n" +
" --force Overrides any safety checks on the requested action.\n" + " --force Overrides any safety checks on the requested action.\n" +
" --date Provide a date in form YYYY/MM/DD to any action that requires one.\n" + " --date Provide a date in form YYYY/MM/DD to any action that requires one.\n" +
" --peer=1.2.3.4 Use the given IP address for connections instead of peer discovery.\n" + " --peers=1.2.3.4 Comma separaterd IP addresses/domain names for connections instead of peer discovery.\n" +
" --condition=... Allows you to specify a numeric condition for other commands. The format is\n" + " --condition=... Allows you to specify a numeric condition for other commands. The format is\n" +
" one of the following operators = < > <= >= immediately followed by a number.\n" + " one of the following operators = < > <= >= immediately followed by a number.\n" +
" For example --condition=\">5.10\" or --condition=\"<=1\"\n" + " For example --condition=\">5.10\" or --condition=\"<=1\"\n" +
@ -226,7 +226,7 @@ public class WalletTool {
parser.accepts("pubkey").withRequiredArg(); parser.accepts("pubkey").withRequiredArg();
parser.accepts("privkey").withRequiredArg(); parser.accepts("privkey").withRequiredArg();
parser.accepts("addr").withRequiredArg(); parser.accepts("addr").withRequiredArg();
parser.accepts("peer").withRequiredArg(); parser.accepts("peers").withRequiredArg();
OptionSpec<String> outputFlag = parser.accepts("output").withRequiredArg(); OptionSpec<String> outputFlag = parser.accepts("output").withRequiredArg();
parser.accepts("value").withRequiredArg(); parser.accepts("value").withRequiredArg();
conditionFlag = parser.accepts("condition").withRequiredArg(); conditionFlag = parser.accepts("condition").withRequiredArg();
@ -383,17 +383,13 @@ public class WalletTool {
} }
setup(); setup();
peers.start(); peers.start();
// Wait for peers to connect, the tx to be sent to one of them and for it to be propagated across the
// network. Once propagation is complete and we heard the transaction back from all our peers, it will
// be committed to the wallet.
peers.broadcastTransaction(t).get(); peers.broadcastTransaction(t).get();
// Horrible hack to ensure we have time to fully broadcast to every peer. Will go away when we resolve
// issue 167.
Thread.sleep(2000);
wallet.commitTx(t);
System.out.println(t.getHashAsString()); System.out.println(t.getHashAsString());
} catch (BlockStoreException e) { } catch (BlockStoreException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (VerificationException e) {
// Cannot happen, created transaction ourselves.
throw new RuntimeException(e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -497,14 +493,17 @@ public class WalletTool {
peers.setUserAgent("WalletTool", "1.0"); peers.setUserAgent("WalletTool", "1.0");
peers.addWallet(wallet); peers.addWallet(wallet);
peers.setFastCatchupTimeSecs(wallet.getEarliestKeyCreationTime()); peers.setFastCatchupTimeSecs(wallet.getEarliestKeyCreationTime());
if (options.has("peer")) { if (options.has("peers")) {
String peer = (String) options.valueOf("peer"); String peersFlag = (String) options.valueOf("peers");
String[] peerAddrs = peersFlag.split(",");
for (String peer : peerAddrs) {
try { try {
peers.addAddress(new PeerAddress(InetAddress.getByName(peer), params.port)); peers.addAddress(new PeerAddress(InetAddress.getByName(peer), params.port));
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
System.err.println("Could not understand peer domain name/IP address: " + peer + ": " + e.getMessage()); System.err.println("Could not understand peer domain name/IP address: " + peer + ": " + e.getMessage());
System.exit(1); System.exit(1);
} }
}
} else { } else {
peers.addPeerDiscovery(discovery); peers.addPeerDiscovery(discovery);
} }