Added invalidUnconfirmedTransactions map

An incoming invalid unconfirmed transaction will be added to this map if its timestamp is more than 30 minutes old. This should allow enough time and opportunities for it to be imported and included in a block (allowing for re-orgs which could switch its status from invalid to valid).

Once added, it will be removed after an hour to allow for another chance to be requested from any peers that still have it. If invalid again, it's added back to the map for another hour.

This fixes a 24 hour long loop, where invalid transactions are requested over and over from peers that have already imported them. It could be improved further by periodically removing invalid unconfirmed transactions from the database, but this will be a higher risk.

The results of this feature should be less network traffic, and less blockchain locks (which should ultimately increase the responsiveness of the synchronizer).
This commit is contained in:
CalDescent 2022-02-06 11:23:28 +00:00
parent 98831a9449
commit 0fe2f226bc

View File

@ -11,18 +11,7 @@ import java.security.Security;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayDeque; import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -101,6 +90,11 @@ public class Controller extends Thread {
private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms
private static final int MAX_INCOMING_TRANSACTIONS = 5000; private static final int MAX_INCOMING_TRANSACTIONS = 5000;
/** Minimum time before considering an invalid unconfirmed transaction as "stale" */
public static final long INVALID_TRANSACTION_STALE_TIMEOUT = 30 * 60 * 1000L; // ms
/** Minimum frequency to re-request stale unconfirmed transactions from peers, to recheck validity */
public static final long INVALID_TRANSACTION_RECHECK_INTERVAL = 60 * 60 * 1000L; // ms
// To do with online accounts list // To do with online accounts list
private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 1 * 60 * 1000L; // ms private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 1 * 60 * 1000L; // ms
@ -147,6 +141,9 @@ public class Controller extends Thread {
/** List of incoming transaction that are in the import queue */ /** List of incoming transaction that are in the import queue */
private List<TransactionData> incomingTransactions = Collections.synchronizedList(new ArrayList<>()); private List<TransactionData> incomingTransactions = Collections.synchronizedList(new ArrayList<>());
/** List of recent invalid unconfirmed transactions */
private Map<byte[], Long> invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>());
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */ /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */
private final ReentrantLock blockchainLock = new ReentrantLock(); private final ReentrantLock blockchainLock = new ReentrantLock();
@ -557,6 +554,8 @@ public class Controller extends Thread {
// Process incoming transactions queue // Process incoming transactions queue
processIncomingTransactionsQueue(); processIncomingTransactionsQueue();
// Clean up invalid incoming transactions list
cleanupInvalidTransactionsList(now);
// Clean up arbitrary data request cache // Clean up arbitrary data request cache
ArbitraryDataManager.getInstance().cleanupRequestCache(now); ArbitraryDataManager.getInstance().cleanupRequestCache(now);
@ -1351,6 +1350,12 @@ public class Controller extends Thread {
if (validationResult != ValidationResult.OK) { if (validationResult != ValidationResult.OK) {
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
Long now = NTP.getTime();
if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) {
LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", Base58.encode(transactionData.getSignature()));
// Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it
invalidUnconfirmedTransactions.put(transactionData.getSignature(), NTP.getTime());
}
iterator.remove(); iterator.remove();
continue; continue;
} }
@ -1366,6 +1371,15 @@ public class Controller extends Thread {
} }
} }
private void cleanupInvalidTransactionsList(Long now) {
if (now == null) {
return;
}
// Periodically remove invalid unconfirmed transactions from the list, so that they can be fetched again
final long minimumTimestamp = now - INVALID_TRANSACTION_RECHECK_INTERVAL;
invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < minimumTimestamp);
}
private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) { private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) {
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
final byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); final byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
@ -1561,6 +1575,12 @@ public class Controller extends Thread {
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
for (byte[] signature : signatures) { for (byte[] signature : signatures) {
if (invalidUnconfirmedTransactions.get(signature) != null) {
// Previously invalid transaction - don't keep requesting it
// It will be periodically removed from invalidUnconfirmedTransactions to allow for rechecks
continue;
}
// Do we have it already? (Before requesting transaction data itself) // Do we have it already? (Before requesting transaction data itself)
if (repository.getTransactionRepository().exists(signature)) { if (repository.getTransactionRepository().exists(signature)) {
LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(signature), peer)); LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(signature), peer));