Merge branch 'master' into block-minter-updates

This commit is contained in:
CalDescent 2022-02-06 18:40:32 +00:00
commit 41c4e0c83e
10 changed files with 508 additions and 270 deletions

View File

@ -576,14 +576,16 @@ public class ArbitraryResource {
@PathParam("service") Service service,
@PathParam("name") String name,
@QueryParam("filepath") String filepath,
@QueryParam("rebuild") boolean rebuild) {
@QueryParam("rebuild") boolean rebuild,
@QueryParam("async") boolean async,
@QueryParam("attempts") Integer attempts) {
// Authentication can be bypassed in the settings, for those running public QDN nodes
if (!Settings.getInstance().isQDNAuthBypassEnabled()) {
Security.checkApiCallAllowed(request);
}
return this.download(service, name, null, filepath, rebuild);
return this.download(service, name, null, filepath, rebuild, async, attempts);
}
@GET
@ -609,14 +611,16 @@ public class ArbitraryResource {
@PathParam("name") String name,
@PathParam("identifier") String identifier,
@QueryParam("filepath") String filepath,
@QueryParam("rebuild") boolean rebuild) {
@QueryParam("rebuild") boolean rebuild,
@QueryParam("async") boolean async,
@QueryParam("attempts") Integer attempts) {
// Authentication can be bypassed in the settings, for those running public QDN nodes
if (!Settings.getInstance().isQDNAuthBypassEnabled()) {
Security.checkApiCallAllowed(request);
}
return this.download(service, name, identifier, filepath, rebuild);
return this.download(service, name, identifier, filepath, rebuild, async, attempts);
}
@ -1027,30 +1031,45 @@ public class ArbitraryResource {
}
}
private HttpServletResponse download(Service service, String name, String identifier, String filepath, boolean rebuild) {
private HttpServletResponse download(Service service, String name, String identifier, String filepath, boolean rebuild, boolean async, Integer maxAttempts) {
ArbitraryDataReader arbitraryDataReader = new ArbitraryDataReader(name, ArbitraryDataFile.ResourceIdType.NAME, service, identifier);
try {
int attempts = 0;
if (maxAttempts == null) {
maxAttempts = 5;
}
// Loop until we have data
while (!Controller.isStopping()) {
attempts++;
if (!arbitraryDataReader.isBuilding()) {
try {
arbitraryDataReader.loadSynchronously(rebuild);
break;
} catch (MissingDataException e) {
if (attempts > 5) {
// Give up after 5 attempts
throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Data unavailable. Please try again later.");
if (async) {
// Asynchronous
arbitraryDataReader.loadAsynchronously(false);
}
else {
// Synchronous
while (!Controller.isStopping()) {
attempts++;
if (!arbitraryDataReader.isBuilding()) {
try {
arbitraryDataReader.loadSynchronously(rebuild);
break;
} catch (MissingDataException e) {
if (attempts > maxAttempts) {
// Give up after 5 attempts
throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Data unavailable. Please try again later.");
}
}
}
Thread.sleep(3000L);
}
Thread.sleep(3000L);
}
java.nio.file.Path outputPath = arbitraryDataReader.getFilePath();
if (outputPath == null) {
// Assume the resource doesn't exist
throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.FILE_NOT_FOUND, "File not found");
}
if (filepath == null || filepath.isEmpty()) {
// No file path supplied - so check if this is a single file resource

View File

@ -122,9 +122,19 @@ public class ArbitraryDataReader {
* This adds the build task to a queue, and the result will be cached when complete
* To check the status of the build, periodically call isCachedDataAvailable()
* Once it returns true, you can then use getFilePath() to access the data itself.
*
* @param overwrite - set to true to force rebuild an existing cache
* @return true if added or already present in queue; false if not
*/
public boolean loadAsynchronously() {
public boolean loadAsynchronously(boolean overwrite) {
ArbitraryDataCache cache = new ArbitraryDataCache(this.uncompressedPath, overwrite,
this.resourceId, this.resourceIdType, this.service, this.identifier);
if (cache.isCachedDataAvailable()) {
// Use cached data
this.filePath = this.uncompressedPath;
return true;
}
return ArbitraryDataBuildManager.getInstance().addToBuildQueue(this.createQueueItem());
}

View File

@ -76,7 +76,7 @@ public class ArbitraryDataRenderer {
if (!arbitraryDataReader.isCachedDataAvailable()) {
// If async is requested, show a loading screen whilst build is in progress
if (async) {
arbitraryDataReader.loadAsynchronously();
arbitraryDataReader.loadAsynchronously(false);
return this.getLoadingResponse(service, resourceId);
}

View File

@ -11,18 +11,7 @@ import java.security.Security;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -101,6 +90,14 @@ public class Controller extends Thread {
private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms
private static final int MAX_INCOMING_TRANSACTIONS = 5000;
/** Minimum time before considering an invalid unconfirmed transaction as "stale" */
public static final long INVALID_TRANSACTION_STALE_TIMEOUT = 30 * 60 * 1000L; // ms
/** Minimum frequency to re-request stale unconfirmed transactions from peers, to recheck validity */
public static final long INVALID_TRANSACTION_RECHECK_INTERVAL = 60 * 60 * 1000L; // ms\
/** Minimum frequency to re-request expired unconfirmed transactions from peers, to recheck validity
* This mainly exists to stop expired transactions from bloating the list */
public static final long EXPIRED_TRANSACTION_RECHECK_INTERVAL = 10 * 60 * 1000L; // ms
// To do with online accounts list
private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 1 * 60 * 1000L; // ms
@ -147,6 +144,9 @@ public class Controller extends Thread {
/** List of incoming transaction that are in the import queue */
private List<TransactionData> incomingTransactions = Collections.synchronizedList(new ArrayList<>());
/** List of recent invalid unconfirmed transactions */
private Map<String, Long> invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>());
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */
private final ReentrantLock blockchainLock = new ReentrantLock();
@ -557,6 +557,8 @@ public class Controller extends Thread {
// Process incoming transactions queue
processIncomingTransactionsQueue();
// Clean up invalid incoming transactions list
cleanupInvalidTransactionsList(now);
// Clean up arbitrary data request cache
ArbitraryDataManager.getInstance().cleanupRequestCache(now);
@ -820,6 +822,103 @@ public class Controller extends Thread {
}
}
// Incoming transactions queue
private void processIncomingTransactionsQueue() {
if (this.incomingTransactions.size() == 0) {
// Don't bother locking if there are no new transactions to process
return;
}
if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) {
// Prioritize syncing, and don't attempt to lock
return;
}
try {
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) {
LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue"));
return;
}
} catch (InterruptedException e) {
LOGGER.info("Interrupted when trying to acquire blockchain lock");
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
// Iterate through incoming transactions list
synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList()
Iterator iterator = this.incomingTransactions.iterator();
while (iterator.hasNext()) {
if (isStopping) {
return;
}
TransactionData transactionData = (TransactionData) iterator.next();
Transaction transaction = Transaction.fromData(repository, transactionData);
// Check signature
if (!transaction.isSignatureValid()) {
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
ValidationResult validationResult = transaction.importAsUnconfirmed();
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
if (validationResult != ValidationResult.OK) {
final String signature58 = Base58.encode(transactionData.getSignature());
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58));
Long now = NTP.getTime();
if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) {
Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL;
if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) {
// Use shorter recheck interval for expired transactions
expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL;
}
Long expiry = now + expiryLength;
LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58);
// Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it
invalidUnconfirmedTransactions.put(signature58, expiry);
}
iterator.remove();
continue;
}
LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
iterator.remove();
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing incoming transactions", e));
} finally {
blockchainLock.unlock();
}
}
private void cleanupInvalidTransactionsList(Long now) {
if (now == null) {
return;
}
// Periodically remove invalid unconfirmed transactions from the list, so that they can be fetched again
invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < now);
}
// Shutdown
public void shutdown() {
@ -1293,79 +1392,6 @@ public class Controller extends Thread {
}
}
private void processIncomingTransactionsQueue() {
if (this.incomingTransactions.size() == 0) {
// Don't bother locking if there are no new transactions to process
return;
}
if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) {
// Prioritize syncing, and don't attempt to lock
return;
}
try {
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) {
LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue"));
return;
}
} catch (InterruptedException e) {
LOGGER.info("Interrupted when trying to acquire blockchain lock");
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
// Iterate through incoming transactions list
synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList()
Iterator iterator = this.incomingTransactions.iterator();
while (iterator.hasNext()) {
if (isStopping) {
return;
}
TransactionData transactionData = (TransactionData) iterator.next();
Transaction transaction = Transaction.fromData(repository, transactionData);
// Check signature
if (!transaction.isSignatureValid()) {
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
ValidationResult validationResult = transaction.importAsUnconfirmed();
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
if (validationResult != ValidationResult.OK) {
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
iterator.remove();
continue;
}
LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
iterator.remove();
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing incoming transactions", e));
} finally {
blockchainLock.unlock();
}
}
private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) {
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
final byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
@ -1561,6 +1587,13 @@ public class Controller extends Thread {
try (final Repository repository = RepositoryManager.getRepository()) {
for (byte[] signature : signatures) {
String signature58 = Base58.encode(signature);
if (invalidUnconfirmedTransactions.containsKey(signature58)) {
// Previously invalid transaction - don't keep requesting it
// It will be periodically removed from invalidUnconfirmedTransactions to allow for rechecks
continue;
}
// Do we have it already? (Before requesting transaction data itself)
if (repository.getTransactionRepository().exists(signature)) {
LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(signature), peer));
@ -1754,88 +1787,94 @@ public class Controller extends Thread {
private void sendOurOnlineAccountsInfo() {
final Long now = NTP.getTime();
if (now == null)
return;
if (now != null) {
List<MintingAccountData> mintingAccounts;
try (final Repository repository = RepositoryManager.getRepository()) {
mintingAccounts = repository.getAccountRepository().getMintingAccounts();
List<MintingAccountData> mintingAccounts;
try (final Repository repository = RepositoryManager.getRepository()) {
mintingAccounts = repository.getAccountRepository().getMintingAccounts();
// We have no accounts, but don't reset timestamp
if (mintingAccounts.isEmpty())
return;
// We have no accounts, but don't reset timestamp
if (mintingAccounts.isEmpty())
return;
// Only reward-share accounts allowed
Iterator<MintingAccountData> iterator = mintingAccounts.iterator();
while (iterator.hasNext()) {
MintingAccountData mintingAccountData = iterator.next();
RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(mintingAccountData.getPublicKey());
if (rewardShareData == null) {
// Reward-share doesn't even exist - probably not a good sign
iterator.remove();
continue;
}
Account mintingAccount = new Account(repository, rewardShareData.getMinter());
if (!mintingAccount.canMint()) {
// Minting-account component of reward-share can no longer mint - disregard
iterator.remove();
continue;
}
}
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage()));
return;
}
// 'current' timestamp
final long onlineAccountsTimestamp = Controller.toOnlineAccountTimestamp(now);
boolean hasInfoChanged = false;
byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp);
List<OnlineAccountData> ourOnlineAccounts = new ArrayList<>();
MINTING_ACCOUNTS:
for (MintingAccountData mintingAccountData : mintingAccounts) {
PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey());
byte[] signature = mintingAccount.sign(timestampBytes);
byte[] publicKey = mintingAccount.getPublicKey();
// Our account is online
OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey);
synchronized (this.onlineAccounts) {
Iterator<OnlineAccountData> iterator = this.onlineAccounts.iterator();
// Only reward-share accounts allowed
Iterator<MintingAccountData> iterator = mintingAccounts.iterator();
int i = 0;
while (iterator.hasNext()) {
OnlineAccountData existingOnlineAccountData = iterator.next();
MintingAccountData mintingAccountData = iterator.next();
if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) {
// If our online account is already present, with same timestamp, then move on to next mintingAccount
if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp)
continue MINTING_ACCOUNTS;
// If our online account is already present, but with older timestamp, then remove it
RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(mintingAccountData.getPublicKey());
if (rewardShareData == null) {
// Reward-share doesn't even exist - probably not a good sign
iterator.remove();
break;
continue;
}
Account mintingAccount = new Account(repository, rewardShareData.getMinter());
if (!mintingAccount.canMint()) {
// Minting-account component of reward-share can no longer mint - disregard
iterator.remove();
continue;
}
if (++i > 2) {
iterator.remove();
continue;
}
}
this.onlineAccounts.add(ourOnlineAccountData);
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage()));
return;
}
LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp));
ourOnlineAccounts.add(ourOnlineAccountData);
hasInfoChanged = true;
// 'current' timestamp
final long onlineAccountsTimestamp = Controller.toOnlineAccountTimestamp(now);
boolean hasInfoChanged = false;
byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp);
List<OnlineAccountData> ourOnlineAccounts = new ArrayList<>();
MINTING_ACCOUNTS:
for (MintingAccountData mintingAccountData : mintingAccounts) {
PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey());
byte[] signature = mintingAccount.sign(timestampBytes);
byte[] publicKey = mintingAccount.getPublicKey();
// Our account is online
OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey);
synchronized (this.onlineAccounts) {
Iterator<OnlineAccountData> iterator = this.onlineAccounts.iterator();
while (iterator.hasNext()) {
OnlineAccountData existingOnlineAccountData = iterator.next();
if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) {
// If our online account is already present, with same timestamp, then move on to next mintingAccount
if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp)
continue MINTING_ACCOUNTS;
// If our online account is already present, but with older timestamp, then remove it
iterator.remove();
break;
}
}
this.onlineAccounts.add(ourOnlineAccountData);
}
LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp));
ourOnlineAccounts.add(ourOnlineAccountData);
hasInfoChanged = true;
}
if (!hasInfoChanged)
return;
Message message = new OnlineAccountsMessage(ourOnlineAccounts);
Network.getInstance().broadcast(peer -> message);
LOGGER.trace(() -> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp));
}
if (!hasInfoChanged)
return;
Message message = new OnlineAccountsMessage(ourOnlineAccounts);
Network.getInstance().broadcast(peer -> message);
LOGGER.trace(()-> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp));
}
public static long toOnlineAccountTimestamp(long timestamp) {

View File

@ -20,8 +20,9 @@ public class ArbitraryDataBuilderThread implements Runnable {
}
@Override
public void run() {
Thread.currentThread().setName("Arbitrary Data Build Manager");
Thread.currentThread().setName("Arbitrary Data Builder Thread");
ArbitraryDataBuildManager buildManager = ArbitraryDataBuildManager.getInstance();
while (!Controller.isStopping()) {
@ -39,7 +40,7 @@ public class ArbitraryDataBuilderThread implements Runnable {
Map.Entry<String, ArbitraryDataBuildQueueItem> next = buildManager.arbitraryDataBuildQueue
.entrySet().stream()
.filter(e -> e.getValue().isQueued())
.findFirst().get();
.findFirst().orElse(null);
if (next == null) {
continue;

View File

@ -5,6 +5,7 @@ import org.apache.logging.log4j.Logger;
import org.qortal.arbitrary.ArbitraryDataFile;
import org.qortal.arbitrary.ArbitraryDataFileChunk;
import org.qortal.controller.Controller;
import org.qortal.data.arbitrary.ArbitraryRelayInfo;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.network.Network;
@ -477,10 +478,8 @@ public class ArbitraryDataFileListManager {
Long now = NTP.getTime();
for (byte[] hash : hashes) {
String hash58 = Base58.encode(hash);
Triple<String, Peer, Long> value = new Triple<>(signature58, peer, now);
if (arbitraryDataFileManager.arbitraryRelayMap.putIfAbsent(hash58, value) == null) {
LOGGER.debug("Added {} to relay map: {}, {}, {}", hash58, signature58, peer, now);
}
ArbitraryRelayInfo relayMap = new ArbitraryRelayInfo(hash58, signature58, peer, now);
ArbitraryDataFileManager.getInstance().addToRelayMap(relayMap);
}
// Forward to requesting peer

View File

@ -4,6 +4,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.arbitrary.ArbitraryDataFile;
import org.qortal.controller.Controller;
import org.qortal.data.arbitrary.ArbitraryRelayInfo;
import org.qortal.data.network.ArbitraryPeerData;
import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.ArbitraryTransactionData;
@ -21,6 +22,8 @@ import org.qortal.utils.Triple;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class ArbitraryDataFileManager extends Thread {
@ -37,10 +40,9 @@ public class ArbitraryDataFileManager extends Thread {
private Map<String, Long> arbitraryDataFileRequests = Collections.synchronizedMap(new HashMap<>());
/**
* Map to keep track of hashes that we might need to relay, keyed by the hash of the file (base58 encoded).
* Value is comprised of the base58-encoded signature, the peer that is hosting it, and the timestamp that it was added
* Map to keep track of hashes that we might need to relay
*/
public Map<String, Triple<String, Peer, Long>> arbitraryRelayMap = Collections.synchronizedMap(new HashMap<>());
public List<ArbitraryRelayInfo> arbitraryRelayMap = Collections.synchronizedList(new ArrayList<>());
/**
* Map to keep track of any arbitrary data file hash responses
@ -65,11 +67,16 @@ public class ArbitraryDataFileManager extends Thread {
Thread.currentThread().setName("Arbitrary Data File Manager");
try {
while (!isStopping) {
Thread.sleep(1000);
// Use a fixed thread pool to execute the arbitrary data file requests
int threadCount = 10;
ExecutorService arbitraryDataFileRequestExecutor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
arbitraryDataFileRequestExecutor.execute(new ArbitraryDataFileRequestThread());
}
Long now = NTP.getTime();
this.processFileHashes(now);
while (!isStopping) {
// Nothing to do yet
Thread.sleep(1000);
}
} catch (InterruptedException e) {
// Fall-through to exit thread...
@ -81,66 +88,6 @@ public class ArbitraryDataFileManager extends Thread {
this.interrupt();
}
private void processFileHashes(Long now) {
try (final Repository repository = RepositoryManager.getRepository()) {
ArbitraryTransactionData arbitraryTransactionData = null;
byte[] signature = null;
byte[] hash = null;
Peer peer = null;
boolean shouldProcess = false;
synchronized (arbitraryDataFileHashResponses) {
for (String hash58 : arbitraryDataFileHashResponses.keySet()) {
if (isStopping) {
return;
}
Triple<Peer, String, Long> value = arbitraryDataFileHashResponses.get(hash58);
if (value != null) {
peer = value.getA();
String signature58 = value.getB();
Long timestamp = value.getC();
if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) {
// Ignore - to be deleted
continue;
}
hash = Base58.decode(hash58);
signature = Base58.decode(signature58);
// Fetch the transaction data
arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (arbitraryTransactionData == null) {
continue;
}
// We want to process this file
shouldProcess = true;
break;
}
}
}
if (!shouldProcess) {
// Nothing to do
return;
}
if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) {
return;
}
String hash58 = Base58.encode(hash);
LOGGER.debug("Fetching file {} from peer {} via response queue...", hash58, peer);
this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
} catch (DataException e) {
LOGGER.info("Unable to process file hashes: {}", e.getMessage());
}
}
public void cleanupRequestCache(Long now) {
if (now == null) {
@ -150,7 +97,7 @@ public class ArbitraryDataFileManager extends Thread {
arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < requestMinimumTimestamp);
final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT;
arbitraryRelayMap.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp);
arbitraryRelayMap.removeIf(entry -> entry == null || entry.getTimestamp() == null || entry.getTimestamp() < relayMinimumTimestamp);
arbitraryDataFileHashResponses.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp);
}
@ -186,13 +133,19 @@ public class ArbitraryDataFileManager extends Thread {
if (receivedArbitraryDataFileMessage != null) {
LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFileMessage.getArbitraryDataFile().getHash58(), peer, (endTime-startTime));
receivedAtLeastOneFile = true;
// Remove this hash from arbitraryDataFileHashResponses now that we have received it
arbitraryDataFileHashResponses.remove(hash58);
}
else {
LOGGER.debug("Peer {} didn't respond with data file {} for signature {}. Time taken: {} ms", peer, Base58.encode(hash), Base58.encode(signature), (endTime-startTime));
}
// Remove this hash from arbitraryDataFileHashResponses now that we have tried to request it
arbitraryDataFileHashResponses.remove(hash58);
// Remove this hash from arbitraryDataFileHashResponses now that we have failed to receive it
arbitraryDataFileHashResponses.remove(hash58);
// Stop asking for files from this peer
break;
}
}
else {
LOGGER.trace("Already requesting data file {} for signature {}", arbitraryDataFile, Base58.encode(signature));
@ -217,22 +170,23 @@ public class ArbitraryDataFileManager extends Thread {
// Invalidate the hosted transactions cache as we are now hosting something new
ArbitraryDataStorageManager.getInstance().invalidateHostedTransactionsCache();
}
// Check if we have all the files we need for this transaction
if (arbitraryDataFile.allFilesExist()) {
// Check if we have all the files we need for this transaction
if (arbitraryDataFile.allFilesExist()) {
// We have all the chunks for this transaction, so we should invalidate the transaction's name's
// data cache so that it is rebuilt the next time we serve it
ArbitraryDataManager.getInstance().invalidateCache(arbitraryTransactionData);
// We have all the chunks for this transaction, so we should invalidate the transaction's name's
// data cache so that it is rebuilt the next time we serve it
ArbitraryDataManager.getInstance().invalidateCache(arbitraryTransactionData);
// We may also need to broadcast to the network that we are now hosting files for this transaction,
// but only if these files are in accordance with our storage policy
if (ArbitraryDataStorageManager.getInstance().canStoreData(arbitraryTransactionData)) {
// Use a null peer address to indicate our own
Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, 0, Arrays.asList(signature));
Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage);
// We may also need to broadcast to the network that we are now hosting files for this transaction,
// but only if these files are in accordance with our storage policy
if (ArbitraryDataStorageManager.getInstance().canStoreData(arbitraryTransactionData)) {
// Use a null peer address to indicate our own
Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, 0, Arrays.asList(signature));
Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage);
}
}
}
return receivedAtLeastOneFile;
@ -437,6 +391,48 @@ public class ArbitraryDataFileManager extends Thread {
}
// Relays
private List<ArbitraryRelayInfo> getRelayInfoListForHash(String hash58) {
synchronized (arbitraryRelayMap) {
return arbitraryRelayMap.stream()
.filter(relayInfo -> Objects.equals(relayInfo.getHash58(), hash58))
.collect(Collectors.toList());
}
}
private ArbitraryRelayInfo getRandomRelayInfoEntryForHash(String hash58) {
LOGGER.info("Fetching random relay info for hash: {}", hash58);
List<ArbitraryRelayInfo> relayInfoList = this.getRelayInfoListForHash(hash58);
if (relayInfoList != null && !relayInfoList.isEmpty()) {
// Pick random item
int index = new SecureRandom().nextInt(relayInfoList.size());
LOGGER.info("Returning random relay info for hash: {} (index {})", hash58, index);
return relayInfoList.get(index);
}
LOGGER.info("No relay info exists for hash: {}", hash58);
return null;
}
public void addToRelayMap(ArbitraryRelayInfo newEntry) {
if (newEntry == null || !newEntry.isValid()) {
return;
}
// Remove existing entry for this peer if it exists, to renew the timestamp
this.removeFromRelayMap(newEntry);
// Re-add
arbitraryRelayMap.add(newEntry);
LOGGER.debug("Added entry to relay map: {}", newEntry);
}
private void removeFromRelayMap(ArbitraryRelayInfo entry) {
arbitraryRelayMap.removeIf(relayInfo -> relayInfo.equals(entry));
}
// Network handlers
public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) {
@ -455,7 +451,7 @@ public class ArbitraryDataFileManager extends Thread {
try {
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature);
Triple<String, Peer, Long> relayInfo = this.arbitraryRelayMap.get(hash58);
ArbitraryRelayInfo relayInfo = this.getRandomRelayInfoEntryForHash(hash58);
if (arbitraryDataFile.exists()) {
LOGGER.trace("Hash {} exists", hash58);
@ -472,15 +468,12 @@ public class ArbitraryDataFileManager extends Thread {
else if (relayInfo != null) {
LOGGER.debug("We have relay info for hash {}", Base58.encode(hash));
// We need to ask this peer for the file
Peer peerToAsk = relayInfo.getB();
Peer peerToAsk = relayInfo.getPeer();
if (peerToAsk != null) {
// Forward the message to this peer
LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58);
this.fetchArbitraryDataFile(peerToAsk, peer, signature, hash, message);
// Remove from the map regardless of outcome, as the relay attempt is now considered complete
arbitraryRelayMap.remove(hash58);
}
else {
LOGGER.debug("Peer {} not found in relay info", peer);

View File

@ -0,0 +1,117 @@
package org.qortal.controller.arbitrary;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.network.Peer;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.utils.ArbitraryTransactionUtils;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import org.qortal.utils.Triple;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
public class ArbitraryDataFileRequestThread implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class);
public ArbitraryDataFileRequestThread() {
}
@Override
public void run() {
Thread.currentThread().setName("Arbitrary Data File Request Thread");
try {
while (!Controller.isStopping()) {
Thread.sleep(1000);
Long now = NTP.getTime();
this.processFileHashes(now);
}
} catch (InterruptedException e) {
// Fall-through to exit thread...
}
}
private void processFileHashes(Long now) {
try (final Repository repository = RepositoryManager.getRepository()) {
ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance();
ArbitraryTransactionData arbitraryTransactionData = null;
byte[] signature = null;
byte[] hash = null;
Peer peer = null;
boolean shouldProcess = false;
synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) {
Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.entrySet().iterator();
while (iterator.hasNext()) {
if (Controller.isStopping()) {
return;
}
Map.Entry entry = (Map.Entry) iterator.next();
if (entry == null || entry.getKey() == null || entry.getValue() == null) {
iterator.remove();
continue;
}
String hash58 = (String) entry.getKey();
Triple<Peer, String, Long> value = (Triple<Peer, String, Long>) entry.getValue();
if (value == null) {
iterator.remove();
continue;
}
peer = value.getA();
String signature58 = value.getB();
Long timestamp = value.getC();
if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) {
// Ignore - to be deleted
iterator.remove();
continue;
}
hash = Base58.decode(hash58);
signature = Base58.decode(signature58);
// We want to process this file
shouldProcess = true;
iterator.remove();
break;
}
}
if (!shouldProcess) {
// Nothing to do
return;
}
// Fetch the transaction data
arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (arbitraryTransactionData == null) {
return;
}
if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) {
return;
}
String hash58 = Base58.encode(hash);
LOGGER.debug("Fetching file {} from peer {} via request thread...", hash58, peer);
arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
} catch (DataException e) {
LOGGER.debug("Unable to process file hashes: {}", e.getMessage());
}
}
}

View File

@ -0,0 +1,60 @@
package org.qortal.data.arbitrary;
import org.qortal.network.Peer;
import java.util.Objects;
public class ArbitraryRelayInfo {
private final String hash58;
private final String signature58;
private final Peer peer;
private final Long timestamp;
public ArbitraryRelayInfo(String hash58, String signature58, Peer peer, Long timestamp) {
this.hash58 = hash58;
this.signature58 = signature58;
this.peer = peer;
this.timestamp = timestamp;
}
public boolean isValid() {
return this.getHash58() != null && this.getSignature58() != null
&& this.getPeer() != null && this.getTimestamp() != null;
}
public String getHash58() {
return this.hash58;
}
public String getSignature58() {
return signature58;
}
public Peer getPeer() {
return peer;
}
public Long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return String.format("%s = %s, %s, %d", this.hash58, this.signature58, this.peer, this.timestamp);
}
@Override
public boolean equals(Object other) {
if (other == this)
return true;
if (!(other instanceof ArbitraryRelayInfo))
return false;
ArbitraryRelayInfo otherRelayInfo = (ArbitraryRelayInfo) other;
return this.peer == otherRelayInfo.getPeer()
&& Objects.equals(this.hash58, otherRelayInfo.getHash58())
&& Objects.equals(this.signature58, otherRelayInfo.getSignature58());
}
}

View File

@ -202,9 +202,9 @@ public class Settings {
private boolean allowConnectionsWithOlderPeerVersions = true;
/** Minimum time (in seconds) that we should attempt to remain connected to a peer for */
private int minPeerConnectionTime = 2 * 60; // seconds
private int minPeerConnectionTime = 5 * 60; // seconds
/** Maximum time (in seconds) that we should attempt to remain connected to a peer for */
private int maxPeerConnectionTime = 20 * 60; // seconds
private int maxPeerConnectionTime = 60 * 60; // seconds
/** Whether to sync multiple blocks at once in normal operation */
private boolean fastSyncEnabled = true;