Unified the code to build the LatestATStates table, as it's now used by more than one class.

Note - the rebuildLatestAtStates() must never be used by two different classes at the same time, or AT states could be incorrectly deleted. It is okay at the moment as we don't run the AT states trimmer and pruner in the same app session. However we should probably synchronize this method so that we don't accidentally call it from two places in the future.
This commit is contained in:
CalDescent 2021-08-25 18:50:41 +01:00
parent 5127f94423
commit ca1379d9f8
6 changed files with 94 additions and 79 deletions

View File

@ -21,9 +21,8 @@ public class AtStatesTrimmer implements Runnable {
try (final Repository repository = RepositoryManager.getRepository()) {
int trimStartHeight = repository.getATRepository().getAtTrimHeight();
repository.getATRepository().prepareForAtStateTrimming();
repository.getATRepository().rebuildLatestAtStates();
repository.saveChanges();
PruneManager.getInstance().setBuiltLatestATStates(true);
while (!Controller.isStopping()) {
repository.discardChanges();
@ -64,7 +63,7 @@ public class AtStatesTrimmer implements Runnable {
if (upperTrimmableHeight > upperBatchHeight) {
trimStartHeight = upperBatchHeight;
repository.getATRepository().setAtTrimHeight(trimStartHeight);
repository.getATRepository().prepareForAtStateTrimming();
repository.getATRepository().rebuildLatestAtStates();
repository.saveChanges();
final int finalTrimStartHeight = trimStartHeight;

View File

@ -512,9 +512,8 @@ public class Controller extends Thread {
final long repositoryBackupInterval = Settings.getInstance().getRepositoryBackupInterval();
final long repositoryCheckpointInterval = Settings.getInstance().getRepositoryCheckpointInterval();
ExecutorService trimExecutor = Executors.newCachedThreadPool(new DaemonThreadFactory());
trimExecutor.execute(new AtStatesTrimmer());
trimExecutor.execute(new OnlineAccountsSignaturesTrimmer());
// Start executor service for trimming or pruning
PruneManager.getInstance().start();
try {
while (!isStopping) {
@ -599,13 +598,7 @@ public class Controller extends Thread {
Thread.interrupted();
// Fall-through to exit
} finally {
trimExecutor.shutdownNow();
try {
trimExecutor.awaitTermination(2L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// We tried...
}
PruneManager.getInstance().stop();
}
}

View File

@ -25,22 +25,14 @@ public class AtStatesPruner implements Runnable {
try (final Repository repository = RepositoryManager.getRepository()) {
int pruneStartHeight = repository.getATRepository().getAtPruneHeight();
// repository.getATRepository().prepareForAtStatePruning();
// repository.saveChanges();
repository.getATRepository().rebuildLatestAtStates();
repository.saveChanges();
while (!Controller.isStopping()) {
repository.discardChanges();
Thread.sleep(Settings.getInstance().getAtStatesPruneInterval());
if (PruneManager.getInstance().getBuiltLatestATStates() == false) {
// Wait for latest AT states table to be built first
// This has a dependency on the AtStatesTrimmer running,
// which should be okay, given that it isn't something
// is disabled in normal operation.
continue;
}
BlockData chainTip = Controller.getInstance().getChainTip();
if (chainTip == null || NTP.getTime() == null)
continue;
@ -63,8 +55,11 @@ public class AtStatesPruner implements Runnable {
int numAtStatesPruned = repository.getATRepository().pruneAtStates(pruneStartHeight, upperPruneHeight);
repository.saveChanges();
int numAtStateDataRowsTrimmed = repository.getATRepository().trimAtStates(
pruneStartHeight, upperPruneHeight, Settings.getInstance().getAtStatesTrimLimit());
repository.saveChanges();
if (numAtStatesPruned > 0) {
if (numAtStatesPruned > 0 || numAtStateDataRowsTrimmed > 0) {
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.debug(() -> String.format("Pruned %d AT state%s between blocks %d and %d",
numAtStatesPruned, (numAtStatesPruned != 1 ? "s" : ""),
@ -74,12 +69,17 @@ public class AtStatesPruner implements Runnable {
if (upperPrunableHeight > upperBatchHeight) {
pruneStartHeight = upperBatchHeight;
repository.getATRepository().setAtPruneHeight(pruneStartHeight);
repository.getATRepository().prepareForAtStatePruning();
repository.getATRepository().rebuildLatestAtStates();
repository.saveChanges();
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.debug(() -> String.format("Bumping AT state base prune height to %d", finalPruneStartHeight));
}
else {
// We've pruned up to the upper prunable height
// Back off for a while to save CPU for syncing
Thread.sleep(5*60*1000L);
}
}
}
} catch (DataException e) {

View File

@ -1,7 +1,9 @@
package org.qortal.controller.pruning;
import org.qortal.controller.AtStatesTrimmer;
import org.qortal.controller.Controller;
import org.qortal.controller.OnlineAccountsSignaturesTrimmer;
import org.qortal.data.block.BlockData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
@ -10,6 +12,7 @@ import org.qortal.utils.DaemonThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PruneManager {
@ -17,13 +20,11 @@ public class PruneManager {
private boolean pruningEnabled = Settings.getInstance().isPruningEnabled();
private int pruneBlockLimit = Settings.getInstance().getPruneBlockLimit();
private boolean builtLatestATStates = false;
private ExecutorService executorService;
private PruneManager() {
// Start individual pruning processes
ExecutorService pruneExecutor = Executors.newCachedThreadPool(new DaemonThreadFactory());
pruneExecutor.execute(new AtStatesPruner());
pruneExecutor.execute(new BlockPruner());
}
public static synchronized PruneManager getInstance() {
@ -33,6 +34,42 @@ public class PruneManager {
return instance;
}
public void start() {
this.executorService = Executors.newCachedThreadPool(new DaemonThreadFactory());
// Don't allow both the pruner and the trimmer to run at the same time.
// In pruning mode, we are already deleting far more than we would when trimming.
// In non-pruning mode, we still need to trim to keep the non-essential data
// out of the database. There isn't a case where both are needed at once.
// If we ever do need to enable both at once, be very careful with the AT state
// trimming, since both currently rely on having exclusive access to the
// prepareForAtStateTrimming() method. For both trimming and pruning to take place
// at once, we would need to synchronize this method in a way that both can't
// call it at the same time, as otherwise active ATs would be pruned/trimmed when
// they should have been kept.
if (Settings.getInstance().isPruningEnabled()) {
// Pruning enabled - start the pruning processes
this.executorService.execute(new AtStatesPruner());
this.executorService.execute(new BlockPruner());
}
else {
// Pruning disabled - use trimming instead
this.executorService.execute(new AtStatesTrimmer());
this.executorService.execute(new OnlineAccountsSignaturesTrimmer());
}
}
public void stop() {
this.executorService.shutdownNow();
try {
this.executorService.awaitTermination(2L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// We tried...
}
}
public boolean isBlockPruned(int height, Repository repository) throws DataException {
if (!this.pruningEnabled) {
return false;
@ -49,13 +86,4 @@ public class PruneManager {
return (height < latestUnprunedHeight);
}
public void setBuiltLatestATStates(boolean builtLatestATStates) {
this.builtLatestATStates = builtLatestATStates;
}
public boolean getBuiltLatestATStates() {
return this.builtLatestATStates;
}
}

View File

@ -112,6 +112,11 @@ public interface ATRepository {
*/
public List<ATStateData> getBlockATStatesAtHeight(int height) throws DataException;
/** Rebuild the latest AT states cache, necessary for AT state trimming/pruning. */
public void rebuildLatestAtStates() throws DataException;
/** Returns height of first trimmable AT state. */
public int getAtTrimHeight() throws DataException;
@ -121,9 +126,6 @@ public interface ATRepository {
*/
public void setAtTrimHeight(int trimHeight) throws DataException;
/** Hook to allow repository to prepare/cache info for AT state trimming. */
public void prepareForAtStateTrimming() throws DataException;
/** Trims full AT state data between passed heights. Returns number of trimmed rows. */
public int trimAtStates(int minHeight, int maxHeight, int limit) throws DataException;
@ -137,9 +139,6 @@ public interface ATRepository {
*/
public void setAtPruneHeight(int pruneHeight) throws DataException;
/** Hook to allow repository to prepare/cache info for AT state pruning. */
public void prepareForAtStatePruning() throws DataException;
/** Prunes full AT state data between passed heights. Returns number of pruned rows. */
public int pruneAtStates(int minHeight, int maxHeight) throws DataException;

View File

@ -8,7 +8,7 @@ import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.data.account.AccountData;
import org.qortal.controller.Controller;
import org.qortal.data.at.ATData;
import org.qortal.data.at.ATStateData;
import org.qortal.repository.ATRepository;
@ -601,6 +601,35 @@ public class HSQLDBATRepository implements ATRepository {
return atStates;
}
@Override
public void rebuildLatestAtStates() throws DataException {
// Rebuild cache of latest AT states that we can't trim
String deleteSql = "DELETE FROM LatestATStates";
try {
this.repository.executeCheckedUpdate(deleteSql);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to delete temporary latest AT states cache from repository", e);
}
String insertSql = "INSERT INTO LatestATStates ("
+ "SELECT AT_address, height FROM ATs "
+ "CROSS JOIN LATERAL("
+ "SELECT height FROM ATStates "
+ "WHERE ATStates.AT_address = ATs.AT_address "
+ "ORDER BY AT_address DESC, height DESC LIMIT 1"
+ ") "
+ ")";
try {
this.repository.executeCheckedUpdate(insertSql);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to populate temporary latest AT states cache in repository", e);
}
}
@Override
public int getAtTrimHeight() throws DataException {
String sql = "SELECT AT_trim_height FROM DatabaseInfo";
@ -632,33 +661,6 @@ public class HSQLDBATRepository implements ATRepository {
}
}
@Override
public void prepareForAtStateTrimming() throws DataException {
// Rebuild cache of latest AT states that we can't trim
String deleteSql = "DELETE FROM LatestATStates";
try {
this.repository.executeCheckedUpdate(deleteSql);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to delete temporary latest AT states cache from repository", e);
}
String insertSql = "INSERT INTO LatestATStates ("
+ "SELECT AT_address, height FROM ATs "
+ "CROSS JOIN LATERAL("
+ "SELECT height FROM ATStates "
+ "WHERE ATStates.AT_address = ATs.AT_address "
+ "ORDER BY AT_address DESC, height DESC LIMIT 1"
+ ") "
+ ")";
try {
this.repository.executeCheckedUpdate(insertSql);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to populate temporary latest AT states cache in repository", e);
}
}
@Override
public int trimAtStates(int minHeight, int maxHeight, int limit) throws DataException {
if (minHeight >= maxHeight)
@ -715,12 +717,6 @@ public class HSQLDBATRepository implements ATRepository {
}
}
@Override
public void prepareForAtStatePruning() throws DataException {
// Use LatestATStates table that was already built by AtStatesTrimmer
// The AtStatesPruner class checks that this process has completed first
}
@Override
public int pruneAtStates(int minHeight, int maxHeight) throws DataException {
int deletedCount = 0;