Fixing peer disconnections due to slow processing & Transaction expiry

Transaction expiry wasn't happening. Use NTP.getTime to check whether
transactions have expired. Also reject expired transactions when trying
to add them to unconfirmed pool.

Sometimes producing a task took way too long, causing massive
spikes in the number of threads and peer disconnections.

This is down to a repository pool exhaustion, so
RepositoryManager.getRepository() would block (for up to 5 minutes).

The key method at fault was Network.getConnectablePeer().

Various fixes:

NetworkProcessor's executor now reaps old threads after only 10 seconds
instead of the usual 60 seconds.

Change logging in Network to help diagnose disconnection and repository
issues.

RepositoryManager now has a tryRepository() call that is non-blocking
and returns null if repository pool is exhausted.

Repository pool size increased from default (10) to 100.

Pruning peers is now opportunistic, using tryRepository(), and returns
early if repository pool is exhausted.

getConnectablePeer() is now opportunistic, using tryRepository(), and
returns null (no peer candidate for connection) if repository pool
is exhausted.

Merging peers is not opportunistic, using tryRepository().

Peer ping interval increased from 8s to 20s.

HSQLDBRepositoryFactory now logs when getConnection() takes over 1000ms.

Added more trace-level logging to ExecuteProduceConsume to
highlight slow produceTask() calls.
This commit is contained in:
catbref 2019-08-09 13:30:26 +01:00
parent 1094db288e
commit fa0b7615a6
11 changed files with 266 additions and 99 deletions

View File

@ -0,0 +1,46 @@
package org.hsqldb.jdbc;
import java.sql.Connection;
import java.sql.SQLException;
import org.hsqldb.jdbc.JDBCPool;
import org.hsqldb.jdbc.pool.JDBCPooledConnection;
public class HSQLDBPool extends JDBCPool {
public HSQLDBPool(int poolSize) {
super(poolSize);
}
/**
* Tries to retrieve a new connection using the properties that have already been
* set.
*
* @return a connection to the data source, or null if no spare connections in pool
* @exception SQLException if a database access error occurs
*/
public Connection tryConnection() throws SQLException {
for (int i = 0; i < states.length(); i++) {
if (states.compareAndSet(i, RefState.available, RefState.allocated)) {
return connections[i].getConnection();
}
if (states.compareAndSet(i, RefState.empty, RefState.allocated)) {
try {
JDBCPooledConnection connection = (JDBCPooledConnection) source.getPooledConnection();
connection.addConnectionEventListener(this);
connection.addStatementEventListener(this);
connections[i] = connection;
return connections[i].getConnection();
} catch (SQLException e) {
states.set(i, RefState.empty);
}
}
}
return null;
}
}

View File

