ExecuteProduceConsume:

Slight reworking of EPC to simplify when producer can block
and generally make some of the conditional code more readable.

Improved logging with task class names and logging level editable during runtime!
Use /peer/enginestats?newLoggingLevel=DEBUG (or TRACE or back to INFO) to change.
This commit is contained in:
catbref 2022-03-14 21:26:19 +00:00
parent 3d99f86630
commit e835f6d998
3 changed files with 79 additions and 65 deletions

View File

@ -20,6 +20,11 @@ import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.logging.log4j.core.LoggerContext;
import org.qortal.api.*;
import org.qortal.api.model.ConnectedPeer;
import org.qortal.api.model.PeersSummary;
@ -127,9 +132,29 @@ public class PeersResource {
}
)
@SecurityRequirement(name = "apiKey")
public ExecuteProduceConsume.StatsSnapshot getEngineStats(@HeaderParam(Security.API_KEY_HEADER) String apiKey) {
public ExecuteProduceConsume.StatsSnapshot getEngineStats(@HeaderParam(Security.API_KEY_HEADER) String apiKey, @QueryParam("newLoggingLevel") Level newLoggingLevel) {
Security.checkApiCallAllowed(request);
if (newLoggingLevel != null) {
final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
final Configuration config = ctx.getConfiguration();
String epcClassName = "org.qortal.network.Network.NetworkProcessor";
LoggerConfig loggerConfig = config.getLoggerConfig(epcClassName);
LoggerConfig specificConfig = loggerConfig;
// We need a specific configuration for this logger,
// otherwise we would change the level of all other loggers
// having the original configuration as parent as well
if (!loggerConfig.getName().equals(epcClassName)) {
specificConfig = new LoggerConfig(epcClassName, newLoggingLevel, true);
specificConfig.setParent(loggerConfig);
config.addLogger(epcClassName, specificConfig);
}
specificConfig.setLevel(newLoggingLevel);
ctx.updateLoggers();
}
return Network.getInstance().getStatsSnapshot();
}

View File

@ -180,7 +180,8 @@ public class Network {
}
// Load all known peers from repository
synchronized (this.allKnownPeers) { List<String> fixedNetwork = Settings.getInstance().getFixedNetwork();
synchronized (this.allKnownPeers) {
List<String> fixedNetwork = Settings.getInstance().getFixedNetwork();
if (fixedNetwork != null && !fixedNetwork.isEmpty()) {
Long addedWhen = NTP.getTime();
String addedBy = "fixedNetwork";

View File

@ -28,7 +28,6 @@ public abstract class ExecuteProduceConsume implements Runnable {
private final String className;
private final Logger logger;
private final boolean isLoggerTraceEnabled;
protected ExecutorService executor;
@ -43,12 +42,12 @@ public abstract class ExecuteProduceConsume implements Runnable {
private volatile int tasksConsumed = 0;
private volatile int spawnFailures = 0;
/** Whether a new thread has already been spawned and is waiting to start. Used to prevent spawning multiple new threads. */
private volatile boolean hasThreadPending = false;
public ExecuteProduceConsume(ExecutorService executor) {
this.className = this.getClass().getSimpleName();
this.logger = LogManager.getLogger(this.getClass());
this.isLoggerTraceEnabled = this.logger.isTraceEnabled();
this.executor = executor;
}
@ -105,8 +104,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
@Override
public void run() {
if (this.isLoggerTraceEnabled)
Thread.currentThread().setName(this.className + "-" + Thread.currentThread().getId());
Thread.currentThread().setName(this.className + "-" + Thread.currentThread().getId());
boolean wasThreadPending;
synchronized (this) {
@ -114,25 +112,19 @@ public abstract class ExecuteProduceConsume implements Runnable {
if (this.activeThreadCount > this.greatestActiveThreadCount)
this.greatestActiveThreadCount = this.activeThreadCount;
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d",
Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount));
}
this.logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d",
Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount));
// Defer clearing hasThreadPending to prevent unnecessary threads waiting to produce...
wasThreadPending = this.hasThreadPending;
}
try {
// It's possible this might need to become a class instance private volatile
boolean canBlock = false;
while (!Thread.currentThread().isInterrupted()) {
Task task = null;
String taskType;
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId()));
}
this.logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId()));
synchronized (this) {
if (wasThreadPending) {
@ -141,13 +133,13 @@ public abstract class ExecuteProduceConsume implements Runnable {
wasThreadPending = false;
}
final boolean lambdaCanIdle = canBlock;
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] producing, activeThreadCount: %d, consumerCount: %d, canBlock is %b...",
Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount, lambdaCanIdle));
}
// If we're the only non-consuming thread - producer can afford to block this round
boolean canBlock = this.activeThreadCount - this.consumerCount <= 1;
final long beforeProduce = isLoggerTraceEnabled ? System.currentTimeMillis() : 0;
this.logger.trace(() -> String.format("[%d] producing... [activeThreadCount: %d, consumerCount: %d, canBlock: %b]",
Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount, canBlock));
final long beforeProduce = this.logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
try {
task = produceTask(canBlock);
@ -158,31 +150,36 @@ public abstract class ExecuteProduceConsume implements Runnable {
this.logger.warn(() -> String.format("[%d] exception while trying to produce task", Thread.currentThread().getId()), e);
}
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] producing took %dms", Thread.currentThread().getId(), System.currentTimeMillis() - beforeProduce));
if (this.logger.isDebugEnabled()) {
final long productionPeriod = System.currentTimeMillis() - beforeProduce;
taskType = task == null ? "no task" : task.getClass().getCanonicalName();
this.logger.debug(() -> String.format("[%d] produced [%s] in %dms [canBlock: %b]",
Thread.currentThread().getId(),
taskType,
productionPeriod,
canBlock
));
} else {
taskType = null;
}
}
if (task == null)
synchronized (this) {
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d",
Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount));
}
this.logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d",
Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount));
if (this.activeThreadCount > this.consumerCount + 1) {
// If we have an excess of non-consuming threads then we can exit
if (this.activeThreadCount - this.consumerCount > 1) {
--this.activeThreadCount;
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d",
Thread.currentThread().getId(), this.activeThreadCount));
}
this.logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d",
Thread.currentThread().getId(), this.activeThreadCount));
return;
}
// We're the last surviving thread - producer can afford to block next round
canBlock = true;
continue;
}
@ -192,16 +189,13 @@ public abstract class ExecuteProduceConsume implements Runnable {
++this.tasksProduced;
++this.consumerCount;
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] hasThreadPending: %b, activeThreadCount: %d, consumerCount now: %d",
Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount, this.consumerCount));
}
this.logger.trace(() -> String.format("[%d] hasThreadPending: %b, activeThreadCount: %d, consumerCount now: %d",
Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount, this.consumerCount));
// If we have no thread pending and no excess of threads then we should spawn a fresh thread
if (!this.hasThreadPending && this.activeThreadCount <= this.consumerCount + 1) {
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId()));
}
if (!this.hasThreadPending && this.activeThreadCount == this.consumerCount) {
this.logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId()));
this.hasThreadPending = true;
try {
@ -209,21 +203,19 @@ public abstract class ExecuteProduceConsume implements Runnable {
} catch (RejectedExecutionException e) {
++this.spawnFailures;
this.hasThreadPending = false;
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId()));
}
this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId()));
this.onSpawnFailure();
}
} else {
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] NOT spawning another thread", Thread.currentThread().getId()));
}
this.logger.trace(() -> String.format("[%d] NOT spawning another thread", Thread.currentThread().getId()));
}
}
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] performing task...", Thread.currentThread().getId()));
}
this.logger.trace(() -> String.format("[%d] consuming [%s] task...", Thread.currentThread().getId(), taskType));
final long beforePerform = this.logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
try {
task.perform(); // This can block for a while
@ -231,29 +223,25 @@ public abstract class ExecuteProduceConsume implements Runnable {
// We're in shutdown situation so exit
Thread.currentThread().interrupt();
} catch (Exception e) {
this.logger.warn(() -> String.format("[%d] exception while performing task", Thread.currentThread().getId()), e);
this.logger.warn(() -> String.format("[%d] exception while consuming task", Thread.currentThread().getId()), e);
}
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] finished task", Thread.currentThread().getId()));
if (this.logger.isDebugEnabled()) {
final long productionPeriod = System.currentTimeMillis() - beforePerform;
this.logger.debug(() -> String.format("[%d] consumed [%s] task in %dms", Thread.currentThread().getId(), taskType, productionPeriod));
}
synchronized (this) {
++this.tasksConsumed;
--this.consumerCount;
if (this.isLoggerTraceEnabled) {
this.logger.trace(() -> String.format("[%d] consumerCount now: %d",
Thread.currentThread().getId(), this.consumerCount));
}
// Quicker, non-blocking produce next round
canBlock = false;
this.logger.trace(() -> String.format("[%d] consumerCount now: %d",
Thread.currentThread().getId(), this.consumerCount));
}
}
} finally {
if (this.isLoggerTraceEnabled)
Thread.currentThread().setName(this.className);
Thread.currentThread().setName(this.className);
}
}