From 0fe2f226bca844ea757867c56d50ba38f0dfd60b Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 11:23:28 +0000 Subject: [PATCH 01/14] Added invalidUnconfirmedTransactions map An incoming invalid unconfirmed transaction will be added to this map if its timestamp is more than 30 minutes old. This should allow enough time and opportunities for it to be imported and included in a block (allowing for re-orgs which could switch its status from invalid to valid). Once added, it will be removed after an hour to allow for another chance to be requested from any peers that still have it. If invalid again, it's added back to the map for another hour. This fixes a 24 hour long loop, where invalid transactions are requested over and over from peers that have already imported them. It could be improved further by periodically removing invalid unconfirmed transactions from the database, but this will be a higher risk. The results of this feature should be less network traffic, and less blockchain locks (which should ultimately increase the responsiveness of the synchronizer). --- .../org/qortal/controller/Controller.java | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 51f91970..fdb513ae 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -11,18 +11,7 @@ import java.security.Security; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Deque; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -101,6 +90,11 @@ public class Controller extends Thread { private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms private static final int MAX_INCOMING_TRANSACTIONS = 5000; + /** Minimum time before considering an invalid unconfirmed transaction as "stale" */ + public static final long INVALID_TRANSACTION_STALE_TIMEOUT = 30 * 60 * 1000L; // ms + /** Minimum frequency to re-request stale unconfirmed transactions from peers, to recheck validity */ + public static final long INVALID_TRANSACTION_RECHECK_INTERVAL = 60 * 60 * 1000L; // ms + // To do with online accounts list private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 1 * 60 * 1000L; // ms @@ -147,6 +141,9 @@ public class Controller extends Thread { /** List of incoming transaction that are in the import queue */ private List incomingTransactions = Collections.synchronizedList(new ArrayList<>()); + /** List of recent invalid unconfirmed transactions */ + private Map invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>()); + /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */ private final ReentrantLock blockchainLock = new ReentrantLock(); @@ -557,6 +554,8 @@ public class Controller extends Thread { // Process incoming transactions queue processIncomingTransactionsQueue(); + // Clean up invalid incoming transactions list + cleanupInvalidTransactionsList(now); // Clean up arbitrary data request cache ArbitraryDataManager.getInstance().cleanupRequestCache(now); @@ -1351,6 +1350,12 @@ public class Controller extends Thread { if (validationResult != ValidationResult.OK) { LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + Long now = NTP.getTime(); + if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { + LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", Base58.encode(transactionData.getSignature())); + // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it + invalidUnconfirmedTransactions.put(transactionData.getSignature(), NTP.getTime()); + } iterator.remove(); continue; } @@ -1366,6 +1371,15 @@ public class Controller extends Thread { } } + private void cleanupInvalidTransactionsList(Long now) { + if (now == null) { + return; + } + // Periodically remove invalid unconfirmed transactions from the list, so that they can be fetched again + final long minimumTimestamp = now - INVALID_TRANSACTION_RECHECK_INTERVAL; + invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < minimumTimestamp); + } + private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) { GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; final byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); @@ -1561,6 +1575,12 @@ public class Controller extends Thread { try (final Repository repository = RepositoryManager.getRepository()) { for (byte[] signature : signatures) { + if (invalidUnconfirmedTransactions.get(signature) != null) { + // Previously invalid transaction - don't keep requesting it + // It will be periodically removed from invalidUnconfirmedTransactions to allow for rechecks + continue; + } + // Do we have it already? (Before requesting transaction data itself) if (repository.getTransactionRepository().exists(signature)) { LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(signature), peer)); From 8c03164ea5fed5165a70a1c1ee42e85c723ce68d Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 11:55:07 +0000 Subject: [PATCH 02/14] Don't add expired transactions to invalidUnconfirmedTransactions, as there is no need to keep track of these. --- src/main/java/org/qortal/controller/Controller.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index fdb513ae..816458a5 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1350,11 +1350,13 @@ public class Controller extends Thread { if (validationResult != ValidationResult.OK) { LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - Long now = NTP.getTime(); - if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { - LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", Base58.encode(transactionData.getSignature())); - // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it - invalidUnconfirmedTransactions.put(transactionData.getSignature(), NTP.getTime()); + if (validationResult != ValidationResult.TIMESTAMP_TOO_OLD) { + Long now = NTP.getTime(); + if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { + LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", Base58.encode(transactionData.getSignature())); + // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it + invalidUnconfirmedTransactions.put(transactionData.getSignature(), NTP.getTime()); + } } iterator.remove(); continue; From 08e06ba11ae89c75d74b3a4e2a15fee9d6746067 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 12:09:44 +0000 Subject: [PATCH 03/14] Fixed bugs preventing invalidUnconfirmedTransactions from working as intended. --- src/main/java/org/qortal/controller/Controller.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 816458a5..c01b1e48 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -142,7 +142,7 @@ public class Controller extends Thread { private List incomingTransactions = Collections.synchronizedList(new ArrayList<>()); /** List of recent invalid unconfirmed transactions */ - private Map invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>()); + private Map invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>()); /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */ private final ReentrantLock blockchainLock = new ReentrantLock(); @@ -1349,13 +1349,14 @@ public class Controller extends Thread { } if (validationResult != ValidationResult.OK) { - LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + final String signature58 = Base58.encode(transactionData.getSignature()); + LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58)); if (validationResult != ValidationResult.TIMESTAMP_TOO_OLD) { Long now = NTP.getTime(); if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { - LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", Base58.encode(transactionData.getSignature())); + LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it - invalidUnconfirmedTransactions.put(transactionData.getSignature(), NTP.getTime()); + invalidUnconfirmedTransactions.put(signature58, NTP.getTime()); } } iterator.remove(); @@ -1577,7 +1578,8 @@ public class Controller extends Thread { try (final Repository repository = RepositoryManager.getRepository()) { for (byte[] signature : signatures) { - if (invalidUnconfirmedTransactions.get(signature) != null) { + String signature58 = Base58.encode(signature); + if (invalidUnconfirmedTransactions.containsKey(signature58)) { // Previously invalid transaction - don't keep requesting it // It will be periodically removed from invalidUnconfirmedTransactions to allow for rechecks continue; From cfe0414d96d08259d27fee618becbf5189ea1576 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 12:35:41 +0000 Subject: [PATCH 04/14] Small rework of invalidUnconfirmedTransactions to specify the expiry time instead of the time added. This allows TIMESTAMP_TOO_OLD transactions to be tracked for a shorter time (10 minutes) than the other invalid transactions (60 minutes). Should reduce network traffic and db load around the time that transactions are expiring, as there is a lag before they are noticed and removed from each node. Due to the variance, it could cause other peers to request them again after deleting. They are now ignored for 10 minutes to avoid request spam. --- .../org/qortal/controller/Controller.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index c01b1e48..e69dc558 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -93,7 +93,10 @@ public class Controller extends Thread { /** Minimum time before considering an invalid unconfirmed transaction as "stale" */ public static final long INVALID_TRANSACTION_STALE_TIMEOUT = 30 * 60 * 1000L; // ms /** Minimum frequency to re-request stale unconfirmed transactions from peers, to recheck validity */ - public static final long INVALID_TRANSACTION_RECHECK_INTERVAL = 60 * 60 * 1000L; // ms + public static final long INVALID_TRANSACTION_RECHECK_INTERVAL = 60 * 60 * 1000L; // ms\ + /** Minimum frequency to re-request expired unconfirmed transactions from peers, to recheck validity + * This mainly exists to stop expired transactions from bloating the list */ + public static final long EXPIRED_TRANSACTION_RECHECK_INTERVAL = 10 * 60 * 1000L; // ms // To do with online accounts list private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms @@ -1351,13 +1354,17 @@ public class Controller extends Thread { if (validationResult != ValidationResult.OK) { final String signature58 = Base58.encode(transactionData.getSignature()); LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58)); - if (validationResult != ValidationResult.TIMESTAMP_TOO_OLD) { - Long now = NTP.getTime(); - if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { - LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); - // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it - invalidUnconfirmedTransactions.put(signature58, NTP.getTime()); + Long now = NTP.getTime(); + if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { + Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL; + if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) { + // Use shorter recheck interval for expired transactions + expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL; } + Long expiry = now + expiryLength; + LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); + // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it + invalidUnconfirmedTransactions.put(signature58, expiry); } iterator.remove(); continue; @@ -1379,8 +1386,7 @@ public class Controller extends Thread { return; } // Periodically remove invalid unconfirmed transactions from the list, so that they can be fetched again - final long minimumTimestamp = now - INVALID_TRANSACTION_RECHECK_INTERVAL; - invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < minimumTimestamp); + invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < now); } private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) { From 3c526db52e1b4503d8066564429509a7fc716190 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 13:03:01 +0000 Subject: [PATCH 05/14] Fixed bug in build manager which would prevent future builds until the core was restarted. --- .../qortal/controller/arbitrary/ArbitraryDataBuilderThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java index da7c7293..781c0a84 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java @@ -39,7 +39,7 @@ public class ArbitraryDataBuilderThread implements Runnable { Map.Entry next = buildManager.arbitraryDataBuildQueue .entrySet().stream() .filter(e -> e.getValue().isQueued()) - .findFirst().get(); + .findFirst().orElse(null); if (next == null) { continue; From 2740543abfec2973a6618f91ec4b5ace377f2ff8 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 13:04:58 +0000 Subject: [PATCH 06/14] Added "async" and "attempts" parameters to GET /arbitrary/{service}/{name}* endpoints. async = fail immediately with 404 if missing, and request in the background attempts = the number of times to request the data (synchronous mode only for now) --- .../api/resource/ArbitraryResource.java | 51 +++++++++++++------ .../qortal/arbitrary/ArbitraryDataReader.java | 12 ++++- .../arbitrary/ArbitraryDataRenderer.java | 2 +- 3 files changed, 47 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/qortal/api/resource/ArbitraryResource.java b/src/main/java/org/qortal/api/resource/ArbitraryResource.java index f588e9c9..84e53200 100644 --- a/src/main/java/org/qortal/api/resource/ArbitraryResource.java +++ b/src/main/java/org/qortal/api/resource/ArbitraryResource.java @@ -576,14 +576,16 @@ public class ArbitraryResource { @PathParam("service") Service service, @PathParam("name") String name, @QueryParam("filepath") String filepath, - @QueryParam("rebuild") boolean rebuild) { + @QueryParam("rebuild") boolean rebuild, + @QueryParam("async") boolean async, + @QueryParam("attempts") Integer attempts) { // Authentication can be bypassed in the settings, for those running public QDN nodes if (!Settings.getInstance().isQDNAuthBypassEnabled()) { Security.checkApiCallAllowed(request); } - return this.download(service, name, null, filepath, rebuild); + return this.download(service, name, null, filepath, rebuild, async, attempts); } @GET @@ -609,14 +611,16 @@ public class ArbitraryResource { @PathParam("name") String name, @PathParam("identifier") String identifier, @QueryParam("filepath") String filepath, - @QueryParam("rebuild") boolean rebuild) { + @QueryParam("rebuild") boolean rebuild, + @QueryParam("async") boolean async, + @QueryParam("attempts") Integer attempts) { // Authentication can be bypassed in the settings, for those running public QDN nodes if (!Settings.getInstance().isQDNAuthBypassEnabled()) { Security.checkApiCallAllowed(request); } - return this.download(service, name, identifier, filepath, rebuild); + return this.download(service, name, identifier, filepath, rebuild, async, attempts); } @@ -1027,30 +1031,45 @@ public class ArbitraryResource { } } - private HttpServletResponse download(Service service, String name, String identifier, String filepath, boolean rebuild) { + private HttpServletResponse download(Service service, String name, String identifier, String filepath, boolean rebuild, boolean async, Integer maxAttempts) { ArbitraryDataReader arbitraryDataReader = new ArbitraryDataReader(name, ArbitraryDataFile.ResourceIdType.NAME, service, identifier); try { int attempts = 0; + if (maxAttempts == null) { + maxAttempts = 5; + } // Loop until we have data - while (!Controller.isStopping()) { - attempts++; - if (!arbitraryDataReader.isBuilding()) { - try { - arbitraryDataReader.loadSynchronously(rebuild); - break; - } catch (MissingDataException e) { - if (attempts > 5) { - // Give up after 5 attempts - throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Data unavailable. Please try again later."); + if (async) { + // Asynchronous + arbitraryDataReader.loadAsynchronously(false); + } + else { + // Synchronous + while (!Controller.isStopping()) { + attempts++; + if (!arbitraryDataReader.isBuilding()) { + try { + arbitraryDataReader.loadSynchronously(rebuild); + break; + } catch (MissingDataException e) { + if (attempts > maxAttempts) { + // Give up after 5 attempts + throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Data unavailable. Please try again later."); + } } } + Thread.sleep(3000L); } - Thread.sleep(3000L); } + java.nio.file.Path outputPath = arbitraryDataReader.getFilePath(); + if (outputPath == null) { + // Assume the resource doesn't exist + throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.FILE_NOT_FOUND, "File not found"); + } if (filepath == null || filepath.isEmpty()) { // No file path supplied - so check if this is a single file resource diff --git a/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java b/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java index 619e5330..bb5641c2 100644 --- a/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java +++ b/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java @@ -122,9 +122,19 @@ public class ArbitraryDataReader { * This adds the build task to a queue, and the result will be cached when complete * To check the status of the build, periodically call isCachedDataAvailable() * Once it returns true, you can then use getFilePath() to access the data itself. + * + * @param overwrite - set to true to force rebuild an existing cache * @return true if added or already present in queue; false if not */ - public boolean loadAsynchronously() { + public boolean loadAsynchronously(boolean overwrite) { + ArbitraryDataCache cache = new ArbitraryDataCache(this.uncompressedPath, overwrite, + this.resourceId, this.resourceIdType, this.service, this.identifier); + if (cache.isCachedDataAvailable()) { + // Use cached data + this.filePath = this.uncompressedPath; + return true; + } + return ArbitraryDataBuildManager.getInstance().addToBuildQueue(this.createQueueItem()); } diff --git a/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java b/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java index 445ff2f6..e4d90b79 100644 --- a/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java +++ b/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java @@ -76,7 +76,7 @@ public class ArbitraryDataRenderer { if (!arbitraryDataReader.isCachedDataAvailable()) { // If async is requested, show a loading screen whilst build is in progress if (async) { - arbitraryDataReader.loadAsynchronously(); + arbitraryDataReader.loadAsynchronously(false); return this.getLoadingResponse(service, resourceId); } From b8aaf14cdc9187f9702bc035a34d96ba99e67a8a Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 15:34:06 +0000 Subject: [PATCH 07/14] Introduced ArbitraryDataFileRequestThread to allow for multiple concurrent file requests. This is likely a short term solution (to allow existing code to be repurposed) until replaced with a task-based approach, as this will allow for a much greater number of threads. --- .../arbitrary/ArbitraryDataFileManager.java | 75 ++--------- .../ArbitraryDataFileRequestThread.java | 117 ++++++++++++++++++ 2 files changed, 128 insertions(+), 64 deletions(-) create mode 100644 src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 1b544434..27433180 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -21,6 +21,8 @@ import org.qortal.utils.Triple; import java.security.SecureRandom; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; public class ArbitraryDataFileManager extends Thread { @@ -65,11 +67,16 @@ public class ArbitraryDataFileManager extends Thread { Thread.currentThread().setName("Arbitrary Data File Manager"); try { - while (!isStopping) { - Thread.sleep(1000); + // Use a fixed thread pool to execute the arbitrary data file requests + int threadCount = 10; + ExecutorService arbitraryDataFileRequestExecutor = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + arbitraryDataFileRequestExecutor.execute(new ArbitraryDataFileRequestThread()); + } - Long now = NTP.getTime(); - this.processFileHashes(now); + while (!isStopping) { + // Nothing to do yet + Thread.sleep(1000); } } catch (InterruptedException e) { // Fall-through to exit thread... @@ -81,66 +88,6 @@ public class ArbitraryDataFileManager extends Thread { this.interrupt(); } - private void processFileHashes(Long now) { - try (final Repository repository = RepositoryManager.getRepository()) { - - ArbitraryTransactionData arbitraryTransactionData = null; - byte[] signature = null; - byte[] hash = null; - Peer peer = null; - boolean shouldProcess = false; - - synchronized (arbitraryDataFileHashResponses) { - for (String hash58 : arbitraryDataFileHashResponses.keySet()) { - if (isStopping) { - return; - } - - Triple value = arbitraryDataFileHashResponses.get(hash58); - if (value != null) { - peer = value.getA(); - String signature58 = value.getB(); - Long timestamp = value.getC(); - - if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { - // Ignore - to be deleted - continue; - } - - hash = Base58.decode(hash58); - signature = Base58.decode(signature58); - - // Fetch the transaction data - arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - if (arbitraryTransactionData == null) { - continue; - } - - // We want to process this file - shouldProcess = true; - break; - } - } - } - - if (!shouldProcess) { - // Nothing to do - return; - } - - if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) { - return; - } - - String hash58 = Base58.encode(hash); - LOGGER.debug("Fetching file {} from peer {} via response queue...", hash58, peer); - this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); - - } catch (DataException e) { - LOGGER.info("Unable to process file hashes: {}", e.getMessage()); - } - } - public void cleanupRequestCache(Long now) { if (now == null) { diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java new file mode 100644 index 00000000..97704ae5 --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java @@ -0,0 +1,117 @@ +package org.qortal.controller.arbitrary; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.controller.Controller; +import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.network.Peer; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.utils.ArbitraryTransactionUtils; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; +import org.qortal.utils.Triple; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; + +public class ArbitraryDataFileRequestThread implements Runnable { + + private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class); + + public ArbitraryDataFileRequestThread() { + + } + + @Override + public void run() { + Thread.currentThread().setName("Arbitrary Data File Request Thread"); + + try { + while (!Controller.isStopping()) { + Thread.sleep(1000); + + Long now = NTP.getTime(); + this.processFileHashes(now); + } + } catch (InterruptedException e) { + // Fall-through to exit thread... + } + } + + private void processFileHashes(Long now) { + try (final Repository repository = RepositoryManager.getRepository()) { + ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance(); + + ArbitraryTransactionData arbitraryTransactionData = null; + byte[] signature = null; + byte[] hash = null; + Peer peer = null; + boolean shouldProcess = false; + + synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) { + Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.entrySet().iterator(); + while (iterator.hasNext()) { + if (Controller.isStopping()) { + return; + } + + Map.Entry entry = (Map.Entry) iterator.next(); + if (entry == null || entry.getKey() == null || entry.getValue() == null) { + iterator.remove(); + continue; + } + + String hash58 = (String) entry.getKey(); + Triple value = (Triple) entry.getValue(); + if (value == null) { + iterator.remove(); + continue; + } + + peer = value.getA(); + String signature58 = value.getB(); + Long timestamp = value.getC(); + + if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { + // Ignore - to be deleted + iterator.remove(); + continue; + } + + hash = Base58.decode(hash58); + signature = Base58.decode(signature58); + + // We want to process this file + shouldProcess = true; + iterator.remove(); + break; + } + } + + if (!shouldProcess) { + // Nothing to do + return; + } + + // Fetch the transaction data + arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); + if (arbitraryTransactionData == null) { + return; + } + + if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) { + return; + } + + String hash58 = Base58.encode(hash); + LOGGER.debug("Fetching file {} from peer {} via request thread...", hash58, peer); + arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); + + } catch (DataException e) { + LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); + } + } +} From ef838627c408e3b5138bf3bb147dc351053df17c Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 15:37:08 +0000 Subject: [PATCH 08/14] Stop asking for hashes from a peer if one fails. This fixes the request looping that occurs on when a peer is unable to serve files. --- .../arbitrary/ArbitraryDataFileManager.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 27433180..44411e92 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -133,13 +133,19 @@ public class ArbitraryDataFileManager extends Thread { if (receivedArbitraryDataFileMessage != null) { LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFileMessage.getArbitraryDataFile().getHash58(), peer, (endTime-startTime)); receivedAtLeastOneFile = true; + + // Remove this hash from arbitraryDataFileHashResponses now that we have received it + arbitraryDataFileHashResponses.remove(hash58); } else { LOGGER.debug("Peer {} didn't respond with data file {} for signature {}. Time taken: {} ms", peer, Base58.encode(hash), Base58.encode(signature), (endTime-startTime)); - } - // Remove this hash from arbitraryDataFileHashResponses now that we have tried to request it - arbitraryDataFileHashResponses.remove(hash58); + // Remove this hash from arbitraryDataFileHashResponses now that we have failed to receive it + arbitraryDataFileHashResponses.remove(hash58); + + // Stop asking for files from this peer + break; + } } else { LOGGER.trace("Already requesting data file {} for signature {}", arbitraryDataFile, Base58.encode(signature)); From fa447ccded27f32ab067ec7c6932068fec6c200f Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 15:37:21 +0000 Subject: [PATCH 09/14] Builder thread updates. --- .../controller/arbitrary/ArbitraryDataBuilderThread.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java index 781c0a84..8da18a2b 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java @@ -20,8 +20,9 @@ public class ArbitraryDataBuilderThread implements Runnable { } + @Override public void run() { - Thread.currentThread().setName("Arbitrary Data Build Manager"); + Thread.currentThread().setName("Arbitrary Data Builder Thread"); ArbitraryDataBuildManager buildManager = ArbitraryDataBuildManager.getInstance(); while (!Controller.isStopping()) { From 9ec4e24ef68150b72ce89d12dacec90b5bf1066c Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 15:45:40 +0000 Subject: [PATCH 10/14] Slightly optimized logic in fetchArbitraryDataFiles() --- .../arbitrary/ArbitraryDataFileManager.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 44411e92..c7326c96 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -170,22 +170,23 @@ public class ArbitraryDataFileManager extends Thread { // Invalidate the hosted transactions cache as we are now hosting something new ArbitraryDataStorageManager.getInstance().invalidateHostedTransactionsCache(); - } - // Check if we have all the files we need for this transaction - if (arbitraryDataFile.allFilesExist()) { + // Check if we have all the files we need for this transaction + if (arbitraryDataFile.allFilesExist()) { - // We have all the chunks for this transaction, so we should invalidate the transaction's name's - // data cache so that it is rebuilt the next time we serve it - ArbitraryDataManager.getInstance().invalidateCache(arbitraryTransactionData); + // We have all the chunks for this transaction, so we should invalidate the transaction's name's + // data cache so that it is rebuilt the next time we serve it + ArbitraryDataManager.getInstance().invalidateCache(arbitraryTransactionData); - // We may also need to broadcast to the network that we are now hosting files for this transaction, - // but only if these files are in accordance with our storage policy - if (ArbitraryDataStorageManager.getInstance().canStoreData(arbitraryTransactionData)) { - // Use a null peer address to indicate our own - Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, 0, Arrays.asList(signature)); - Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage); + // We may also need to broadcast to the network that we are now hosting files for this transaction, + // but only if these files are in accordance with our storage policy + if (ArbitraryDataStorageManager.getInstance().canStoreData(arbitraryTransactionData)) { + // Use a null peer address to indicate our own + Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, 0, Arrays.asList(signature)); + Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage); + } } + } return receivedAtLeastOneFile; From cd5ce6dd5e00cad775812528015eb8517c967e62 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 16:04:54 +0000 Subject: [PATCH 11/14] Don't remove from the relay map after a file is requested, as it may be needed by other peers. It will be cleaned up automatically after 60 seconds, so it is best to keep the data intact until then. --- .../qortal/controller/arbitrary/ArbitraryDataFileManager.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index c7326c96..cab4a93e 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -432,9 +432,6 @@ public class ArbitraryDataFileManager extends Thread { // Forward the message to this peer LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58); this.fetchArbitraryDataFile(peerToAsk, peer, signature, hash, message); - - // Remove from the map regardless of outcome, as the relay attempt is now considered complete - arbitraryRelayMap.remove(hash58); } else { LOGGER.debug("Peer {} not found in relay info", peer); From 84e4f9a1c1bd9cda93bee555554fba3c2e11c845 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 17:20:01 +0000 Subject: [PATCH 12/14] Rework of arbitraryRelayMap to keep track of multiple responses. Previously, only one peer's response for a hash would be remembered, even if multiple others reported back too. This would cause useful mapping to be lost. --- .../ArbitraryDataFileListManager.java | 7 +-- .../arbitrary/ArbitraryDataFileManager.java | 54 +++++++++++++++-- .../data/arbitrary/ArbitraryRelayInfo.java | 60 +++++++++++++++++++ 3 files changed, 111 insertions(+), 10 deletions(-) create mode 100644 src/main/java/org/qortal/data/arbitrary/ArbitraryRelayInfo.java diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java index 6337fc7c..46c2ff15 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -5,6 +5,7 @@ import org.apache.logging.log4j.Logger; import org.qortal.arbitrary.ArbitraryDataFile; import org.qortal.arbitrary.ArbitraryDataFileChunk; import org.qortal.controller.Controller; +import org.qortal.data.arbitrary.ArbitraryRelayInfo; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; import org.qortal.network.Network; @@ -477,10 +478,8 @@ public class ArbitraryDataFileListManager { Long now = NTP.getTime(); for (byte[] hash : hashes) { String hash58 = Base58.encode(hash); - Triple value = new Triple<>(signature58, peer, now); - if (arbitraryDataFileManager.arbitraryRelayMap.putIfAbsent(hash58, value) == null) { - LOGGER.debug("Added {} to relay map: {}, {}, {}", hash58, signature58, peer, now); - } + ArbitraryRelayInfo relayMap = new ArbitraryRelayInfo(hash58, signature58, peer, now); + ArbitraryDataFileManager.getInstance().addToRelayMap(relayMap); } // Forward to requesting peer diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index cab4a93e..8461448e 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -4,6 +4,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.arbitrary.ArbitraryDataFile; import org.qortal.controller.Controller; +import org.qortal.data.arbitrary.ArbitraryRelayInfo; import org.qortal.data.network.ArbitraryPeerData; import org.qortal.data.network.PeerData; import org.qortal.data.transaction.ArbitraryTransactionData; @@ -39,10 +40,9 @@ public class ArbitraryDataFileManager extends Thread { private Map arbitraryDataFileRequests = Collections.synchronizedMap(new HashMap<>()); /** - * Map to keep track of hashes that we might need to relay, keyed by the hash of the file (base58 encoded). - * Value is comprised of the base58-encoded signature, the peer that is hosting it, and the timestamp that it was added + * Map to keep track of hashes that we might need to relay */ - public Map> arbitraryRelayMap = Collections.synchronizedMap(new HashMap<>()); + public List arbitraryRelayMap = Collections.synchronizedList(new ArrayList<>()); /** * Map to keep track of any arbitrary data file hash responses @@ -97,7 +97,7 @@ public class ArbitraryDataFileManager extends Thread { arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < requestMinimumTimestamp); final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT; - arbitraryRelayMap.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); + arbitraryRelayMap.removeIf(entry -> entry == null || entry.getTimestamp() == null || entry.getTimestamp() < relayMinimumTimestamp); arbitraryDataFileHashResponses.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); } @@ -391,6 +391,48 @@ public class ArbitraryDataFileManager extends Thread { } + // Relays + + private List getRelayInfoListForHash(String hash58) { + synchronized (arbitraryRelayMap) { + return arbitraryRelayMap.stream() + .filter(relayInfo -> Objects.equals(relayInfo.getHash58(), hash58)) + .collect(Collectors.toList()); + } + } + + private ArbitraryRelayInfo getRandomRelayInfoEntryForHash(String hash58) { + LOGGER.info("Fetching random relay info for hash: {}", hash58); + List relayInfoList = this.getRelayInfoListForHash(hash58); + if (relayInfoList != null && !relayInfoList.isEmpty()) { + + // Pick random item + int index = new SecureRandom().nextInt(relayInfoList.size()); + LOGGER.info("Returning random relay info for hash: {} (index {})", hash58, index); + return relayInfoList.get(index); + } + LOGGER.info("No relay info exists for hash: {}", hash58); + return null; + } + + public void addToRelayMap(ArbitraryRelayInfo newEntry) { + if (newEntry == null || !newEntry.isValid()) { + return; + } + + // Remove existing entry for this peer if it exists, to renew the timestamp + this.removeFromRelayMap(newEntry); + + // Re-add + arbitraryRelayMap.add(newEntry); + LOGGER.debug("Added entry to relay map: {}", newEntry); + } + + private void removeFromRelayMap(ArbitraryRelayInfo entry) { + arbitraryRelayMap.removeIf(relayInfo -> relayInfo.equals(entry)); + } + + // Network handlers public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) { @@ -409,7 +451,7 @@ public class ArbitraryDataFileManager extends Thread { try { ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature); - Triple relayInfo = this.arbitraryRelayMap.get(hash58); + ArbitraryRelayInfo relayInfo = this.getRandomRelayInfoEntryForHash(hash58); if (arbitraryDataFile.exists()) { LOGGER.trace("Hash {} exists", hash58); @@ -426,7 +468,7 @@ public class ArbitraryDataFileManager extends Thread { else if (relayInfo != null) { LOGGER.debug("We have relay info for hash {}", Base58.encode(hash)); // We need to ask this peer for the file - Peer peerToAsk = relayInfo.getB(); + Peer peerToAsk = relayInfo.getPeer(); if (peerToAsk != null) { // Forward the message to this peer diff --git a/src/main/java/org/qortal/data/arbitrary/ArbitraryRelayInfo.java b/src/main/java/org/qortal/data/arbitrary/ArbitraryRelayInfo.java new file mode 100644 index 00000000..94f41d18 --- /dev/null +++ b/src/main/java/org/qortal/data/arbitrary/ArbitraryRelayInfo.java @@ -0,0 +1,60 @@ +package org.qortal.data.arbitrary; + +import org.qortal.network.Peer; +import java.util.Objects; + +public class ArbitraryRelayInfo { + + private final String hash58; + private final String signature58; + private final Peer peer; + private final Long timestamp; + + public ArbitraryRelayInfo(String hash58, String signature58, Peer peer, Long timestamp) { + this.hash58 = hash58; + this.signature58 = signature58; + this.peer = peer; + this.timestamp = timestamp; + } + + public boolean isValid() { + return this.getHash58() != null && this.getSignature58() != null + && this.getPeer() != null && this.getTimestamp() != null; + } + + public String getHash58() { + return this.hash58; + } + + public String getSignature58() { + return signature58; + } + + public Peer getPeer() { + return peer; + } + + public Long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return String.format("%s = %s, %s, %d", this.hash58, this.signature58, this.peer, this.timestamp); + } + + @Override + public boolean equals(Object other) { + if (other == this) + return true; + + if (!(other instanceof ArbitraryRelayInfo)) + return false; + + ArbitraryRelayInfo otherRelayInfo = (ArbitraryRelayInfo) other; + + return this.peer == otherRelayInfo.getPeer() + && Objects.equals(this.hash58, otherRelayInfo.getHash58()) + && Objects.equals(this.signature58, otherRelayInfo.getSignature58()); + } +} From 3e0306f6467d51fd4ede6dca8c056c843b273ab4 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 17:29:00 +0000 Subject: [PATCH 13/14] Increased minPeerConnectionTime and maxPeerConnectionTime to reduce the chances of forced connections during relays. An alternate option would be to avoid force disconnecting while relays are in progress, but some nodes could have active relays 100% of the time and therefore would never recycle their peers. So it is simpler to just increase the average peer connection time for everyone. --- src/main/java/org/qortal/settings/Settings.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/qortal/settings/Settings.java b/src/main/java/org/qortal/settings/Settings.java index 41b69114..dd62189f 100644 --- a/src/main/java/org/qortal/settings/Settings.java +++ b/src/main/java/org/qortal/settings/Settings.java @@ -202,9 +202,9 @@ public class Settings { private boolean allowConnectionsWithOlderPeerVersions = true; /** Minimum time (in seconds) that we should attempt to remain connected to a peer for */ - private int minPeerConnectionTime = 2 * 60; // seconds + private int minPeerConnectionTime = 5 * 60; // seconds /** Maximum time (in seconds) that we should attempt to remain connected to a peer for */ - private int maxPeerConnectionTime = 20 * 60; // seconds + private int maxPeerConnectionTime = 60 * 60; // seconds /** Whether to sync multiple blocks at once in normal operation */ private boolean fastSyncEnabled = true; From 99f6bb5ac6b17f8e15782d5a2c70883e0f6606cc Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 18:32:43 +0000 Subject: [PATCH 14/14] Reorganized some controller methods. --- .../org/qortal/controller/Controller.java | 339 +++++++++--------- 1 file changed, 174 insertions(+), 165 deletions(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index e69dc558..7c3caad5 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -822,6 +822,103 @@ public class Controller extends Thread { } } + // Incoming transactions queue + + private void processIncomingTransactionsQueue() { + if (this.incomingTransactions.size() == 0) { + // Don't bother locking if there are no new transactions to process + return; + } + + if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) { + // Prioritize syncing, and don't attempt to lock + return; + } + + try { + ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { + LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue")); + return; + } + } catch (InterruptedException e) { + LOGGER.info("Interrupted when trying to acquire blockchain lock"); + return; + } + + try (final Repository repository = RepositoryManager.getRepository()) { + + // Iterate through incoming transactions list + synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList() + Iterator iterator = this.incomingTransactions.iterator(); + while (iterator.hasNext()) { + if (isStopping) { + return; + } + + TransactionData transactionData = (TransactionData) iterator.next(); + Transaction transaction = Transaction.fromData(repository, transactionData); + + // Check signature + if (!transaction.isSignatureValid()) { + LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } + + ValidationResult validationResult = transaction.importAsUnconfirmed(); + + if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { + LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } + + if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { + LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } + + if (validationResult != ValidationResult.OK) { + final String signature58 = Base58.encode(transactionData.getSignature()); + LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58)); + Long now = NTP.getTime(); + if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { + Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL; + if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) { + // Use shorter recheck interval for expired transactions + expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL; + } + Long expiry = now + expiryLength; + LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); + // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it + invalidUnconfirmedTransactions.put(signature58, expiry); + } + iterator.remove(); + continue; + } + + LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + iterator.remove(); + } + } + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while processing incoming transactions", e)); + } finally { + blockchainLock.unlock(); + } + } + + private void cleanupInvalidTransactionsList(Long now) { + if (now == null) { + return; + } + // Periodically remove invalid unconfirmed transactions from the list, so that they can be fetched again + invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < now); + } + + // Shutdown public void shutdown() { @@ -1295,100 +1392,6 @@ public class Controller extends Thread { } } - private void processIncomingTransactionsQueue() { - if (this.incomingTransactions.size() == 0) { - // Don't bother locking if there are no new transactions to process - return; - } - - if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) { - // Prioritize syncing, and don't attempt to lock - return; - } - - try { - ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); - if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { - LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue")); - return; - } - } catch (InterruptedException e) { - LOGGER.info("Interrupted when trying to acquire blockchain lock"); - return; - } - - try (final Repository repository = RepositoryManager.getRepository()) { - - // Iterate through incoming transactions list - synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList() - Iterator iterator = this.incomingTransactions.iterator(); - while (iterator.hasNext()) { - if (isStopping) { - return; - } - - TransactionData transactionData = (TransactionData) iterator.next(); - Transaction transaction = Transaction.fromData(repository, transactionData); - - // Check signature - if (!transaction.isSignatureValid()) { - LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } - - ValidationResult validationResult = transaction.importAsUnconfirmed(); - - if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { - LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } - - if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { - LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } - - if (validationResult != ValidationResult.OK) { - final String signature58 = Base58.encode(transactionData.getSignature()); - LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58)); - Long now = NTP.getTime(); - if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { - Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL; - if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) { - // Use shorter recheck interval for expired transactions - expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL; - } - Long expiry = now + expiryLength; - LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); - // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it - invalidUnconfirmedTransactions.put(signature58, expiry); - } - iterator.remove(); - continue; - } - - LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - iterator.remove(); - } - } - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while processing incoming transactions", e)); - } finally { - blockchainLock.unlock(); - } - } - - private void cleanupInvalidTransactionsList(Long now) { - if (now == null) { - return; - } - // Periodically remove invalid unconfirmed transactions from the list, so that they can be fetched again - invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < now); - } - private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) { GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; final byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); @@ -1784,88 +1787,94 @@ public class Controller extends Thread { private void sendOurOnlineAccountsInfo() { final Long now = NTP.getTime(); - if (now == null) - return; + if (now != null) { - List mintingAccounts; - try (final Repository repository = RepositoryManager.getRepository()) { - mintingAccounts = repository.getAccountRepository().getMintingAccounts(); + List mintingAccounts; + try (final Repository repository = RepositoryManager.getRepository()) { + mintingAccounts = repository.getAccountRepository().getMintingAccounts(); - // We have no accounts, but don't reset timestamp - if (mintingAccounts.isEmpty()) - return; + // We have no accounts, but don't reset timestamp + if (mintingAccounts.isEmpty()) + return; - // Only reward-share accounts allowed - Iterator iterator = mintingAccounts.iterator(); - while (iterator.hasNext()) { - MintingAccountData mintingAccountData = iterator.next(); - - RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(mintingAccountData.getPublicKey()); - if (rewardShareData == null) { - // Reward-share doesn't even exist - probably not a good sign - iterator.remove(); - continue; - } - - Account mintingAccount = new Account(repository, rewardShareData.getMinter()); - if (!mintingAccount.canMint()) { - // Minting-account component of reward-share can no longer mint - disregard - iterator.remove(); - continue; - } - } - } catch (DataException e) { - LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage())); - return; - } - - // 'current' timestamp - final long onlineAccountsTimestamp = Controller.toOnlineAccountTimestamp(now); - boolean hasInfoChanged = false; - - byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp); - List ourOnlineAccounts = new ArrayList<>(); - - MINTING_ACCOUNTS: - for (MintingAccountData mintingAccountData : mintingAccounts) { - PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey()); - - byte[] signature = mintingAccount.sign(timestampBytes); - byte[] publicKey = mintingAccount.getPublicKey(); - - // Our account is online - OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey); - synchronized (this.onlineAccounts) { - Iterator iterator = this.onlineAccounts.iterator(); + // Only reward-share accounts allowed + Iterator iterator = mintingAccounts.iterator(); + int i = 0; while (iterator.hasNext()) { - OnlineAccountData existingOnlineAccountData = iterator.next(); + MintingAccountData mintingAccountData = iterator.next(); - if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) { - // If our online account is already present, with same timestamp, then move on to next mintingAccount - if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp) - continue MINTING_ACCOUNTS; - - // If our online account is already present, but with older timestamp, then remove it + RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(mintingAccountData.getPublicKey()); + if (rewardShareData == null) { + // Reward-share doesn't even exist - probably not a good sign iterator.remove(); - break; + continue; + } + + Account mintingAccount = new Account(repository, rewardShareData.getMinter()); + if (!mintingAccount.canMint()) { + // Minting-account component of reward-share can no longer mint - disregard + iterator.remove(); + continue; + } + + if (++i > 2) { + iterator.remove(); + continue; } } - - this.onlineAccounts.add(ourOnlineAccountData); + } catch (DataException e) { + LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage())); + return; } - LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp)); - ourOnlineAccounts.add(ourOnlineAccountData); - hasInfoChanged = true; + // 'current' timestamp + final long onlineAccountsTimestamp = Controller.toOnlineAccountTimestamp(now); + boolean hasInfoChanged = false; + + byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp); + List ourOnlineAccounts = new ArrayList<>(); + + MINTING_ACCOUNTS: + for (MintingAccountData mintingAccountData : mintingAccounts) { + PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey()); + + byte[] signature = mintingAccount.sign(timestampBytes); + byte[] publicKey = mintingAccount.getPublicKey(); + + // Our account is online + OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey); + synchronized (this.onlineAccounts) { + Iterator iterator = this.onlineAccounts.iterator(); + while (iterator.hasNext()) { + OnlineAccountData existingOnlineAccountData = iterator.next(); + + if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) { + // If our online account is already present, with same timestamp, then move on to next mintingAccount + if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp) + continue MINTING_ACCOUNTS; + + // If our online account is already present, but with older timestamp, then remove it + iterator.remove(); + break; + } + } + + this.onlineAccounts.add(ourOnlineAccountData); + } + + LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp)); + ourOnlineAccounts.add(ourOnlineAccountData); + hasInfoChanged = true; + } + + if (!hasInfoChanged) + return; + + Message message = new OnlineAccountsMessage(ourOnlineAccounts); + Network.getInstance().broadcast(peer -> message); + + LOGGER.trace(() -> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp)); } - - if (!hasInfoChanged) - return; - - Message message = new OnlineAccountsMessage(ourOnlineAccounts); - Network.getInstance().broadcast(peer -> message); - - LOGGER.trace(()-> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp)); } public static long toOnlineAccountTimestamp(long timestamp) {