Introduced ArbitraryDataFileRequestThread to allow for multiple concurrent file requests.

This is likely a short term solution (to allow existing code to be repurposed) until replaced with a task-based approach, as this will allow for a much greater number of threads.
This commit is contained in:
CalDescent 2022-02-06 15:34:06 +00:00
parent 2740543abf
commit b8aaf14cdc
2 changed files with 128 additions and 64 deletions

View File

@ -21,6 +21,8 @@ import org.qortal.utils.Triple;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class ArbitraryDataFileManager extends Thread {
@ -65,11 +67,16 @@ public class ArbitraryDataFileManager extends Thread {
Thread.currentThread().setName("Arbitrary Data File Manager");
try {
while (!isStopping) {
Thread.sleep(1000);
// Use a fixed thread pool to execute the arbitrary data file requests
int threadCount = 10;
ExecutorService arbitraryDataFileRequestExecutor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
arbitraryDataFileRequestExecutor.execute(new ArbitraryDataFileRequestThread());
}
Long now = NTP.getTime();
this.processFileHashes(now);
while (!isStopping) {
// Nothing to do yet
Thread.sleep(1000);
}
} catch (InterruptedException e) {
// Fall-through to exit thread...
@ -81,66 +88,6 @@ public class ArbitraryDataFileManager extends Thread {
this.interrupt();
}
private void processFileHashes(Long now) {
try (final Repository repository = RepositoryManager.getRepository()) {
ArbitraryTransactionData arbitraryTransactionData = null;
byte[] signature = null;
byte[] hash = null;
Peer peer = null;
boolean shouldProcess = false;
synchronized (arbitraryDataFileHashResponses) {
for (String hash58 : arbitraryDataFileHashResponses.keySet()) {
if (isStopping) {
return;
}
Triple<Peer, String, Long> value = arbitraryDataFileHashResponses.get(hash58);
if (value != null) {
peer = value.getA();
String signature58 = value.getB();
Long timestamp = value.getC();
if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) {
// Ignore - to be deleted
continue;
}
hash = Base58.decode(hash58);
signature = Base58.decode(signature58);
// Fetch the transaction data
arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (arbitraryTransactionData == null) {
continue;
}
// We want to process this file
shouldProcess = true;
break;
}
}
}
if (!shouldProcess) {
// Nothing to do
return;
}
if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) {
return;
}
String hash58 = Base58.encode(hash);
LOGGER.debug("Fetching file {} from peer {} via response queue...", hash58, peer);
this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
} catch (DataException e) {
LOGGER.info("Unable to process file hashes: {}", e.getMessage());
}
}
public void cleanupRequestCache(Long now) {
if (now == null) {

View File

@ -0,0 +1,117 @@
package org.qortal.controller.arbitrary;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.network.Peer;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.utils.ArbitraryTransactionUtils;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import org.qortal.utils.Triple;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
public class ArbitraryDataFileRequestThread implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class);
public ArbitraryDataFileRequestThread() {
}
@Override
public void run() {
Thread.currentThread().setName("Arbitrary Data File Request Thread");
try {
while (!Controller.isStopping()) {
Thread.sleep(1000);
Long now = NTP.getTime();
this.processFileHashes(now);
}
} catch (InterruptedException e) {
// Fall-through to exit thread...
}
}
private void processFileHashes(Long now) {
try (final Repository repository = RepositoryManager.getRepository()) {
ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance();
ArbitraryTransactionData arbitraryTransactionData = null;
byte[] signature = null;
byte[] hash = null;
Peer peer = null;
boolean shouldProcess = false;
synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) {
Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.entrySet().iterator();
while (iterator.hasNext()) {
if (Controller.isStopping()) {
return;
}
Map.Entry entry = (Map.Entry) iterator.next();
if (entry == null || entry.getKey() == null || entry.getValue() == null) {
iterator.remove();
continue;
}
String hash58 = (String) entry.getKey();
Triple<Peer, String, Long> value = (Triple<Peer, String, Long>) entry.getValue();
if (value == null) {
iterator.remove();
continue;
}
peer = value.getA();
String signature58 = value.getB();
Long timestamp = value.getC();
if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) {
// Ignore - to be deleted
iterator.remove();
continue;
}
hash = Base58.decode(hash58);
signature = Base58.decode(signature58);
// We want to process this file
shouldProcess = true;
iterator.remove();
break;
}
}
if (!shouldProcess) {
// Nothing to do
return;
}
// Fetch the transaction data
arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (arbitraryTransactionData == null) {
return;
}
if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) {
return;
}
String hash58 = Base58.encode(hash);
LOGGER.debug("Fetching file {} from peer {} via request thread...", hash58, peer);
arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
} catch (DataException e) {
LOGGER.debug("Unable to process file hashes: {}", e.getMessage());
}
}
}