From 35a7f38d8630f30d95665e41b8a7abea39309976 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Mon, 1 Jul 2013 11:42:42 +0200 Subject: [PATCH] Make block chain listeners run in given executors and default to the user thread. --- .../bitcoin/core/AbstractBlockChain.java | 162 +++++++++++------- .../bitcoin/tools/BuildCheckpoints.java | 3 +- 2 files changed, 102 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/com/google/bitcoin/core/AbstractBlockChain.java b/core/src/main/java/com/google/bitcoin/core/AbstractBlockChain.java index 4e27bfcb..b6efe44f 100644 --- a/core/src/main/java/com/google/bitcoin/core/AbstractBlockChain.java +++ b/core/src/main/java/com/google/bitcoin/core/AbstractBlockChain.java @@ -18,6 +18,7 @@ package com.google.bitcoin.core; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStoreException; +import com.google.bitcoin.utils.ListenerRegistration; import com.google.bitcoin.utils.Threading; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; @@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.math.BigInteger; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import static com.google.common.base.Preconditions.*; @@ -100,7 +102,7 @@ public abstract class AbstractBlockChain { private final Object chainHeadLock = new Object(); protected final NetworkParameters params; - private final CopyOnWriteArrayList listeners; + private final CopyOnWriteArrayList> listeners; // Holds a block header and, optionally, a list of tx hashes or block's transactions protected static class OrphanBlock { @@ -128,7 +130,8 @@ public abstract class AbstractBlockChain { chainHead = blockStore.getChainHead(); log.info("chain head is at height {}:\n{}", chainHead.getHeight(), chainHead.getHeader()); this.params = params; - this.listeners = new CopyOnWriteArrayList(listeners); + this.listeners = new CopyOnWriteArrayList>(); + for (BlockChainListener l : listeners) addListener(l, Threading.SAME_THREAD); } /** @@ -138,14 +141,21 @@ public abstract class AbstractBlockChain { * wallets is not well tested! */ public void addWallet(Wallet wallet) { - listeners.add(wallet); + addListener(wallet, Threading.SAME_THREAD); } /** * Adds a generic {@link BlockChainListener} listener to the chain. */ public void addListener(BlockChainListener listener) { - listeners.add(listener); + addListener(listener, Threading.SAME_THREAD); + } + + /** + * Adds a generic {@link BlockChainListener} listener to the chain. + */ + public void addListener(BlockChainListener listener, Executor executor) { + listeners.add(new ListenerRegistration(listener, executor)); } /** @@ -381,8 +391,8 @@ public abstract class AbstractBlockChain { // expensiveChecks enables checks that require looking at blocks further back in the chain // than the previous one when connecting (eg median timestamp check) // It could be exposed, but for now we just set it to shouldVerifyTransactions() - private void connectBlock(Block block, StoredBlock storedPrev, boolean expensiveChecks, - Set filteredTxHashList, List filteredTxn) + private void connectBlock(final Block block, StoredBlock storedPrev, boolean expensiveChecks, + Set filteredTxHashList, final List filteredTxn) throws BlockStoreException, VerificationException, PrunedException { checkState(lock.isLocked()); // Check that we aren't connecting a block that fails a checkpoint check @@ -406,28 +416,7 @@ public abstract class AbstractBlockChain { block.transactions == null ? block : block.cloneAsHeader(), txOutChanges); setChainHead(newStoredBlock); log.debug("Chain is now {} blocks high", newStoredBlock.getHeight()); - // Notify the listeners of the new block, so the depth and workDone of stored transactions can be updated - // (in the case of the listener being a wallet). Wallets need to know how deep each transaction is so - // coinbases aren't used before maturity. - boolean first = true; - for (BlockChainListener listener : listeners) { - if (block.transactions != null || filteredTxn != null) { - // If this is not the first wallet, ask for the transactions to be duplicated before being given - // to the wallet when relevant. This ensures that if we have two connected wallets and a tx that - // is relevant to both of them, they don't end up accidentally sharing the same object (which can - // result in temporary in-memory corruption during re-orgs). See bug 257. We only duplicate in - // the case of multiple wallets to avoid an unnecessary efficiency hit in the common case. - sendTransactionsToListener(newStoredBlock, NewBlockType.BEST_CHAIN, listener, - block.transactions != null ? block.transactions : filteredTxn, !first); - } - if (filteredTxHashList != null) { - for (Sha256Hash hash : filteredTxHashList) { - listener.notifyTransactionIsInBlock(hash, newStoredBlock, NewBlockType.BEST_CHAIN); - } - } - listener.notifyNewBestBlock(newStoredBlock); - first = false; - } + informListenersForNewBlock(block, NewBlockType.BEST_CHAIN, filteredTxHashList, filteredTxn, newStoredBlock); } else { // This block connects to somewhere other than the top of the best known chain. We treat these differently. // @@ -466,33 +455,72 @@ public abstract class AbstractBlockChain { // If we do, send them to the wallet but state that they are on a side chain so it knows not to try and // spend them until they become activated. if (block.transactions != null || filteredTxn != null) { - boolean first = true; - for (BlockChainListener listener : listeners) { - List txnToNotify; - if (block.transactions != null) - txnToNotify = block.transactions; - else - txnToNotify = filteredTxn; - // If this is not the first wallet, ask for the transactions to be duplicated before being given - // to the wallet when relevant. This ensures that if we have two connected wallets and a tx that - // is relevant to both of them, they don't end up accidentally sharing the same object (which can - // result in temporary in-memory corruption during re-orgs). See bug 257. We only duplicate in - // the case of multiple wallets to avoid an unnecessary efficiency hit in the common case. - sendTransactionsToListener(newBlock, NewBlockType.SIDE_CHAIN, listener, txnToNotify, !first); - if (filteredTxHashList != null) { - for (Sha256Hash hash : filteredTxHashList) { - listener.notifyTransactionIsInBlock(hash, newBlock, NewBlockType.SIDE_CHAIN); - } - } - first = false; - } + informListenersForNewBlock(block, NewBlockType.SIDE_CHAIN, filteredTxHashList, filteredTxn, newBlock); } if (haveNewBestChain) handleNewBestChain(storedPrev, newBlock, block, expensiveChecks); } } - + + private void informListenersForNewBlock(final Block block, final NewBlockType newBlockType, + final Set filteredTxHashList, + final List filteredTxn, final StoredBlock newStoredBlock) throws VerificationException { + // Notify the listeners of the new block, so the depth and workDone of stored transactions can be updated + // (in the case of the listener being a wallet). Wallets need to know how deep each transaction is so + // coinbases aren't used before maturity. + boolean first = true; + for (final ListenerRegistration registration : listeners) { + if (registration.executor == Threading.SAME_THREAD) { + informListenerForNewTransactions(block, newBlockType, filteredTxHashList, filteredTxn, + newStoredBlock, first, registration.listener); + if (newBlockType == NewBlockType.BEST_CHAIN) + registration.listener.notifyNewBestBlock(newStoredBlock); + } else { + // Listener wants to be run on some other thread, so marshal it across here. + final boolean notFirst = !first; + registration.executor.execute(new Runnable() { + @Override + public void run() { + try { + informListenerForNewTransactions(block, newBlockType, filteredTxHashList, filteredTxn, + newStoredBlock, notFirst, registration.listener); + if (newBlockType == NewBlockType.BEST_CHAIN) + registration.listener.notifyNewBestBlock(newStoredBlock); + } catch (VerificationException e) { + log.error("Block chain listener threw exception: ", e); + // Don't attempt to relay this back to the original peer thread if this was an async + // listener invocation. + // TODO: Make exception reporting a global feature and use it here. + } + } + }); + } + first = false; + } + } + + private static void informListenerForNewTransactions(Block block, NewBlockType newBlockType, + Set filteredTxHashList, + List filteredTxn, + StoredBlock newStoredBlock, boolean first, + BlockChainListener listener) throws VerificationException { + if (block.transactions != null || filteredTxn != null) { + // If this is not the first wallet, ask for the transactions to be duplicated before being given + // to the wallet when relevant. This ensures that if we have two connected wallets and a tx that + // is relevant to both of them, they don't end up accidentally sharing the same object (which can + // result in temporary in-memory corruption during re-orgs). See bug 257. We only duplicate in + // the case of multiple wallets to avoid an unnecessary efficiency hit in the common case. + sendTransactionsToListener(newStoredBlock, newBlockType, listener, + block.transactions != null ? block.transactions : filteredTxn, !first); + } + if (filteredTxHashList != null) { + for (Sha256Hash hash : filteredTxHashList) { + listener.notifyTransactionIsInBlock(hash, newStoredBlock, newBlockType); + } + } + } + /** * Gets the median timestamp of the last 11 blocks */ @@ -530,14 +558,14 @@ public abstract class AbstractBlockChain { // Firstly, calculate the block at which the chain diverged. We only need to examine the // chain from beyond this block to find differences. StoredBlock head = getChainHead(); - StoredBlock splitPoint = findSplit(newChainHead, head, blockStore); + final StoredBlock splitPoint = findSplit(newChainHead, head, blockStore); log.info("Re-organize after split at height {}", splitPoint.getHeight()); log.info("Old chain head: {}", head.getHeader().getHashAsString()); log.info("New chain head: {}", newChainHead.getHeader().getHashAsString()); log.info("Split at block: {}", splitPoint.getHeader().getHashAsString()); // Then build a list of all blocks in the old part of the chain and the new part. - LinkedList oldBlocks = getPartialChain(head, splitPoint, blockStore); - LinkedList newBlocks = getPartialChain(newChainHead, splitPoint, blockStore); + final LinkedList oldBlocks = getPartialChain(head, splitPoint, blockStore); + final LinkedList newBlocks = getPartialChain(newChainHead, splitPoint, blockStore); // Disconnect each transaction in the previous main chain that is no longer in the new main chain StoredBlock storedNewHead = splitPoint; if (shouldVerifyTransactions()) { @@ -572,13 +600,22 @@ public abstract class AbstractBlockChain { // Now inform the listeners. This is necessary so the set of currently active transactions (that we can spend) // can be updated to take into account the re-organize. We might also have received new coins we didn't have // before and our previous spends might have been undone. - for (int i = 0; i < listeners.size(); i++) { - BlockChainListener listener = listeners.get(i); - listener.reorganize(splitPoint, oldBlocks, newBlocks); - if (i == listeners.size()) { - break; // Listener removed itself and it was the last one. - } else if (listeners.get(i) != listener) { - i--; // Listener removed itself and it was not the last one. + for (final ListenerRegistration registration : listeners) { + if (registration.executor == Threading.SAME_THREAD) { + // Short circuit the executor so we can propagate any exceptions. + // TODO: Do we really need to do this or should it be irrelevant? + registration.listener.reorganize(splitPoint, oldBlocks, newBlocks); + } else { + registration.executor.execute(new Runnable() { + @Override + public void run() { + try { + registration.listener.reorganize(splitPoint, oldBlocks, newBlocks); + } catch (VerificationException e) { + log.error("Block chain listener threw exception during reorg", e); + } + } + }); } } // Update the pointer to the best known block. @@ -812,8 +849,9 @@ public abstract class AbstractBlockChain { // Does not need to be locked. for (Transaction tx : block.transactions) { try { - for (BlockChainListener listener : listeners) { - if (listener.isTransactionRelevant(tx)) return true; + for (final ListenerRegistration registration : listeners) { + if (registration.executor != Threading.SAME_THREAD) continue; + if (registration.listener.isTransactionRelevant(tx)) return true; } } catch (ScriptException e) { // We don't want scripts we don't understand to break the block chain so just note that this tx was @@ -897,7 +935,7 @@ public abstract class AbstractBlockChain { result.set(block); } } - }); + }, Threading.SAME_THREAD); return result; } } diff --git a/tools/src/main/java/com/google/bitcoin/tools/BuildCheckpoints.java b/tools/src/main/java/com/google/bitcoin/tools/BuildCheckpoints.java index cb418e55..c6cb7247 100644 --- a/tools/src/main/java/com/google/bitcoin/tools/BuildCheckpoints.java +++ b/tools/src/main/java/com/google/bitcoin/tools/BuildCheckpoints.java @@ -5,6 +5,7 @@ import com.google.bitcoin.params.MainNetParams; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.MemoryBlockStore; import com.google.bitcoin.utils.BriefLogFormatter; +import com.google.bitcoin.utils.Threading; import java.io.DataOutputStream; import java.io.FileInputStream; @@ -51,7 +52,7 @@ public class BuildCheckpoints { checkpoints.put(height, block); } } - }); + }, Threading.SAME_THREAD); peerGroup.startAndWait(); peerGroup.downloadBlockChain();