@ -47,7 +47,6 @@ import org.qora.api.model.ActivitySummary;
import org.qora.api.model.NodeInfo;
import org.qora.block.BlockChain;
import org.qora.controller.Controller;
import org.qora.controller.Synchronizer;
import org.qora.controller.Synchronizer.SynchronizationResult;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
@ -455,7 +454,7 @@ public class AdminResource {
SynchronizationResult syncResult;
try {
do {
syncResult = Synchronizer.getInstance().synchronize(targetPeer, true);
syncResult = Controller.getInstance().actuallySynchronize(targetPeer, true);
} while (syncResult == SynchronizationResult.OK);
} finally {
blockchainLock.unlock();

View File

@ -416,6 +416,9 @@ public class BlockChain {
repository.saveChanges();
}
BlockData lastBlockData = repository.getBlockRepository().getLastBlock();
Controller.getInstance().setChainTip(lastBlockData);
return true;
}
} finally {

View File

@ -15,6 +15,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
@ -102,6 +103,8 @@ public class Controller extends Thread {
private final String buildVersion;
private final long buildTimestamp; // seconds
private AtomicReference<BlockData> chainTip = new AtomicReference<>();
private long repositoryBackupTimestamp = startTime + REPOSITORY_BACKUP_PERIOD; // ms
private long ntpCheckTimestamp = startTime; // ms
private long deleteExpiredTimestamp = startTime + DELETE_EXPIRED_INTERVAL; // ms
@ -183,22 +186,20 @@ public class Controller extends Thread {
/** Returns current blockchain height, or 0 if there's a repository issue */
public int getChainHeight() {
try (final Repository repository = RepositoryManager.getRepository()) {
return repository.getBlockRepository().getBlockchainHeight();
} catch (DataException e) {
LOGGER.error("Repository issue when fetching blockchain height", e);
BlockData blockData = this.chainTip.get();
if (blockData == null)
return 0;
}
return blockData.getHeight();
}
/** Returns highest block, or null if there's a repository issue */
public BlockData getChainTip() {
try (final Repository repository = RepositoryManager.getRepository()) {
return repository.getBlockRepository().getLastBlock();
} catch (DataException e) {
LOGGER.error("Repository issue when fetching blockchain tip", e);
return null;
}
return this.chainTip.get();
}
public void setChainTip(BlockData blockData) {
this.chainTip.set(blockData);
}
public ReentrantLock getBlockchainLock() {
@ -238,7 +239,14 @@ public class Controller extends Thread {
LOGGER.info("Validating blockchain");
try {
BlockChain.validate();
LOGGER.info(String.format("Our chain height at start-up: %d", getInstance().getChainHeight()));
// Set initial chain height/tip
try (final Repository repository = RepositoryManager.getRepository()) {
BlockData blockData = repository.getBlockRepository().getLastBlock();
Controller.getInstance().setChainTip(blockData);
LOGGER.info(String.format("Our chain height at start-up: %d", blockData.getHeight()));
}
} catch (DataException e) {
LOGGER.error("Couldn't validate blockchain", e);
System.exit(2);
@ -403,67 +411,84 @@ public class Controller extends Thread {
int index = new SecureRandom().nextInt(peers.size());
Peer peer = peers.get(index);
noSyncOurBlockSignature = null;
noSyncPeerBlockSignature = null;
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, false);
switch (syncResult) {
case GENESIS_ONLY:
case NO_COMMON_BLOCK:
case TOO_FAR_BEHIND:
case TOO_DIVERGENT:
case INVALID_DATA:
// These are more serious results that warrant a cool-off
LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name()));
// Don't use this peer again for a while
PeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(NTP.getTime());
// Only save to repository if outbound peer
if (peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.warn("Repository issue while updating peer synchronization info", e);
}
break;
case INFERIOR_CHAIN:
noSyncOurBlockSignature = latestBlockData.getSignature();
noSyncPeerBlockSignature = peer.getLastBlockSignature();
// These are minor failure results so fine to try again
LOGGER.debug(() -> String.format("Refused to synchronize with peer %s (%s)", peer, syncResult.name()));
break;
case NO_REPLY:
case NO_BLOCKCHAIN_LOCK:
case REPOSITORY_ISSUE:
// These are minor failure results so fine to try again
LOGGER.debug(() -> String.format("Failed to synchronize with peer %s (%s)", peer, syncResult.name()));
break;
case OK:
requestSysTrayUpdate = true;
// fall-through...
case NOTHING_TO_DO:
noSyncOurBlockSignature = latestBlockData.getSignature();
noSyncPeerBlockSignature = peer.getLastBlockSignature();
LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name()));
break;
}
// Broadcast our new chain tip (if changed)
BlockData newLatestBlockData = getChainTip();
if (!Arrays.equals(newLatestBlockData.getSignature(), latestBlockData.getSignature()))
Network.getInstance().broadcast(recipientPeer -> Network.getInstance().buildHeightMessage(recipientPeer, newLatestBlockData));
actuallySynchronize(peer, false);
}
}
public SynchronizationResult actuallySynchronize(Peer peer, boolean force) throws InterruptedException {
BlockData latestBlockData = getChainTip();
noSyncOurBlockSignature = null;
noSyncPeerBlockSignature = null;
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, force);
switch (syncResult) {
case GENESIS_ONLY:
case NO_COMMON_BLOCK:
case TOO_FAR_BEHIND:
case TOO_DIVERGENT:
case INVALID_DATA:
// These are more serious results that warrant a cool-off
LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name()));
// Don't use this peer again for a while
PeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(NTP.getTime());
// Only save to repository if outbound peer
if (peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.warn("Repository issue while updating peer synchronization info", e);
}
break;
case INFERIOR_CHAIN:
noSyncOurBlockSignature = latestBlockData.getSignature();
noSyncPeerBlockSignature = peer.getLastBlockSignature();
// These are minor failure results so fine to try again
LOGGER.debug(() -> String.format("Refused to synchronize with peer %s (%s)", peer, syncResult.name()));
break;
case NO_REPLY:
case NO_BLOCKCHAIN_LOCK:
case REPOSITORY_ISSUE:
// These are minor failure results so fine to try again
LOGGER.debug(() -> String.format("Failed to synchronize with peer %s (%s)", peer, syncResult.name()));
break;
case OK:
requestSysTrayUpdate = true;
// fall-through...
case NOTHING_TO_DO:
noSyncOurBlockSignature = latestBlockData.getSignature();
noSyncPeerBlockSignature = peer.getLastBlockSignature();
LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name()));
break;
}
// Broadcast our new chain tip (if changed)
BlockData newLatestBlockData;
try (final Repository repository = RepositoryManager.getRepository()) {
newLatestBlockData = repository.getBlockRepository().getLastBlock();
this.setChainTip(newLatestBlockData);
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue when trying to fetch post-synchronization chain tip: %s", e.getMessage()));
return syncResult;
}
if (!Arrays.equals(newLatestBlockData.getSignature(), latestBlockData.getSignature()))
Network.getInstance().broadcast(recipientPeer -> Network.getInstance().buildHeightMessage(recipientPeer, newLatestBlockData));
return syncResult;
}
private void updateSysTray() {
if (NTP.getTime() == null) {
SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING CLOCK"));
SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING_CLOCK"));
return;
}
@ -480,11 +505,19 @@ public class Controller extends Thread {
}
public void deleteExpiredTransactions() {
try (final Repository repository = RepositoryManager.getRepository()) {
final Long now = NTP.getTime();
if (now == null)
return;
// This isn't critical so don't block for repository instance.
try (final Repository repository = RepositoryManager.tryRepository()) {
if (repository == null)
return;
List<TransactionData> transactions = repository.getTransactionRepository().getUnconfirmedTransactions();
for (TransactionData transactionData : transactions)
if (transactionData.getTimestamp() >= Transaction.getDeadline(transactionData)) {
if (now >= Transaction.getDeadline(transactionData)) {
LOGGER.info(String.format("Deleting expired, unconfirmed transaction %s", Base58.encode(transactionData.getSignature())));
repository.getTransactionRepository().delete(transactionData);
}
@ -574,7 +607,15 @@ public class Controller extends Thread {
public void onGeneratedBlock() {
// Broadcast our new height info
BlockData latestBlockData = getChainTip();
BlockData latestBlockData;
try (final Repository repository = RepositoryManager.getRepository()) {
latestBlockData = repository.getBlockRepository().getLastBlock();
this.setChainTip(latestBlockData);
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue when trying to fetch post-generation chain tip: %s", e.getMessage()));
return;
}
Network network = Network.getInstance();
network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData));

View File

@ -22,6 +22,8 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -160,8 +162,13 @@ public class Network extends Thread {
mergePeersLock = new ReentrantLock();
// We'll use a cached thread pool, but with more aggressive 10 second timeout.
ExecutorService networkExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
networkEPC = new NetworkProcessor(networkExecutor);
// Start up first networking thread
networkEPC = new NetworkProcessor();
networkEPC.start();
}
@ -272,10 +279,16 @@ public class Network extends Thread {
// Main thread
class NetworkProcessor extends ExecuteProduceConsume {
public NetworkProcessor(ExecutorService executor) {
super(executor);
}
@Override
protected Task produceTask(boolean canBlock) throws InterruptedException {
Task task;
// Only this method can block to reduce CPU spin
task = maybeProduceChannelTask(canBlock);
if (task != null)
return task;
@ -465,14 +478,14 @@ public class Network extends Thread {
try {
if (now == null) {
LOGGER.trace(String.format("Connection discarded from peer %s due to lack of NTP sync", socketChannel.getRemoteAddress()));
LOGGER.debug(String.format("Connection discarded from peer %s due to lack of NTP sync", socketChannel.getRemoteAddress()));
return;
}
synchronized (this.connectedPeers) {
if (connectedPeers.size() >= maxPeers) {
// We have enough peers
LOGGER.trace(String.format("Connection discarded from peer %s", socketChannel.getRemoteAddress()));
LOGGER.debug(String.format("Connection discarded from peer %s", socketChannel.getRemoteAddress()));
return;
}
@ -522,7 +535,11 @@ public class Network extends Thread {
peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name()));
// Prune 'old' peers from repository...
try (final Repository repository = RepositoryManager.getRepository()) {
// Pruning peers isn't critical so no need to block for a repository instance.
try (final Repository repository = RepositoryManager.tryRepository()) {
if (repository == null)
return;
// Fetch all known peers
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
@ -564,7 +581,11 @@ public class Network extends Thread {
private Peer getConnectablePeer() throws InterruptedException {
final long now = NTP.getTime();
try (final Repository repository = RepositoryManager.getRepository()) {
// We can't block here so use tryRepository(). We don't NEED to connect a new peer.
try (final Repository repository = RepositoryManager.tryRepository()) {
if (repository == null)
return null;
// Find an address to connect to
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
@ -626,7 +647,7 @@ public class Network extends Thread {
return newPeer;
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue while finding a connectable peer: %s", e.getMessage()));
LOGGER.error("Repository issue while finding a connectable peer", e);
return null;
}
}
@ -687,7 +708,7 @@ public class Network extends Thread {
repository.getNetworkRepository().delete(peer.getPeerData().getAddress());
repository.saveChanges();
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue while trying to delete inbound peer %s", peer));
LOGGER.error(String.format("Repository issue while trying to delete inbound peer %s", peer), e);
}
}
@ -855,7 +876,7 @@ public class Network extends Thread {
repository.getNetworkRepository().save(peer.getPeerData());
repository.saveChanges();
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue while trying to update outbound peer %s", peer));
LOGGER.error(String.format("Repository issue while trying to update outbound peer %s", peer), e);
}
// Start regular pings
@ -1053,7 +1074,11 @@ public class Network extends Thread {
return;
try {
try (final Repository repository = RepositoryManager.getRepository()) {
// Merging peers isn't critical so don't block for a repository instance.
try (final Repository repository = RepositoryManager.tryRepository()) {
if (repository == null)
return;
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
// Filter out duplicates
@ -1139,18 +1164,18 @@ public class Network extends Thread {
// Stop processing threads
try {
if (!this.networkEPC.shutdown(5000))
LOGGER.debug("Network threads failed to terminate");
LOGGER.warn("Network threads failed to terminate");
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for networking threads to terminate");
LOGGER.warn("Interrupted while waiting for networking threads to terminate");
}
// Stop broadcasts
this.broadcastExecutor.shutdownNow();
try {
if (!this.broadcastExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS))
LOGGER.debug("Broadcast threads failed to terminate");
LOGGER.warn("Broadcast threads failed to terminate");
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for broadcast threads failed to terminate");
LOGGER.warn("Interrupted while waiting for broadcast threads failed to terminate");
}
// Close all peer connections

View File

@ -51,7 +51,7 @@ public class Peer {
* <p>
* Just under every 30s is usually ideal to keep NAT mappings refreshed.
*/
private static final int PING_INTERVAL = 8000; // ms
private static final int PING_INTERVAL = 20_000; // ms
private volatile boolean isStopping = false;

View File

@ -4,6 +4,8 @@ public interface RepositoryFactory {
public Repository getRepository() throws DataException;
public Repository tryRepository() throws DataException;
public void close() throws DataException;
}

View File

@ -15,6 +15,13 @@ public abstract class RepositoryManager {
return repositoryFactory.getRepository();
}
public static Repository tryRepository() throws DataException {
if (repositoryFactory == null)
throw new DataException("No repository available");
return repositoryFactory.tryRepository();
}
public static void closeRepositoryFactory() throws DataException {
repositoryFactory.close();
repositoryFactory = null;

View File

@ -5,17 +5,25 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hsqldb.HsqlException;
import org.hsqldb.error.ErrorCode;
import org.hsqldb.jdbc.JDBCPool;
import org.hsqldb.jdbc.HSQLDBPool;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
import org.qora.repository.RepositoryFactory;
public class HSQLDBRepositoryFactory implements RepositoryFactory {
private static final Logger LOGGER = LogManager.getLogger(HSQLDBRepositoryFactory.class);
private static final int POOL_SIZE = 100;
/** Log getConnection() calls that take longer than this. (ms) */
private static final long SLOW_CONNECTION_THRESHOLD = 1000L;
private String connectionUrl;
private JDBCPool connectionPool;
private HSQLDBPool connectionPool;
public HSQLDBRepositoryFactory(String connectionUrl) throws DataException {
// one-time initialization goes in here
@ -36,7 +44,7 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory {
HSQLDBRepository.attemptRecovery(connectionUrl);
}
this.connectionPool = new JDBCPool();
this.connectionPool = new HSQLDBPool(POOL_SIZE);
this.connectionPool.setUrl(this.connectionUrl);
Properties properties = new Properties();
@ -60,14 +68,41 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory {
}
}
private Connection getConnection() throws SQLException {
Connection connection = this.connectionPool.getConnection();
@Override
public Repository tryRepository() throws DataException {
try {
return new HSQLDBRepository(this.tryConnection());
} catch (SQLException e) {
throw new DataException("Repository instantiation error", e);
}
}
private Connection getConnection() throws SQLException {
final long before = System.currentTimeMillis();
Connection connection = this.connectionPool.getConnection();
final long delay = System.currentTimeMillis() - before;
if (delay > SLOW_CONNECTION_THRESHOLD)
// This could be an indication of excessive repository use, or insufficient pool size
LOGGER.warn(String.format("Fetching repository connection from pool took %dms (threshold: %dms)"), delay, SLOW_CONNECTION_THRESHOLD);
setupConnection(connection);
return connection;
}
private Connection tryConnection() throws SQLException {
Connection connection = this.connectionPool.tryConnection();
if (connection == null)
return null;
setupConnection(connection);
return connection;
}
private void setupConnection(Connection connection) throws SQLException {
// Set transaction level
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
connection.setAutoCommit(false);
return connection;
}
@Override

View File

@ -517,16 +517,20 @@ public abstract class Transaction {
* @throws DataException
*/
public ValidationResult isValidUnconfirmed() throws DataException {
final Long now = NTP.getTime();
if (now == null)
return ValidationResult.CLOCK_NOT_SYNCED;
// Expired already?
if (now >= this.getDeadline())
return ValidationResult.TIMESTAMP_TOO_OLD;
// Transactions with a timestamp prior to latest block's timestamp are too old
BlockData latestBlock = repository.getBlockRepository().getLastBlock();
if (this.getDeadline() <= latestBlock.getTimestamp())
return ValidationResult.TIMESTAMP_TOO_OLD;
// Transactions with a timestamp too far into future are too new
final Long now = NTP.getTime();
if (now == null)
return ValidationResult.CLOCK_NOT_SYNCED;
long maxTimestamp = now + Settings.getInstance().getMaxTransactionTimestampFuture();
if (this.transactionData.getTimestamp() > maxTimestamp)
return ValidationResult.TIMESTAMP_TOO_NEW;

View File

@ -103,8 +103,13 @@ public abstract class ExecuteProduceConsume implements Runnable {
}
final boolean lambdaCanIdle = canBlock;
logger.trace(() -> String.format("[%d] producing, canBlock is %b...", Thread.currentThread().getId(), lambdaCanIdle));
logger.trace(() -> String.format("[%d] producing, activeThreadCount: %d, consumerCount: %d, canBlock is %b...",
Thread.currentThread().getId(), activeThreadCount, consumerCount, lambdaCanIdle));
final long now = System.currentTimeMillis();
task = produceTask(canBlock);
final long delay = System.currentTimeMillis() - now;
logger.trace(() -> String.format("[%d] producing took %dms", Thread.currentThread().getId(), delay));
}
if (task == null)