3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-31 15:22:16 +00:00

Extend executor-specific event handlers to PeerGroup and Peer. Rename Threading.sameThread to Threading.SAME_THREAD

This commit is contained in:
Mike Hearn 2013-06-28 17:11:14 +02:00
parent 50b71979bb
commit 2537ff47b5
8 changed files with 109 additions and 44 deletions

View File

@ -18,6 +18,7 @@ package com.google.bitcoin.core;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.BlockStoreException;
import com.google.bitcoin.utils.ListenerRegistration;
import com.google.bitcoin.utils.Threading;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
@ -34,6 +35,7 @@ import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@ -60,7 +62,7 @@ public class Peer {
private final NetworkParameters params;
private final AbstractBlockChain blockChain;
private volatile PeerAddress vAddress;
private final CopyOnWriteArrayList<PeerEventListener> eventListeners;
private final CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>> eventListeners;
private final CopyOnWriteArrayList<PeerLifecycleListener> lifecycleListeners;
// Whether to try and download blocks and transactions from this peer. Set to false by PeerGroup if not the
// primary peer. This is to avoid redundant work and concurrency problems with downloading the same chain
@ -147,7 +149,7 @@ public class Peer {
this.blockChain = chain; // Allowed to be null.
this.vDownloadData = chain != null;
this.getDataFutures = new CopyOnWriteArrayList<GetDataRequest>();
this.eventListeners = new CopyOnWriteArrayList<PeerEventListener>();
this.eventListeners = new CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>>();
this.lifecycleListeners = new CopyOnWriteArrayList<PeerLifecycleListener>();
this.fastCatchupTimeSecs = params.getGenesisBlock().getTimeSeconds();
this.isAcked = false;
@ -167,12 +169,29 @@ public class Peer {
this.versionMessage.appendToSubVer(thisSoftwareName, thisSoftwareVersion, null);
}
/**
* Registers the given object as an event listener that will be invoked on the user thread. Note that listeners
* added this way will <b>not</b> receive {@link PeerEventListener#getData(Peer, GetDataMessage)} or
* {@link PeerEventListener#onPreMessageReceived(Peer, Message)} calls because those require that the listener
* be added using {@link Threading#SAME_THREAD}, which requires the other addListener form.
*/
public void addEventListener(PeerEventListener listener) {
eventListeners.add(listener);
addEventListener(listener, Threading.USER_THREAD);
}
/**
* Registers the given object as an event listener that will be invoked by the given executor. Note that listeners
* added using any other executor than {@link Threading#SAME_THREAD} will <b>not</b> receive
* {@link PeerEventListener#getData(Peer, GetDataMessage)} or
* {@link PeerEventListener#onPreMessageReceived(Peer, Message)} calls because this class is not willing to cross
* threads in order to get the results of those hook methods.
*/
public void addEventListener(PeerEventListener listener, Executor executor) {
eventListeners.add(new ListenerRegistration<PeerEventListener>(listener, executor));
}
public boolean removeEventListener(PeerEventListener listener) {
return eventListeners.remove(listener);
return ListenerRegistration.removeFromList(listener, eventListeners);
}
void addLifecycleListener(PeerLifecycleListener listener) {
@ -246,9 +265,13 @@ public class Peer {
try {
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (PeerEventListener listener : eventListeners) {
m = listener.onPreMessageReceived(this, m);
if (m == null) break;
for (ListenerRegistration<PeerEventListener> registration : eventListeners) {
// Skip any listeners that are supposed to run in another thread as we don't want to block waiting
// for it, which might cause circular deadlock.
if (registration.executor == Threading.SAME_THREAD) {
m = registration.listener.onPreMessageReceived(this, m);
if (m == null) break;
}
}
if (m == null) return;
@ -309,12 +332,17 @@ public class Peer {
} else {
log.warn("Received unhandled message: {}", m);
}
} catch (Throwable throwable) {
} catch (final Throwable throwable) {
log.warn("Caught exception in peer thread: {}", throwable.getMessage());
throwable.printStackTrace();
for (PeerEventListener listener : eventListeners) {
for (final ListenerRegistration<PeerEventListener> registration : eventListeners) {
try {
listener.onException(throwable);
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onException(throwable);
}
});
} catch (Exception e1) {
e1.printStackTrace();
}
@ -435,8 +463,9 @@ public class Peer {
private void processGetData(GetDataMessage getdata) throws IOException {
log.info("{}: Received getdata message: {}", vAddress, getdata.toString());
ArrayList<Message> items = new ArrayList<Message>();
for (PeerEventListener listener : eventListeners) {
List<Message> listenerItems = listener.getData(this, getdata);
for (ListenerRegistration<PeerEventListener> registration : eventListeners) {
if (registration.executor != Threading.SAME_THREAD) continue;
List<Message> listenerItems = registration.listener.getData(this, getdata);
if (listenerItems == null) continue;
items.addAll(listenerItems);
}
@ -452,6 +481,7 @@ public class Peer {
private void processTransaction(Transaction tx) throws VerificationException, IOException {
// Check a few basic syntax issues to ensure the received TX isn't nonsense.
tx.verify();
final Transaction fTx;
lock.lock();
try {
log.debug("{}: Received tx {}", vAddress, tx.getHashAsString());
@ -459,7 +489,7 @@ public class Peer {
// We may get back a different transaction object.
tx = memoryPool.seen(tx, getAddress());
}
final Transaction fTx = tx;
fTx = tx;
// Label the transaction as coming in from the P2P network (as opposed to being created by us, direct import,
// etc). This helps the wallet decide how to risk analyze it later.
fTx.getConfidence().setSource(TransactionConfidence.Source.NETWORK);
@ -520,8 +550,14 @@ public class Peer {
}
// Tell all listeners about this tx so they can decide whether to keep it or not. If no listener keeps a
// reference around then the memory pool will forget about it after a while too because it uses weak references.
for (PeerEventListener listener : eventListeners)
listener.onTransaction(this, tx);
for (final ListenerRegistration<PeerEventListener> registration : eventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onTransaction(Peer.this, fTx);
}
});
}
}
/**
@ -663,7 +699,7 @@ public class Peer {
}
}
}
}, MoreExecutors.sameThreadExecutor());
}, Threading.SAME_THREAD);
}
} catch (Exception e) {
log.error("{}: Couldn't send getdata in downloadDependencies({})", this, tx.getHash());
@ -805,8 +841,14 @@ public class Peer {
// since the time we first connected to the peer. However, it's weird and unexpected to receive a callback
// with negative "blocks left" in this case, so we clamp to zero so the API user doesn't have to think about it.
final int blocksLeft = Math.max(0, (int) vPeerVersionMessage.bestHeight - blockChain.getBestChainHeight());
for (PeerEventListener listener : eventListeners)
listener.onBlocksDownloaded(Peer.this, m, blocksLeft);
for (final ListenerRegistration<PeerEventListener> registration : eventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onBlocksDownloaded(Peer.this, m, blocksLeft);
}
});
}
}
private void processInv(InventoryMessage inv) throws IOException {
@ -1127,8 +1169,14 @@ public class Peer {
// chain even if the chain block count is lower.
final int blocksLeft = getPeerBlockHeightDifference();
if (blocksLeft >= 0) {
for (PeerEventListener listener : eventListeners)
listener.onChainDownloadStarted(this, blocksLeft);
for (final ListenerRegistration<PeerEventListener> registration : eventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onChainDownloadStarted(Peer.this, blocksLeft);
}
});
}
// When we just want as many blocks as possible, we can set the target hash to zero.
blockChainDownload(Sha256Hash.ZERO_HASH);
}

View File

@ -439,7 +439,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
* to running on the user thread.
*/
public void addEventListener(PeerEventListener listener) {
addEventListener(listener, Threading.userCode);
addEventListener(listener, Threading.USER_THREAD);
}
/** The given event listener will no longer be called with events. */
@ -611,7 +611,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
// if a key is added. Of course, by then we may have downloaded the chain already. Ideally adding keys would
// automatically rewind the block chain and redownload the blocks to find transactions relevant to those keys,
// all transparently and in the background. But we are a long way from that yet.
wallet.addEventListener(walletEventListener);
wallet.addEventListener(walletEventListener); // TODO: Run this in the current peer thread.
recalculateFastCatchupAndFilter();
updateVersionMessageRelayTxesBeforeFilter(getVersionMessage());
} finally {
@ -811,7 +811,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
}
}
// Make sure the peer knows how to upload transactions that are requested from us.
peer.addEventListener(getDataListener);
peer.addEventListener(getDataListener, Threading.SAME_THREAD);
// Now tell the peers about any transactions we have which didn't appear in the chain yet. These are not
// necessarily spends we created. They may also be transactions broadcast across the network that we saw,
// which are relevant to us, and which we therefore wish to help propagate (ie they send us coins).
@ -824,7 +824,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
announcePendingWalletTransactions(wallets, Collections.singletonList(peer));
// And set up event listeners for clients. This will allow them to find out about new transactions and blocks.
for (ListenerRegistration<PeerEventListener> registration : peerEventListeners) {
peer.addEventListener(registration.listener);
peer.addEventListener(registration.listener, registration.executor);
}
setupPingingForNewPeer(peer);
} finally {
@ -858,7 +858,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
if (firstRun) {
firstRun = false;
try {
peer.ping().addListener(this, MoreExecutors.sameThreadExecutor());
peer.ping().addListener(this, Threading.SAME_THREAD);
} catch (Exception e) {
log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString());
return;
@ -875,7 +875,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
try {
if (!peers.contains(peer) || !PeerGroup.this.isRunning())
return; // Peer was removed/shut down.
peer.ping().addListener(pingRunnable[0], MoreExecutors.sameThreadExecutor());
peer.ping().addListener(pingRunnable[0], Threading.SAME_THREAD);
} catch (Exception e) {
log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString());
}
@ -1041,7 +1041,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
private void startBlockChainDownloadFromPeer(Peer peer) {
lock.lock();
try {
peer.addEventListener(downloadListener);
peer.addEventListener(downloadListener, Threading.SAME_THREAD);
setDownloadPeer(peer);
// startBlockChainDownload will setDownloadData(true) on itself automatically.
peer.startBlockChainDownload();
@ -1245,7 +1245,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
});
}
}
}, MoreExecutors.sameThreadExecutor());
}, Threading.SAME_THREAD);
return future;
}

