3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 23:03:04 +00:00

Threading: experiment with limiting the user/event thread to 100 pending closures.

This commit is contained in:
Mike Hearn 2014-01-14 15:25:38 +01:00
parent 941bf330a4
commit e0b698a2e9

View File

@ -16,18 +16,19 @@
package com.google.bitcoin.utils; package com.google.bitcoin.utils;
import com.google.common.util.concurrent.Callables;
import com.google.common.util.concurrent.CycleDetectingLockFactory; import com.google.common.util.concurrent.CycleDetectingLockFactory;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.lang.ref.WeakReference; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.*; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static com.google.common.base.Preconditions.checkState;
/** /**
* Various threading related utilities. Provides a wrapper around explicit lock creation that lets you control whether * Various threading related utilities. Provides a wrapper around explicit lock creation that lets you control whether
* bitcoinj performs cycle detection or not. Cycle detection is useful to detect bugs but comes with a small cost. * bitcoinj performs cycle detection or not. Cycle detection is useful to detect bugs but comes with a small cost.
@ -41,9 +42,6 @@ public class Threading {
*/ */
public static Executor USER_THREAD; public static Executor USER_THREAD;
// Default value for USER_THREAD.
private static final ExecutorService SINGLE_THREADED_EXECUTOR;
/** /**
* A dummy executor that just invokes the runnable immediately. Use this over * A dummy executor that just invokes the runnable immediately. Use this over
* {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor()} because the latter creates a new * {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor()} because the latter creates a new
@ -52,9 +50,6 @@ public class Threading {
*/ */
public static final Executor SAME_THREAD; public static final Executor SAME_THREAD;
// For safety reasons keep track of the thread we use to run user-provided event listeners to avoid deadlock.
private static volatile WeakReference<Thread> vUserThread;
/** /**
* Put a dummy task into the queue and wait for it to be run. Because it's single threaded, this means all * Put a dummy task into the queue and wait for it to be run. Because it's single threaded, this means all
* tasks submitted before this point are now completed. Usually you won't want to use this method - it's a * tasks submitted before this point are now completed. Usually you won't want to use this method - it's a
@ -63,14 +58,13 @@ public class Threading {
* on it. You can then either block on that future, compose it, add listeners to it and so on. * on it. You can then either block on that future, compose it, add listeners to it and so on.
*/ */
public static void waitForUserCode() { public static void waitForUserCode() {
// If this assert fires it means you have a bug in your code - you can't call this method inside your own final CountDownLatch latch = new CountDownLatch(1);
// event handlers because it would never return. If you aren't calling this method explicitly, then that USER_THREAD.execute(new Runnable() {
// means there's a bug in bitcoinj. @Override public void run() {
if (vUserThread != null) { latch.countDown();
checkState(vUserThread.get() != null && vUserThread.get() != Thread.currentThread(), }
"waitForUserCode() run on user code thread would deadlock."); });
} Uninterruptibles.awaitUninterruptibly(latch);
Futures.getUnchecked(SINGLE_THREADED_EXECUTOR.submit(Callables.returning(null)));
} }
/** /**
@ -86,23 +80,49 @@ public class Threading {
///////////////////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////
public static class UserThread extends Thread implements Executor {
private static final Logger log = LoggerFactory.getLogger(UserThread.class);
private LinkedBlockingQueue<Runnable> tasks;
public UserThread(int tasksBound) {
super("bitcoinj user thread");
setDaemon(true);
tasks = new LinkedBlockingQueue<Runnable>(tasksBound);
start();
}
@SuppressWarnings("InfiniteLoopStatement") @Override
public void run() {
while (true) {
if (tasks.remainingCapacity() < 2)
log.warn("User thread saturated, {} tasks queued. Review your event handlers to make sure they are not too slow",
tasks.size());
Runnable task = Uninterruptibles.takeUninterruptibly(tasks);
try {
task.run();
} catch (Throwable throwable) {
log.warn("Exception in user thread", throwable);
Thread.UncaughtExceptionHandler handler = uncaughtExceptionHandler;
if (handler != null)
handler.uncaughtException(this, throwable);
}
}
}
@Override
public void execute(Runnable command) {
// Will block if the event thread is saturated.
Uninterruptibles.putUninterruptibly(tasks, command);
}
}
static { static {
// Default policy goes here. If you want to change this, use one of the static methods before // Default policy goes here. If you want to change this, use one of the static methods before
// instantiating any bitcoinj objects. The policy change will take effect only on new objects // instantiating any bitcoinj objects. The policy change will take effect only on new objects
// from that point onwards. // from that point onwards.
throwOnLockCycles(); throwOnLockCycles();
SINGLE_THREADED_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactory() { USER_THREAD = new UserThread(100); // 100 pending event listeners to avoid memory blowup.
@Nonnull @Override public Thread newThread(@Nonnull Runnable runnable) {
Thread t = new Thread(runnable);
t.setName("bitcoinj user thread");
t.setDaemon(true);
t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
vUserThread = new WeakReference<Thread>(t);
return t;
}
});
USER_THREAD = SINGLE_THREADED_EXECUTOR;
SAME_THREAD = new Executor() { SAME_THREAD = new Executor() {
@Override @Override
public void execute(@Nonnull Runnable runnable) { public void execute(@Nonnull Runnable runnable) {