Refactored to move all build-related code to a new ArbitraryDataBuildManager class

This is will be used to coordinate all build processes and threads. This way it keeps it separate from the ArbitraryDataManager class, which was getting a bit cluttered.
This commit is contained in:
CalDescent 2021-11-03 19:56:52 +00:00
parent 3a51be3430
commit abfeafc823
6 changed files with 207 additions and 175 deletions

View File

@ -4,6 +4,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.qortal.controller.arbitrary.ArbitraryDataBuildManager;
import org.qortal.controller.arbitrary.ArbitraryDataManager; import org.qortal.controller.arbitrary.ArbitraryDataManager;
import org.qortal.crypto.AES; import org.qortal.crypto.AES;
import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.ArbitraryTransactionData;
@ -70,7 +71,7 @@ public class ArbitraryDataReader {
// cached data, as it may not be fully built // cached data, as it may not be fully built
ArbitraryDataBuildQueueItem queueItem = ArbitraryDataBuildQueueItem queueItem =
new ArbitraryDataBuildQueueItem(this.resourceId, this.resourceIdType, this.service); new ArbitraryDataBuildQueueItem(this.resourceId, this.resourceIdType, this.service);
if (ArbitraryDataManager.getInstance().isInBuildQueue(queueItem)) { if (ArbitraryDataBuildManager.getInstance().isInBuildQueue(queueItem)) {
return false; return false;
} }
@ -97,7 +98,7 @@ public class ArbitraryDataReader {
public boolean loadAsynchronously() { public boolean loadAsynchronously() {
ArbitraryDataBuildQueueItem queueItem = ArbitraryDataBuildQueueItem queueItem =
new ArbitraryDataBuildQueueItem(this.resourceId, this.resourceIdType, this.service); new ArbitraryDataBuildQueueItem(this.resourceId, this.resourceIdType, this.service);
return ArbitraryDataManager.getInstance().addToBuildQueue(queueItem); return ArbitraryDataBuildManager.getInstance().addToBuildQueue(queueItem);
} }
/** /**
@ -134,13 +135,13 @@ public class ArbitraryDataReader {
} }
private void preExecute() { private void preExecute() {
ArbitraryDataManager.getInstance().setBuildInProgress(true); ArbitraryDataBuildManager.getInstance().setBuildInProgress(true);
this.createWorkingDirectory(); this.createWorkingDirectory();
this.createUncompressedDirectory(); this.createUncompressedDirectory();
} }
private void postExecute() { private void postExecute() {
ArbitraryDataManager.getInstance().setBuildInProgress(false); ArbitraryDataBuildManager.getInstance().setBuildInProgress(false);
} }
private void createWorkingDirectory() { private void createWorkingDirectory() {

View File

@ -17,7 +17,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Deque; import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
@ -50,6 +49,7 @@ import org.qortal.api.DomainMapService;
import org.qortal.block.Block; import org.qortal.block.Block;
import org.qortal.block.BlockChain; import org.qortal.block.BlockChain;
import org.qortal.block.BlockChain.BlockTimingByHeight; import org.qortal.block.BlockChain.BlockTimingByHeight;
import org.qortal.controller.arbitrary.ArbitraryDataBuildManager;
import org.qortal.controller.arbitrary.ArbitraryDataCleanupManager; import org.qortal.controller.arbitrary.ArbitraryDataCleanupManager;
import org.qortal.controller.arbitrary.ArbitraryDataManager; import org.qortal.controller.arbitrary.ArbitraryDataManager;
import org.qortal.controller.Synchronizer.SynchronizationResult; import org.qortal.controller.Synchronizer.SynchronizationResult;
@ -81,27 +81,6 @@ import org.qortal.transaction.Transaction.TransactionType;
import org.qortal.transaction.Transaction.ValidationResult; import org.qortal.transaction.Transaction.ValidationResult;
import org.qortal.utils.*; import org.qortal.utils.*;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import java.awt.TrayIcon.MessageType;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
import java.security.Security;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.qortal.network.Peer.FETCH_BLOCKS_TIMEOUT; import static org.qortal.network.Peer.FETCH_BLOCKS_TIMEOUT;
public class Controller extends Thread { public class Controller extends Thread {
@ -508,6 +487,7 @@ public class Controller extends Thread {
// Arbitrary data controllers // Arbitrary data controllers
LOGGER.info("Starting arbitrary-transaction controllers"); LOGGER.info("Starting arbitrary-transaction controllers");
ArbitraryDataManager.getInstance().start(); ArbitraryDataManager.getInstance().start();
ArbitraryDataBuildManager.getInstance().start();
ArbitraryDataCleanupManager.getInstance().start(); ArbitraryDataCleanupManager.getInstance().start();
// Auto-update service? // Auto-update service?
@ -602,7 +582,7 @@ public class Controller extends Thread {
// Clean up arbitrary data request cache // Clean up arbitrary data request cache
ArbitraryDataManager.getInstance().cleanupRequestCache(now); ArbitraryDataManager.getInstance().cleanupRequestCache(now);
// Clean up arbitrary data queues and lists // Clean up arbitrary data queues and lists
ArbitraryDataManager.getInstance().cleanupQueues(now); ArbitraryDataBuildManager.getInstance().cleanupQueues(now);
// Time to 'checkpoint' uncommitted repository writes? // Time to 'checkpoint' uncommitted repository writes?
if (now >= repositoryCheckpointTimestamp + repositoryCheckpointInterval) { if (now >= repositoryCheckpointTimestamp + repositoryCheckpointInterval) {
@ -1084,6 +1064,7 @@ public class Controller extends Thread {
// Arbitrary data controllers // Arbitrary data controllers
LOGGER.info("Shutting down arbitrary-transaction controllers"); LOGGER.info("Shutting down arbitrary-transaction controllers");
ArbitraryDataManager.getInstance().shutdown(); ArbitraryDataManager.getInstance().shutdown();
ArbitraryDataBuildManager.getInstance().shutdown();
ArbitraryDataCleanupManager.getInstance().shutdown(); ArbitraryDataCleanupManager.getInstance().shutdown();
if (blockMinter != null) { if (blockMinter != null) {

View File

@ -0,0 +1,187 @@
package org.qortal.controller.arbitrary;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.arbitrary.ArbitraryDataBuildQueueItem;
import org.qortal.utils.NTP;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ArbitraryDataBuildManager extends Thread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataBuildManager.class);
private static ArbitraryDataBuildManager instance;
private volatile boolean isStopping = false;
private boolean buildInProgress = false;
/**
* Map to keep track of arbitrary transaction resources currently being built (or queued).
*/
public Map<String, ArbitraryDataBuildQueueItem> arbitraryDataBuildQueue = Collections.synchronizedMap(new HashMap<>());
/**
* Map to keep track of failed arbitrary transaction builds.
*/
public Map<String, ArbitraryDataBuildQueueItem> arbitraryDataFailedBuilds = Collections.synchronizedMap(new HashMap<>());
public ArbitraryDataBuildManager() {
}
@Override
public void run() {
try {
// Use a fixed thread pool to execute the arbitrary data build actions (currently just a single thread)
// This can be expanded to have multiple threads processing the build queue when needed
ExecutorService arbitraryDataBuildExecutor = Executors.newFixedThreadPool(1);
arbitraryDataBuildExecutor.execute(new ArbitraryDataBuilderThread());
while (!isStopping) {
// Nothing to do yet
Thread.sleep(5000);
}
} catch (InterruptedException e) {
// Fall-through to exit thread...
}
}
public static ArbitraryDataBuildManager getInstance() {
if (instance == null)
instance = new ArbitraryDataBuildManager();
return instance;
}
public void shutdown() {
isStopping = true;
this.interrupt();
}
public void cleanupQueues(Long now) {
if (now == null) {
return;
}
arbitraryDataBuildQueue.entrySet().removeIf(entry -> entry.getValue().hasReachedBuildTimeout(now));
arbitraryDataFailedBuilds.entrySet().removeIf(entry -> entry.getValue().hasReachedFailureTimeout(now));
}
// Build queue
public boolean addToBuildQueue(ArbitraryDataBuildQueueItem queueItem) {
String resourceId = queueItem.getResourceId();
if (resourceId == null) {
return false;
}
resourceId = resourceId.toLowerCase();
if (this.arbitraryDataBuildQueue == null) {
return false;
}
if (NTP.getTime() == null) {
// Can't use queues until we have synced the time
return false;
}
// Don't add builds that have failed recently
if (this.isInFailedBuildsList(queueItem)) {
return false;
}
if (this.arbitraryDataBuildQueue.put(resourceId, queueItem) != null) {
// Already in queue
return true;
}
LOGGER.info("Added {} to build queue", resourceId);
// Added to queue
return true;
}
public boolean isInBuildQueue(ArbitraryDataBuildQueueItem queueItem) {
String resourceId = queueItem.getResourceId();
if (resourceId == null) {
return false;
}
if (this.arbitraryDataBuildQueue == null) {
return false;
}
if (this.arbitraryDataBuildQueue.containsKey(resourceId)) {
// Already in queue
return true;
}
// Not in queue
return false;
}
// Failed builds
public boolean addToFailedBuildsList(ArbitraryDataBuildQueueItem queueItem) {
String resourceId = queueItem.getResourceId();
if (resourceId == null) {
return false;
}
if (this.arbitraryDataFailedBuilds == null) {
return false;
}
if (NTP.getTime() == null) {
// Can't use queues until we have synced the time
return false;
}
if (this.arbitraryDataFailedBuilds.put(resourceId, queueItem) != null) {
// Already in list
return true;
}
LOGGER.info("Added {} to failed builds list", resourceId);
// Added to queue
return true;
}
public boolean isInFailedBuildsList(ArbitraryDataBuildQueueItem queueItem) {
String resourceId = queueItem.getResourceId();
if (resourceId == null) {
return false;
}
resourceId = resourceId.toLowerCase();
if (this.arbitraryDataFailedBuilds == null) {
return false;
}
if (this.arbitraryDataFailedBuilds.containsKey(resourceId)) {
// Already in list
return true;
}
// Not in list
return false;
}
public void setBuildInProgress(boolean buildInProgress) {
this.buildInProgress = buildInProgress;
}
public boolean getBuildInProgress() {
return this.buildInProgress;
}
}

View File

@ -21,21 +21,21 @@ public class ArbitraryDataBuilderThread implements Runnable {
public void run() { public void run() {
Thread.currentThread().setName("Arbitrary Data Build Manager"); Thread.currentThread().setName("Arbitrary Data Build Manager");
ArbitraryDataManager arbitraryDataManager = ArbitraryDataManager.getInstance(); ArbitraryDataBuildManager buildManager = ArbitraryDataBuildManager.getInstance();
while (!Controller.isStopping()) { while (!Controller.isStopping()) {
try { try {
Thread.sleep(1000); Thread.sleep(1000);
if (arbitraryDataManager.arbitraryDataBuildQueue == null) { if (buildManager.arbitraryDataBuildQueue == null) {
continue; continue;
} }
if (arbitraryDataManager.arbitraryDataBuildQueue.isEmpty()) { if (buildManager.arbitraryDataBuildQueue.isEmpty()) {
continue; continue;
} }
// Find resources that are queued for building // Find resources that are queued for building
Map.Entry<String, ArbitraryDataBuildQueueItem> next = arbitraryDataManager.arbitraryDataBuildQueue Map.Entry<String, ArbitraryDataBuildQueueItem> next = buildManager.arbitraryDataBuildQueue
.entrySet().stream() .entrySet().stream()
.filter(e -> e.getValue().isQueued()) .filter(e -> e.getValue().isQueued())
.findFirst().get(); .findFirst().get();
@ -57,7 +57,7 @@ public class ArbitraryDataBuilderThread implements Runnable {
} }
// Ignore builds that have failed recently // Ignore builds that have failed recently
if (ArbitraryDataManager.getInstance().isInFailedBuildsList(queueItem)) { if (buildManager.isInFailedBuildsList(queueItem)) {
continue; continue;
} }
@ -73,7 +73,7 @@ public class ArbitraryDataBuilderThread implements Runnable {
LOGGER.info("Error building {}: {}", queueItem, e.getMessage()); LOGGER.info("Error building {}: {}", queueItem, e.getMessage());
// Something went wrong - so remove it from the queue, and add to failed builds list // Something went wrong - so remove it from the queue, and add to failed builds list
queueItem.setFailed(true); queueItem.setFailed(true);
ArbitraryDataManager.getInstance().addToFailedBuildsList(queueItem); buildManager.addToFailedBuildsList(queueItem);
this.removeFromQueue(resourceId); this.removeFromQueue(resourceId);
} }
@ -87,6 +87,6 @@ public class ArbitraryDataBuilderThread implements Runnable {
if (resourceId == null) { if (resourceId == null) {
return; return;
} }
ArbitraryDataManager.getInstance().arbitraryDataBuildQueue.remove(resourceId.toLowerCase()); ArbitraryDataBuildManager.getInstance().arbitraryDataBuildQueue.remove(resourceId.toLowerCase());
} }
} }

View File

@ -110,7 +110,7 @@ public class ArbitraryDataCleanupManager extends Thread {
} }
// Don't interfere with the filesystem whilst a build is in progress // Don't interfere with the filesystem whilst a build is in progress
if (ArbitraryDataManager.getInstance().getBuildInProgress()) { if (ArbitraryDataBuildManager.getInstance().getBuildInProgress()) {
Thread.sleep(5000); Thread.sleep(5000);
} }

View File

@ -1,13 +1,10 @@
package org.qortal.controller.arbitrary; package org.qortal.controller.arbitrary;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.qortal.api.resource.TransactionsResource.ConfirmationStatus; import org.qortal.api.resource.TransactionsResource.ConfirmationStatus;
import org.qortal.arbitrary.ArbitraryDataBuildQueueItem;
import org.qortal.controller.Controller; import org.qortal.controller.Controller;
import org.qortal.data.network.ArbitraryPeerData; import org.qortal.data.network.ArbitraryPeerData;
import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.ArbitraryTransactionData;
@ -38,8 +35,6 @@ public class ArbitraryDataManager extends Thread {
private static ArbitraryDataManager instance; private static ArbitraryDataManager instance;
private final Object peerDataLock = new Object(); private final Object peerDataLock = new Object();
private boolean buildInProgress = false;
private volatile boolean isStopping = false; private volatile boolean isStopping = false;
/** /**
@ -78,15 +73,6 @@ public class ArbitraryDataManager extends Thread {
*/ */
private static long ARBITRARY_DATA_CACHE_TIMEOUT = 60 * 60 * 1000L; // 60 minutes private static long ARBITRARY_DATA_CACHE_TIMEOUT = 60 * 60 * 1000L; // 60 minutes
/**
* Map to keep track of arbitrary transaction resources currently being built (or queued).
*/
public Map<String, ArbitraryDataBuildQueueItem> arbitraryDataBuildQueue = Collections.synchronizedMap(new HashMap<>());
/**
* Map to keep track of failed arbitrary transaction builds.
*/
public Map<String, ArbitraryDataBuildQueueItem> arbitraryDataFailedBuilds = Collections.synchronizedMap(new HashMap<>());
private ArbitraryDataManager() { private ArbitraryDataManager() {
@ -103,11 +89,6 @@ public class ArbitraryDataManager extends Thread {
public void run() { public void run() {
Thread.currentThread().setName("Arbitrary Data Manager"); Thread.currentThread().setName("Arbitrary Data Manager");
// Use a fixed thread pool to execute the arbitrary data build actions (currently just a single thread)
// This can be expanded to have multiple threads processing the build queue when needed
ExecutorService arbitraryDataBuildExecutor = Executors.newFixedThreadPool(1);
arbitraryDataBuildExecutor.execute(new ArbitraryDataBuilderThread());
// Keep a reference to the storage manager as we will need this a lot // Keep a reference to the storage manager as we will need this a lot
ArbitraryDataStorageManager storageManager = ArbitraryDataStorageManager.getInstance(); ArbitraryDataStorageManager storageManager = ArbitraryDataStorageManager.getInstance();
@ -313,14 +294,6 @@ public class ArbitraryDataManager extends Thread {
arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() < requestMinimumTimestamp); arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() < requestMinimumTimestamp);
} }
public void cleanupQueues(Long now) {
if (now == null) {
return;
}
arbitraryDataBuildQueue.entrySet().removeIf(entry -> entry.getValue().hasReachedBuildTimeout(now));
arbitraryDataFailedBuilds.entrySet().removeIf(entry -> entry.getValue().hasReachedFailureTimeout(now));
}
// Arbitrary data resource cache // Arbitrary data resource cache
public boolean isResourceCached(String resourceId) { public boolean isResourceCached(String resourceId) {
@ -373,117 +346,6 @@ public class ArbitraryDataManager extends Thread {
this.arbitraryDataCachedResources.put(resourceId, timestamp); this.arbitraryDataCachedResources.put(resourceId, timestamp);
} }
// Build queue
public boolean addToBuildQueue(ArbitraryDataBuildQueueItem queueItem) {
String resourceId = queueItem.getResourceId();
if (resourceId == null) {
return false;
}
resourceId = resourceId.toLowerCase();
if (this.arbitraryDataBuildQueue == null) {
return false;
}
if (NTP.getTime() == null) {
// Can't use queues until we have synced the time
return false;
}
// Don't add builds that have failed recently
if (this.isInFailedBuildsList(queueItem)) {
return false;
}
if (this.arbitraryDataBuildQueue.put(resourceId, queueItem) != null) {
// Already in queue
return true;
}
LOGGER.info("Added {} to build queue", resourceId);
// Added to queue
return true;
}
public boolean isInBuildQueue(ArbitraryDataBuildQueueItem queueItem) {
String resourceId = queueItem.getResourceId();
if (resourceId == null) {
return false;
}
if (this.arbitraryDataBuildQueue == null) {
return false;
}
if (this.arbitraryDataBuildQueue.containsKey(resourceId)) {
// Already in queue
return true;
}
// Not in queue
return false;
}
// Failed builds
public boolean addToFailedBuildsList(ArbitraryDataBuildQueueItem queueItem) {
String resourceId = queueItem.getResourceId();
if (resourceId == null) {
return false;
}
if (this.arbitraryDataFailedBuilds == null) {
return false;
}
if (NTP.getTime() == null) {
// Can't use queues until we have synced the time
return false;
}
if (this.arbitraryDataFailedBuilds.put(resourceId, queueItem) != null) {
// Already in list
return true;
}
LOGGER.info("Added {} to failed builds list", resourceId);
// Added to queue
return true;
}
public boolean isInFailedBuildsList(ArbitraryDataBuildQueueItem queueItem) {
String resourceId = queueItem.getResourceId();
if (resourceId == null) {
return false;
}
resourceId = resourceId.toLowerCase();
if (this.arbitraryDataFailedBuilds == null) {
return false;
}
if (this.arbitraryDataFailedBuilds.containsKey(resourceId)) {
// Already in list
return true;
}
// Not in list
return false;
}
public void setBuildInProgress(boolean buildInProgress) {
this.buildInProgress = buildInProgress;
}
public boolean getBuildInProgress() {
return this.buildInProgress;
}
// Network handlers // Network handlers
@ -611,8 +473,9 @@ public class ArbitraryDataManager extends Thread {
} }
// Also remove from the failed builds queue in case it previously failed due to missing chunks // Also remove from the failed builds queue in case it previously failed due to missing chunks
if (this.arbitraryDataFailedBuilds.containsKey(resourceId)) { ArbitraryDataBuildManager buildManager = ArbitraryDataBuildManager.getInstance();
this.arbitraryDataFailedBuilds.remove(resourceId); if (buildManager.arbitraryDataFailedBuilds.containsKey(resourceId)) {
buildManager.arbitraryDataFailedBuilds.remove(resourceId);
} }
} }