View File

@ -412,7 +412,7 @@ public class TransactionConfidence implements Serializable {
*/
public void queueListeners(final Listener.ChangeReason reason) {
for (final Listener listener : listeners) {
Threading.userCode.execute(new Runnable() {
Threading.USER_THREAD.execute(new Runnable() {
@Override public void run() {
listener.onConfidenceChanged(transaction, reason);
}

View File

@ -1359,7 +1359,7 @@ public class Wallet implements Serializable, BlockChainListener {
* like receiving money. Runs the listener methods in the user thread.
*/
public void addEventListener(WalletEventListener listener) {
addEventListener(listener, Threading.userCode);
addEventListener(listener, Threading.USER_THREAD);
}
/**
@ -3032,7 +3032,7 @@ public class Wallet implements Serializable, BlockChainListener {
it.remove();
final BigInteger v = checkNotNull(val);
// Don't run any user-provided future listeners with our lock held.
Threading.userCode.execute(new Runnable() {
Threading.USER_THREAD.execute(new Runnable() {
@Override public void run() {
req.future.set(v);
}

View File

@ -5,7 +5,6 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.bitcoin.core.*;
import com.google.bitcoin.utils.Threading;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import net.jcip.annotations.GuardedBy;
import org.bitcoin.paymentchannel.Protos;
@ -242,7 +241,7 @@ public class PaymentChannelServer {
public void run() {
multisigContractPropogated(multisigContract.getHash());
}
}, MoreExecutors.sameThreadExecutor());
}, Threading.SAME_THREAD);
}
@GuardedBy("lock")

View File

@ -20,7 +20,9 @@ import com.google.common.util.concurrent.Callables;
import com.google.common.util.concurrent.CycleDetectingLockFactory;
import com.google.common.util.concurrent.Futures;
import javax.annotation.Nonnull;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
@ -34,10 +36,20 @@ import static com.google.common.base.Preconditions.checkState;
*/
public class Threading {
/**
* A single-threaded executor that is intended for running event listeners on. This ensures all event listener code
* runs without any locks being held.
* An executor with one thread that is intended for running event listeners on. This ensures all event listener code
* runs without any locks being held. It's intended for the API user to run things on. Callbacks registered by
* bitcoinj internally shouldn't normally run here, although currently there are a few exceptions.
*/
public static final ExecutorService userCode;
public static final ExecutorService USER_THREAD;
/**
* A dummy executor that just invokes the runnable immediately. Use this over
* {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor()} because the latter creates a new
* object each time in order to implement the more complex {@link ExecutorService} interface, which is overkill
* for our needs.
*/
public static final Executor SAME_THREAD;
// For safety reasons keep track of the thread we use to run user-provided event listeners to avoid deadlock.
private static final Thread executorThread;
@ -53,7 +65,7 @@ public class Threading {
// event handlers because it would never return. If you aren't calling this method explicitly, then that
// means there's a bug in bitcoinj.
checkState(executorThread != Thread.currentThread(), "waitForUserCode() run on user code thread would deadlock.");
Futures.getUnchecked(userCode.submit(Callables.returning(null)));
Futures.getUnchecked(USER_THREAD.submit(Callables.returning(null)));
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -64,15 +76,21 @@ public class Threading {
// from that point onwards.
throwOnLockCycles();
userCode = Executors.newSingleThreadExecutor();
USER_THREAD = Executors.newSingleThreadExecutor();
// We can't directly get the thread that was just created, but we can fetch it indirectly. We'll use this
// for deadlock detection by checking for waits on the user code thread.
executorThread = Futures.getUnchecked(userCode.submit(new Callable<Thread>() {
executorThread = Futures.getUnchecked(USER_THREAD.submit(new Callable<Thread>() {
@Override public Thread call() throws Exception {
Thread.currentThread().setName("bitcoinj user code thread");
return Thread.currentThread();
}
}));
SAME_THREAD = new Executor() {
@Override
public void execute(@Nonnull Runnable runnable) {
runnable.run();
}
};
}
private static CycleDetectingLockFactory.Policy policy;

View File

@ -206,7 +206,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
public void onTransaction(Peer peer, Transaction t) {
event[0] = t;
}
});
}, Threading.SAME_THREAD);
FakeChannel p1 = connectPeer(1);
FakeChannel p2 = connectPeer(2);

View File

@ -326,7 +326,7 @@ public class PeerTest extends TestWithNetworkConnections {
control.replay();
connect();
peer.addEventListener(listener);
peer.addEventListener(listener, Threading.SAME_THREAD);
long height = peer.getBestHeight();
inbound(peer, inv);
@ -359,7 +359,7 @@ public class PeerTest extends TestWithNetworkConnections {
control.replay();
connect();
peer.addEventListener(listener);
peer.addEventListener(listener, Threading.SAME_THREAD);
peer.startBlockChainDownload();
control.verify();
@ -500,7 +500,7 @@ public class PeerTest extends TestWithNetworkConnections {
public void onTransaction(Peer peer1, Transaction t) {
onTx[0] = t;
}
});
}, Threading.SAME_THREAD);
// Make the some fake transactions in the following graph:
// t1 -> t2 -> [t5]
@ -787,7 +787,7 @@ public class PeerTest extends TestWithNetworkConnections {
public void onException(Throwable throwable) {
throwables[0] = throwable;
}
});
}, Threading.SAME_THREAD);
control.replay();
connect();
Transaction t1 = new Transaction(unitTestParams);