forked from Qortal/qortal
Reworked AT-states and online signatures trimming
Instead of searching from block 0, we now keep a record of base trim height in the DB itself. Also, we no longer trim the latest AT state for non-finished ATs in case they are in deep sleeping and we need their state for when they awaken.
This commit is contained in:
parent
a6a1f65d3e
commit
60621e8b81
@ -13,18 +13,22 @@ public class AtStatesTrimmer implements Runnable {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(AtStatesTrimmer.class);
|
||||
|
||||
private enum TrimMode { SEARCHING, TRIMMING }
|
||||
private static final long TRIM_INTERVAL = 2 * 1000L; // ms
|
||||
private static final int TRIM_SEARCH_SIZE = 2000; // blocks
|
||||
private static final int TRIM_BATCH_SIZE = 200; // blocks
|
||||
private static final int TRIM_LIMIT = 4000; // rows
|
||||
|
||||
private TrimMode trimMode = TrimMode.SEARCHING;
|
||||
private int trimStartHeight = 0;
|
||||
// This has a significant effect on execution time
|
||||
private static final int TRIM_BATCH_SIZE = 200; // blocks
|
||||
|
||||
// Not so significant effect on execution time
|
||||
private static final int TRIM_LIMIT = 4000; // rows
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Thread.currentThread().setName("AT States trimmer");
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
repository.getATRepository().prepareForAtStateTrimming();
|
||||
repository.saveChanges();
|
||||
|
||||
while (!Controller.isStopping()) {
|
||||
repository.discardChanges();
|
||||
|
||||
@ -41,39 +45,30 @@ public class AtStatesTrimmer implements Runnable {
|
||||
long upperTrimmableTimestamp = Math.min(currentTrimmableTimestamp, chainTrimmableTimestamp);
|
||||
int upperTrimmableHeight = repository.getBlockRepository().getHeightFromTimestamp(upperTrimmableTimestamp);
|
||||
|
||||
if (trimMode == TrimMode.SEARCHING) {
|
||||
int trimEndHeight = Math.min(trimStartHeight + TRIM_SEARCH_SIZE, upperTrimmableHeight);
|
||||
int trimStartHeight = repository.getATRepository().getAtTrimHeight();
|
||||
|
||||
LOGGER.debug(() -> String.format("Searching for trimmable AT states between blocks %d and %d", trimStartHeight, trimEndHeight));
|
||||
int foundStartHeight = repository.getATRepository().findFirstTrimmableStateHeight(trimStartHeight, trimEndHeight);
|
||||
int upperBatchHeight = trimStartHeight + TRIM_BATCH_SIZE;
|
||||
int upperTrimHeight = Math.min(upperBatchHeight, upperTrimmableHeight);
|
||||
|
||||
if (foundStartHeight == 0) {
|
||||
// No trimmable AT states found
|
||||
trimStartHeight = trimEndHeight;
|
||||
} else {
|
||||
trimStartHeight = foundStartHeight;
|
||||
trimMode = TrimMode.TRIMMING;
|
||||
LOGGER.debug(() -> String.format("Found first trimmable AT state at block height %d", trimStartHeight));
|
||||
}
|
||||
|
||||
// The above search will probably take enough time by itself so wait until next round
|
||||
continue;
|
||||
}
|
||||
|
||||
int upperBatchHeight = Math.min(trimStartHeight + TRIM_BATCH_SIZE, upperTrimmableHeight);
|
||||
|
||||
if (trimStartHeight >= upperBatchHeight)
|
||||
if (trimStartHeight >= upperTrimHeight)
|
||||
continue;
|
||||
|
||||
int numAtStatesTrimmed = repository.getATRepository().trimAtStates(trimStartHeight, upperBatchHeight, TRIM_LIMIT);
|
||||
int numAtStatesTrimmed = repository.getATRepository().trimAtStates(trimStartHeight, upperTrimHeight, TRIM_LIMIT);
|
||||
repository.saveChanges();
|
||||
|
||||
if (numAtStatesTrimmed > 0) {
|
||||
LOGGER.debug(() -> String.format("Trimmed %d AT state%s between blocks %d and %d",
|
||||
numAtStatesTrimmed, (numAtStatesTrimmed != 1 ? "s" : ""),
|
||||
trimStartHeight, upperBatchHeight));
|
||||
trimStartHeight, upperTrimHeight));
|
||||
} else {
|
||||
trimStartHeight = upperBatchHeight;
|
||||
// Can we move onto next batch?
|
||||
if (upperTrimmableHeight > upperBatchHeight) {
|
||||
repository.getATRepository().setAtTrimHeight(upperBatchHeight);
|
||||
repository.getATRepository().prepareForAtStateTrimming();
|
||||
repository.saveChanges();
|
||||
|
||||
LOGGER.debug(() -> String.format("Bumping AT state trim height to %d", upperBatchHeight));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (DataException e) {
|
||||
|
@ -22,6 +22,7 @@ import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
@ -415,8 +416,9 @@ public class Controller extends Thread {
|
||||
|
||||
final long repositoryBackupInterval = Settings.getInstance().getRepositoryBackupInterval();
|
||||
|
||||
Executors.newSingleThreadExecutor(new DaemonThreadFactory("AT states trimmer")).execute(new AtStatesTrimmer());
|
||||
Executors.newSingleThreadExecutor(new DaemonThreadFactory("Online sigs trimmer")).execute(new OnlineAccountsSignaturesTrimmer());
|
||||
ExecutorService trimExecutor = Executors.newCachedThreadPool(new DaemonThreadFactory());
|
||||
trimExecutor.execute(new AtStatesTrimmer());
|
||||
trimExecutor.execute(new OnlineAccountsSignaturesTrimmer());
|
||||
|
||||
try {
|
||||
while (!isStopping) {
|
||||
@ -490,7 +492,17 @@ public class Controller extends Thread {
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Clear interrupted flag so we can shutdown trim threads
|
||||
Thread.interrupted();
|
||||
// Fall-through to exit
|
||||
} finally {
|
||||
trimExecutor.shutdownNow();
|
||||
|
||||
try {
|
||||
trimExecutor.awaitTermination(2L, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// We tried...
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,17 +13,16 @@ public class OnlineAccountsSignaturesTrimmer implements Runnable {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(OnlineAccountsSignaturesTrimmer.class);
|
||||
|
||||
private static final long INITIAL_SLEEP_PERIOD = 5 * 60 * 1000L; // ms
|
||||
private static final long INITIAL_SLEEP_PERIOD = 5 * 60 * 1000L + 1234L; // ms
|
||||
|
||||
private enum TrimMode { SEARCHING, TRIMMING }
|
||||
private static final long TRIM_INTERVAL = 2 * 1000L; // ms
|
||||
private static final int TRIM_SEARCH_SIZE = 5000; // blocks
|
||||
private static final int TRIM_BATCH_SIZE = 500; // blocks
|
||||
|
||||
private TrimMode trimMode = TrimMode.SEARCHING;
|
||||
private int trimStartHeight = 0;
|
||||
// This has a significant effect on execution time
|
||||
private static final int TRIM_BATCH_SIZE = 200; // blocks
|
||||
|
||||
public void run() {
|
||||
Thread.currentThread().setName("Online Accounts trimmer");
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
// Don't even start trimming until initial rush has ended
|
||||
Thread.sleep(INITIAL_SLEEP_PERIOD);
|
||||
@ -41,39 +40,29 @@ public class OnlineAccountsSignaturesTrimmer implements Runnable {
|
||||
long upperTrimmableTimestamp = NTP.getTime() - BlockChain.getInstance().getOnlineAccountSignaturesMaxLifetime();
|
||||
int upperTrimmableHeight = repository.getBlockRepository().getHeightFromTimestamp(upperTrimmableTimestamp);
|
||||
|
||||
if (trimMode == TrimMode.SEARCHING) {
|
||||
int trimEndHeight = Math.min(trimStartHeight + TRIM_SEARCH_SIZE, upperTrimmableHeight);
|
||||
int trimStartHeight = repository.getBlockRepository().getOnlineAccountsSignaturesTrimHeight();
|
||||
|
||||
LOGGER.debug(() -> String.format("Searching for trimmable online accounts signatures between blocks %d and %d", trimStartHeight, trimEndHeight));
|
||||
int foundStartHeight = repository.getBlockRepository().findFirstTrimmableOnlineAccountsSignatureHeight(trimStartHeight, trimEndHeight);
|
||||
int upperBatchHeight = trimStartHeight + TRIM_BATCH_SIZE;
|
||||
int upperTrimHeight = Math.min(upperBatchHeight, upperTrimmableHeight);
|
||||
|
||||
if (foundStartHeight == 0) {
|
||||
// No trimmable online accounts signatures found
|
||||
trimStartHeight = trimEndHeight;
|
||||
} else {
|
||||
trimStartHeight = foundStartHeight;
|
||||
trimMode = TrimMode.TRIMMING;
|
||||
LOGGER.debug(() -> String.format("Found first trimmable online accounts signatures at block height %d", trimStartHeight));
|
||||
}
|
||||
|
||||
// The above search will probably take enough time by itself so wait until next round
|
||||
continue;
|
||||
}
|
||||
|
||||
int upperBatchHeight = Math.min(trimStartHeight + TRIM_BATCH_SIZE, upperTrimmableHeight);
|
||||
|
||||
if (trimStartHeight >= upperBatchHeight)
|
||||
if (trimStartHeight >= upperTrimHeight)
|
||||
continue;
|
||||
|
||||
int numSigsTrimmed = repository.getBlockRepository().trimOldOnlineAccountsSignatures(trimStartHeight, upperBatchHeight);
|
||||
int numSigsTrimmed = repository.getBlockRepository().trimOldOnlineAccountsSignatures(trimStartHeight, upperTrimHeight);
|
||||
repository.saveChanges();
|
||||
|
||||
if (numSigsTrimmed > 0) {
|
||||
LOGGER.debug(() -> String.format("Trimmed %d online accounts signature%s between blocks %d and %d",
|
||||
numSigsTrimmed, (numSigsTrimmed != 1 ? "s" : ""),
|
||||
trimStartHeight, upperBatchHeight));
|
||||
trimStartHeight, upperTrimHeight));
|
||||
} else {
|
||||
trimStartHeight = upperBatchHeight;
|
||||
// Can we move onto next batch?
|
||||
if (upperTrimmableHeight > upperBatchHeight) {
|
||||
repository.getBlockRepository().setOnlineAccountsSignaturesTrimHeight(upperBatchHeight);
|
||||
repository.saveChanges();
|
||||
|
||||
LOGGER.debug(() -> String.format("Bumping online accounts signatures trim height to %d", upperBatchHeight));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (DataException e) {
|
||||
|
@ -87,8 +87,14 @@ public interface ATRepository {
|
||||
*/
|
||||
public List<ATStateData> getBlockATStatesAtHeight(int height) throws DataException;
|
||||
|
||||
/** Returns height of first trimmable AT state, or 0 if not found. */
|
||||
public int findFirstTrimmableStateHeight(int minHeight, int maxHeight) throws DataException;
|
||||
/** Returns height of first trimmable AT state. */
|
||||
public int getAtTrimHeight() throws DataException;
|
||||
|
||||
/** Sets new base height for AT state trimming. */
|
||||
public void setAtTrimHeight(int trimHeight) throws DataException;
|
||||
|
||||
/** Hook to allow repository to prepare/cache info for AT state trimming. */
|
||||
public void prepareForAtStateTrimming() throws DataException;
|
||||
|
||||
/** Trims full AT state data between passed heights. Returns number of trimmed rows. */
|
||||
public int trimAtStates(int minHeight, int maxHeight, int limit) throws DataException;
|
||||
|
@ -143,8 +143,11 @@ public interface BlockRepository {
|
||||
*/
|
||||
public List<BlockInfo> getBlockInfos(Integer startHeight, Integer endHeight, Integer count) throws DataException;
|
||||
|
||||
/** Returns height of first trimmable online accounts signatures, or 0 if not found. */
|
||||
public int findFirstTrimmableOnlineAccountsSignatureHeight(int minHeight, int maxHeight) throws DataException;
|
||||
/** Returns height of first trimmable online accounts signatures. */
|
||||
public int getOnlineAccountsSignaturesTrimHeight() throws DataException;
|
||||
|
||||
/** Sets new base height for trimming online accounts signatures. */
|
||||
public void setOnlineAccountsSignaturesTrimHeight(int trimHeight) throws DataException;
|
||||
|
||||
/**
|
||||
* Trim online accounts signatures from blocks between passed heights.
|
||||
|
@ -400,18 +400,61 @@ public class HSQLDBATRepository implements ATRepository {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int findFirstTrimmableStateHeight(int minHeight, int maxHeight) throws DataException {
|
||||
String sql = "SELECT MIN(height) FROM ATStates "
|
||||
+ "WHERE state_data IS NOT NULL "
|
||||
+ "AND height BETWEEN ? AND ?";
|
||||
public int getAtTrimHeight() throws DataException {
|
||||
String sql = "SELECT AT_trim_height FROM DatabaseInfo";
|
||||
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(sql, minHeight, maxHeight)) {
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(sql)) {
|
||||
if (resultSet == null)
|
||||
return 0;
|
||||
|
||||
return resultSet.getInt(1);
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to find first trimmable AT state in repository", e);
|
||||
throw new DataException("Unable to fetch AT state trim height from repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAtTrimHeight(int trimHeight) throws DataException {
|
||||
String updateSql = "UPDATE DatabaseInfo SET AT_trim_height = ?";
|
||||
|
||||
try {
|
||||
this.repository.executeCheckedUpdate(updateSql, trimHeight);
|
||||
} catch (SQLException e) {
|
||||
repository.examineException(e);
|
||||
throw new DataException("Unable to set AT state trim height in repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepareForAtStateTrimming() throws DataException {
|
||||
// Rebuild cache of latest, non-finished AT states that we can't trim
|
||||
String dropSql = "DROP TABLE IF EXISTS LatestATStates";
|
||||
|
||||
try {
|
||||
this.repository.executeCheckedUpdate(dropSql);
|
||||
} catch (SQLException e) {
|
||||
repository.examineException(e);
|
||||
throw new DataException("Unable to drop temporary latest AT states cache from repository", e);
|
||||
}
|
||||
|
||||
String createSql = "CREATE TEMPORARY TABLE LatestATStates "
|
||||
+ "AS ("
|
||||
+ "SELECT AT_address, height FROM ATs "
|
||||
+ "CROSS JOIN LATERAL("
|
||||
+ "SELECT height FROM ATStates "
|
||||
+ "WHERE ATStates.AT_address = ATs.AT_address "
|
||||
+ "ORDER BY AT_address DESC, height DESC LIMIT 1"
|
||||
+ ") "
|
||||
+ "WHERE is_finished IS false"
|
||||
+ ") "
|
||||
+ "WITH DATA "
|
||||
+ "ON COMMIT PRESERVE ROWS";
|
||||
|
||||
try {
|
||||
this.repository.executeCheckedUpdate(createSql);
|
||||
} catch (SQLException e) {
|
||||
repository.examineException(e);
|
||||
throw new DataException("Unable to recreate temporary latest AT states cache in repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -425,6 +468,11 @@ public class HSQLDBATRepository implements ATRepository {
|
||||
String sql = "UPDATE ATStates SET state_data = NULL "
|
||||
+ "WHERE state_data IS NOT NULL "
|
||||
+ "AND height BETWEEN ? AND ? "
|
||||
+ "AND NOT EXISTS("
|
||||
+ "SELECT TRUE FROM LatestATStates "
|
||||
+ "WHERE LatestATStates.AT_address = ATStates.AT_address "
|
||||
+ "AND LatestATStates.height = ATStates.height"
|
||||
+ ") "
|
||||
+ "LIMIT ?";
|
||||
|
||||
try {
|
||||
|
@ -462,18 +462,28 @@ public class HSQLDBBlockRepository implements BlockRepository {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int findFirstTrimmableOnlineAccountsSignatureHeight(int minHeight, int maxHeight) throws DataException {
|
||||
String sql = "SELECT MIN(height) FROM Blocks "
|
||||
+ "WHERE online_accounts_signatures IS NOT NULL "
|
||||
+ "AND height BETWEEN ? AND ?";
|
||||
public int getOnlineAccountsSignaturesTrimHeight() throws DataException {
|
||||
String sql = "SELECT online_signatures_trim_height FROM DatabaseInfo";
|
||||
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(sql, minHeight, maxHeight)) {
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(sql)) {
|
||||
if (resultSet == null)
|
||||
return 0;
|
||||
|
||||
return resultSet.getInt(1);
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to find first trimmable online accounts signatures in repository", e);
|
||||
throw new DataException("Unable to fetch online accounts signatures trim height from repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOnlineAccountsSignaturesTrimHeight(int trimHeight) throws DataException {
|
||||
String updateSql = "UPDATE DatabaseInfo SET online_signatures_trim_height = ?";
|
||||
|
||||
try {
|
||||
this.repository.executeCheckedUpdate(updateSql, trimHeight);
|
||||
} catch (SQLException e) {
|
||||
repository.examineException(e);
|
||||
throw new DataException("Unable to set online accounts signatures trim height in repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -665,6 +665,12 @@ public class HSQLDBDatabaseUpdates {
|
||||
stmt.execute("CREATE INDEX ATStateHeightIndex on ATStates (height)");
|
||||
break;
|
||||
|
||||
case 26:
|
||||
// Support for trimming
|
||||
stmt.execute("ALTER TABLE DatabaseInfo ADD AT_trim_height INT NOT NULL DEFAULT 0");
|
||||
stmt.execute("ALTER TABLE DatabaseInfo ADD online_signatures_trim_height INT NOT NULL DEFAULT 0");
|
||||
break;
|
||||
|
||||
default:
|
||||
// nothing to do
|
||||
return false;
|
||||
|
Loading…
Reference in New Issue
Block a user