3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-12 10:15:52 +00:00

Payment protocol: Make PaymentSession use a global thread pool that uses daemon threads.

Fixes a 60 second hang that could occur in wallet-tool once the send request was processed. Fixing this revealed another bug - WalletTool was depending on this VM shutdown delay caused by the worker pool timeout, so clean up the code and replace with a blocking get of the future.

Also support the --offline and --password flags when using the payment protocol.
This commit is contained in:
Mike Hearn 2014-01-28 13:46:25 +01:00
parent ddec4f9106
commit a1562836be
3 changed files with 118 additions and 116 deletions

View File

@ -18,24 +18,21 @@ package com.google.bitcoin.protocols.payments;
import com.google.bitcoin.core.*; import com.google.bitcoin.core.*;
import com.google.bitcoin.params.MainNetParams; import com.google.bitcoin.params.MainNetParams;
import com.google.bitcoin.params.TestNet3Params;
import com.google.bitcoin.script.ScriptBuilder; import com.google.bitcoin.script.ScriptBuilder;
import com.google.bitcoin.uri.BitcoinURI; import com.google.bitcoin.uri.BitcoinURI;
import com.google.bitcoin.utils.Threading;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import org.bitcoin.protocols.payments.Protos; import org.bitcoin.protocols.payments.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spongycastle.asn1.ASN1String; import org.spongycastle.asn1.ASN1String;
import org.spongycastle.asn1.x500.AttributeTypeAndValue; import org.spongycastle.asn1.x500.AttributeTypeAndValue;
import org.spongycastle.asn1.x500.RDN; import org.spongycastle.asn1.x500.RDN;
import org.spongycastle.asn1.x500.style.RFC4519Style;
import org.spongycastle.asn1.x500.X500Name; import org.spongycastle.asn1.x500.X500Name;
import org.spongycastle.asn1.x500.style.RFC4519Style;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.security.auth.x500.X500Principal; import javax.security.auth.x500.X500Principal;
@ -47,7 +44,6 @@ import java.security.cert.*;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/** /**
* <p>Provides a standard implementation of the Payment Protocol (BIP 0070)</p> * <p>Provides a standard implementation of the Payment Protocol (BIP 0070)</p>
@ -79,8 +75,7 @@ import java.util.concurrent.Executors;
* @see <a href="https://github.com/bitcoin/bips/blob/master/bip-0070.mediawiki">BIP 0070</a> * @see <a href="https://github.com/bitcoin/bips/blob/master/bip-0070.mediawiki">BIP 0070</a>
*/ */
public class PaymentSession { public class PaymentSession {
private static final Logger log = LoggerFactory.getLogger(PaymentSession.class); private static ListeningExecutorService executor = Threading.THREAD_POOL;
private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
private NetworkParameters params; private NetworkParameters params;
private String trustStorePath; private String trustStorePath;
private Protos.PaymentRequest paymentRequest; private Protos.PaymentRequest paymentRequest;
@ -184,7 +179,6 @@ public class PaymentSession {
} }
private static ListenableFuture<PaymentSession> fetchPaymentRequest(final URI uri, final boolean verifyPki, @Nullable final String trustStorePath) { private static ListenableFuture<PaymentSession> fetchPaymentRequest(final URI uri, final boolean verifyPki, @Nullable final String trustStorePath) {
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
return executor.submit(new Callable<PaymentSession>() { return executor.submit(new Callable<PaymentSession>() {
@Override @Override
public PaymentSession call() throws Exception { public PaymentSession call() throws Exception {

View File

@ -17,16 +17,15 @@
package com.google.bitcoin.utils; package com.google.bitcoin.utils;
import com.google.common.util.concurrent.CycleDetectingLockFactory; import com.google.common.util.concurrent.CycleDetectingLockFactory;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
@ -35,6 +34,13 @@ import java.util.concurrent.locks.ReentrantLock;
* Also provides a worker thread that is designed for event listeners to be dispatched on. * Also provides a worker thread that is designed for event listeners to be dispatched on.
*/ */
public class Threading { public class Threading {
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// User thread/event handling utilities
//
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/** /**
* An executor with one thread that is intended for running event listeners on. This ensures all event listener code * An executor with one thread that is intended for running event listeners on. This ensures all event listener code
* runs without any locks being held. It's intended for the API user to run things on. Callbacks registered by * runs without any locks being held. It's intended for the API user to run things on. Callbacks registered by
@ -78,8 +84,6 @@ public class Threading {
@Nullable @Nullable
public static volatile Thread.UncaughtExceptionHandler uncaughtExceptionHandler; public static volatile Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
public static class UserThread extends Thread implements Executor { public static class UserThread extends Thread implements Executor {
private static final Logger log = LoggerFactory.getLogger(UserThread.class); private static final Logger log = LoggerFactory.getLogger(UserThread.class);
private LinkedBlockingQueue<Runnable> tasks; private LinkedBlockingQueue<Runnable> tasks;
@ -132,6 +136,12 @@ public class Threading {
}; };
} }
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// Cycle detecting lock factories
//
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
private static CycleDetectingLockFactory.Policy policy; private static CycleDetectingLockFactory.Policy policy;
public static CycleDetectingLockFactory factory; public static CycleDetectingLockFactory factory;
@ -159,4 +169,23 @@ public class Threading {
public static CycleDetectingLockFactory.Policy getPolicy() { public static CycleDetectingLockFactory.Policy getPolicy() {
return policy; return policy;
} }
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// Generic worker pool.
//
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/** A caching thread pool that creates daemon threads, which won't keep the JVM alive waiting for more work. */
public static ListeningExecutorService THREAD_POOL = MoreExecutors.listeningDecorator(
Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Threading.THREAD_POOL worker");
t.setDaemon(true);
return t;
}
})
);
} }

View File

@ -32,8 +32,6 @@ import com.google.bitcoin.utils.BriefLogFormatter;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.io.Resources; import com.google.common.io.Resources;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import joptsimple.OptionParser; import joptsimple.OptionParser;
import joptsimple.OptionSet; import joptsimple.OptionSet;
@ -444,8 +442,6 @@ public class WalletTool {
return; return;
} catch (ScriptException e) { } catch (ScriptException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (KeyCrypterException e) {
throw new RuntimeException(e);
} }
t = req.tx; // Not strictly required today. t = req.tx; // Not strictly required today.
System.out.println(t.getHashAsString()); System.out.println(t.getHashAsString());
@ -460,11 +456,6 @@ public class WalletTool {
// network. Once propagation is complete and we heard the transaction back from all our peers, it will // network. Once propagation is complete and we heard the transaction back from all our peers, it will
// be committed to the wallet. // be committed to the wallet.
peers.broadcastTransaction(t).get(); peers.broadcastTransaction(t).get();
if (peers.getMinBroadcastConnections() == 1) {
// Crap hack to work around some issue with Netty where the write future
// completes before the remote peer actually hears the message.
Thread.sleep(5000);
}
} catch (BlockStoreException e) { } catch (BlockStoreException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (KeyCrypterException e) { } catch (KeyCrypterException e) {
@ -479,56 +470,32 @@ public class WalletTool {
} }
private static void sendPaymentRequest(String location, boolean verifyPki) { private static void sendPaymentRequest(String location, boolean verifyPki) {
if (location.startsWith("http")) { if (location.startsWith("http") || location.startsWith("bitcoin")) {
try { try {
ListenableFuture<PaymentSession> future = PaymentSession.createFromUrl(location, verifyPki); ListenableFuture<PaymentSession> future;
Futures.addCallback(future, new FutureCallback<PaymentSession>() { if (location.startsWith("http")) {
@Override future = PaymentSession.createFromUrl(location, verifyPki);
public void onSuccess(PaymentSession session) { } else {
if (session != null) BitcoinURI paymentRequestURI = new BitcoinURI(location);
send(session); future = PaymentSession.createFromBitcoinUri(paymentRequestURI, verifyPki);
else { }
System.err.println("Server returned null session"); PaymentSession session = future.get();
System.exit(1); if (session != null) {
} send(session);
} } else {
public void onFailure(Throwable thrown) { System.err.println("Server returned null session");
System.err.println("Failed to fetch payment request " + thrown.getMessage()); System.exit(1);
System.exit(1); }
}
});
} catch (PaymentRequestException e) { } catch (PaymentRequestException e) {
System.err.println("Error creating payment session " + e.getMessage()); System.err.println("Error creating payment session " + e.getMessage());
System.exit(1); System.exit(1);
}
} else if (location.startsWith("bitcoin")) {
BitcoinURI paymentRequestURI = null;
try {
paymentRequestURI = new BitcoinURI(location);
} catch (BitcoinURIParseException e) { } catch (BitcoinURIParseException e) {
System.err.println("Invalid bitcoin uri: " + e.getMessage()); System.err.println("Invalid bitcoin uri: " + e.getMessage());
System.exit(1); System.exit(1);
} } catch (InterruptedException e) {
try { // Ignore.
ListenableFuture<PaymentSession> future = PaymentSession.createFromBitcoinUri(paymentRequestURI, verifyPki); } catch (ExecutionException e) {
Futures.addCallback(future, new FutureCallback<PaymentSession>() { throw new RuntimeException(e);
@Override
public void onSuccess(PaymentSession session) {
if (session != null)
send(session);
else {
System.err.println("Server returned null session");
System.exit(1);
}
}
public void onFailure(Throwable thrown) {
System.err.println("Failed to fetch payment request " + thrown.getMessage());
System.exit(1);
}
});
} catch (PaymentRequestException e) {
System.err.println("Error creating payment session " + e.getMessage());
System.exit(1);
} }
} else { } else {
// Try to open the payment request as a file. // Try to open the payment request as a file.
@ -537,7 +504,7 @@ public class WalletTool {
File paymentRequestFile = new File(location); File paymentRequestFile = new File(location);
stream = new FileInputStream(paymentRequestFile); stream = new FileInputStream(paymentRequestFile);
} catch (Exception e) { } catch (Exception e) {
System.err.println("Failed to open file " + e.getMessage()); System.err.println("Failed to open file: " + e.getMessage());
System.exit(1); System.exit(1);
} }
try { try {
@ -557,6 +524,63 @@ public class WalletTool {
} }
} }
private static void send(PaymentSession session) {
try {
System.out.println("Payment Request");
System.out.println("Amount: " + session.getValue().doubleValue() / 100000 + "mBTC");
System.out.println("Date: " + session.getDate());
System.out.println("Memo: " + session.getMemo());
if (session.pkiVerificationData != null) {
System.out.println("Pki-Verified Name: " + session.pkiVerificationData.name);
if (session.pkiVerificationData.orgName != null)
System.out.println("Pki-Verified Org: " + session.pkiVerificationData.orgName);
}
final Wallet.SendRequest req = session.getSendRequest();
if (password != null) {
if (!wallet.checkPassword(password)) {
System.err.println("Password is incorrect.");
return;
}
req.aesKey = wallet.getKeyCrypter().deriveKey(password);
}
wallet.completeTx(req); // may throw InsufficientMoneyException.
if (options.has("offline")) {
wallet.commitTx(req.tx);
return;
}
setup();
// No refund address specified, no user-specified memo field.
ListenableFuture<PaymentSession.Ack> future = session.sendPayment(ImmutableList.of(req.tx), null, null);
if (future == null) {
// No payment_url for submission so, broadcast and wait.
peers.startAndWait();
peers.broadcastTransaction(req.tx).get();
} else {
PaymentSession.Ack ack = future.get();
wallet.commitTx(req.tx);
System.out.println("Memo from server: " + ack.getMemo());
}
} catch (PaymentRequestException e) {
System.err.println("Failed to send payment " + e.getMessage());
System.exit(1);
} catch (VerificationException e) {
System.err.println("Failed to send payment " + e.getMessage());
System.exit(1);
} catch (ExecutionException e) {
System.err.println("Failed to send payment " + e.getMessage());
System.exit(1);
} catch (IOException e) {
System.err.println("Invalid payment " + e.getMessage());
System.exit(1);
} catch (InterruptedException e1) {
// Ignore.
} catch (InsufficientMoneyException e) {
System.err.println("Insufficient funds: have " + Utils.bitcoinValueToFriendlyString(wallet.getBalance()));
} catch (BlockStoreException e) {
throw new RuntimeException(e);
}
}
private static void wait(WaitForEnum waitFor) throws BlockStoreException { private static void wait(WaitForEnum waitFor) throws BlockStoreException {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
setup(); setup();
@ -833,49 +857,4 @@ public class WalletTool {
setup(); setup();
System.out.println(wallet.toString(true, true, true, chain)); System.out.println(wallet.toString(true, true, true, chain));
} }
private static void send(PaymentSession session) {
try {
System.out.println("Payment Request");
System.out.println("Amount: " + session.getValue().doubleValue() / 100000 + "mBTC");
System.out.println("Date: " + session.getDate());
System.out.println("Memo: " + session.getMemo());
if (session.pkiVerificationData != null) {
System.out.println("Pki-Verified Name: " + session.pkiVerificationData.name);
if (session.pkiVerificationData.orgName != null)
System.out.println("Pki-Verified Org: " + session.pkiVerificationData.orgName);
}
final Wallet.SendRequest req = session.getSendRequest();
wallet.completeTx(req); // may throw InsufficientMoneyException.
// No refund address specified, no user-specified memo field.
ListenableFuture<PaymentSession.Ack> future = session.sendPayment(ImmutableList.of(req.tx), null, null);
Futures.addCallback(future, new FutureCallback<PaymentSession.Ack>() {
@Override
public void onSuccess(PaymentSession.Ack ack) {
try {
wallet.commitTx(req.tx);
System.out.println(ack.getMemo());
} catch (VerificationException e) {
System.err.println("Failed to send tx " + e.getMessage());
System.exit(1);
}
}
public void onFailure(Throwable thrown) {
System.err.println("Failed to send payment " + thrown.getMessage());
System.exit(1);
}
});
}catch (PaymentRequestException e) {
System.err.println("Failed to send payment " + e.getMessage());
System.exit(1);
} catch (VerificationException e) {
System.err.println("Failed to send payment " + e.getMessage());
System.exit(1);
} catch (IOException e) {
System.err.println("Invalid payment " + e.getMessage());
System.exit(1);
} catch (InsufficientMoneyException e) {
System.err.println("Insufficient funds: have " + Utils.bitcoinValueToFriendlyString(wallet.getBalance()));
}
}
} }