Cache updating moved to a dedicated thread.

Hopeful fix for serialization failures which occurred when updating from various different network threads.
This commit is contained in:
CalDescent 2023-05-12 19:39:31 +01:00
parent 7725c5e21f
commit b661d39844
7 changed files with 181 additions and 83 deletions

View File

@ -45,6 +45,7 @@ import org.qortal.arbitrary.metadata.ArbitraryDataTransactionMetadata;
import org.qortal.arbitrary.misc.Category; import org.qortal.arbitrary.misc.Category;
import org.qortal.arbitrary.misc.Service; import org.qortal.arbitrary.misc.Service;
import org.qortal.controller.Controller; import org.qortal.controller.Controller;
import org.qortal.controller.arbitrary.ArbitraryDataCacheManager;
import org.qortal.controller.arbitrary.ArbitraryDataRenderManager; import org.qortal.controller.arbitrary.ArbitraryDataRenderManager;
import org.qortal.controller.arbitrary.ArbitraryDataStorageManager; import org.qortal.controller.arbitrary.ArbitraryDataStorageManager;
import org.qortal.controller.arbitrary.ArbitraryMetadataManager; import org.qortal.controller.arbitrary.ArbitraryMetadataManager;
@ -1133,7 +1134,7 @@ public class ArbitraryResource {
Security.checkApiCallAllowed(request); Security.checkApiCallAllowed(request);
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
RepositoryManager.buildArbitraryResourcesCache(repository, true); ArbitraryDataCacheManager.getInstance().buildArbitraryResourcesCache(repository, true);
return "true"; return "true";
} catch (DataException e) { } catch (DataException e) {

View File

@ -403,7 +403,7 @@ public class Controller extends Thread {
RepositoryManager.setRequestedCheckpoint(Boolean.TRUE); RepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
RepositoryManager.buildArbitraryResourcesCache(repository, false); ArbitraryDataCacheManager.getInstance().buildArbitraryResourcesCache(repository, false);
} }
} }
catch (DataException e) { catch (DataException e) {
@ -485,6 +485,7 @@ public class Controller extends Thread {
LOGGER.info("Starting arbitrary-transaction controllers"); LOGGER.info("Starting arbitrary-transaction controllers");
ArbitraryDataManager.getInstance().start(); ArbitraryDataManager.getInstance().start();
ArbitraryDataFileManager.getInstance().start(); ArbitraryDataFileManager.getInstance().start();
ArbitraryDataCacheManager.getInstance().start();
ArbitraryDataBuildManager.getInstance().start(); ArbitraryDataBuildManager.getInstance().start();
ArbitraryDataCleanupManager.getInstance().start(); ArbitraryDataCleanupManager.getInstance().start();
ArbitraryDataStorageManager.getInstance().start(); ArbitraryDataStorageManager.getInstance().start();
@ -939,6 +940,7 @@ public class Controller extends Thread {
LOGGER.info("Shutting down arbitrary-transaction controllers"); LOGGER.info("Shutting down arbitrary-transaction controllers");
ArbitraryDataManager.getInstance().shutdown(); ArbitraryDataManager.getInstance().shutdown();
ArbitraryDataFileManager.getInstance().shutdown(); ArbitraryDataFileManager.getInstance().shutdown();
ArbitraryDataCacheManager.getInstance().shutdown();
ArbitraryDataBuildManager.getInstance().shutdown(); ArbitraryDataBuildManager.getInstance().shutdown();
ArbitraryDataCleanupManager.getInstance().shutdown(); ArbitraryDataCleanupManager.getInstance().shutdown();
ArbitraryDataStorageManager.getInstance().shutdown(); ArbitraryDataStorageManager.getInstance().shutdown();

View File

@ -0,0 +1,167 @@
package org.qortal.controller.arbitrary;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.api.resource.TransactionsResource;
import org.qortal.controller.Controller;
import org.qortal.data.arbitrary.ArbitraryResourceData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.gui.SplashFrame;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import org.qortal.transaction.ArbitraryTransaction;
import org.qortal.transaction.Transaction;
import org.qortal.utils.Base58;
import java.util.*;
public class ArbitraryDataCacheManager extends Thread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataCacheManager.class);
private static ArbitraryDataCacheManager instance;
private volatile boolean isStopping = false;
/** Queue of arbitrary transactions that require cache updates */
private final List<ArbitraryTransactionData> updateQueue = Collections.synchronizedList(new ArrayList<>());
public static synchronized ArbitraryDataCacheManager getInstance() {
if (instance == null) {
instance = new ArbitraryDataCacheManager();
}
return instance;
}
@Override
public void run() {
Thread.currentThread().setName("Arbitrary Data Cache Manager");
try {
while (!Controller.isStopping()) {
Thread.sleep(500L);
// Process queue
processResourceQueue();
}
} catch (InterruptedException e) {
// Fall through to exit thread
}
}
public void shutdown() {
isStopping = true;
this.interrupt();
}
private void processResourceQueue() {
if (this.updateQueue.isEmpty()) {
// Nothing to do
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
// Take a snapshot of resourceQueue, so we don't need to lock it while processing
List<ArbitraryTransactionData> resourceQueueCopy = List.copyOf(this.updateQueue);
for (ArbitraryTransactionData transactionData : resourceQueueCopy) {
// Best not to return when controller is stopping, as ideally we need to finish processing
LOGGER.debug(() -> String.format("Processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature())));
// Remove from the queue regardless of outcome
this.updateQueue.remove(transactionData);
// Update arbitrary resource caches
try {
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache();
arbitraryTransaction.updateArbitraryMetadataCache();
repository.saveChanges();
LOGGER.debug(() -> String.format("Finished processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature())));
} catch (DataException e) {
repository.discardChanges();
LOGGER.error("Repository issue while updating arbitrary resource caches", e);
}
}
} catch (DataException e) {
LOGGER.error("Repository issue while processing arbitrary resource cache updates", e);
}
}
public void addToUpdateQueue(ArbitraryTransactionData transactionData) {
this.updateQueue.add(transactionData);
LOGGER.debug(() -> String.format("Transaction %.8s added to queue", Base58.encode(transactionData.getSignature())));
}
public boolean buildArbitraryResourcesCache(Repository repository, boolean forceRebuild) throws DataException {
if (Settings.getInstance().isLite()) {
// Lite nodes have no blockchain
return false;
}
try {
// Check if QDNResources table is empty
List<ArbitraryResourceData> resources = repository.getArbitraryRepository().getArbitraryResources(10, 0, false);
if (!resources.isEmpty() && !forceRebuild) {
// Resources exist in the cache, so assume complete.
// We avoid checkpointing and prevent the node from starting up in the case of a rebuild failure, so
// we shouldn't ever be left in a partially rebuilt state.
LOGGER.debug("Arbitrary resources cache already built");
return false;
}
LOGGER.info("Building arbitrary resources cache...");
SplashFrame.getInstance().updateStatus("Building QDN cache - please wait...");
final int batchSize = 100;
int offset = 0;
// Loop through all ARBITRARY transactions, and determine latest state
while (!Controller.isStopping()) {
LOGGER.info("Fetching arbitrary transactions {} - {}", offset, offset+batchSize-1);
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, List.of(Transaction.TransactionType.ARBITRARY), null, null, null, TransactionsResource.ConfirmationStatus.BOTH, batchSize, offset, false);
if (signatures.isEmpty()) {
// Complete
break;
}
// Expand signatures to transactions
for (byte[] signature : signatures) {
ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository
.getTransactionRepository().fromSignature(signature);
if (transactionData.getService() == null) {
// Unsupported service - ignore this resource
continue;
}
// Update arbitrary resource caches
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache();
arbitraryTransaction.updateArbitraryMetadataCache();
}
offset += batchSize;
}
repository.saveChanges();
LOGGER.info("Completed build of arbitrary resources cache.");
return true;
}
catch (DataException e) {
LOGGER.info("Unable to build arbitrary resources cache: {}. The database may have been left in an inconsistent state.", e.getMessage());
// Throw an exception so that the node startup is halted, allowing for a retry next time.
repository.discardChanges();
throw new DataException("Build of arbitrary resources cache failed.");
}
}
}

View File

@ -564,10 +564,8 @@ public class ArbitraryDataManager extends Thread {
repository.getArbitraryRepository().delete(arbitraryResourceData); repository.getArbitraryRepository().delete(arbitraryResourceData);
} }
else { else {
// We found the next oldest transaction, so we can update the cache // We found the next oldest transaction, so add to queue for processing
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, latestTransactionData); ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData);
arbitraryTransaction.updateArbitraryResourceCache();
arbitraryTransaction.updateArbitraryMetadataCache();
} }
; ;
repository.saveChanges(); repository.saveChanges();

View File

@ -15,7 +15,6 @@ import org.qortal.repository.DataException;
import org.qortal.repository.Repository; import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager; import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings; import org.qortal.settings.Settings;
import org.qortal.transaction.ArbitraryTransaction;
import org.qortal.utils.Base58; import org.qortal.utils.Base58;
import org.qortal.utils.ListUtils; import org.qortal.utils.ListUtils;
import org.qortal.utils.NTP; import org.qortal.utils.NTP;
@ -354,13 +353,9 @@ public class ArbitraryMetadataManager {
} }
} }
// Update arbitrary resource caches // Add to resource queue to update arbitrary resource caches
if (arbitraryTransactionData != null) { if (arbitraryTransactionData != null) {
repository.discardChanges(); ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData);
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, arbitraryTransactionData);
arbitraryTransaction.updateArbitraryResourceCache();
arbitraryTransaction.updateArbitraryMetadataCache();
repository.saveChanges();
} }
} catch (DataException e) { } catch (DataException e) {

View File

@ -65,70 +65,6 @@ public abstract class RepositoryManager {
} }
} }
public static boolean buildArbitraryResourcesCache(Repository repository, boolean forceRebuild) throws DataException {
if (Settings.getInstance().isLite()) {
// Lite nodes have no blockchain
return false;
}
try {
// Check if QDNResources table is empty
List<ArbitraryResourceData> resources = repository.getArbitraryRepository().getArbitraryResources(10, 0, false);
if (!resources.isEmpty() && !forceRebuild) {
// Resources exist in the cache, so assume complete.
// We avoid checkpointing and prevent the node from starting up in the case of a rebuild failure, so
// we shouldn't ever be left in a partially rebuilt state.
LOGGER.debug("Arbitrary resources cache already built");
return false;
}
LOGGER.info("Building arbitrary resources cache...");
SplashFrame.getInstance().updateStatus("Building QDN cache - please wait...");
final int batchSize = 100;
int offset = 0;
// Loop through all ARBITRARY transactions, and determine latest state
while (!Controller.isStopping()) {
LOGGER.info("Fetching arbitrary transactions {} - {}", offset, offset+batchSize-1);
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, List.of(Transaction.TransactionType.ARBITRARY), null, null, null, TransactionsResource.ConfirmationStatus.BOTH, batchSize, offset, false);
if (signatures.isEmpty()) {
// Complete
break;
}
// Expand signatures to transactions
for (byte[] signature : signatures) {
ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository
.getTransactionRepository().fromSignature(signature);
if (transactionData.getService() == null) {
// Unsupported service - ignore this resource
continue;
}
// Update arbitrary resource caches
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache();
arbitraryTransaction.updateArbitraryMetadataCache();
}
offset += batchSize;
}
repository.saveChanges();
LOGGER.info("Completed build of arbitrary resources cache.");
return true;
}
catch (DataException e) {
LOGGER.info("Unable to build arbitrary resources cache: {}. The database may have been left in an inconsistent state.", e.getMessage());
// Throw an exception so that the node startup is halted, allowing for a retry next time.
repository.discardChanges();
throw new DataException("Build of arbitrary resources cache failed.");
}
}
public static void setRequestedCheckpoint(Boolean quick) { public static void setRequestedCheckpoint(Boolean quick) {
quickCheckpointRequested = quick; quickCheckpointRequested = quick;
} }

