diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index bb4fa828..2f84cea1 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -18,6 +18,7 @@ package com.google.bitcoin.core; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStoreException; +import com.google.bitcoin.utils.ListenerRegistration; import com.google.bitcoin.utils.Threading; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -34,6 +35,7 @@ import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -60,7 +62,7 @@ public class Peer { private final NetworkParameters params; private final AbstractBlockChain blockChain; private volatile PeerAddress vAddress; - private final CopyOnWriteArrayList eventListeners; + private final CopyOnWriteArrayList> eventListeners; private final CopyOnWriteArrayList lifecycleListeners; // Whether to try and download blocks and transactions from this peer. Set to false by PeerGroup if not the // primary peer. This is to avoid redundant work and concurrency problems with downloading the same chain @@ -147,7 +149,7 @@ public class Peer { this.blockChain = chain; // Allowed to be null. this.vDownloadData = chain != null; this.getDataFutures = new CopyOnWriteArrayList(); - this.eventListeners = new CopyOnWriteArrayList(); + this.eventListeners = new CopyOnWriteArrayList>(); this.lifecycleListeners = new CopyOnWriteArrayList(); this.fastCatchupTimeSecs = params.getGenesisBlock().getTimeSeconds(); this.isAcked = false; @@ -167,12 +169,29 @@ public class Peer { this.versionMessage.appendToSubVer(thisSoftwareName, thisSoftwareVersion, null); } + /** + * Registers the given object as an event listener that will be invoked on the user thread. Note that listeners + * added this way will not receive {@link PeerEventListener#getData(Peer, GetDataMessage)} or + * {@link PeerEventListener#onPreMessageReceived(Peer, Message)} calls because those require that the listener + * be added using {@link Threading#SAME_THREAD}, which requires the other addListener form. + */ public void addEventListener(PeerEventListener listener) { - eventListeners.add(listener); + addEventListener(listener, Threading.USER_THREAD); + } + + /** + * Registers the given object as an event listener that will be invoked by the given executor. Note that listeners + * added using any other executor than {@link Threading#SAME_THREAD} will not receive + * {@link PeerEventListener#getData(Peer, GetDataMessage)} or + * {@link PeerEventListener#onPreMessageReceived(Peer, Message)} calls because this class is not willing to cross + * threads in order to get the results of those hook methods. + */ + public void addEventListener(PeerEventListener listener, Executor executor) { + eventListeners.add(new ListenerRegistration(listener, executor)); } public boolean removeEventListener(PeerEventListener listener) { - return eventListeners.remove(listener); + return ListenerRegistration.removeFromList(listener, eventListeners); } void addLifecycleListener(PeerLifecycleListener listener) { @@ -246,9 +265,13 @@ public class Peer { try { // Allow event listeners to filter the message stream. Listeners are allowed to drop messages by // returning null. - for (PeerEventListener listener : eventListeners) { - m = listener.onPreMessageReceived(this, m); - if (m == null) break; + for (ListenerRegistration registration : eventListeners) { + // Skip any listeners that are supposed to run in another thread as we don't want to block waiting + // for it, which might cause circular deadlock. + if (registration.executor == Threading.SAME_THREAD) { + m = registration.listener.onPreMessageReceived(this, m); + if (m == null) break; + } } if (m == null) return; @@ -309,12 +332,17 @@ public class Peer { } else { log.warn("Received unhandled message: {}", m); } - } catch (Throwable throwable) { + } catch (final Throwable throwable) { log.warn("Caught exception in peer thread: {}", throwable.getMessage()); throwable.printStackTrace(); - for (PeerEventListener listener : eventListeners) { + for (final ListenerRegistration registration : eventListeners) { try { - listener.onException(throwable); + registration.executor.execute(new Runnable() { + @Override + public void run() { + registration.listener.onException(throwable); + } + }); } catch (Exception e1) { e1.printStackTrace(); } @@ -435,8 +463,9 @@ public class Peer { private void processGetData(GetDataMessage getdata) throws IOException { log.info("{}: Received getdata message: {}", vAddress, getdata.toString()); ArrayList items = new ArrayList(); - for (PeerEventListener listener : eventListeners) { - List listenerItems = listener.getData(this, getdata); + for (ListenerRegistration registration : eventListeners) { + if (registration.executor != Threading.SAME_THREAD) continue; + List listenerItems = registration.listener.getData(this, getdata); if (listenerItems == null) continue; items.addAll(listenerItems); } @@ -452,6 +481,7 @@ public class Peer { private void processTransaction(Transaction tx) throws VerificationException, IOException { // Check a few basic syntax issues to ensure the received TX isn't nonsense. tx.verify(); + final Transaction fTx; lock.lock(); try { log.debug("{}: Received tx {}", vAddress, tx.getHashAsString()); @@ -459,7 +489,7 @@ public class Peer { // We may get back a different transaction object. tx = memoryPool.seen(tx, getAddress()); } - final Transaction fTx = tx; + fTx = tx; // Label the transaction as coming in from the P2P network (as opposed to being created by us, direct import, // etc). This helps the wallet decide how to risk analyze it later. fTx.getConfidence().setSource(TransactionConfidence.Source.NETWORK); @@ -520,8 +550,14 @@ public class Peer { } // Tell all listeners about this tx so they can decide whether to keep it or not. If no listener keeps a // reference around then the memory pool will forget about it after a while too because it uses weak references. - for (PeerEventListener listener : eventListeners) - listener.onTransaction(this, tx); + for (final ListenerRegistration registration : eventListeners) { + registration.executor.execute(new Runnable() { + @Override + public void run() { + registration.listener.onTransaction(Peer.this, fTx); + } + }); + } } /** @@ -663,7 +699,7 @@ public class Peer { } } } - }, MoreExecutors.sameThreadExecutor()); + }, Threading.SAME_THREAD); } } catch (Exception e) { log.error("{}: Couldn't send getdata in downloadDependencies({})", this, tx.getHash()); @@ -805,8 +841,14 @@ public class Peer { // since the time we first connected to the peer. However, it's weird and unexpected to receive a callback // with negative "blocks left" in this case, so we clamp to zero so the API user doesn't have to think about it. final int blocksLeft = Math.max(0, (int) vPeerVersionMessage.bestHeight - blockChain.getBestChainHeight()); - for (PeerEventListener listener : eventListeners) - listener.onBlocksDownloaded(Peer.this, m, blocksLeft); + for (final ListenerRegistration registration : eventListeners) { + registration.executor.execute(new Runnable() { + @Override + public void run() { + registration.listener.onBlocksDownloaded(Peer.this, m, blocksLeft); + } + }); + } } private void processInv(InventoryMessage inv) throws IOException { @@ -1127,8 +1169,14 @@ public class Peer { // chain even if the chain block count is lower. final int blocksLeft = getPeerBlockHeightDifference(); if (blocksLeft >= 0) { - for (PeerEventListener listener : eventListeners) - listener.onChainDownloadStarted(this, blocksLeft); + for (final ListenerRegistration registration : eventListeners) { + registration.executor.execute(new Runnable() { + @Override + public void run() { + registration.listener.onChainDownloadStarted(Peer.this, blocksLeft); + } + }); + } // When we just want as many blocks as possible, we can set the target hash to zero. blockChainDownload(Sha256Hash.ZERO_HASH); } diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 4c2447d9..75782e7b 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -439,7 +439,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca * to running on the user thread. */ public void addEventListener(PeerEventListener listener) { - addEventListener(listener, Threading.userCode); + addEventListener(listener, Threading.USER_THREAD); } /** The given event listener will no longer be called with events. */ @@ -611,7 +611,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca // if a key is added. Of course, by then we may have downloaded the chain already. Ideally adding keys would // automatically rewind the block chain and redownload the blocks to find transactions relevant to those keys, // all transparently and in the background. But we are a long way from that yet. - wallet.addEventListener(walletEventListener); + wallet.addEventListener(walletEventListener); // TODO: Run this in the current peer thread. recalculateFastCatchupAndFilter(); updateVersionMessageRelayTxesBeforeFilter(getVersionMessage()); } finally { @@ -811,7 +811,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca } } // Make sure the peer knows how to upload transactions that are requested from us. - peer.addEventListener(getDataListener); + peer.addEventListener(getDataListener, Threading.SAME_THREAD); // Now tell the peers about any transactions we have which didn't appear in the chain yet. These are not // necessarily spends we created. They may also be transactions broadcast across the network that we saw, // which are relevant to us, and which we therefore wish to help propagate (ie they send us coins). @@ -824,7 +824,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca announcePendingWalletTransactions(wallets, Collections.singletonList(peer)); // And set up event listeners for clients. This will allow them to find out about new transactions and blocks. for (ListenerRegistration registration : peerEventListeners) { - peer.addEventListener(registration.listener); + peer.addEventListener(registration.listener, registration.executor); } setupPingingForNewPeer(peer); } finally { @@ -858,7 +858,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca if (firstRun) { firstRun = false; try { - peer.ping().addListener(this, MoreExecutors.sameThreadExecutor()); + peer.ping().addListener(this, Threading.SAME_THREAD); } catch (Exception e) { log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString()); return; @@ -875,7 +875,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca try { if (!peers.contains(peer) || !PeerGroup.this.isRunning()) return; // Peer was removed/shut down. - peer.ping().addListener(pingRunnable[0], MoreExecutors.sameThreadExecutor()); + peer.ping().addListener(pingRunnable[0], Threading.SAME_THREAD); } catch (Exception e) { log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString()); } @@ -1041,7 +1041,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca private void startBlockChainDownloadFromPeer(Peer peer) { lock.lock(); try { - peer.addEventListener(downloadListener); + peer.addEventListener(downloadListener, Threading.SAME_THREAD); setDownloadPeer(peer); // startBlockChainDownload will setDownloadData(true) on itself automatically. peer.startBlockChainDownload(); @@ -1245,7 +1245,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca }); } } - }, MoreExecutors.sameThreadExecutor()); + }, Threading.SAME_THREAD); return future; } diff --git a/core/src/main/java/com/google/bitcoin/core/TransactionConfidence.java b/core/src/main/java/com/google/bitcoin/core/TransactionConfidence.java index 68c02cd0..5f95cbe9 100644 --- a/core/src/main/java/com/google/bitcoin/core/TransactionConfidence.java +++ b/core/src/main/java/com/google/bitcoin/core/TransactionConfidence.java @@ -412,7 +412,7 @@ public class TransactionConfidence implements Serializable { */ public void queueListeners(final Listener.ChangeReason reason) { for (final Listener listener : listeners) { - Threading.userCode.execute(new Runnable() { + Threading.USER_THREAD.execute(new Runnable() { @Override public void run() { listener.onConfidenceChanged(transaction, reason); } diff --git a/core/src/main/java/com/google/bitcoin/core/Wallet.java b/core/src/main/java/com/google/bitcoin/core/Wallet.java index 1ca54cd5..f02feadb 100644 --- a/core/src/main/java/com/google/bitcoin/core/Wallet.java +++ b/core/src/main/java/com/google/bitcoin/core/Wallet.java @@ -1359,7 +1359,7 @@ public class Wallet implements Serializable, BlockChainListener { * like receiving money. Runs the listener methods in the user thread. */ public void addEventListener(WalletEventListener listener) { - addEventListener(listener, Threading.userCode); + addEventListener(listener, Threading.USER_THREAD); } /** @@ -3032,7 +3032,7 @@ public class Wallet implements Serializable, BlockChainListener { it.remove(); final BigInteger v = checkNotNull(val); // Don't run any user-provided future listeners with our lock held. - Threading.userCode.execute(new Runnable() { + Threading.USER_THREAD.execute(new Runnable() { @Override public void run() { req.future.set(v); } diff --git a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServer.java b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServer.java index a5106f54..77395b21 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServer.java +++ b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServer.java @@ -5,7 +5,6 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.bitcoin.core.*; import com.google.bitcoin.utils.Threading; -import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import net.jcip.annotations.GuardedBy; import org.bitcoin.paymentchannel.Protos; @@ -242,7 +241,7 @@ public class PaymentChannelServer { public void run() { multisigContractPropogated(multisigContract.getHash()); } - }, MoreExecutors.sameThreadExecutor()); + }, Threading.SAME_THREAD); } @GuardedBy("lock") diff --git a/core/src/main/java/com/google/bitcoin/utils/Threading.java b/core/src/main/java/com/google/bitcoin/utils/Threading.java index 99911b9b..396f4b5e 100644 --- a/core/src/main/java/com/google/bitcoin/utils/Threading.java +++ b/core/src/main/java/com/google/bitcoin/utils/Threading.java @@ -20,7 +20,9 @@ import com.google.common.util.concurrent.Callables; import com.google.common.util.concurrent.CycleDetectingLockFactory; import com.google.common.util.concurrent.Futures; +import javax.annotation.Nonnull; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; @@ -34,10 +36,20 @@ import static com.google.common.base.Preconditions.checkState; */ public class Threading { /** - * A single-threaded executor that is intended for running event listeners on. This ensures all event listener code - * runs without any locks being held. + * An executor with one thread that is intended for running event listeners on. This ensures all event listener code + * runs without any locks being held. It's intended for the API user to run things on. Callbacks registered by + * bitcoinj internally shouldn't normally run here, although currently there are a few exceptions. */ - public static final ExecutorService userCode; + public static final ExecutorService USER_THREAD; + + /** + * A dummy executor that just invokes the runnable immediately. Use this over + * {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor()} because the latter creates a new + * object each time in order to implement the more complex {@link ExecutorService} interface, which is overkill + * for our needs. + */ + public static final Executor SAME_THREAD; + // For safety reasons keep track of the thread we use to run user-provided event listeners to avoid deadlock. private static final Thread executorThread; @@ -53,7 +65,7 @@ public class Threading { // event handlers because it would never return. If you aren't calling this method explicitly, then that // means there's a bug in bitcoinj. checkState(executorThread != Thread.currentThread(), "waitForUserCode() run on user code thread would deadlock."); - Futures.getUnchecked(userCode.submit(Callables.returning(null))); + Futures.getUnchecked(USER_THREAD.submit(Callables.returning(null))); } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -64,15 +76,21 @@ public class Threading { // from that point onwards. throwOnLockCycles(); - userCode = Executors.newSingleThreadExecutor(); + USER_THREAD = Executors.newSingleThreadExecutor(); // We can't directly get the thread that was just created, but we can fetch it indirectly. We'll use this // for deadlock detection by checking for waits on the user code thread. - executorThread = Futures.getUnchecked(userCode.submit(new Callable() { + executorThread = Futures.getUnchecked(USER_THREAD.submit(new Callable() { @Override public Thread call() throws Exception { Thread.currentThread().setName("bitcoinj user code thread"); return Thread.currentThread(); } })); + SAME_THREAD = new Executor() { + @Override + public void execute(@Nonnull Runnable runnable) { + runnable.run(); + } + }; } private static CycleDetectingLockFactory.Policy policy; diff --git a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java index d4329155..259a4ce2 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java @@ -206,7 +206,7 @@ public class PeerGroupTest extends TestWithPeerGroup { public void onTransaction(Peer peer, Transaction t) { event[0] = t; } - }); + }, Threading.SAME_THREAD); FakeChannel p1 = connectPeer(1); FakeChannel p2 = connectPeer(2); diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index 83a858b6..bb28832a 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -326,7 +326,7 @@ public class PeerTest extends TestWithNetworkConnections { control.replay(); connect(); - peer.addEventListener(listener); + peer.addEventListener(listener, Threading.SAME_THREAD); long height = peer.getBestHeight(); inbound(peer, inv); @@ -359,7 +359,7 @@ public class PeerTest extends TestWithNetworkConnections { control.replay(); connect(); - peer.addEventListener(listener); + peer.addEventListener(listener, Threading.SAME_THREAD); peer.startBlockChainDownload(); control.verify(); @@ -500,7 +500,7 @@ public class PeerTest extends TestWithNetworkConnections { public void onTransaction(Peer peer1, Transaction t) { onTx[0] = t; } - }); + }, Threading.SAME_THREAD); // Make the some fake transactions in the following graph: // t1 -> t2 -> [t5] @@ -787,7 +787,7 @@ public class PeerTest extends TestWithNetworkConnections { public void onException(Throwable throwable) { throwables[0] = throwable; } - }); + }, Threading.SAME_THREAD); control.replay(); connect(); Transaction t1 = new Transaction(unitTestParams);