diff --git a/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java b/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java index e1794edf..2810b6df 100644 --- a/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java +++ b/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java @@ -1,6 +1,7 @@ package org.qortal.arbitrary; import com.google.common.io.Resources; + import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.logging.log4j.LogManager; @@ -15,11 +16,13 @@ import org.qortal.settings.Settings; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URL; + import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.NoSuchFileException; @@ -167,7 +170,14 @@ public class ArbitraryDataRenderer { if (HTMLParser.isHtmlFile(filename)) { // HTML file - needs to be parsed byte[] data = Files.readAllBytes(filePath); // TODO: limit file size that can be read into memory - HTMLParser htmlParser = new HTMLParser(resourceId, inPath, prefix, includeResourceIdInPrefix, data, qdnContext, service, identifier, theme, usingCustomRouting, lang); + String encodedResourceId; + + if (resourceIdType == ResourceIdType.NAME) { + encodedResourceId = resourceId.replace(" ", "%20"); + } else { + encodedResourceId = resourceId; + } + HTMLParser htmlParser = new HTMLParser(encodedResourceId, inPath, prefix, includeResourceIdInPrefix, data, qdnContext, service, identifier, theme, usingCustomRouting, lang); htmlParser.addAdditionalHeaderTags(); response.addHeader("Content-Security-Policy", "default-src 'self' 'unsafe-inline' 'unsafe-eval'; font-src 'self' data:; media-src 'self' data: blob:; img-src 'self' data: blob:; connect-src 'self' wss: blob:;"); response.setContentType(context.getMimeType(filename)); diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 31337b42..4ec03ee3 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -567,6 +567,9 @@ public class Controller extends Thread { LOGGER.info("Starting foreign fees manager"); ForeignFeesManager.getInstance().start(); + LOGGER.info("Starting follower"); + Follower.getInstance().start(); + LOGGER.info("Starting transaction importer"); TransactionImporter.getInstance().start(); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java index ee37dbec..07989589 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -402,8 +402,8 @@ public class ArbitraryDataFileListManager { return true; } - public void deleteFileListRequestsForSignature(byte[] signature) { - String signature58 = Base58.encode(signature); + public void deleteFileListRequestsForSignature(String signature58) { + for (Iterator>> it = arbitraryDataFileListRequests.entrySet().iterator(); it.hasNext();) { Map.Entry> entry = it.next(); if (entry == null || entry.getKey() == null || entry.getValue() == null) { diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index a4034596..4b50603c 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -23,7 +23,6 @@ import org.qortal.utils.NTP; import java.security.SecureRandom; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -70,6 +69,7 @@ public class ArbitraryDataFileManager extends Thread { private ArbitraryDataFileManager() { this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS); + this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS); } public static ArbitraryDataFileManager getInstance() { @@ -140,7 +140,7 @@ public class ArbitraryDataFileManager extends Thread { if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { LOGGER.debug("Requesting data file {} from peer {}", hash58, peer); Long startTime = NTP.getTime(); - ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, null, arbitraryTransactionData, signature, hash, null); + ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, arbitraryTransactionData, signature, hash); Long endTime = NTP.getTime(); if (receivedArbitraryDataFile != null) { LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFile.getHash58(), peer, (endTime-startTime)); @@ -207,14 +207,71 @@ public class ArbitraryDataFileManager extends Thread { } } - private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, Peer requestingPeer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash, Message originalMessage) throws DataException { - ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); - boolean fileAlreadyExists = existingFile.exists(); - String hash58 = Base58.encode(hash); + private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash) throws DataException { ArbitraryDataFile arbitraryDataFile; - // Fetch the file if it doesn't exist locally - if (!fileAlreadyExists) { + try { + ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); + boolean fileAlreadyExists = existingFile.exists(); + String hash58 = Base58.encode(hash); + + // Fetch the file if it doesn't exist locally + if (!fileAlreadyExists) { + LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer)); + arbitraryDataFileRequests.put(hash58, NTP.getTime()); + Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); + + Message response = null; + try { + response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT); + } catch (InterruptedException e) { + // Will return below due to null response + } + arbitraryDataFileRequests.remove(hash58); + LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); + + if (response == null) { + LOGGER.debug("Received null response from peer {}", peer); + return null; + } + if (response.getType() != MessageType.ARBITRARY_DATA_FILE) { + LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer); + return null; + } + + ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response; + arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile(); + } else { + LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58)); + arbitraryDataFile = existingFile; + } + + if (arbitraryDataFile != null) { + + arbitraryDataFile.save(); + + // If this is a metadata file then we need to update the cache + if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) { + if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) { + ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData); + } + } + + // We may need to remove the file list request, if we have all the files for this transaction + this.handleFileListRequests(signature); + } + } catch (DataException e) { + LOGGER.error(e.getMessage(), e); + arbitraryDataFile = null; + } + + return arbitraryDataFile; + } + + private void fetchFileForRelay(Peer peer, Peer requestingPeer, byte[] signature, byte[] hash, Message originalMessage) throws DataException { + try { + String hash58 = Base58.encode(hash); + LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer)); arbitraryDataFileRequests.put(hash58, NTP.getTime()); Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); @@ -228,77 +285,75 @@ public class ArbitraryDataFileManager extends Thread { arbitraryDataFileRequests.remove(hash58); LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); - - if (response == null) { LOGGER.debug("Received null response from peer {}", peer); - return null; + return; } if (response.getType() != MessageType.ARBITRARY_DATA_FILE) { LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer); - return null; - } - - ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response; - arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile(); - } else { - LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58)); - arbitraryDataFile = existingFile; - } - - if (arbitraryDataFile == null) { - // We don't have a file, so give up here - return null; - } - - // We might want to forward the request to the peer that originally requested it - this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage); - - boolean isRelayRequest = (requestingPeer != null); - if (isRelayRequest) { - if (!fileAlreadyExists) { - // File didn't exist locally before the request, and it's a forwarding request, so delete it if it exists. - // It shouldn't exist on the filesystem yet, but leaving this here just in case. - arbitraryDataFile.delete(10); - } - } - else { - arbitraryDataFile.save(); - } - - // If this is a metadata file then we need to update the cache - if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) { - if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) { - ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData); - } - } - - // We may need to remove the file list request, if we have all the files for this transaction - this.handleFileListRequests(signature); - - return arbitraryDataFile; - } - - private void handleFileListRequests(byte[] signature) { - try (final Repository repository = RepositoryManager.getRepository()) { - - // Fetch the transaction data - ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - if (arbitraryTransactionData == null) { return; } - boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData); + ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response; + ArbitraryDataFile arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile(); - if (completeFileExists) { - String signature58 = Base58.encode(arbitraryTransactionData.getSignature()); - LOGGER.info("All chunks or complete file exist for transaction {}", signature58); - - ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature); + if (arbitraryDataFile != null) { + + // We might want to forward the request to the peer that originally requested it + this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage); + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + + Map signatureBySignature58 = new HashMap<>(); + + // Lock to synchronize access to the list + private final Object handleFileListRequestsLock = new Object(); + + // Scheduled executor service to process messages every second + private final ScheduledExecutorService handleFileListRequestsScheduler = Executors.newScheduledThreadPool(1); + + private void handleFileListRequests(byte[] signature) { + + synchronized (handleFileListRequestsLock) { + signatureBySignature58.put(Base58.encode(signature), signature); + } + } + + private void handleFileListRequestProcess() { + + Map signaturesToProcess; + + synchronized (handleFileListRequestsLock) { + signaturesToProcess = new HashMap<>(signatureBySignature58); + signatureBySignature58.clear(); + } + + if( signaturesToProcess.isEmpty() ) return; + + LOGGER.info("signatures to process = " + signaturesToProcess.size()); + + try (final Repository repository = RepositoryManager.getRepository()) { + + // Fetch the transaction data + List arbitraryTransactionDataList + = ArbitraryTransactionUtils.fetchTransactionDataList(repository, new ArrayList<>(signaturesToProcess.values())); + + for( ArbitraryTransactionData arbitraryTransactionData : arbitraryTransactionDataList ) { + boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData); + + if (completeFileExists) { + String signature58 = Base58.encode(arbitraryTransactionData.getSignature()); + LOGGER.info("All chunks or complete file exist for transaction {}", signature58); + + ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature58); + } } - } catch (DataException e) { - LOGGER.debug("Unable to handle file list requests: {}", e.getMessage()); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } } @@ -315,15 +370,19 @@ public class ArbitraryDataFileManager extends Thread { LOGGER.debug("Received arbitrary data file - forwarding is needed"); - // The ID needs to match that of the original request - message.setId(originalMessage.getId()); + try { + // The ID needs to match that of the original request + message.setId(originalMessage.getId()); - if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { - LOGGER.debug("Failed to forward arbitrary data file to peer {}", requestingPeer); - requestingPeer.disconnect("failed to forward arbitrary data file"); - } - else { - LOGGER.debug("Forwarded arbitrary data file to peer {}", requestingPeer); + if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { + LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer); + requestingPeer.disconnect("failed to forward arbitrary data file"); + } + else { + LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer); + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } } @@ -615,7 +674,7 @@ public class ArbitraryDataFileManager extends Thread { LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58); // No need to pass arbitraryTransactionData below because this is only used for metadata caching, // and metadata isn't retained when relaying. - this.fetchArbitraryDataFile(peerToAsk, peer, null, signature, hash, message); + this.fetchFileForRelay(peerToAsk, peer, signature, hash, message); } else { LOGGER.debug("Peer {} not found in relay info", peer); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java index b27eef26..5096f3dd 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java @@ -15,6 +15,7 @@ import org.qortal.settings.Settings; import org.qortal.utils.ArbitraryTransactionUtils; import org.qortal.utils.Base58; import org.qortal.utils.NTP; +import org.qortal.utils.NamedThreadFactory; import java.net.http.HttpResponse; import java.util.ArrayList; @@ -38,6 +39,9 @@ public class ArbitraryDataFileRequestThread { private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class); + private static final Integer FETCHER_LIMIT_PER_PEER = Settings.getInstance().getMaxThreadsForMessageType(MessageType.GET_ARBITRARY_DATA_FILE); + private static final String FETCHER_THREAD_PREFIX = "Arbitrary Data Fetcher "; + private ConcurrentHashMap executorByPeer = new ConcurrentHashMap<>(); private ArbitraryDataFileRequestThread() { @@ -64,8 +68,9 @@ public class ArbitraryDataFileRequestThread { if (value instanceof ThreadPoolExecutor) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) value; if (threadPoolExecutor.getActiveCount() == 0) { + threadPoolExecutor.shutdown(); if (this.executorByPeer.computeIfPresent(key, (k, v) -> null) == null) { - LOGGER.info("removed executor: peer = " + key); + LOGGER.trace("removed executor: peer = " + key); } } } else { @@ -147,7 +152,9 @@ public class ArbitraryDataFileRequestThread { .computeIfAbsent( responseInfo.getPeer().toString(), peer -> Executors.newFixedThreadPool( - Settings.getInstance().getMaxThreadsForMessageType(MessageType.GET_ARBITRARY_DATA_FILE)) + FETCHER_LIMIT_PER_PEER, + new NamedThreadFactory(FETCHER_THREAD_PREFIX + responseInfo.getPeer().toString(), NORM_PRIORITY) + ) ) .execute(fetcher); } diff --git a/src/main/java/org/qortal/controller/arbitrary/Follower.java b/src/main/java/org/qortal/controller/arbitrary/Follower.java new file mode 100644 index 00000000..228640f4 --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/Follower.java @@ -0,0 +1,130 @@ +package org.qortal.controller.arbitrary; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.settings.Settings; +import org.qortal.utils.ListUtils; +import org.qortal.utils.NamedThreadFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.OptionalInt; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class Follower { + + private static final Logger LOGGER = LogManager.getLogger(Follower.class); + + private ScheduledExecutorService service + = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Follower", Thread.NORM_PRIORITY)); + + private Follower() { + + } + + private static Follower instance; + + public static Follower getInstance() { + + if( instance == null ) { + instance = new Follower(); + } + + return instance; + } + + public void start() { + + // fetch arbitrary transactions from followed names from the last 100 blocks every 2 minutes + service.scheduleWithFixedDelay(() -> fetch(OptionalInt.of(100)), 10, 2, TimeUnit.MINUTES); + + // fetch arbitrary transaction from followed names from any block every 24 hours + service.scheduleWithFixedDelay(() -> fetch(OptionalInt.empty()), 4, 24, TimeUnit.HOURS); + } + + private void fetch(OptionalInt limit) { + + try { + // for each followed name, get arbitraty transactions, then examine those transactions before fetching + for (String name : ListUtils.followedNames()) { + + List transactionsInReverseOrder; + + // open database to get the transactions in reverse order for the followed name + try (final Repository repository = RepositoryManager.getRepository()) { + + List latestArbitraryTransactionsByName + = repository.getArbitraryRepository().getLatestArbitraryTransactionsByName(name); + + if (limit.isPresent()) { + final int blockHeightThreshold = repository.getBlockRepository().getBlockchainHeight() - limit.getAsInt(); + + transactionsInReverseOrder + = latestArbitraryTransactionsByName.stream().filter(tx -> tx.getBlockHeight() > blockHeightThreshold) + .collect(Collectors.toList()); + } else { + transactionsInReverseOrder = latestArbitraryTransactionsByName; + } + + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + transactionsInReverseOrder = new ArrayList<>(0); + } + + // collect process transaction hashes, so we don't fetch outdated transactions + Set processedTransactions = new HashSet<>(); + + ArbitraryDataStorageManager storageManager = ArbitraryDataStorageManager.getInstance(); + + // for each arbitrary transaction for the followed name process, evaluate, fetch + for (ArbitraryTransactionData arbitraryTransaction : transactionsInReverseOrder) { + + boolean examined = false; + + try (final Repository repository = RepositoryManager.getRepository()) { + + // if not processed + if (!processedTransactions.contains(new ArbitraryTransactionDataHashWrapper(arbitraryTransaction))) { + boolean isLocal = repository.getArbitraryRepository().isDataLocal(arbitraryTransaction.getSignature()); + + // if not local, then continue to evaluate + if (!isLocal) { + + // evaluate fetching status for this transaction on this node + ArbitraryDataExamination examination = storageManager.shouldPreFetchData(repository, arbitraryTransaction); + + // if the evaluation passed, then fetch + examined = examination.isPass(); + } + // if locally stored, then nothing needs to be done + + // add to processed transactions + processedTransactions.add(new ArbitraryTransactionDataHashWrapper(arbitraryTransaction)); + } + } + + // if passed examination for fetching, then fetch + if (examined) { + LOGGER.info("for {} on {}, fetching {}", name, arbitraryTransaction.getService(), arbitraryTransaction.getIdentifier()); + boolean fetched = ArbitraryDataFileListManager.getInstance().fetchArbitraryDataFileList(arbitraryTransaction); + + LOGGER.info("fetched = " + fetched); + } + + // pause a second before moving on to another transaction + Thread.sleep(1000); + } + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } +} diff --git a/src/main/java/org/qortal/network/task/ChannelWriteTask.java b/src/main/java/org/qortal/network/task/ChannelWriteTask.java index 59bc557e..6f28a942 100644 --- a/src/main/java/org/qortal/network/task/ChannelWriteTask.java +++ b/src/main/java/org/qortal/network/task/ChannelWriteTask.java @@ -31,7 +31,20 @@ public class ChannelWriteTask implements Task { @Override public void perform() throws InterruptedException { try { - boolean isSocketClogged = peer.writeChannel(); + + boolean isSocketClogged; + do { + isSocketClogged = peer.writeChannel(); + + if (isSocketClogged) { + LOGGER.info( + "socket is clogged: peer = {} {}, retrying", + peer.getPeerData().getAddress().toString(), + Thread.currentThread().getName() + ); + Thread.sleep(1000); + } + } while( isSocketClogged ); // Tell Network that we've finished Network.getInstance().notifyChannelNotWriting(socketChannel);