ExecuteProduceConsume and networking improvements

Added "volatile" to more fields, for thread-safety on reads.
Changes to field values are done inside synchronized blocks
so no need for AtomicInteger/AtomicBoolean. (Could be changed
in the future to show intention/readability though).

Added more statistics (tasks produced/consumed).

Limited Network's EPC executor to 10 threads max.
This commit is contained in:
catbref 2019-08-16 08:28:26 +01:00
parent c597f11c37
commit f83dc26ae0
3 changed files with 124 additions and 65 deletions

View File

@ -162,8 +162,8 @@ public class Network extends Thread {
mergePeersLock = new ReentrantLock(); mergePeersLock = new ReentrantLock();
// We'll use a cached thread pool, but with more aggressive 10 second timeout. // We'll use a cached thread pool, max 10 threads, but with more aggressive 10 second timeout.
ExecutorService networkExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, ExecutorService networkExecutor = new ThreadPoolExecutor(1, 10,
10L, TimeUnit.SECONDS, 10L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()); new SynchronousQueue<Runnable>());
networkEPC = new NetworkProcessor(networkExecutor); networkEPC = new NetworkProcessor(networkExecutor);

View File

@ -14,15 +14,22 @@ public abstract class ExecuteProduceConsume implements Runnable {
private final Logger logger; private final Logger logger;
private ExecutorService executor; private ExecutorService executor;
private int activeThreadCount = 0;
private int greatestActiveThreadCount = 0;
private int consumerCount = 0;
private boolean hasThreadPending = false; // These are volatile to prevent thread-local caching of values
// but all are updated inside synchronized blocks
// so we don't need AtomicInteger/AtomicBoolean
private volatile int activeThreadCount = 0;
private volatile int greatestActiveThreadCount = 0;
private volatile int consumerCount = 0;
private volatile int tasksProduced = 0;
private volatile int tasksConsumed = 0;
private volatile boolean hasThreadPending = false;
public ExecuteProduceConsume(ExecutorService executor) { public ExecuteProduceConsume(ExecutorService executor) {
className = this.getClass().getSimpleName(); this.className = this.getClass().getSimpleName();
logger = LogManager.getLogger(this.getClass()); this.logger = LogManager.getLogger(this.getClass());
this.executor = executor; this.executor = executor;
} }
@ -32,27 +39,39 @@ public abstract class ExecuteProduceConsume implements Runnable {
} }
public void start() { public void start() {
executor.execute(this); this.executor.execute(this);
} }
public void shutdown() { public void shutdown() {
executor.shutdownNow(); this.executor.shutdownNow();
} }
public boolean shutdown(long timeout) throws InterruptedException { public boolean shutdown(long timeout) throws InterruptedException {
executor.shutdownNow(); this.executor.shutdownNow();
return executor.awaitTermination(timeout, TimeUnit.MILLISECONDS); return this.executor.awaitTermination(timeout, TimeUnit.MILLISECONDS);
} }
public int getActiveThreadCount() { public int getActiveThreadCount() {
synchronized (this) { synchronized (this) {
return activeThreadCount; return this.activeThreadCount;
} }
} }
public int getGreatestActiveThreadCount() { public int getGreatestActiveThreadCount() {
synchronized (this) { synchronized (this) {
return greatestActiveThreadCount; return this.greatestActiveThreadCount;
}
}
public int getTasksProduced() {
synchronized (this) {
return this.tasksProduced;
}
}
public int getTasksConsumed() {
synchronized (this) {
return this.tasksConsumed;
} }
} }
@ -72,19 +91,19 @@ public abstract class ExecuteProduceConsume implements Runnable {
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName(className + "-" + Thread.currentThread().getId()); Thread.currentThread().setName(this.className + "-" + Thread.currentThread().getId());
boolean wasThreadPending; boolean wasThreadPending;
synchronized (this) { synchronized (this) {
++activeThreadCount; ++this.activeThreadCount;
if (activeThreadCount > greatestActiveThreadCount) if (this.activeThreadCount > this.greatestActiveThreadCount)
greatestActiveThreadCount = activeThreadCount; this.greatestActiveThreadCount = this.activeThreadCount;
logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d", this.logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d",
Thread.currentThread().getId(), hasThreadPending, activeThreadCount)); Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount));
// Defer clearing hasThreadPending to prevent unnecessary threads waiting to produce... // Defer clearing hasThreadPending to prevent unnecessary threads waiting to produce...
wasThreadPending = hasThreadPending; wasThreadPending = this.hasThreadPending;
} }
try { try {
@ -93,33 +112,34 @@ public abstract class ExecuteProduceConsume implements Runnable {
while (true) { while (true) {
final Task task; final Task task;
logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId())); this.logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId()));
synchronized (this) { synchronized (this) {
if (wasThreadPending) { if (wasThreadPending) {
// Clear thread-pending flag now that we about to produce. // Clear thread-pending flag now that we about to produce.
hasThreadPending = false; this.hasThreadPending = false;
wasThreadPending = false; wasThreadPending = false;
} }
final boolean lambdaCanIdle = canBlock; final boolean lambdaCanIdle = canBlock;
logger.trace(() -> String.format("[%d] producing, activeThreadCount: %d, consumerCount: %d, canBlock is %b...", this.logger.trace(() -> String.format("[%d] producing, activeThreadCount: %d, consumerCount: %d, canBlock is %b...",
Thread.currentThread().getId(), activeThreadCount, consumerCount, lambdaCanIdle)); Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount, lambdaCanIdle));
final long now = System.currentTimeMillis(); final long now = System.currentTimeMillis();
task = produceTask(canBlock); task = produceTask(canBlock);
final long delay = System.currentTimeMillis() - now; final long delay = System.currentTimeMillis() - now;
logger.trace(() -> String.format("[%d] producing took %dms", Thread.currentThread().getId(), delay)); this.logger.trace(() -> String.format("[%d] producing took %dms", Thread.currentThread().getId(), delay));
} }
if (task == null) if (task == null)
synchronized (this) { synchronized (this) {
logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d", this.logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d",
Thread.currentThread().getId(), activeThreadCount, consumerCount)); Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount));
if (activeThreadCount > consumerCount + 1) { if (this.activeThreadCount > this.consumerCount + 1) {
--activeThreadCount; --this.activeThreadCount;
logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d", Thread.currentThread().getId(), activeThreadCount)); this.logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d",
Thread.currentThread().getId(), this.activeThreadCount));
break; break;
} }
@ -132,30 +152,38 @@ public abstract class ExecuteProduceConsume implements Runnable {
// We have a task // We have a task
synchronized (this) { synchronized (this) {
++consumerCount; ++this.tasksProduced;
++this.consumerCount;
if (!hasThreadPending) { if (!this.hasThreadPending) {
logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId())); this.logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId()));
hasThreadPending = true; this.hasThreadPending = true;
executor.execute(this); // Same object, different thread
try {
this.executor.execute(this); // Same object, different thread
} catch (RejectedExecutionException e) {
this.hasThreadPending = false;
this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId()));
}
} }
} }
logger.trace(() -> String.format("[%d] performing task...", Thread.currentThread().getId())); this.logger.trace(() -> String.format("[%d] performing task...", Thread.currentThread().getId()));
task.perform(); // This can block for a while task.perform(); // This can block for a while
logger.trace(() -> String.format("[%d] finished task", Thread.currentThread().getId())); this.logger.trace(() -> String.format("[%d] finished task", Thread.currentThread().getId()));
synchronized (this) { synchronized (this) {
--consumerCount; ++this.tasksConsumed;
--this.consumerCount;
// Quicker, non-blocking produce next round // Quicker, non-blocking produce next round
canBlock = false; canBlock = false;
} }
} }
} catch (InterruptedException | RejectedExecutionException e) { } catch (InterruptedException e) {
// We're in shutdown situation so exit // We're in shutdown situation so exit
} finally { } finally {
Thread.currentThread().setName(className + "-dormant"); Thread.currentThread().setName(this.className + "-dormant");
} }
} }

