Synchronized LatestATStates, to make rebuildLatestAtStates() thread safe.

This commit is contained in:
CalDescent 2021-08-28 11:00:49 +01:00
parent dc030a42bb
commit 87595fd704
2 changed files with 90 additions and 71 deletions

View File

@ -604,28 +604,34 @@ public class HSQLDBATRepository implements ATRepository {
@Override @Override
public void rebuildLatestAtStates() throws DataException { public void rebuildLatestAtStates() throws DataException {
// Rebuild cache of latest AT states that we can't trim // latestATStatesLock is to prevent concurrent updates on LatestATStates
String deleteSql = "DELETE FROM LatestATStates"; // that could result in one process using a partial or empty dataset
try { // because it was in the process of being rebuilt by another thread
this.repository.executeCheckedUpdate(deleteSql); synchronized (this.repository.latestATStatesLock) {
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to delete temporary latest AT states cache from repository", e);
}
String insertSql = "INSERT INTO LatestATStates (" // Rebuild cache of latest AT states that we can't trim
+ "SELECT AT_address, height FROM ATs " String deleteSql = "DELETE FROM LatestATStates";
+ "CROSS JOIN LATERAL(" try {
+ "SELECT height FROM ATStates " this.repository.executeCheckedUpdate(deleteSql);
+ "WHERE ATStates.AT_address = ATs.AT_address " } catch (SQLException e) {
+ "ORDER BY AT_address DESC, height DESC LIMIT 1" repository.examineException(e);
+ ") " throw new DataException("Unable to delete temporary latest AT states cache from repository", e);
+ ")"; }
try {
this.repository.executeCheckedUpdate(insertSql); String insertSql = "INSERT INTO LatestATStates ("
} catch (SQLException e) { + "SELECT AT_address, height FROM ATs "
repository.examineException(e); + "CROSS JOIN LATERAL("
throw new DataException("Unable to populate temporary latest AT states cache in repository", e); + "SELECT height FROM ATStates "
+ "WHERE ATStates.AT_address = ATs.AT_address "
+ "ORDER BY AT_address DESC, height DESC LIMIT 1"
+ ") "
+ ")";
try {
this.repository.executeCheckedUpdate(insertSql);
} catch (SQLException e) {
repository.examineException(e);
throw new DataException("Unable to populate temporary latest AT states cache in repository", e);
}
} }
} }
@ -666,22 +672,28 @@ public class HSQLDBATRepository implements ATRepository {
if (minHeight >= maxHeight) if (minHeight >= maxHeight)
return 0; return 0;
// We're often called so no need to trim all states in one go. // latestATStatesLock is to prevent concurrent updates on LatestATStates
// Limit updates to reduce CPU and memory load. // that could result in one process using a partial or empty dataset
String sql = "DELETE FROM ATStatesData " // because it was in the process of being rebuilt by another thread
+ "WHERE height BETWEEN ? AND ? " synchronized (this.repository.latestATStatesLock) {
+ "AND NOT EXISTS("
// We're often called so no need to trim all states in one go.
// Limit updates to reduce CPU and memory load.
String sql = "DELETE FROM ATStatesData "
+ "WHERE height BETWEEN ? AND ? "
+ "AND NOT EXISTS("
+ "SELECT TRUE FROM LatestATStates " + "SELECT TRUE FROM LatestATStates "
+ "WHERE LatestATStates.AT_address = ATStatesData.AT_address " + "WHERE LatestATStates.AT_address = ATStatesData.AT_address "
+ "AND LatestATStates.height = ATStatesData.height" + "AND LatestATStates.height = ATStatesData.height"
+ ") " + ") "
+ "LIMIT ?"; + "LIMIT ?";
try { try {
return this.repository.executeCheckedUpdate(sql, minHeight, maxHeight, limit); return this.repository.executeCheckedUpdate(sql, minHeight, maxHeight, limit);
} catch (SQLException e) { } catch (SQLException e) {
repository.examineException(e); repository.examineException(e);
throw new DataException("Unable to trim AT states in repository", e); throw new DataException("Unable to trim AT states in repository", e);
}
} }
} }
@ -719,57 +731,63 @@ public class HSQLDBATRepository implements ATRepository {
@Override @Override
public int pruneAtStates(int minHeight, int maxHeight) throws DataException { public int pruneAtStates(int minHeight, int maxHeight) throws DataException {
int deletedCount = 0; // latestATStatesLock is to prevent concurrent updates on LatestATStates
// that could result in one process using a partial or empty dataset
// because it was in the process of being rebuilt by another thread
synchronized (this.repository.latestATStatesLock) {
for (int height=minHeight; height<maxHeight; height++) { int deletedCount = 0;
// Give up if we're stopping for (int height = minHeight; height < maxHeight; height++) {
if (Controller.isStopping()) {
return deletedCount;
}
// Get latest AT states for this height
List<String> atAddresses = new ArrayList<>();
String updateSql = "SELECT AT_address FROM LatestATStates WHERE height = ?";
try (ResultSet resultSet = this.repository.checkedExecute(updateSql, height)) {
if (resultSet != null) {
do {
String atAddress = resultSet.getString(1);
atAddresses.add(atAddress);
} while (resultSet.next());
}
} catch (SQLException e) {
throw new DataException("Unable to fetch flagged accounts from repository", e);
}
List<ATStateData> atStates = this.getBlockATStatesAtHeight(height);
for (ATStateData atState : atStates) {
//LOGGER.info("Found atState {} at height {}", atState.getATAddress(), atState.getHeight());
// Give up if we're stopping // Give up if we're stopping
if (Controller.isStopping()) { if (Controller.isStopping()) {
return deletedCount; return deletedCount;
} }
if (atAddresses.contains(atState.getATAddress())) { // Get latest AT states for this height
// We don't want to delete this AT state because it is still active List<String> atAddresses = new ArrayList<>();
LOGGER.info("Skipping atState {} at height {}", atState.getATAddress(), atState.getHeight()); String updateSql = "SELECT AT_address FROM LatestATStates WHERE height = ?";
continue; try (ResultSet resultSet = this.repository.checkedExecute(updateSql, height)) {
if (resultSet != null) {
do {
String atAddress = resultSet.getString(1);
atAddresses.add(atAddress);
} while (resultSet.next());
}
} catch (SQLException e) {
throw new DataException("Unable to fetch flagged accounts from repository", e);
} }
// Safe to delete everything else for this height List<ATStateData> atStates = this.getBlockATStatesAtHeight(height);
try { for (ATStateData atState : atStates) {
this.repository.delete("ATStates", "AT_address = ? AND height = ?", //LOGGER.info("Found atState {} at height {}", atState.getATAddress(), atState.getHeight());
atState.getATAddress(), atState.getHeight());
deletedCount++; // Give up if we're stopping
} catch (SQLException e) { if (Controller.isStopping()) {
throw new DataException("Unable to delete AT state data from repository", e); return deletedCount;
}
if (atAddresses.contains(atState.getATAddress())) {
// We don't want to delete this AT state because it is still active
LOGGER.info("Skipping atState {} at height {}", atState.getATAddress(), atState.getHeight());
continue;
}
// Safe to delete everything else for this height
try {
this.repository.delete("ATStates", "AT_address = ? AND height = ?",
atState.getATAddress(), atState.getHeight());
deletedCount++;
} catch (SQLException e) {
throw new DataException("Unable to delete AT state data from repository", e);
}
} }
} }
}
return deletedCount; return deletedCount;
}
} }

View File

@ -69,6 +69,7 @@ public class HSQLDBRepository implements Repository {
protected final Map<String, PreparedStatement> preparedStatementCache = new HashMap<>(); protected final Map<String, PreparedStatement> preparedStatementCache = new HashMap<>();
// We want the same object corresponding to the actual DB // We want the same object corresponding to the actual DB
protected final Object trimHeightsLock = RepositoryManager.getRepositoryFactory(); protected final Object trimHeightsLock = RepositoryManager.getRepositoryFactory();
protected final Object latestATStatesLock = RepositoryManager.getRepositoryFactory();
private final ATRepository atRepository = new HSQLDBATRepository(this); private final ATRepository atRepository = new HSQLDBATRepository(this);
private final AccountRepository accountRepository = new HSQLDBAccountRepository(this); private final AccountRepository accountRepository = new HSQLDBAccountRepository(this);