Compare commits

...

12 Commits

Author SHA1 Message Date
CalDescent
2479f2d65d Moved trimming and pruning classes into a single package (org.qortal.controller.repository) 2021-08-27 09:45:56 +01:00
CalDescent
9056cb7026 Increased atStatesPruneBatchSize from 10 to 25. 2021-08-27 09:45:56 +01:00
CalDescent
cd9d9b31ef Prune ATStatesData as well as the ATStates when switching to pruning mode. 2021-08-27 09:45:56 +01:00
CalDescent
ff841c28e3 Updated tests to use the renamed method. 2021-08-27 09:45:56 +01:00
CalDescent
ca1379d9f8 Unified the code to build the LatestATStates table, as it's now used by more than one class.
Note - the rebuildLatestAtStates() must never be used by two different classes at the same time, or AT states could be incorrectly deleted. It is okay at the moment as we don't run the AT states trimmer and pruner in the same app session. However we should probably synchronize this method so that we don't accidentally call it from two places in the future.
2021-08-27 09:45:56 +01:00
CalDescent
5127f94423 Added bulk pruning phase on node startup the first time that pruning mode is enabled.
When switching from a full node to a pruning node, we need to delete most of the database contents. If we do this entirely as a background process, it is very slow and can interfere with syncing. However, if we take the approach of transferring only the necessary rows to a new table and then deleting the original table, this makes the process much faster. It was taking several days to delete the AT states in the background, but only a couple of minutes to copy them to a new table.

The trade off is that we have to go through a form of "reshape" when starting the app for the first time after enabling pruning mode. But given that this is an opt-in mode, I don't think it will be a problem.

Once the pruning is complete, it automatically performs a CHECKPOINT DEFRAG in order to shrink the database file size down to a fraction of what it was before.

From this point, the original background process will run, but can be dialled right down so not to interfere with syncing.
2021-08-27 09:45:56 +01:00
CalDescent
f5910ab950 Break out of the AT pruning inner loops if we're stopping the app. 2021-08-27 09:45:56 +01:00
CalDescent
22efaccd4a Fixed NPE introduced in earlier commit. 2021-08-27 09:45:56 +01:00
CalDescent
c8466a2e7a Updated AT states pruner as it previously relied on blocks being present in the db to make decisions. As a side effect, this now prunes ATs up the the pruneBlockLimit too, rather than keeping the last 35 days or so. Will review this later but I don't think we will need the missing ones. 2021-08-27 09:45:56 +01:00
CalDescent
209a9fa8c3 Rework of Blockchain.validate() to account for pruning mode. 2021-08-27 09:45:56 +01:00
CalDescent
bc1af12655 Prune all blocks up until the blockPruneLimit
By default, this leaves only the last 1450 blocks in the database. Only applies when pruning mode is enabled.
2021-08-27 09:45:55 +01:00
CalDescent
e7e4cb7579 Started work on pruning mode (top-only-sync)
Initially just deleting old and unused AT states, to get this table under control. I have had to delete them individually as the table can't handle complex queries due to its size.

Nodes in pruning mode will be unable to serve older blocks to peers.
2021-08-27 09:45:55 +01:00
16 changed files with 885 additions and 63 deletions

View File