View File

@ -4,48 +4,38 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test; import org.junit.Test;
import org.qora.utils.ExecuteProduceConsume; import org.qora.utils.ExecuteProduceConsume;
public class ThreadTests { public class EPCTests {
private void testEPC(ExecuteProduceConsume testEPC) throws InterruptedException {
testEPC.start();
// Let it run for a minute
for (int s = 1; s <= 60; ++s) {
Thread.sleep(1000);
System.out.println(String.format("After %d second%s, active threads: %d, greatest thread count: %d", s, (s != 1 ? "s" : "") , testEPC.getActiveThreadCount(), testEPC.getGreatestActiveThreadCount()));
}
final long before = System.currentTimeMillis();
testEPC.shutdown(30 * 1000);
final long after = System.currentTimeMillis();
System.out.println(String.format("Shutdown took %d milliseconds", after - before));
System.out.println(String.format("Greatest thread count: %d", testEPC.getGreatestActiveThreadCount()));
}
@Test
public void testRandomEPC() throws InterruptedException {
final int TASK_PERCENT = 25; // Produce a task this % of the time
final int PAUSE_PERCENT = 80; // Pause for new work this % of the time
class RandomEPC extends ExecuteProduceConsume { class RandomEPC extends ExecuteProduceConsume {
private final int TASK_PERCENT;
private final int PAUSE_PERCENT;
public RandomEPC(ExecutorService executor, int taskPercent, int pausePercent) {
super(executor);
this.TASK_PERCENT = taskPercent;
this.PAUSE_PERCENT = pausePercent;
}
@Override @Override
protected Task produceTask(boolean canIdle) throws InterruptedException { protected Task produceTask(boolean canIdle) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Random random = new Random(); Random random = new Random();
final int percent = random.nextInt(100); final int percent = random.nextInt(100);
// Sometimes produce a task // Sometimes produce a task
if (percent < TASK_PERCENT) { if (percent < TASK_PERCENT) {
return new Task() { return () -> {
@Override
public void perform() throws InterruptedException {
Thread.sleep(random.nextInt(500) + 100); Thread.sleep(random.nextInt(500) + 100);
}
}; };
} else { } else {
// If we don't produce a task, then maybe simulate a pause until work arrives // If we don't produce a task, then maybe simulate a pause until work arrives
@ -57,7 +47,48 @@ public class ThreadTests {
} }
} }
testEPC(new RandomEPC()); private void testEPC(ExecuteProduceConsume testEPC) throws InterruptedException {
testEPC.start();
// Let it run for a minute
for (int s = 1; s <= 60; ++s) {
Thread.sleep(1000);
System.out.println(String.format("After %d second%s, active threads: %d, greatest thread count: %d, tasks produced: %d, tasks consumed: %d",
s, (s != 1 ? "s" : ""),
testEPC.getActiveThreadCount(), testEPC.getGreatestActiveThreadCount(),
testEPC.getTasksProduced(), testEPC.getTasksConsumed()));
}
final long before = System.currentTimeMillis();
testEPC.shutdown(30 * 1000);
final long after = System.currentTimeMillis();
System.out.println(String.format("Shutdown took %d milliseconds", after - before));
System.out.println(String.format("Greatest thread count: %d", testEPC.getGreatestActiveThreadCount()));
System.out.println(String.format("Tasks produced: %d", testEPC.getTasksProduced()));
System.out.println(String.format("Tasks consumed: %d", testEPC.getTasksConsumed()));
}
@Test
public void testRandomEPC() throws InterruptedException {
final int TASK_PERCENT = 25; // Produce a task this % of the time
final int PAUSE_PERCENT = 80; // Pause for new work this % of the time
final ExecutorService executor = Executors.newCachedThreadPool();
testEPC(new RandomEPC(executor, TASK_PERCENT, PAUSE_PERCENT));
}
@Test
public void testRandomFixedPoolEPC() throws InterruptedException {
final int TASK_PERCENT = 25; // Produce a task this % of the time
final int PAUSE_PERCENT = 80; // Pause for new work this % of the time
final int MAX_THREADS = 3;
final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);
testEPC(new RandomEPC(executor, TASK_PERCENT, PAUSE_PERCENT));
} }
/** /**