Incoming transactions are now added to a queue, and then processed soon after.

This solves a problem where incoming transactions could rarely obtain a blockchain lock (due to multiple transactions arriving at once) and therefore most messages were thrown away. It was also causing constant blockchain locks to be acquired, which would often prevent the synchronizer from running.
This commit is contained in:
CalDescent 2022-01-30 19:03:31 +00:00
parent 90f3d2568a
commit bd60c793be

View File

@ -26,6 +26,7 @@ import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -101,6 +102,7 @@ public class Controller extends Thread {
private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000L; // ms private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000L; // ms
private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms
private static final long RECOVERY_MODE_TIMEOUT = 10 * 60 * 1000L; // ms private static final long RECOVERY_MODE_TIMEOUT = 10 * 60 * 1000L; // ms
private static final int MAX_INCOMING_TRANSACTIONS = 5000;
// To do with online accounts list // To do with online accounts list
private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms
@ -153,6 +155,9 @@ public class Controller extends Thread {
/** Temporary estimate of synchronization progress for SysTray use. */ /** Temporary estimate of synchronization progress for SysTray use. */
private volatile int syncPercent = 0; private volatile int syncPercent = 0;
/** List of incoming transaction that are in the import queue */
private List<TransactionData> incomingTransactions = Collections.synchronizedList(new ArrayList<>());
/** Latest block signatures from other peers that we know are on inferior chains. */ /** Latest block signatures from other peers that we know are on inferior chains. */
List<ByteArray> inferiorChainSignatures = new ArrayList<>(); List<ByteArray> inferiorChainSignatures = new ArrayList<>();
@ -584,6 +589,9 @@ public class Controller extends Thread {
potentiallySynchronize(); potentiallySynchronize();
} }
// Process incoming transactions queue
processIncomingTransactionsQueue();
// Clean up arbitrary data request cache // Clean up arbitrary data request cache
ArbitraryDataManager.getInstance().cleanupRequestCache(now); ArbitraryDataManager.getInstance().cleanupRequestCache(now);
// Clean up arbitrary data queues and lists // Clean up arbitrary data queues and lists
@ -1497,50 +1505,72 @@ public class Controller extends Thread {
private void onNetworkTransactionMessage(Peer peer, Message message) { private void onNetworkTransactionMessage(Peer peer, Message message) {
TransactionMessage transactionMessage = (TransactionMessage) message; TransactionMessage transactionMessage = (TransactionMessage) message;
TransactionData transactionData = transactionMessage.getTransactionData(); TransactionData transactionData = transactionMessage.getTransactionData();
if (this.incomingTransactions.size() < MAX_INCOMING_TRANSACTIONS) {
this.incomingTransactions.add(transactionData);
}
}
/* private void processIncomingTransactionsQueue() {
* If we can't obtain blockchain lock immediately, try {
* e.g. Synchronizer is active, or another transaction is taking a while to validate, ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
* then we're using up a network thread for ages and clogging things up if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) {
* so bail out early LOGGER.info(() -> String.format("Too busy to process incoming transactions queue"));
*/ return;
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); }
if (!blockchainLock.tryLock()) { } catch (InterruptedException e) {
LOGGER.trace(() -> String.format("Too busy to import %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); LOGGER.info("Interrupted when trying to acquire blockchain lock");
return; return;
} }
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
Transaction transaction = Transaction.fromData(repository, transactionData);
// Check signature // Iterate through incoming transactions list
if (!transaction.isSignatureValid()) { synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList()
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); Iterator iterator = this.incomingTransactions.iterator();
return; while (iterator.hasNext()) {
if (isStopping) {
return;
}
TransactionData transactionData = (TransactionData) iterator.next();
Transaction transaction = Transaction.fromData(repository, transactionData);
// Check signature
if (!transaction.isSignatureValid()) {
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
ValidationResult validationResult = transaction.importAsUnconfirmed();
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
if (validationResult != ValidationResult.OK) {
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
iterator.remove();
}
} }
ValidationResult validationResult = transaction.importAsUnconfirmed();
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
return;
}
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
return;
}
if (validationResult != ValidationResult.OK) {
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s from peer %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
return;
}
LOGGER.debug(() -> String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
} catch (DataException e) { } catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); LOGGER.error(String.format("Repository issue while processing incoming transactions", e));
} finally { } finally {
blockchainLock.unlock(); blockchainLock.unlock();
LOGGER.info("[processIncomingTransactionsQueue] Released blockchain lock");
} }
} }