@@ -506,28 +506,51 @@ public class BlockChain {
* @throws SQLException
*/
public static void validate() throws DataException {
// Check first block is Genesis Block
if (!isGenesisBlockValid())
rebuildBlockchain();
try (final Repository repository = RepositoryManager.getRepository()) {
boolean pruningEnabled = Settings.getInstance().isPruningEnabled();
BlockData chainTip = repository.getBlockRepository().getLastBlock();
boolean hasBlocks = (chainTip != null && chainTip.getHeight() > 1);
if (pruningEnabled && hasBlocks) {
// Pruning is enabled and we have blocks, so it's possible that the genesis block has been pruned
// It's best not to validate it, and there's no real need to
}
else {
// Check first block is Genesis Block
if (!isGenesisBlockValid()) {
rebuildBlockchain();
}
}
repository.checkConsistency();
int startHeight = Math.max(repository.getBlockRepository().getBlockchainHeight() - 1440, 1);
// Set the number of blocks to validate based on the pruned state of the chain
// If pruned, subtract an extra 10 to allow room for error
int blocksToValidate = pruningEnabled ? Settings.getInstance().getPruneBlockLimit() - 10 : 1440;
int startHeight = Math.max(repository.getBlockRepository().getBlockchainHeight() - blocksToValidate, 1);
BlockData detachedBlockData = repository.getBlockRepository().getDetachedBlockSignature(startHeight);
if (detachedBlockData != null) {
LOGGER.error(String.format("Block %d's reference does not match any block's signature", detachedBlockData.getHeight()));
// Wait for blockchain lock (whereas orphan() only tries to get lock)
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
blockchainLock.lock();
try {
LOGGER.info(String.format("Orphaning back to block %d", detachedBlockData.getHeight() - 1));
orphan(detachedBlockData.getHeight() - 1);
} finally {
blockchainLock.unlock();
// Orphan if we aren't a pruning node
if (!Settings.getInstance().isPruningEnabled()) {
// Wait for blockchain lock (whereas orphan() only tries to get lock)
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
blockchainLock.lock();
try {
LOGGER.info(String.format("Orphaning back to block %d", detachedBlockData.getHeight() - 1));
orphan(detachedBlockData.getHeight() - 1);
} finally {
blockchainLock.unlock();
}
}
else {
LOGGER.error(String.format("Not orphaning because we are in pruning mode. You may be on an " +
"invalid chain and should consider bootstrapping or re-syncing from genesis."));
}
}
}

View File

@@ -24,7 +24,6 @@ import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
@@ -46,6 +45,7 @@ import org.qortal.block.Block;
import org.qortal.block.BlockChain;
import org.qortal.block.BlockChain.BlockTimingByHeight;
import org.qortal.controller.Synchronizer.SynchronizationResult;
import org.qortal.controller.repository.PruneManager;
import org.qortal.controller.tradebot.TradeBot;
import org.qortal.crypto.Crypto;
import org.qortal.data.account.MintingAccountData;
@@ -95,7 +95,6 @@ import org.qortal.transaction.Transaction.TransactionType;
import org.qortal.transaction.Transaction.ValidationResult;
import org.qortal.utils.Base58;
import org.qortal.utils.ByteArray;
import org.qortal.utils.DaemonThreadFactory;
import org.qortal.utils.NTP;
import org.qortal.utils.Triple;
@@ -357,7 +356,7 @@ public class Controller extends Thread {
return this.savedArgs;
}
/* package */ static boolean isStopping() {
public static boolean isStopping() {
return isStopping;
}
@@ -415,6 +414,7 @@ public class Controller extends Thread {
try {
RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getRepositoryUrl());
RepositoryManager.setRepositoryFactory(repositoryFactory);
RepositoryManager.prune();
} catch (DataException e) {
// If exception has no cause then repository is in use by some other process.
if (e.getCause() == null) {
@@ -510,9 +510,8 @@ public class Controller extends Thread {
final long repositoryBackupInterval = Settings.getInstance().getRepositoryBackupInterval();
final long repositoryCheckpointInterval = Settings.getInstance().getRepositoryCheckpointInterval();
ExecutorService trimExecutor = Executors.newCachedThreadPool(new DaemonThreadFactory());
trimExecutor.execute(new AtStatesTrimmer());
trimExecutor.execute(new OnlineAccountsSignaturesTrimmer());
// Start executor service for trimming or pruning
PruneManager.getInstance().start();
try {
while (!isStopping) {
@@ -597,13 +596,7 @@ public class Controller extends Thread {
Thread.interrupted();
// Fall-through to exit
} finally {
trimExecutor.shutdownNow();
try {
trimExecutor.awaitTermination(2L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// We tried...
}
PruneManager.getInstance().stop();
}
}
@@ -1286,6 +1279,13 @@ public class Controller extends Thread {
try (final Repository repository = RepositoryManager.getRepository()) {
BlockData blockData = repository.getBlockRepository().fromSignature(signature);
if (blockData != null) {
if (PruneManager.getInstance().isBlockPruned(blockData.getHeight(), repository)) {
// If this is a pruned block, we likely only have partial data, so best not to sent it
blockData = null;
}
}
if (blockData == null) {
// We don't have this block
this.stats.getBlockMessageStats.unknownBlocks.getAndIncrement();
@@ -1407,6 +1407,14 @@ public class Controller extends Thread {
BlockData blockData = repository.getBlockRepository().fromReference(parentSignature);
if (blockData != null) {
if (PruneManager.getInstance().isBlockPruned(blockData.getHeight(), repository)) {
// If this request contains a pruned block, we likely only have partial data, so best not to sent anything
// We always prune from the oldest first, so it's fine to just check the first block requested
blockData = null;
}
}
while (blockData != null && blockSummaries.size() < numberRequested) {
BlockSummaryData blockSummary = new BlockSummaryData(blockData);
blockSummaries.add(blockSummary);

View File

@@ -0,0 +1,92 @@
package org.qortal.controller.repository;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.data.block.BlockData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import org.qortal.utils.NTP;
public class AtStatesPruner implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(AtStatesPruner.class);
@Override
public void run() {
Thread.currentThread().setName("AT States pruner");
if (!Settings.getInstance().isPruningEnabled()) {
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
int pruneStartHeight = repository.getATRepository().getAtPruneHeight();
repository.getATRepository().rebuildLatestAtStates();
repository.saveChanges();
while (!Controller.isStopping()) {
repository.discardChanges();
Thread.sleep(Settings.getInstance().getAtStatesPruneInterval());
BlockData chainTip = Controller.getInstance().getChainTip();
if (chainTip == null || NTP.getTime() == null)
continue;
// Don't even attempt if we're mid-sync as our repository requests will be delayed for ages
if (Controller.getInstance().isSynchronizing())
continue;
// Prune AT states for all blocks up until our latest minus pruneBlockLimit
final int ourLatestHeight = chainTip.getHeight();
final int upperPrunableHeight = ourLatestHeight - Settings.getInstance().getPruneBlockLimit();
int upperBatchHeight = pruneStartHeight + Settings.getInstance().getAtStatesPruneBatchSize();
int upperPruneHeight = Math.min(upperBatchHeight, upperPrunableHeight);
if (pruneStartHeight >= upperPruneHeight)
continue;
LOGGER.debug(String.format("Pruning AT states between blocks %d and %d...", pruneStartHeight, upperPruneHeight));
int numAtStatesPruned = repository.getATRepository().pruneAtStates(pruneStartHeight, upperPruneHeight);
repository.saveChanges();
int numAtStateDataRowsTrimmed = repository.getATRepository().trimAtStates(
pruneStartHeight, upperPruneHeight, Settings.getInstance().getAtStatesTrimLimit());
repository.saveChanges();
if (numAtStatesPruned > 0 || numAtStateDataRowsTrimmed > 0) {
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.debug(() -> String.format("Pruned %d AT state%s between blocks %d and %d",
numAtStatesPruned, (numAtStatesPruned != 1 ? "s" : ""),
finalPruneStartHeight, upperPruneHeight));
} else {
// Can we move onto next batch?
if (upperPrunableHeight > upperBatchHeight) {
pruneStartHeight = upperBatchHeight;
repository.getATRepository().setAtPruneHeight(pruneStartHeight);
repository.getATRepository().rebuildLatestAtStates();
repository.saveChanges();
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.debug(() -> String.format("Bumping AT state base prune height to %d", finalPruneStartHeight));
}
else {
// We've pruned up to the upper prunable height
// Back off for a while to save CPU for syncing
Thread.sleep(5*60*1000L);
}
}
}
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue trying to prune AT states: %s", e.getMessage()));
} catch (InterruptedException e) {
// Time to exit
}
}
}

View File

@@ -1,7 +1,8 @@
package org.qortal.controller;
package org.qortal.controller.repository;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.data.block.BlockData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
@@ -20,7 +21,7 @@ public class AtStatesTrimmer implements Runnable {
try (final Repository repository = RepositoryManager.getRepository()) {
int trimStartHeight = repository.getATRepository().getAtTrimHeight();
repository.getATRepository().prepareForAtStateTrimming();
repository.getATRepository().rebuildLatestAtStates();
repository.saveChanges();
while (!Controller.isStopping()) {
@@ -62,7 +63,7 @@ public class AtStatesTrimmer implements Runnable {
if (upperTrimmableHeight > upperBatchHeight) {
trimStartHeight = upperBatchHeight;
repository.getATRepository().setAtTrimHeight(trimStartHeight);
repository.getATRepository().prepareForAtStateTrimming();
repository.getATRepository().rebuildLatestAtStates();
repository.saveChanges();
final int finalTrimStartHeight = trimStartHeight;

View File

@@ -0,0 +1,86 @@
package org.qortal.controller.repository;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.data.block.BlockData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import org.qortal.utils.NTP;
public class BlockPruner implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(BlockPruner.class);
@Override
public void run() {
Thread.currentThread().setName("Block pruner");
if (!Settings.getInstance().isPruningEnabled()) {
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
int pruneStartHeight = repository.getBlockRepository().getBlockPruneHeight();
while (!Controller.isStopping()) {
repository.discardChanges();
Thread.sleep(Settings.getInstance().getBlockPruneInterval());
BlockData chainTip = Controller.getInstance().getChainTip();
if (chainTip == null || NTP.getTime() == null)
continue;
// Don't even attempt if we're mid-sync as our repository requests will be delayed for ages
if (Controller.getInstance().isSynchronizing())
continue;
// Prune all blocks up until our latest minus pruneBlockLimit
final int ourLatestHeight = chainTip.getHeight();
final int upperPrunableHeight = ourLatestHeight - Settings.getInstance().getPruneBlockLimit();
int upperBatchHeight = pruneStartHeight + Settings.getInstance().getBlockPruneBatchSize();
int upperPruneHeight = Math.min(upperBatchHeight, upperPrunableHeight);
if (pruneStartHeight >= upperPruneHeight) {
continue;
}
LOGGER.debug(String.format("Pruning blocks between %d and %d...", pruneStartHeight, upperPruneHeight));
int numBlocksPruned = repository.getBlockRepository().pruneBlocks(pruneStartHeight, upperPruneHeight);
repository.saveChanges();
if (numBlocksPruned > 0) {
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.debug(() -> String.format("Pruned %d block%s between %d and %d",
numBlocksPruned, (numBlocksPruned != 1 ? "s" : ""),
finalPruneStartHeight, upperPruneHeight));
} else {
// Can we move onto next batch?
if (upperPrunableHeight > upperBatchHeight) {
pruneStartHeight = upperBatchHeight;
repository.getBlockRepository().setBlockPruneHeight(pruneStartHeight);
repository.saveChanges();
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.debug(() -> String.format("Bumping block base prune height to %d", finalPruneStartHeight));
}
else {
// We've pruned up to the upper prunable height
// Back off for a while to save CPU for syncing
Thread.sleep(10*60*1000L);
}
}
}
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue trying to prune blocks: %s", e.getMessage()));
} catch (InterruptedException e) {
// Time to exit
}
}
}

View File

@@ -1,8 +1,9 @@
package org.qortal.controller;
package org.qortal.controller.repository;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.block.BlockChain;
import org.qortal.controller.Controller;
import org.qortal.data.block.BlockData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;

View File

@@ -0,0 +1,87 @@
package org.qortal.controller.repository;
import org.qortal.controller.Controller;
import org.qortal.data.block.BlockData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.settings.Settings;
import org.qortal.utils.DaemonThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PruneManager {
private static PruneManager instance;
private boolean pruningEnabled = Settings.getInstance().isPruningEnabled();
private int pruneBlockLimit = Settings.getInstance().getPruneBlockLimit();
private ExecutorService executorService;
private PruneManager() {
}
public static synchronized PruneManager getInstance() {
if (instance == null)
instance = new PruneManager();
return instance;
}
public void start() {
this.executorService = Executors.newCachedThreadPool(new DaemonThreadFactory());
// Don't allow both the pruner and the trimmer to run at the same time.
// In pruning mode, we are already deleting far more than we would when trimming.
// In non-pruning mode, we still need to trim to keep the non-essential data
// out of the database. There isn't a case where both are needed at once.
// If we ever do need to enable both at once, be very careful with the AT state
// trimming, since both currently rely on having exclusive access to the
// prepareForAtStateTrimming() method. For both trimming and pruning to take place
// at once, we would need to synchronize this method in a way that both can't
// call it at the same time, as otherwise active ATs would be pruned/trimmed when
// they should have been kept.
if (Settings.getInstance().isPruningEnabled()) {
// Pruning enabled - start the pruning processes
this.executorService.execute(new AtStatesPruner());
this.executorService.execute(new BlockPruner());
}
else {
// Pruning disabled - use trimming instead
this.executorService.execute(new AtStatesTrimmer());
this.executorService.execute(new OnlineAccountsSignaturesTrimmer());
}
}
public void stop() {
this.executorService.shutdownNow();
try {
this.executorService.awaitTermination(2L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// We tried...
}
}
public boolean isBlockPruned(int height, Repository repository) throws DataException {
if (!this.pruningEnabled) {
return false;
}
BlockData chainTip = Controller.getInstance().getChainTip();
if (chainTip == null) {
throw new DataException("Unable to determine chain tip when checking if a block is pruned");
}
final int ourLatestHeight = chainTip.getHeight();
final int latestUnprunedHeight = ourLatestHeight - this.pruneBlockLimit;
return (height < latestUnprunedHeight);
}
}

View File

@@ -112,6 +112,11 @@ public interface ATRepository {
*/
public List<ATStateData> getBlockATStatesAtHeight(int height) throws DataException;
/** Rebuild the latest AT states cache, necessary for AT state trimming/pruning. */
public void rebuildLatestAtStates() throws DataException;
/** Returns height of first trimmable AT state. */
public int getAtTrimHeight() throws DataException;
@@ -121,12 +126,23 @@ public interface ATRepository {
*/
public void setAtTrimHeight(int trimHeight) throws DataException;
/** Hook to allow repository to prepare/cache info for AT state trimming. */
public void prepareForAtStateTrimming() throws DataException;
/** Trims full AT state data between passed heights. Returns number of trimmed rows. */
public int trimAtStates(int minHeight, int maxHeight, int limit) throws DataException;
/** Returns height of first prunable AT state. */
public int getAtPruneHeight() throws DataException;
/** Sets new base height for AT state pruning.
* <p>
* NOTE: performs implicit <tt>repository.saveChanges()</tt>.
*/
public void setAtPruneHeight(int pruneHeight) throws DataException;
/** Prunes full AT state data between passed heights. Returns number of pruned rows. */
public int pruneAtStates(int minHeight, int maxHeight) throws DataException;
/**
* Save ATStateData into repository.
* <p>

View File

@@ -166,6 +166,20 @@ public interface BlockRepository {
*/
public BlockData getDetachedBlockSignature(int startHeight) throws DataException;
/** Returns height of first prunable block. */
public int getBlockPruneHeight() throws DataException;
/** Sets new base height for block pruning.
* <p>
* NOTE: performs implicit <tt>repository.saveChanges()</tt>.
*/
public void setBlockPruneHeight(int pruneHeight) throws DataException;
/** Prunes full block data between passed heights. Returns number of pruned rows. */
public int pruneBlocks(int minHeight, int maxHeight) throws DataException;
/**
* Saves block into repository.
*

View File

@@ -1,8 +1,14 @@
package org.qortal.repository;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.repository.hsqldb.HSQLDBDatabasePruning;
import org.qortal.settings.Settings;
import java.sql.SQLException;
public abstract class RepositoryManager {
private static final Logger LOGGER = LogManager.getLogger(RepositoryManager.class);
private static RepositoryFactory repositoryFactory = null;
@@ -51,6 +57,24 @@ public abstract class RepositoryManager {
}
}
public static void prune() {
// Bulk prune the database the first time we use pruning mode
if (Settings.getInstance().isPruningEnabled()) {
try {
boolean prunedATStates = HSQLDBDatabasePruning.pruneATStates();
boolean prunedBlocks = HSQLDBDatabasePruning.pruneBlocks();
// Perform repository maintenance to shrink the db size down
if (prunedATStates && prunedBlocks) {
HSQLDBDatabasePruning.performMaintenance();
}
} catch (SQLException | DataException e) {
LOGGER.info("Unable to bulk prune AT states. The database may have been left in an inconsistent state.");
}
}
}
public static void setRequestedCheckpoint(Boolean quick) {
quickCheckpointRequested = quick;
}

View File

@@ -8,6 +8,7 @@ import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.data.at.ATData;
import org.qortal.data.at.ATStateData;
import org.qortal.repository.ATRepository;
@@ -600,6 +601,35 @@ public class HSQLDBATRepository implements ATRepository {
return atStates;
}
@Override
public void rebuildLatestAtStates() throws DataException {
// Rebuild cache of latest AT states that we can't trim
String deleteSql = "DELETE FROM LatestATStates";
try {
this.repository.executeCheckedUpdate(deleteSql);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to delete temporary latest AT states cache from repository", e);
}
String insertSql = "INSERT INTO LatestATStates ("
+ "SELECT AT_address, height FROM ATs "
+ "CROSS JOIN LATERAL("
+ "SELECT height FROM ATStates "
+ "WHERE ATStates.AT_address = ATs.AT_address "
+ "ORDER BY AT_address DESC, height DESC LIMIT 1"
+ ") "
+ ")";
try {
this.repository.executeCheckedUpdate(insertSql);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to populate temporary latest AT states cache in repository", e);
}
}
@Override
public int getAtTrimHeight() throws DataException {
String sql = "SELECT AT_trim_height FROM DatabaseInfo";
@@ -631,33 +661,6 @@ public class HSQLDBATRepository implements ATRepository {
}
}
@Override
public void prepareForAtStateTrimming() throws DataException {
// Rebuild cache of latest AT states that we can't trim
String deleteSql = "DELETE FROM LatestATStates";
try {
this.repository.executeCheckedUpdate(deleteSql);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to delete temporary latest AT states cache from repository", e);
}
String insertSql = "INSERT INTO LatestATStates ("
+ "SELECT AT_address, height FROM ATs "
+ "CROSS JOIN LATERAL("
+ "SELECT height FROM ATStates "
+ "WHERE ATStates.AT_address = ATs.AT_address "
+ "ORDER BY AT_address DESC, height DESC LIMIT 1"
+ ") "
+ ")";
try {
this.repository.executeCheckedUpdate(insertSql);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to populate temporary latest AT states cache in repository", e);
}
}
@Override
public int trimAtStates(int minHeight, int maxHeight, int limit) throws DataException {
if (minHeight >= maxHeight)
@@ -682,6 +685,94 @@ public class HSQLDBATRepository implements ATRepository {
}
}
@Override
public int getAtPruneHeight() throws DataException {
String sql = "SELECT AT_prune_height FROM DatabaseInfo";
try (ResultSet resultSet = this.repository.checkedExecute(sql)) {
if (resultSet == null)
return 0;
return resultSet.getInt(1);
} catch (SQLException e) {
throw new DataException("Unable to fetch AT state prune height from repository", e);
}
}
@Override
public void setAtPruneHeight(int pruneHeight) throws DataException {
// trimHeightsLock is to prevent concurrent update on DatabaseInfo
// that could result in "transaction rollback: serialization failure"
synchronized (this.repository.trimHeightsLock) {
String updateSql = "UPDATE DatabaseInfo SET AT_prune_height = ?";
try {
this.repository.executeCheckedUpdate(updateSql, pruneHeight);
this.repository.saveChanges();
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to set AT state prune height in repository", e);
}
}
}
@Override
public int pruneAtStates(int minHeight, int maxHeight) throws DataException {
int deletedCount = 0;
for (int height=minHeight; height<maxHeight; height++) {
// Give up if we're stopping
if (Controller.isStopping()) {
return deletedCount;
}
// Get latest AT states for this height
List<String> atAddresses = new ArrayList<>();
String updateSql = "SELECT AT_address FROM LatestATStates WHERE height = ?";
try (ResultSet resultSet = this.repository.checkedExecute(updateSql, height)) {
if (resultSet != null) {
do {
String atAddress = resultSet.getString(1);
atAddresses.add(atAddress);
} while (resultSet.next());
}
} catch (SQLException e) {
throw new DataException("Unable to fetch flagged accounts from repository", e);
}
List<ATStateData> atStates = this.getBlockATStatesAtHeight(height);
for (ATStateData atState : atStates) {
//LOGGER.info("Found atState {} at height {}", atState.getATAddress(), atState.getHeight());
// Give up if we're stopping
if (Controller.isStopping()) {
return deletedCount;
}
if (atAddresses.contains(atState.getATAddress())) {
// We don't want to delete this AT state because it is still active
LOGGER.info("Skipping atState {} at height {}", atState.getATAddress(), atState.getHeight());
continue;
}
// Safe to delete everything else for this height
try {
this.repository.delete("ATStates", "AT_address = ? AND height = ?",
atState.getATAddress(), atState.getHeight());
deletedCount++;
} catch (SQLException e) {
throw new DataException("Unable to delete AT state data from repository", e);
}
}
}
return deletedCount;
}
@Override
public void save(ATStateData atStateData) throws DataException {
// We shouldn't ever save partial ATStateData

View File

@@ -509,6 +509,53 @@ public class HSQLDBBlockRepository implements BlockRepository {
}
}
@Override
public int getBlockPruneHeight() throws DataException {
String sql = "SELECT block_prune_height FROM DatabaseInfo";
try (ResultSet resultSet = this.repository.checkedExecute(sql)) {
if (resultSet == null)
return 0;
return resultSet.getInt(1);
} catch (SQLException e) {
throw new DataException("Unable to fetch block prune height from repository", e);
}
}
@Override
public void setBlockPruneHeight(int pruneHeight) throws DataException {
// trimHeightsLock is to prevent concurrent update on DatabaseInfo
// that could result in "transaction rollback: serialization failure"
synchronized (this.repository.trimHeightsLock) {
String updateSql = "UPDATE DatabaseInfo SET block_prune_height = ?";
try {
this.repository.executeCheckedUpdate(updateSql, pruneHeight);
this.repository.saveChanges();
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to set block prune height in repository", e);
}
}
}
@Override
public int pruneBlocks(int minHeight, int maxHeight) throws DataException {
// Don't prune the genesis block
if (minHeight <= 1) {
minHeight = 2;
}
try {
return this.repository.delete("Blocks", "height BETWEEN ? AND ?", minHeight, maxHeight);
} catch (SQLException e) {
throw new DataException("Unable to prune blocks from repository", e);
}
}
@Override
public BlockData getDetachedBlockSignature(int startHeight) throws DataException {
String sql = "SELECT " + BLOCK_DB_COLUMNS + " FROM Blocks "

View File

@@ -0,0 +1,282 @@
package org.qortal.repository.hsqldb;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.data.block.BlockData;
import org.qortal.repository.DataException;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
*
* When switching from a full node to a pruning node, we need to delete most of the database contents.
* If we do this entirely as a background process, it is very slow and can interfere with syncing.
* However, if we take the approach of transferring only the necessary rows to a new table and then
* deleting the original table, this makes the process much faster. It was taking several days to
* delete the AT states in the background, but only a couple of minutes to copy them to a new table.
*
* The trade off is that we have to go through a form of "reshape" when starting the app for the first
* time after enabling pruning mode. But given that this is an opt-in mode, I don't think it will be
* a problem.
*
* Once the pruning is complete, it automatically performs a CHECKPOINT DEFRAG in order to
* shrink the database file size down to a fraction of what it was before.
*
* From this point, the original background process will run, but can be dialled right down so not
* to interfere with syncing.
*
*/
public class HSQLDBDatabasePruning {
private static final Logger LOGGER = LogManager.getLogger(HSQLDBDatabasePruning.class);
public static boolean pruneATStates() throws SQLException, DataException {
try (final HSQLDBRepository repository = (HSQLDBRepository)RepositoryManager.getRepository()) {
// Only bulk prune AT states if we have never done so before
int pruneHeight = repository.getATRepository().getAtPruneHeight();
if (pruneHeight > 0) {
// Already pruned AT states
return false;
}
LOGGER.info("Starting bulk prune of AT states - this process could take a while... (approx. 2 mins on high spec)");
// Create new AT-states table to hold smaller dataset
repository.executeCheckedUpdate("DROP TABLE IF EXISTS ATStatesNew");
repository.executeCheckedUpdate("CREATE TABLE ATStatesNew ("
+ "AT_address QortalAddress, height INTEGER NOT NULL, state_hash ATStateHash NOT NULL, "
+ "fees QortalAmount NOT NULL, is_initial BOOLEAN NOT NULL, sleep_until_message_timestamp BIGINT, "
+ "PRIMARY KEY (AT_address, height), "
+ "FOREIGN KEY (AT_address) REFERENCES ATs (AT_address) ON DELETE CASCADE)");
repository.executeCheckedUpdate("SET TABLE ATStatesNew NEW SPACE");
repository.executeCheckedUpdate("CHECKPOINT");
// Find our latest block
BlockData latestBlock = repository.getBlockRepository().getLastBlock();
if (latestBlock == null) {
LOGGER.info("Unable to determine blockchain height, necessary for bulk block pruning");
return false;
}
// Calculate some constants for later use
final int blockchainHeight = latestBlock.getHeight();
final int maximumBlockToTrim = blockchainHeight - Settings.getInstance().getPruneBlockLimit();
final int startHeight = maximumBlockToTrim;
final int endHeight = blockchainHeight;
final int blockStep = 10000;
// Loop through all the LatestATStates and copy them to the new table
LOGGER.info("Copying AT states...");
for (int height = 0; height < endHeight; height += blockStep) {
//LOGGER.info(String.format("Copying AT states between %d and %d...", height, height + blockStep - 1));
String sql = "SELECT height, AT_address FROM LatestATStates WHERE height BETWEEN ? AND ?";
try (ResultSet latestAtStatesResultSet = repository.checkedExecute(sql, height, height + blockStep - 1)) {
if (latestAtStatesResultSet != null) {
do {
int latestAtHeight = latestAtStatesResultSet.getInt(1);
String latestAtAddress = latestAtStatesResultSet.getString(2);
// Copy this latest ATState to the new table
//LOGGER.info(String.format("Copying AT %s at height %d...", latestAtAddress, latestAtHeight));
try {
String updateSql = "INSERT INTO ATStatesNew ("
+ "SELECT AT_address, height, state_hash, fees, is_initial, sleep_until_message_timestamp "
+ "FROM ATStates "
+ "WHERE height = ? AND AT_address = ?)";
repository.executeCheckedUpdate(updateSql, latestAtHeight, latestAtAddress);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to copy ATStates", e);
}
if (height >= startHeight) {
// Now copy this AT states for each recent block it is present in
for (int i = startHeight; i < endHeight; i++) {
if (latestAtHeight < i) {
// This AT finished before this block so there is nothing to copy
continue;
}
//LOGGER.info(String.format("Copying recent AT %s at height %d...", latestAtAddress, i));
try {
// Copy each LatestATState to the new table
String updateSql = "INSERT IGNORE INTO ATStatesNew ("
+ "SELECT AT_address, height, state_hash, fees, is_initial, sleep_until_message_timestamp "
+ "FROM ATStates "
+ "WHERE height = ? AND AT_address = ?)";
repository.executeCheckedUpdate(updateSql, i, latestAtAddress);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to copy ATStates", e);
}
}
}
} while (latestAtStatesResultSet.next());
}
} catch (SQLException e) {
throw new DataException("Unable to copy AT states", e);
}
}
repository.saveChanges();
// Add a height index
LOGGER.info("Rebuilding AT states height index in repository");
repository.executeCheckedUpdate("CREATE INDEX IF NOT EXISTS ATStatesHeightIndex ON ATStatesNew (height)");
repository.executeCheckedUpdate("CHECKPOINT");
// Finally, drop the original table and rename
LOGGER.info("Deleting old AT states...");
repository.executeCheckedUpdate("DROP TABLE ATStates");
repository.executeCheckedUpdate("ALTER TABLE ATStatesNew RENAME TO ATStates");
repository.executeCheckedUpdate("CHECKPOINT");
// Update the prune height
repository.getATRepository().setAtPruneHeight(maximumBlockToTrim);
repository.saveChanges();
repository.executeCheckedUpdate("CHECKPOINT");
// Now prune/trim the ATStatesData, as this currently goes back over a month
return HSQLDBDatabasePruning.pruneATStateData();
}
}
/*
* Bulk prune ATStatesData to catch up with the now pruned ATStates table
* This uses the existing AT States trimming code but with a much higher end block
*/
private static boolean pruneATStateData() throws SQLException, DataException {
try (final HSQLDBRepository repository = (HSQLDBRepository) RepositoryManager.getRepository()) {
BlockData latestBlock = repository.getBlockRepository().getLastBlock();
if (latestBlock == null) {
LOGGER.info("Unable to determine blockchain height, necessary for bulk ATStatesData pruning");
return false;
}
final int blockchainHeight = latestBlock.getHeight();
final int upperPrunableHeight = blockchainHeight - Settings.getInstance().getPruneBlockLimit();
// ATStateData is already trimmed - so carry on from where we left off in the past
int pruneStartHeight = repository.getATRepository().getAtTrimHeight();
LOGGER.info("Starting bulk prune of AT states data - this process could take a while... (approx. 3 mins on high spec)");
while (pruneStartHeight < upperPrunableHeight) {
// Prune all AT state data up until our latest minus pruneBlockLimit
if (Controller.isStopping()) {
return false;
}
// Override batch size in the settings because this is a one-off process
final int batchSize = 1000;
final int rowLimitPerBatch = 50000;
int upperBatchHeight = pruneStartHeight + batchSize;
int upperPruneHeight = Math.min(upperBatchHeight, upperPrunableHeight);
LOGGER.trace(String.format("Pruning AT states data between %d and %d...", pruneStartHeight, upperPruneHeight));
int numATStatesPruned = repository.getATRepository().trimAtStates(pruneStartHeight, upperPruneHeight, rowLimitPerBatch);
repository.saveChanges();
if (numATStatesPruned > 0) {
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.trace(() -> String.format("Pruned %d AT states data rows between blocks %d and %d",
numATStatesPruned, finalPruneStartHeight, upperPruneHeight));
} else {
// Can we move onto next batch?
if (upperPrunableHeight > upperBatchHeight) {
pruneStartHeight = upperBatchHeight;
repository.getATRepository().setAtTrimHeight(pruneStartHeight);
// No need to rebuild the latest AT states as we aren't currently synchronizing
repository.saveChanges();
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.debug(() -> String.format("Bumping AT states trim height to %d", finalPruneStartHeight));
}
else {
// We've finished pruning
break;
}
}
}
return true;
}
}
public static boolean pruneBlocks() throws SQLException, DataException {
try (final HSQLDBRepository repository = (HSQLDBRepository) RepositoryManager.getRepository()) {
// Only bulk prune AT states if we have never done so before
int pruneHeight = repository.getBlockRepository().getBlockPruneHeight();
if (pruneHeight > 0) {
// Already pruned blocks
return false;
}
BlockData latestBlock = repository.getBlockRepository().getLastBlock();
if (latestBlock == null) {
LOGGER.info("Unable to determine blockchain height, necessary for bulk block pruning");
return false;
}
final int blockchainHeight = latestBlock.getHeight();
final int upperPrunableHeight = blockchainHeight - Settings.getInstance().getPruneBlockLimit();
int pruneStartHeight = 0;
LOGGER.info("Starting bulk prune of blocks - this process could take a while... (approx. 5 mins on high spec)");
while (pruneStartHeight < upperPrunableHeight) {
// Prune all blocks up until our latest minus pruneBlockLimit
int upperBatchHeight = pruneStartHeight + Settings.getInstance().getBlockPruneBatchSize();
int upperPruneHeight = Math.min(upperBatchHeight, upperPrunableHeight);
LOGGER.info(String.format("Pruning blocks between %d and %d...", pruneStartHeight, upperPruneHeight));
int numBlocksPruned = repository.getBlockRepository().pruneBlocks(pruneStartHeight, upperPruneHeight);
repository.saveChanges();
if (numBlocksPruned > 0) {
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.info(() -> String.format("Pruned %d block%s between %d and %d",
numBlocksPruned, (numBlocksPruned != 1 ? "s" : ""),
finalPruneStartHeight, upperPruneHeight));
} else {
// Can we move onto next batch?
if (upperPrunableHeight > upperBatchHeight) {
pruneStartHeight = upperBatchHeight;
repository.getBlockRepository().setBlockPruneHeight(pruneStartHeight);
repository.saveChanges();
final int finalPruneStartHeight = pruneStartHeight;
LOGGER.debug(() -> String.format("Bumping block base prune height to %d", finalPruneStartHeight));
}
else {
// We've finished pruning
break;
}
}
}
return true;
}
}
public static void performMaintenance() throws SQLException, DataException {
try (final HSQLDBRepository repository = (HSQLDBRepository) RepositoryManager.getRepository()) {
repository.performPeriodicMaintenance();
}
}
}

View File

@@ -867,6 +867,11 @@ public class HSQLDBDatabaseUpdates {
stmt.execute("CHECKPOINT");
break;
}
case 35:
// Support for pruning
stmt.execute("ALTER TABLE DatabaseInfo ADD AT_prune_height INT NOT NULL DEFAULT 0");
stmt.execute("ALTER TABLE DatabaseInfo ADD block_prune_height INT NOT NULL DEFAULT 0");
break;
default:
// nothing to do

View File

@@ -109,6 +109,26 @@ public class Settings {
* This has a significant effect on execution time. */
private int onlineSignaturesTrimBatchSize = 100; // blocks
/** Whether we should prune old data to reduce database size
* This prevents the node from being able to serve older blocks */
private boolean pruningEnabled = false;
/** The amount of recent blocks we should keep when pruning */
private int pruneBlockLimit = 1450;
/** How often to attempt AT state pruning (ms). */
private long atStatesPruneInterval = 3219L; // milliseconds
/** Block height range to scan for prunable AT states.<br>
* This has a significant effect on execution time. */
private int atStatesPruneBatchSize = 25; // blocks
/** How often to attempt block pruning (ms). */
private long blockPruneInterval = 3219L; // milliseconds
/** Block height range to scan for prunable blocks.<br>
* This has a significant effect on execution time. */
private int blockPruneBatchSize = 10000; // blocks
// Peer-to-peer related
private boolean isTestNet = false;
/** Port number for inbound peer-to-peer connections. */
@@ -529,4 +549,29 @@ public class Settings {
return this.onlineSignaturesTrimBatchSize;
}
public boolean isPruningEnabled() {
return this.pruningEnabled;
}
public int getPruneBlockLimit() {
return this.pruneBlockLimit;
}
public long getAtStatesPruneInterval() {
return this.atStatesPruneInterval;
}
public int getAtStatesPruneBatchSize() {
return this.atStatesPruneBatchSize;
}
public long getBlockPruneInterval() {
return this.blockPruneInterval;
}
public int getBlockPruneBatchSize() {
return this.blockPruneBatchSize;
}
}

View File

@@ -75,7 +75,7 @@ public class AtRepositoryTests extends Common {
Integer testHeight = maxHeight - 2;
// Trim AT state data
repository.getATRepository().prepareForAtStateTrimming();
repository.getATRepository().rebuildLatestAtStates();
repository.getATRepository().trimAtStates(2, maxHeight, 1000);
ATStateData atStateData = repository.getATRepository().getATStateAtHeight(atAddress, testHeight);
@@ -129,7 +129,7 @@ public class AtRepositoryTests extends Common {
Integer testHeight = blockchainHeight;
// Trim AT state data
repository.getATRepository().prepareForAtStateTrimming();
repository.getATRepository().rebuildLatestAtStates();
// COMMIT to check latest AT states persist / TEMPORARY table interaction
repository.saveChanges();
@@ -280,7 +280,7 @@ public class AtRepositoryTests extends Common {
Integer testHeight = maxHeight - 2;
// Trim AT state data
repository.getATRepository().prepareForAtStateTrimming();
repository.getATRepository().rebuildLatestAtStates();
repository.getATRepository().trimAtStates(2, maxHeight, 1000);
List<ATStateData> atStates = repository.getATRepository().getBlockATStatesAtHeight(testHeight);