diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBATRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBATRepository.java index 1921661c..522fafb7 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBATRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBATRepository.java @@ -604,28 +604,34 @@ public class HSQLDBATRepository implements ATRepository { @Override public void rebuildLatestAtStates() throws DataException { - // Rebuild cache of latest AT states that we can't trim - String deleteSql = "DELETE FROM LatestATStates"; - try { - this.repository.executeCheckedUpdate(deleteSql); - } catch (SQLException e) { - repository.examineException(e); - throw new DataException("Unable to delete temporary latest AT states cache from repository", e); - } + // latestATStatesLock is to prevent concurrent updates on LatestATStates + // that could result in one process using a partial or empty dataset + // because it was in the process of being rebuilt by another thread + synchronized (this.repository.latestATStatesLock) { - String insertSql = "INSERT INTO LatestATStates (" - + "SELECT AT_address, height FROM ATs " - + "CROSS JOIN LATERAL(" - + "SELECT height FROM ATStates " - + "WHERE ATStates.AT_address = ATs.AT_address " - + "ORDER BY AT_address DESC, height DESC LIMIT 1" - + ") " - + ")"; - try { - this.repository.executeCheckedUpdate(insertSql); - } catch (SQLException e) { - repository.examineException(e); - throw new DataException("Unable to populate temporary latest AT states cache in repository", e); + // Rebuild cache of latest AT states that we can't trim + String deleteSql = "DELETE FROM LatestATStates"; + try { + this.repository.executeCheckedUpdate(deleteSql); + } catch (SQLException e) { + repository.examineException(e); + throw new DataException("Unable to delete temporary latest AT states cache from repository", e); + } + + String insertSql = "INSERT INTO LatestATStates (" + + "SELECT AT_address, height FROM ATs " + + "CROSS JOIN LATERAL(" + + "SELECT height FROM ATStates " + + "WHERE ATStates.AT_address = ATs.AT_address " + + "ORDER BY AT_address DESC, height DESC LIMIT 1" + + ") " + + ")"; + try { + this.repository.executeCheckedUpdate(insertSql); + } catch (SQLException e) { + repository.examineException(e); + throw new DataException("Unable to populate temporary latest AT states cache in repository", e); + } } } @@ -666,22 +672,28 @@ public class HSQLDBATRepository implements ATRepository { if (minHeight >= maxHeight) return 0; - // We're often called so no need to trim all states in one go. - // Limit updates to reduce CPU and memory load. - String sql = "DELETE FROM ATStatesData " - + "WHERE height BETWEEN ? AND ? " - + "AND NOT EXISTS(" + // latestATStatesLock is to prevent concurrent updates on LatestATStates + // that could result in one process using a partial or empty dataset + // because it was in the process of being rebuilt by another thread + synchronized (this.repository.latestATStatesLock) { + + // We're often called so no need to trim all states in one go. + // Limit updates to reduce CPU and memory load. + String sql = "DELETE FROM ATStatesData " + + "WHERE height BETWEEN ? AND ? " + + "AND NOT EXISTS(" + "SELECT TRUE FROM LatestATStates " + "WHERE LatestATStates.AT_address = ATStatesData.AT_address " + "AND LatestATStates.height = ATStatesData.height" - + ") " - + "LIMIT ?"; + + ") " + + "LIMIT ?"; - try { - return this.repository.executeCheckedUpdate(sql, minHeight, maxHeight, limit); - } catch (SQLException e) { - repository.examineException(e); - throw new DataException("Unable to trim AT states in repository", e); + try { + return this.repository.executeCheckedUpdate(sql, minHeight, maxHeight, limit); + } catch (SQLException e) { + repository.examineException(e); + throw new DataException("Unable to trim AT states in repository", e); + } } } @@ -719,57 +731,63 @@ public class HSQLDBATRepository implements ATRepository { @Override public int pruneAtStates(int minHeight, int maxHeight) throws DataException { - int deletedCount = 0; + // latestATStatesLock is to prevent concurrent updates on LatestATStates + // that could result in one process using a partial or empty dataset + // because it was in the process of being rebuilt by another thread + synchronized (this.repository.latestATStatesLock) { - for (int height=minHeight; height atAddresses = new ArrayList<>(); - String updateSql = "SELECT AT_address FROM LatestATStates WHERE height = ?"; - try (ResultSet resultSet = this.repository.checkedExecute(updateSql, height)) { - if (resultSet != null) { - do { - String atAddress = resultSet.getString(1); - atAddresses.add(atAddress); - - } while (resultSet.next()); - } - } catch (SQLException e) { - throw new DataException("Unable to fetch flagged accounts from repository", e); - } - - List atStates = this.getBlockATStatesAtHeight(height); - for (ATStateData atState : atStates) { - //LOGGER.info("Found atState {} at height {}", atState.getATAddress(), atState.getHeight()); + for (int height = minHeight; height < maxHeight; height++) { // Give up if we're stopping if (Controller.isStopping()) { return deletedCount; } - if (atAddresses.contains(atState.getATAddress())) { - // We don't want to delete this AT state because it is still active - LOGGER.info("Skipping atState {} at height {}", atState.getATAddress(), atState.getHeight()); - continue; + // Get latest AT states for this height + List atAddresses = new ArrayList<>(); + String updateSql = "SELECT AT_address FROM LatestATStates WHERE height = ?"; + try (ResultSet resultSet = this.repository.checkedExecute(updateSql, height)) { + if (resultSet != null) { + do { + String atAddress = resultSet.getString(1); + atAddresses.add(atAddress); + + } while (resultSet.next()); + } + } catch (SQLException e) { + throw new DataException("Unable to fetch flagged accounts from repository", e); } - // Safe to delete everything else for this height - try { - this.repository.delete("ATStates", "AT_address = ? AND height = ?", - atState.getATAddress(), atState.getHeight()); - deletedCount++; - } catch (SQLException e) { - throw new DataException("Unable to delete AT state data from repository", e); + List atStates = this.getBlockATStatesAtHeight(height); + for (ATStateData atState : atStates) { + //LOGGER.info("Found atState {} at height {}", atState.getATAddress(), atState.getHeight()); + + // Give up if we're stopping + if (Controller.isStopping()) { + return deletedCount; + } + + if (atAddresses.contains(atState.getATAddress())) { + // We don't want to delete this AT state because it is still active + LOGGER.info("Skipping atState {} at height {}", atState.getATAddress(), atState.getHeight()); + continue; + } + + // Safe to delete everything else for this height + try { + this.repository.delete("ATStates", "AT_address = ? AND height = ?", + atState.getATAddress(), atState.getHeight()); + deletedCount++; + } catch (SQLException e) { + throw new DataException("Unable to delete AT state data from repository", e); + } } } - } - return deletedCount; + return deletedCount; + } } diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java index 4d8e5043..3a947cd6 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java @@ -69,6 +69,7 @@ public class HSQLDBRepository implements Repository { protected final Map preparedStatementCache = new HashMap<>(); // We want the same object corresponding to the actual DB protected final Object trimHeightsLock = RepositoryManager.getRepositoryFactory(); + protected final Object latestATStatesLock = RepositoryManager.getRepositoryFactory(); private final ATRepository atRepository = new HSQLDBATRepository(this); private final AccountRepository accountRepository = new HSQLDBAccountRepository(this);