View File

@ -12,6 +12,7 @@ import org.qortal.arbitrary.ArbitraryDataResource;
import org.qortal.arbitrary.metadata.ArbitraryDataTransactionMetadata; import org.qortal.arbitrary.metadata.ArbitraryDataTransactionMetadata;
import org.qortal.arbitrary.misc.Service; import org.qortal.arbitrary.misc.Service;
import org.qortal.block.BlockChain; import org.qortal.block.BlockChain;
import org.qortal.controller.arbitrary.ArbitraryDataCacheManager;
import org.qortal.controller.arbitrary.ArbitraryDataManager; import org.qortal.controller.arbitrary.ArbitraryDataManager;
import org.qortal.controller.repository.NamesDatabaseIntegrityCheck; import org.qortal.controller.repository.NamesDatabaseIntegrityCheck;
import org.qortal.crypto.Crypto; import org.qortal.crypto.Crypto;
@ -258,9 +259,8 @@ public class ArbitraryTransaction extends Transaction {
} }
} }
// Add to arbitrary resource caches // Add to queue for cache updates
this.updateArbitraryResourceCache(); ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData);
this.updateArbitraryMetadataCache();
} }
@Override @Override
@ -296,9 +296,8 @@ public class ArbitraryTransaction extends Transaction {
} }
} }
// Add/update arbitrary resource caches // Add to queue for cache updates
this.updateArbitraryResourceCache(); ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData);
this.updateArbitraryMetadataCache();
} catch (Exception e) { } catch (Exception e) {
// Log and ignore all exceptions. The cache is updated from other places too, and can be rebuilt if needed. // Log and ignore all exceptions. The cache is updated from other places too, and can be rebuilt if needed.