forked from Qortal/qortal
Merge pull request #239 from kennycud/master
Restructuring database connections for better garbage collection - resolves long-standing memory leak in multiple places that was discovered more specifically after the thread crashes were made to restart if crashed. Thanks so much to @kennycud for this improvement!
This commit is contained in:
commit
2347118e59
@ -97,21 +97,27 @@ public class BlockMinter extends Thread {
|
||||
|
||||
final boolean isSingleNodeTestnet = Settings.getInstance().isSingleNodeTestnet();
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
// Going to need this a lot...
|
||||
BlockRepository blockRepository = repository.getBlockRepository();
|
||||
|
||||
// Flags for tracking change in whether minting is possible,
|
||||
// so we can notify Controller, and further update SysTray, etc.
|
||||
boolean isMintingPossible = false;
|
||||
boolean wasMintingPossible = isMintingPossible;
|
||||
try {
|
||||
while (running) {
|
||||
// recreate repository for new loop iteration
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
// Going to need this a lot...
|
||||
BlockRepository blockRepository = repository.getBlockRepository();
|
||||
|
||||
if (isMintingPossible != wasMintingPossible)
|
||||
Controller.getInstance().onMintingPossibleChange(isMintingPossible);
|
||||
|
||||
wasMintingPossible = isMintingPossible;
|
||||
|
||||
try {
|
||||
// reset the repository, to the repository recreated for this loop iteration
|
||||
for( Block newBlock : newBlocks ) newBlock.setRepository(repository);
|
||||
|
||||
// Free up any repository locks
|
||||
repository.discardChanges();
|
||||
|
||||
@ -452,9 +458,14 @@ public class BlockMinter extends Thread {
|
||||
// We've been interrupted - time to exit
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (DataException e) {
|
||||
LOGGER.warn("Repository issue while running block minter - NO LONGER MINTING", e);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,7 @@ import org.qortal.block.Block;
|
||||
import org.qortal.block.BlockChain;
|
||||
import org.qortal.block.BlockChain.BlockTimingByHeight;
|
||||
import org.qortal.controller.arbitrary.*;
|
||||
import org.qortal.controller.hsqldb.HSQLDBBalanceRecorder;
|
||||
import org.qortal.controller.hsqldb.HSQLDBDataCacheManager;
|
||||
import org.qortal.controller.repository.NamesDatabaseIntegrityCheck;
|
||||
import org.qortal.controller.repository.PruneManager;
|
||||
@ -36,7 +37,6 @@ import org.qortal.network.Peer;
|
||||
import org.qortal.network.PeerAddress;
|
||||
import org.qortal.network.message.*;
|
||||
import org.qortal.repository.*;
|
||||
import org.qortal.repository.hsqldb.HSQLDBRepository;
|
||||
import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory;
|
||||
import org.qortal.settings.Settings;
|
||||
import org.qortal.transaction.Transaction;
|
||||
@ -73,6 +73,8 @@ import java.util.stream.Collectors;
|
||||
|
||||
public class Controller extends Thread {
|
||||
|
||||
public static HSQLDBRepositoryFactory REPOSITORY_FACTORY;
|
||||
|
||||
static {
|
||||
// This must go before any calls to LogManager/Logger
|
||||
System.setProperty("log4j2.formatMsgNoLookups", "true");
|
||||
@ -403,22 +405,37 @@ public class Controller extends Thread {
|
||||
|
||||
LOGGER.info("Starting repository");
|
||||
try {
|
||||
RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getRepositoryUrl());
|
||||
RepositoryManager.setRepositoryFactory(repositoryFactory);
|
||||
REPOSITORY_FACTORY = new HSQLDBRepositoryFactory(getRepositoryUrl());
|
||||
RepositoryManager.setRepositoryFactory(REPOSITORY_FACTORY);
|
||||
RepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
// RepositoryManager.rebuildTransactionSequences(repository);
|
||||
ArbitraryDataCacheManager.getInstance().buildArbitraryResourcesCache(repository, false);
|
||||
}
|
||||
|
||||
if( Settings.getInstance().isDbCacheEnabled() ) {
|
||||
LOGGER.info("Db Cache Starting ...");
|
||||
HSQLDBDataCacheManager hsqldbDataCacheManager = new HSQLDBDataCacheManager((HSQLDBRepository) repositoryFactory.getRepository());
|
||||
HSQLDBDataCacheManager hsqldbDataCacheManager = new HSQLDBDataCacheManager();
|
||||
hsqldbDataCacheManager.start();
|
||||
}
|
||||
else {
|
||||
LOGGER.info("Db Cache Disabled");
|
||||
}
|
||||
|
||||
if( Settings.getInstance().isBalanceRecorderEnabled() ) {
|
||||
Optional<HSQLDBBalanceRecorder> recorder = HSQLDBBalanceRecorder.getInstance();
|
||||
|
||||
if( recorder.isPresent() ) {
|
||||
LOGGER.info("Balance Recorder Starting ...");
|
||||
recorder.get().start();
|
||||
}
|
||||
else {
|
||||
LOGGER.info("Balance Recorder won't start.");
|
||||
}
|
||||
}
|
||||
else {
|
||||
LOGGER.info("Balance Recorder Disabled");
|
||||
}
|
||||
} catch (DataException e) {
|
||||
// If exception has no cause or message then repository is in use by some other process.
|
||||
@ -639,10 +656,8 @@ public class Controller extends Thread {
|
||||
boolean canBootstrap = Settings.getInstance().getBootstrap();
|
||||
boolean needsArchiveRebuild = false;
|
||||
int checkHeight = 0;
|
||||
Repository repository = null;
|
||||
|
||||
try {
|
||||
repository = RepositoryManager.getRepository();
|
||||
try (final Repository repository = RepositoryManager.getRepository()){
|
||||
needsArchiveRebuild = (repository.getBlockArchiveRepository().fromHeight(2) == null);
|
||||
checkHeight = repository.getBlockRepository().getBlockchainHeight();
|
||||
} catch (DataException e) {
|
||||
|
@ -0,0 +1,117 @@
|
||||
package org.qortal.controller.hsqldb;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.data.account.AccountBalanceData;
|
||||
import org.qortal.repository.hsqldb.HSQLDBCacheUtils;
|
||||
import org.qortal.settings.Settings;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HSQLDBBalanceRecorder extends Thread{
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(HSQLDBBalanceRecorder.class);
|
||||
|
||||
private static HSQLDBBalanceRecorder SINGLETON = null;
|
||||
|
||||
private ConcurrentHashMap<Integer, List<AccountBalanceData>> balancesByHeight = new ConcurrentHashMap<>();
|
||||
|
||||
private ConcurrentHashMap<String, List<AccountBalanceData>> balancesByAddress = new ConcurrentHashMap<>();
|
||||
|
||||
private int priorityRequested;
|
||||
private int frequency;
|
||||
private int capacity;
|
||||
|
||||
private HSQLDBBalanceRecorder( int priorityRequested, int frequency, int capacity) {
|
||||
|
||||
super("Balance Recorder");
|
||||
|
||||
this.priorityRequested = priorityRequested;
|
||||
this.frequency = frequency;
|
||||
this.capacity = capacity;
|
||||
}
|
||||
|
||||
public static Optional<HSQLDBBalanceRecorder> getInstance() {
|
||||
|
||||
if( SINGLETON == null ) {
|
||||
|
||||
SINGLETON
|
||||
= new HSQLDBBalanceRecorder(
|
||||
Settings.getInstance().getBalanceRecorderPriority(),
|
||||
Settings.getInstance().getBalanceRecorderFrequency(),
|
||||
Settings.getInstance().getBalanceRecorderCapacity()
|
||||
);
|
||||
|
||||
}
|
||||
else if( SINGLETON == null ) {
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return Optional.of(SINGLETON);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
Thread.currentThread().setName("Balance Recorder");
|
||||
|
||||
HSQLDBCacheUtils.startRecordingBalances(this.balancesByHeight, this.balancesByAddress, this.priorityRequested, this.frequency, this.capacity);
|
||||
}
|
||||
|
||||
public List<AccountBalanceData> getLatestRecordings(int limit, long offset) {
|
||||
ArrayList<AccountBalanceData> data;
|
||||
|
||||
Optional<Integer> lastHeight = getLastHeight();
|
||||
|
||||
if(lastHeight.isPresent() ) {
|
||||
List<AccountBalanceData> latest = this.balancesByHeight.get(lastHeight.get());
|
||||
|
||||
if( latest != null ) {
|
||||
data = new ArrayList<>(latest.size());
|
||||
data.addAll(
|
||||
latest.stream()
|
||||
.sorted(Comparator.comparingDouble(AccountBalanceData::getBalance).reversed())
|
||||
.skip(offset)
|
||||
.limit(limit)
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
else {
|
||||
data = new ArrayList<>(0);
|
||||
}
|
||||
}
|
||||
else {
|
||||
data = new ArrayList<>(0);
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
private Optional<Integer> getLastHeight() {
|
||||
return this.balancesByHeight.keySet().stream().sorted(Comparator.reverseOrder()).findFirst();
|
||||
}
|
||||
|
||||
public List<Integer> getBlocksRecorded() {
|
||||
|
||||
return this.balancesByHeight.keySet().stream().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<AccountBalanceData> getAccountBalanceRecordings(String address) {
|
||||
return this.balancesByAddress.get(address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HSQLDBBalanceRecorder{" +
|
||||
"priorityRequested=" + priorityRequested +
|
||||
", frequency=" + frequency +
|
||||
", capacity=" + capacity +
|
||||
'}';
|
||||
}
|
||||
}
|
@ -8,11 +8,7 @@ import org.qortal.settings.Settings;
|
||||
|
||||
public class HSQLDBDataCacheManager extends Thread{
|
||||
|
||||
private HSQLDBRepository respository;
|
||||
|
||||
public HSQLDBDataCacheManager(HSQLDBRepository respository) {
|
||||
this.respository = respository;
|
||||
}
|
||||
public HSQLDBDataCacheManager() {}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
@ -20,8 +16,7 @@ public class HSQLDBDataCacheManager extends Thread{
|
||||
|
||||
HSQLDBCacheUtils.startCaching(
|
||||
Settings.getInstance().getDbCacheThreadPriority(),
|
||||
Settings.getInstance().getDbCacheFrequency(),
|
||||
this.respository
|
||||
Settings.getInstance().getDbCacheFrequency()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -39,15 +39,24 @@ public class AtStatesPruner implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
int pruneStartHeight;
|
||||
int maxLatestAtStatesHeight;
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
int pruneStartHeight = repository.getATRepository().getAtPruneHeight();
|
||||
int maxLatestAtStatesHeight = PruneManager.getMaxHeightForLatestAtStates(repository);
|
||||
pruneStartHeight = repository.getATRepository().getAtPruneHeight();
|
||||
maxLatestAtStatesHeight = PruneManager.getMaxHeightForLatestAtStates(repository);
|
||||
|
||||
repository.discardChanges();
|
||||
repository.getATRepository().rebuildLatestAtStates(maxLatestAtStatesHeight);
|
||||
repository.saveChanges();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("AT States Pruning is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
return;
|
||||
}
|
||||
|
||||
while (!Controller.isStopping()) {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
try {
|
||||
repository.discardChanges();
|
||||
|
||||
@ -102,8 +111,7 @@ public class AtStatesPruner implements Runnable {
|
||||
|
||||
final int finalPruneStartHeight = pruneStartHeight;
|
||||
LOGGER.info(() -> String.format("Bumping AT state base prune height to %d", finalPruneStartHeight));
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
// We've pruned up to the upper prunable height
|
||||
// Back off for a while to save CPU for syncing
|
||||
repository.discardChanges();
|
||||
@ -113,17 +121,15 @@ public class AtStatesPruner implements Runnable {
|
||||
} catch (InterruptedException e) {
|
||||
if (Controller.isStopping()) {
|
||||
LOGGER.info("AT States Pruning Shutting Down");
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
LOGGER.warn("AT States Pruning interrupted. Trying again. Report this error immediately to the developers.", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("AT States Pruning stopped working. Trying again. Report this error immediately to the developers.", e);
|
||||
}
|
||||
}
|
||||
} catch(Exception e){
|
||||
LOGGER.error("AT States Pruning is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -26,15 +26,23 @@ public class AtStatesTrimmer implements Runnable {
|
||||
return;
|
||||
}
|
||||
|
||||
int trimStartHeight;
|
||||
int maxLatestAtStatesHeight;
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
int trimStartHeight = repository.getATRepository().getAtTrimHeight();
|
||||
int maxLatestAtStatesHeight = PruneManager.getMaxHeightForLatestAtStates(repository);
|
||||
trimStartHeight = repository.getATRepository().getAtTrimHeight();
|
||||
maxLatestAtStatesHeight = PruneManager.getMaxHeightForLatestAtStates(repository);
|
||||
|
||||
repository.discardChanges();
|
||||
repository.getATRepository().rebuildLatestAtStates(maxLatestAtStatesHeight);
|
||||
repository.saveChanges();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("AT States Trimming is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
return;
|
||||
}
|
||||
|
||||
while (!Controller.isStopping()) {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
try {
|
||||
repository.discardChanges();
|
||||
|
||||
@ -92,10 +100,10 @@ public class AtStatesTrimmer implements Runnable {
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("AT States Trimming stopped working. Trying again. Report this error immediately to the developers.", e);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("AT States Trimming is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -30,11 +30,13 @@ public class BlockArchiver implements Runnable {
|
||||
return;
|
||||
}
|
||||
|
||||
int startHeight;
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
// Don't even start building until initial rush has ended
|
||||
Thread.sleep(INITIAL_SLEEP_PERIOD);
|
||||
|
||||
int startHeight = repository.getBlockArchiveRepository().getBlockArchiveHeight();
|
||||
startHeight = repository.getBlockArchiveRepository().getBlockArchiveHeight();
|
||||
|
||||
// Don't attempt to archive if we have no ATStatesHeightIndex, as it will be too slow
|
||||
boolean hasAtStatesHeightIndex = repository.getATRepository().hasAtStatesHeightIndex();
|
||||
@ -43,10 +45,16 @@ public class BlockArchiver implements Runnable {
|
||||
repository.discardChanges();
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Block Archiving is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
return;
|
||||
}
|
||||
|
||||
LOGGER.info("Starting block archiver from height {}...", startHeight);
|
||||
|
||||
while (!Controller.isStopping()) {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
try {
|
||||
repository.discardChanges();
|
||||
|
||||
@ -109,18 +117,15 @@ public class BlockArchiver implements Runnable {
|
||||
} catch (InterruptedException e) {
|
||||
if (Controller.isStopping()) {
|
||||
LOGGER.info("Block Archiving Shutting Down");
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
LOGGER.warn("Block Archiving interrupted. Trying again. Report this error immediately to the developers.", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Block Archiving stopped working. Trying again. Report this error immediately to the developers.", e);
|
||||
}
|
||||
}
|
||||
} catch(Exception e){
|
||||
LOGGER.error("Block Archiving is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -39,8 +39,10 @@ public class BlockPruner implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
int pruneStartHeight;
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
int pruneStartHeight = repository.getBlockRepository().getBlockPruneHeight();
|
||||
pruneStartHeight = repository.getBlockRepository().getBlockPruneHeight();
|
||||
|
||||
// Don't attempt to prune if we have no ATStatesHeightIndex, as it will be too slow
|
||||
boolean hasAtStatesHeightIndex = repository.getATRepository().hasAtStatesHeightIndex();
|
||||
@ -48,8 +50,15 @@ public class BlockPruner implements Runnable {
|
||||
LOGGER.info("Unable to start block pruner due to missing ATStatesHeightIndex. Bootstrapping is recommended.");
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Block Pruning is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
return;
|
||||
}
|
||||
|
||||
while (!Controller.isStopping()) {
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
try {
|
||||
repository.discardChanges();
|
||||
|
||||
@ -122,10 +131,9 @@ public class BlockPruner implements Runnable {
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Block Pruning stopped working. Trying again. Report this error immediately to the developers.", e);
|
||||
}
|
||||
}
|
||||
} catch(Exception e){
|
||||
LOGGER.error("Block Pruning is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -28,13 +28,21 @@ public class OnlineAccountsSignaturesTrimmer implements Runnable {
|
||||
return;
|
||||
}
|
||||
|
||||
int trimStartHeight;
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
// Don't even start trimming until initial rush has ended
|
||||
Thread.sleep(INITIAL_SLEEP_PERIOD);
|
||||
|
||||
int trimStartHeight = repository.getBlockRepository().getOnlineAccountsSignaturesTrimHeight();
|
||||
trimStartHeight = repository.getBlockRepository().getOnlineAccountsSignaturesTrimHeight();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Online Accounts Signatures Trimming is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
return;
|
||||
}
|
||||
|
||||
while (!Controller.isStopping()) {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
try {
|
||||
repository.discardChanges();
|
||||
|
||||
@ -88,10 +96,9 @@ public class OnlineAccountsSignaturesTrimmer implements Runnable {
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Online Accounts Signatures Trimming stopped working. Trying again. Report this error immediately to the developers.", e);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Online Accounts Signatures Trimming is not working! Not trying again. Restart ASAP. Report this error immediately to the developers.", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -5,10 +5,13 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.api.SearchMode;
|
||||
import org.qortal.arbitrary.misc.Category;
|
||||
import org.qortal.arbitrary.misc.Service;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.data.account.AccountBalanceData;
|
||||
import org.qortal.data.arbitrary.ArbitraryResourceCache;
|
||||
import org.qortal.data.arbitrary.ArbitraryResourceData;
|
||||
import org.qortal.data.arbitrary.ArbitraryResourceMetadata;
|
||||
import org.qortal.data.arbitrary.ArbitraryResourceStatus;
|
||||
import org.qortal.repository.DataException;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
@ -48,6 +51,11 @@ public class HSQLDBCacheUtils {
|
||||
}
|
||||
};
|
||||
private static final String DEFAULT_IDENTIFIER = "default";
|
||||
private static final int ZERO = 0;
|
||||
public static final String DB_CACHE_TIMER = "DB Cache Timer";
|
||||
public static final String DB_CACHE_TIMER_TASK = "DB Cache Timer Task";
|
||||
public static final String BALANCE_RECORDER_TIMER = "Balance Recorder Timer";
|
||||
public static final String BALANCE_RECORDER_TIMER_TASK = "Balance Recorder Timer Task";
|
||||
|
||||
/**
|
||||
*
|
||||
@ -352,12 +360,123 @@ public class HSQLDBCacheUtils {
|
||||
*
|
||||
* @param priorityRequested the thread priority to fill cache in
|
||||
* @param frequency the frequency to fill the cache (in seconds)
|
||||
* @param respository the data source
|
||||
*
|
||||
* @return the data cache
|
||||
*/
|
||||
public static void startCaching(int priorityRequested, int frequency, HSQLDBRepository respository) {
|
||||
public static void startCaching(int priorityRequested, int frequency) {
|
||||
|
||||
Timer timer = buildTimer(DB_CACHE_TIMER, priorityRequested);
|
||||
|
||||
TimerTask task = new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
Thread.currentThread().setName(DB_CACHE_TIMER_TASK);
|
||||
|
||||
try (final HSQLDBRepository respository = (HSQLDBRepository) Controller.REPOSITORY_FACTORY.getRepository()) {
|
||||
fillCache(ArbitraryResourceCache.getInstance(), respository);
|
||||
}
|
||||
catch( DataException e ) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// delay 1 second
|
||||
timer.scheduleAtFixedRate(task, 1000, frequency * 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start Recording Balances
|
||||
*
|
||||
* @param queue the queue to add to, remove oldest data if necssary
|
||||
* @param repository the db repsoitory
|
||||
* @param priorityRequested the requested thread priority
|
||||
* @param frequency the recording frequencies, in minutes
|
||||
*/
|
||||
public static void startRecordingBalances(
|
||||
final ConcurrentHashMap<Integer, List<AccountBalanceData>> balancesByHeight,
|
||||
final ConcurrentHashMap<String, List<AccountBalanceData>> balancesByAddress,
|
||||
int priorityRequested,
|
||||
int frequency,
|
||||
int capacity) {
|
||||
|
||||
Timer timer = buildTimer(BALANCE_RECORDER_TIMER, priorityRequested);
|
||||
|
||||
TimerTask task = new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
Thread.currentThread().setName(BALANCE_RECORDER_TIMER_TASK);
|
||||
|
||||
try (final HSQLDBRepository repository = (HSQLDBRepository) Controller.REPOSITORY_FACTORY.getRepository()) {
|
||||
while (balancesByHeight.size() > capacity + 1) {
|
||||
Optional<Integer> firstHeight = balancesByHeight.keySet().stream().sorted().findFirst();
|
||||
|
||||
if (firstHeight.isPresent()) balancesByHeight.remove(firstHeight.get());
|
||||
}
|
||||
|
||||
// get current balances
|
||||
List<AccountBalanceData> accountBalances = getAccountBalances(repository);
|
||||
|
||||
// get anyone of the balances
|
||||
Optional<AccountBalanceData> data = accountBalances.stream().findAny();
|
||||
|
||||
// if there are any balances, then record them
|
||||
if (data.isPresent()) {
|
||||
// map all new balances to the current height
|
||||
balancesByHeight.put(data.get().getHeight(), accountBalances);
|
||||
|
||||
// for each new balance, map to address
|
||||
for (AccountBalanceData accountBalance : accountBalances) {
|
||||
|
||||
// get recorded balances for this address
|
||||
List<AccountBalanceData> establishedBalances
|
||||
= balancesByAddress.getOrDefault(accountBalance.getAddress(), new ArrayList<>(0));
|
||||
|
||||
// start a new list of recordings for this address, add the new balance and add the established
|
||||
// balances
|
||||
List<AccountBalanceData> balances = new ArrayList<>(establishedBalances.size() + 1);
|
||||
balances.add(accountBalance);
|
||||
balances.addAll(establishedBalances);
|
||||
|
||||
// reset tha balances for this address
|
||||
balancesByAddress.put(accountBalance.getAddress(), balances);
|
||||
|
||||
// TODO: reduce account balances to capacity
|
||||
}
|
||||
|
||||
// reduce height balances to capacity
|
||||
while( balancesByHeight.size() > capacity ) {
|
||||
Optional<Integer> lowestHeight
|
||||
= balancesByHeight.entrySet().stream()
|
||||
.min(Comparator.comparingInt(Map.Entry::getKey))
|
||||
.map(Map.Entry::getKey);
|
||||
|
||||
if (lowestHeight.isPresent()) balancesByHeight.entrySet().remove(lowestHeight);
|
||||
}
|
||||
}
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// wait 5 minutes
|
||||
timer.scheduleAtFixedRate(task, 300_000, frequency * 60_000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build Timer
|
||||
*
|
||||
* Build a timer for scheduling a timer task.
|
||||
*
|
||||
* @param name the name for the thread running the timer task
|
||||
* @param priorityRequested the priority for the thread running the timer task
|
||||
*
|
||||
* @return a timer for scheduling a timer task
|
||||
*/
|
||||
private static Timer buildTimer( final String name, int priorityRequested) {
|
||||
// ensure priority is in between 1-10
|
||||
final int priority = Math.max(0, Math.min(10, priorityRequested));
|
||||
|
||||
@ -365,7 +484,7 @@ public class HSQLDBCacheUtils {
|
||||
Timer timer = new Timer(true) { // 'true' to make the Timer daemon
|
||||
@Override
|
||||
public void schedule(TimerTask task, long delay) {
|
||||
Thread thread = new Thread(task) {
|
||||
Thread thread = new Thread(task, name) {
|
||||
@Override
|
||||
public void run() {
|
||||
this.setPriority(priority);
|
||||
@ -376,17 +495,7 @@ public class HSQLDBCacheUtils {
|
||||
thread.start();
|
||||
}
|
||||
};
|
||||
|
||||
TimerTask task = new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
fillCache(ArbitraryResourceCache.getInstance(), respository);
|
||||
}
|
||||
};
|
||||
|
||||
// delay 1 second
|
||||
timer.scheduleAtFixedRate(task, 1000, frequency * 1000);
|
||||
return timer;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -541,4 +650,43 @@ public class HSQLDBCacheUtils {
|
||||
|
||||
return resources;
|
||||
}
|
||||
|
||||
public static List<AccountBalanceData> getAccountBalances(HSQLDBRepository repository) {
|
||||
|
||||
StringBuilder sql = new StringBuilder();
|
||||
|
||||
sql.append("SELECT account, balance, height ");
|
||||
sql.append("FROM ACCOUNTBALANCES as balances ");
|
||||
sql.append("JOIN (SELECT height FROM BLOCKS ORDER BY height DESC LIMIT 1) AS max_height ON true ");
|
||||
sql.append("WHERE asset_id=0");
|
||||
|
||||
List<AccountBalanceData> data = new ArrayList<>();
|
||||
|
||||
LOGGER.info( "Getting account balances ...");
|
||||
|
||||
try {
|
||||
Statement statement = repository.connection.createStatement();
|
||||
|
||||
ResultSet resultSet = statement.executeQuery(sql.toString());
|
||||
|
||||
if (resultSet == null || !resultSet.next())
|
||||
return new ArrayList<>(0);
|
||||
|
||||
do {
|
||||
String account = resultSet.getString(1);
|
||||
long balance = resultSet.getLong(2);
|
||||
int height = resultSet.getInt(3);
|
||||
|
||||
data.add(new AccountBalanceData(account, ZERO, balance, height));
|
||||
} while (resultSet.next());
|
||||
} catch (SQLException e) {
|
||||
LOGGER.warn(e.getMessage());
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
|
||||
LOGGER.info("Retrieved account balances: count = " + data.size());
|
||||
|
||||
return data;
|
||||
}
|
||||
}
|
@ -441,6 +441,14 @@ public class Settings {
|
||||
*/
|
||||
private long archivingPause = 3000;
|
||||
|
||||
private boolean balanceRecorderEnabled = false;
|
||||
|
||||
private int balanceRecorderPriority = 1;
|
||||
|
||||
private int balanceRecorderFrequency = 2*60*1000;
|
||||
|
||||
private int balanceRecorderCapacity = 1000;
|
||||
|
||||
// Domain mapping
|
||||
public static class ThreadLimit {
|
||||
private String messageType;
|
||||
@ -1230,4 +1238,20 @@ public class Settings {
|
||||
public long getArchivingPause() {
|
||||
return archivingPause;
|
||||
}
|
||||
|
||||
public int getBalanceRecorderPriority() {
|
||||
return balanceRecorderPriority;
|
||||
}
|
||||
|
||||
public int getBalanceRecorderFrequency() {
|
||||
return balanceRecorderFrequency;
|
||||
}
|
||||
|
||||
public int getBalanceRecorderCapacity() {
|
||||
return balanceRecorderCapacity;
|
||||
}
|
||||
|
||||
public boolean isBalanceRecorderEnabled() {
|
||||
return balanceRecorderEnabled;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user