diff --git a/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java b/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java index e1794edf..2810b6df 100644 --- a/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java +++ b/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java @@ -1,6 +1,7 @@ package org.qortal.arbitrary; import com.google.common.io.Resources; + import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.logging.log4j.LogManager; @@ -15,11 +16,13 @@ import org.qortal.settings.Settings; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URL; + import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.NoSuchFileException; @@ -167,7 +170,14 @@ public class ArbitraryDataRenderer { if (HTMLParser.isHtmlFile(filename)) { // HTML file - needs to be parsed byte[] data = Files.readAllBytes(filePath); // TODO: limit file size that can be read into memory - HTMLParser htmlParser = new HTMLParser(resourceId, inPath, prefix, includeResourceIdInPrefix, data, qdnContext, service, identifier, theme, usingCustomRouting, lang); + String encodedResourceId; + + if (resourceIdType == ResourceIdType.NAME) { + encodedResourceId = resourceId.replace(" ", "%20"); + } else { + encodedResourceId = resourceId; + } + HTMLParser htmlParser = new HTMLParser(encodedResourceId, inPath, prefix, includeResourceIdInPrefix, data, qdnContext, service, identifier, theme, usingCustomRouting, lang); htmlParser.addAdditionalHeaderTags(); response.addHeader("Content-Security-Policy", "default-src 'self' 'unsafe-inline' 'unsafe-eval'; font-src 'self' data:; media-src 'self' data: blob:; img-src 'self' data: blob:; connect-src 'self' wss: blob:;"); response.setContentType(context.getMimeType(filename)); diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 31337b42..4ec03ee3 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -567,6 +567,9 @@ public class Controller extends Thread { LOGGER.info("Starting foreign fees manager"); ForeignFeesManager.getInstance().start(); + LOGGER.info("Starting follower"); + Follower.getInstance().start(); + LOGGER.info("Starting transaction importer"); TransactionImporter.getInstance().start(); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java index ee37dbec..aa55424b 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -124,8 +124,8 @@ public class ArbitraryDataFileListManager { if (timeSinceLastAttempt > 15 * 1000L) { // We haven't tried for at least 15 seconds - if (networkBroadcastCount < 3) { - // We've made less than 3 total attempts + if (networkBroadcastCount < 12) { + // We've made less than 12 total attempts return true; } } @@ -134,8 +134,8 @@ public class ArbitraryDataFileListManager { if (timeSinceLastAttempt > 60 * 1000L) { // We haven't tried for at least 1 minute - if (networkBroadcastCount < 8) { - // We've made less than 8 total attempts + if (networkBroadcastCount < 40) { + // We've made less than 40 total attempts return true; } } @@ -402,8 +402,8 @@ public class ArbitraryDataFileListManager { return true; } - public void deleteFileListRequestsForSignature(byte[] signature) { - String signature58 = Base58.encode(signature); + public void deleteFileListRequestsForSignature(String signature58) { + for (Iterator>> it = arbitraryDataFileListRequests.entrySet().iterator(); it.hasNext();) { Map.Entry> entry = it.next(); if (entry == null || entry.getKey() == null || entry.getValue() == null) { @@ -587,9 +587,7 @@ public class ArbitraryDataFileListManager { // Forward to requesting peer LOGGER.debug("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer); - if (!requestingPeer.sendMessage(forwardArbitraryDataFileListMessage)) { - requestingPeer.disconnect("failed to forward arbitrary data file list"); - } + requestingPeer.sendMessage(forwardArbitraryDataFileListMessage); } } } @@ -787,7 +785,6 @@ public class ArbitraryDataFileListManager { if (!peer.sendMessage(arbitraryDataFileListMessage)) { LOGGER.debug("Couldn't send list of hashes"); - peer.disconnect("failed to send list of hashes"); continue; } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index a4034596..5407803c 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -1,6 +1,7 @@ package org.qortal.controller.arbitrary; import com.google.common.net.InetAddresses; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.arbitrary.ArbitraryDataFile; @@ -23,12 +24,15 @@ import org.qortal.utils.NTP; import java.security.SecureRandom; import java.util.*; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.qortal.network.PeerSendManager; + public class ArbitraryDataFileManager extends Thread { private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class); @@ -66,10 +70,38 @@ public class ArbitraryDataFileManager extends Thread { public static int MAX_FILE_HASH_RESPONSES = 1000; +private final Map peerSendManagers = new ConcurrentHashMap<>(); + +private PeerSendManager getOrCreateSendManager(Peer peer) { + return peerSendManagers.computeIfAbsent(peer, p -> new PeerSendManager(p)); +} + + + private ArbitraryDataFileManager() { this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS); + this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS); + ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor(); + + cleaner.scheduleAtFixedRate(() -> { + long idleCutoff = TimeUnit.MINUTES.toMillis(2); + Iterator> iterator = peerSendManagers.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + Peer peer = entry.getKey(); + PeerSendManager manager = entry.getValue(); + + if (manager.isIdle(idleCutoff)) { + iterator.remove(); // SAFE removal during iteration + manager.shutdown(); + LOGGER.debug("Cleaned up PeerSendManager for peer {}", peer); + } + } + }, 0, 5, TimeUnit.MINUTES); + } public static ArbitraryDataFileManager getInstance() { @@ -79,6 +111,8 @@ public class ArbitraryDataFileManager extends Thread { return instance; } + + @Override public void run() { Thread.currentThread().setName("Arbitrary Data File Manager"); @@ -140,7 +174,7 @@ public class ArbitraryDataFileManager extends Thread { if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { LOGGER.debug("Requesting data file {} from peer {}", hash58, peer); Long startTime = NTP.getTime(); - ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, null, arbitraryTransactionData, signature, hash, null); + ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, arbitraryTransactionData, signature, hash); Long endTime = NTP.getTime(); if (receivedArbitraryDataFile != null) { LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFile.getHash58(), peer, (endTime-startTime)); @@ -207,14 +241,71 @@ public class ArbitraryDataFileManager extends Thread { } } - private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, Peer requestingPeer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash, Message originalMessage) throws DataException { - ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); - boolean fileAlreadyExists = existingFile.exists(); - String hash58 = Base58.encode(hash); + private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash) throws DataException { ArbitraryDataFile arbitraryDataFile; - // Fetch the file if it doesn't exist locally - if (!fileAlreadyExists) { + try { + ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); + boolean fileAlreadyExists = existingFile.exists(); + String hash58 = Base58.encode(hash); + + // Fetch the file if it doesn't exist locally + if (!fileAlreadyExists) { + LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer)); + arbitraryDataFileRequests.put(hash58, NTP.getTime()); + Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); + + Message response = null; + try { + response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT); + } catch (InterruptedException e) { + // Will return below due to null response + } + arbitraryDataFileRequests.remove(hash58); + LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); + + if (response == null) { + LOGGER.debug("Received null response from peer {}", peer); + return null; + } + if (response.getType() != MessageType.ARBITRARY_DATA_FILE) { + LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer); + return null; + } + + ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response; + arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile(); + } else { + LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58)); + arbitraryDataFile = existingFile; + } + + if (arbitraryDataFile != null) { + + arbitraryDataFile.save(); + + // If this is a metadata file then we need to update the cache + if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) { + if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) { + ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData); + } + } + + // We may need to remove the file list request, if we have all the files for this transaction + this.handleFileListRequests(signature); + } + } catch (DataException e) { + LOGGER.error(e.getMessage(), e); + arbitraryDataFile = null; + } + + return arbitraryDataFile; + } + + private void fetchFileForRelay(Peer peer, Peer requestingPeer, byte[] signature, byte[] hash, Message originalMessage) throws DataException { + try { + String hash58 = Base58.encode(hash); + LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer)); arbitraryDataFileRequests.put(hash58, NTP.getTime()); Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); @@ -228,77 +319,73 @@ public class ArbitraryDataFileManager extends Thread { arbitraryDataFileRequests.remove(hash58); LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); - - if (response == null) { LOGGER.debug("Received null response from peer {}", peer); - return null; + return; } if (response.getType() != MessageType.ARBITRARY_DATA_FILE) { LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer); - return null; - } - - ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response; - arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile(); - } else { - LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58)); - arbitraryDataFile = existingFile; - } - - if (arbitraryDataFile == null) { - // We don't have a file, so give up here - return null; - } - - // We might want to forward the request to the peer that originally requested it - this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage); - - boolean isRelayRequest = (requestingPeer != null); - if (isRelayRequest) { - if (!fileAlreadyExists) { - // File didn't exist locally before the request, and it's a forwarding request, so delete it if it exists. - // It shouldn't exist on the filesystem yet, but leaving this here just in case. - arbitraryDataFile.delete(10); - } - } - else { - arbitraryDataFile.save(); - } - - // If this is a metadata file then we need to update the cache - if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) { - if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) { - ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData); - } - } - - // We may need to remove the file list request, if we have all the files for this transaction - this.handleFileListRequests(signature); - - return arbitraryDataFile; - } - - private void handleFileListRequests(byte[] signature) { - try (final Repository repository = RepositoryManager.getRepository()) { - - // Fetch the transaction data - ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - if (arbitraryTransactionData == null) { return; } - boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData); + ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response; + ArbitraryDataFile arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile(); - if (completeFileExists) { - String signature58 = Base58.encode(arbitraryTransactionData.getSignature()); - LOGGER.info("All chunks or complete file exist for transaction {}", signature58); - - ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature); + if (arbitraryDataFile != null) { + + // We might want to forward the request to the peer that originally requested it + this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage); + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + + Map signatureBySignature58 = new HashMap<>(); + + // Lock to synchronize access to the list + private final Object handleFileListRequestsLock = new Object(); + + // Scheduled executor service to process messages every second + private final ScheduledExecutorService handleFileListRequestsScheduler = Executors.newScheduledThreadPool(1); + + private void handleFileListRequests(byte[] signature) { + + synchronized (handleFileListRequestsLock) { + signatureBySignature58.put(Base58.encode(signature), signature); + } + } + + private void handleFileListRequestProcess() { + + Map signaturesToProcess; + + synchronized (handleFileListRequestsLock) { + signaturesToProcess = new HashMap<>(signatureBySignature58); + signatureBySignature58.clear(); + } + + if( signaturesToProcess.isEmpty() ) return; + + try (final Repository repository = RepositoryManager.getRepository()) { + + // Fetch the transaction data + List arbitraryTransactionDataList + = ArbitraryTransactionUtils.fetchTransactionDataList(repository, new ArrayList<>(signaturesToProcess.values())); + + for( ArbitraryTransactionData arbitraryTransactionData : arbitraryTransactionDataList ) { + boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData); + + if (completeFileExists) { + String signature58 = Base58.encode(arbitraryTransactionData.getSignature()); + LOGGER.debug("All chunks or complete file exist for transaction {}", signature58); + + ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature58); + } } - } catch (DataException e) { - LOGGER.debug("Unable to handle file list requests: {}", e.getMessage()); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } } @@ -315,15 +402,14 @@ public class ArbitraryDataFileManager extends Thread { LOGGER.debug("Received arbitrary data file - forwarding is needed"); - // The ID needs to match that of the original request - message.setId(originalMessage.getId()); + try { + // The ID needs to match that of the original request + message.setId(originalMessage.getId()); - if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { - LOGGER.debug("Failed to forward arbitrary data file to peer {}", requestingPeer); - requestingPeer.disconnect("failed to forward arbitrary data file"); - } - else { - LOGGER.debug("Forwarded arbitrary data file to peer {}", requestingPeer); + getOrCreateSendManager(requestingPeer).queueMessage(message); + + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } } @@ -597,13 +683,9 @@ public class ArbitraryDataFileManager extends Thread { LOGGER.debug("Sending file {}...", arbitraryDataFile); ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile); arbitraryDataFileMessage.setId(message.getId()); - if (!peer.sendMessageWithTimeout(arbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { - LOGGER.debug("Couldn't send file {}", arbitraryDataFile); - peer.disconnect("failed to send file"); - } - else { - LOGGER.debug("Sent file {}", arbitraryDataFile); - } + + getOrCreateSendManager(peer).queueMessage(arbitraryDataFileMessage); + } else if (relayInfo != null) { LOGGER.debug("We have relay info for hash {}", Base58.encode(hash)); @@ -615,7 +697,7 @@ public class ArbitraryDataFileManager extends Thread { LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58); // No need to pass arbitraryTransactionData below because this is only used for metadata caching, // and metadata isn't retained when relaying. - this.fetchArbitraryDataFile(peerToAsk, peer, null, signature, hash, message); + this.fetchFileForRelay(peerToAsk, peer, signature, hash, message); } else { LOGGER.debug("Peer {} not found in relay info", peer); @@ -637,7 +719,6 @@ public class ArbitraryDataFileManager extends Thread { fileUnknownMessage.setId(message.getId()); if (!peer.sendMessage(fileUnknownMessage)) { LOGGER.debug("Couldn't sent file-unknown response"); - peer.disconnect("failed to send file-unknown response"); } else { LOGGER.debug("Sent file-unknown response for file {}", arbitraryDataFile); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java index b27eef26..5096f3dd 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java @@ -15,6 +15,7 @@ import org.qortal.settings.Settings; import org.qortal.utils.ArbitraryTransactionUtils; import org.qortal.utils.Base58; import org.qortal.utils.NTP; +import org.qortal.utils.NamedThreadFactory; import java.net.http.HttpResponse; import java.util.ArrayList; @@ -38,6 +39,9 @@ public class ArbitraryDataFileRequestThread { private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class); + private static final Integer FETCHER_LIMIT_PER_PEER = Settings.getInstance().getMaxThreadsForMessageType(MessageType.GET_ARBITRARY_DATA_FILE); + private static final String FETCHER_THREAD_PREFIX = "Arbitrary Data Fetcher "; + private ConcurrentHashMap executorByPeer = new ConcurrentHashMap<>(); private ArbitraryDataFileRequestThread() { @@ -64,8 +68,9 @@ public class ArbitraryDataFileRequestThread { if (value instanceof ThreadPoolExecutor) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) value; if (threadPoolExecutor.getActiveCount() == 0) { + threadPoolExecutor.shutdown(); if (this.executorByPeer.computeIfPresent(key, (k, v) -> null) == null) { - LOGGER.info("removed executor: peer = " + key); + LOGGER.trace("removed executor: peer = " + key); } } } else { @@ -147,7 +152,9 @@ public class ArbitraryDataFileRequestThread { .computeIfAbsent( responseInfo.getPeer().toString(), peer -> Executors.newFixedThreadPool( - Settings.getInstance().getMaxThreadsForMessageType(MessageType.GET_ARBITRARY_DATA_FILE)) + FETCHER_LIMIT_PER_PEER, + new NamedThreadFactory(FETCHER_THREAD_PREFIX + responseInfo.getPeer().toString(), NORM_PRIORITY) + ) ) .execute(fetcher); } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index a0a70144..8f0bf708 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -42,10 +42,10 @@ public class ArbitraryDataManager extends Thread { private int powDifficulty = 14; // Must not be final, as unit tests need to reduce this value /** Request timeout when transferring arbitrary data */ - public static final long ARBITRARY_REQUEST_TIMEOUT = 12 * 1000L; // ms + public static final long ARBITRARY_REQUEST_TIMEOUT = 24 * 1000L; // ms /** Maximum time to hold information about an in-progress relay */ - public static final long ARBITRARY_RELAY_TIMEOUT = 90 * 1000L; // ms + public static final long ARBITRARY_RELAY_TIMEOUT = 120 * 1000L; // ms /** Maximum time to hold direct peer connection information */ public static final long ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT = 2 * 60 * 1000L; // ms diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java index d38d329f..76e44895 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryMetadataManager.java @@ -360,9 +360,8 @@ public class ArbitraryMetadataManager { // Forward to requesting peer LOGGER.debug("Forwarding metadata to requesting peer: {}", requestingPeer); - if (!requestingPeer.sendMessage(forwardArbitraryMetadataMessage)) { - requestingPeer.disconnect("failed to forward arbitrary metadata"); - } + requestingPeer.sendMessage(forwardArbitraryMetadataMessage); + } } } @@ -479,7 +478,6 @@ public class ArbitraryMetadataManager { arbitraryMetadataMessage.setId(message.getId()); if (!peer.sendMessage(arbitraryMetadataMessage)) { LOGGER.debug("Couldn't send metadata"); - peer.disconnect("failed to send metadata"); continue; } LOGGER.debug("Sent metadata"); diff --git a/src/main/java/org/qortal/controller/arbitrary/Follower.java b/src/main/java/org/qortal/controller/arbitrary/Follower.java new file mode 100644 index 00000000..228640f4 --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/Follower.java @@ -0,0 +1,130 @@ +package org.qortal.controller.arbitrary; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.settings.Settings; +import org.qortal.utils.ListUtils; +import org.qortal.utils.NamedThreadFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.OptionalInt; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class Follower { + + private static final Logger LOGGER = LogManager.getLogger(Follower.class); + + private ScheduledExecutorService service + = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Follower", Thread.NORM_PRIORITY)); + + private Follower() { + + } + + private static Follower instance; + + public static Follower getInstance() { + + if( instance == null ) { + instance = new Follower(); + } + + return instance; + } + + public void start() { + + // fetch arbitrary transactions from followed names from the last 100 blocks every 2 minutes + service.scheduleWithFixedDelay(() -> fetch(OptionalInt.of(100)), 10, 2, TimeUnit.MINUTES); + + // fetch arbitrary transaction from followed names from any block every 24 hours + service.scheduleWithFixedDelay(() -> fetch(OptionalInt.empty()), 4, 24, TimeUnit.HOURS); + } + + private void fetch(OptionalInt limit) { + + try { + // for each followed name, get arbitraty transactions, then examine those transactions before fetching + for (String name : ListUtils.followedNames()) { + + List transactionsInReverseOrder; + + // open database to get the transactions in reverse order for the followed name + try (final Repository repository = RepositoryManager.getRepository()) { + + List latestArbitraryTransactionsByName + = repository.getArbitraryRepository().getLatestArbitraryTransactionsByName(name); + + if (limit.isPresent()) { + final int blockHeightThreshold = repository.getBlockRepository().getBlockchainHeight() - limit.getAsInt(); + + transactionsInReverseOrder + = latestArbitraryTransactionsByName.stream().filter(tx -> tx.getBlockHeight() > blockHeightThreshold) + .collect(Collectors.toList()); + } else { + transactionsInReverseOrder = latestArbitraryTransactionsByName; + } + + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + transactionsInReverseOrder = new ArrayList<>(0); + } + + // collect process transaction hashes, so we don't fetch outdated transactions + Set processedTransactions = new HashSet<>(); + + ArbitraryDataStorageManager storageManager = ArbitraryDataStorageManager.getInstance(); + + // for each arbitrary transaction for the followed name process, evaluate, fetch + for (ArbitraryTransactionData arbitraryTransaction : transactionsInReverseOrder) { + + boolean examined = false; + + try (final Repository repository = RepositoryManager.getRepository()) { + + // if not processed + if (!processedTransactions.contains(new ArbitraryTransactionDataHashWrapper(arbitraryTransaction))) { + boolean isLocal = repository.getArbitraryRepository().isDataLocal(arbitraryTransaction.getSignature()); + + // if not local, then continue to evaluate + if (!isLocal) { + + // evaluate fetching status for this transaction on this node + ArbitraryDataExamination examination = storageManager.shouldPreFetchData(repository, arbitraryTransaction); + + // if the evaluation passed, then fetch + examined = examination.isPass(); + } + // if locally stored, then nothing needs to be done + + // add to processed transactions + processedTransactions.add(new ArbitraryTransactionDataHashWrapper(arbitraryTransaction)); + } + } + + // if passed examination for fetching, then fetch + if (examined) { + LOGGER.info("for {} on {}, fetching {}", name, arbitraryTransaction.getService(), arbitraryTransaction.getIdentifier()); + boolean fetched = ArbitraryDataFileListManager.getInstance().fetchArbitraryDataFileList(arbitraryTransaction); + + LOGGER.info("fetched = " + fetched); + } + + // pause a second before moving on to another transaction + Thread.sleep(1000); + } + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } +} diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index fc28ea87..6d22be3e 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -640,10 +640,13 @@ public class Peer { return false; try { - this.outputBuffer = ByteBuffer.wrap(message.toBytes()); + byte[] messageBytes = message.toBytes(); + + this.outputBuffer = ByteBuffer.wrap(messageBytes); this.outputMessageType = message.getType().name(); this.outputMessageId = message.getId(); + LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", this.peerConnectionId, this.outputMessageType, this.outputMessageId, this); @@ -662,12 +665,22 @@ public class Peer { // If output byte buffer is not null, send from that int bytesWritten = this.socketChannel.write(outputBuffer); - LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", this.peerConnectionId, - bytesWritten, this.outputMessageType, this.outputMessageId, this, outputBuffer.limit()); + int zeroSendCount = 0; - // If we've sent 0 bytes then socket buffer is full so we need to wait until it's empty again - if (bytesWritten == 0) { - return true; + while (bytesWritten == 0) { + if (zeroSendCount > 9) { + LOGGER.debug("Socket write stuck for too long, returning"); + return true; + } + try { + Thread.sleep(10); // 10MS CPU Sleep to try and give it time to flush the socket + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; // optional, if you want to signal shutdown + } + zeroSendCount++; + bytesWritten = this.socketChannel.write(outputBuffer); } // If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more) @@ -729,7 +742,7 @@ public class Peer { try { // Queue message, to be picked up by ChannelWriteTask and then peer.writeChannel() - LOGGER.trace("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId, + LOGGER.debug("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId, message.getType().name(), message.getId(), this); // Check message properly constructed diff --git a/src/main/java/org/qortal/network/PeerSendManager.java b/src/main/java/org/qortal/network/PeerSendManager.java new file mode 100644 index 00000000..7f9a6fc9 --- /dev/null +++ b/src/main/java/org/qortal/network/PeerSendManager.java @@ -0,0 +1,131 @@ +package org.qortal.network; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.network.message.Message; + +public class PeerSendManager { + private static final Logger LOGGER = LogManager.getLogger(PeerSendManager.class); + + private static final int MAX_FAILURES = 15; + private static final int MAX_MESSAGE_ATTEMPTS = 2; + private static final int SEND_TIMEOUT_MS = 500; + private static final int RETRY_DELAY_MS = 100; + private static final long MAX_QUEUE_DURATION_MS = 20_000; + private static final long COOLDOWN_DURATION_MS = 20_000; + + private final Peer peer; + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final ExecutorService executor; + private final AtomicInteger failureCount = new AtomicInteger(0); + private static final AtomicInteger threadCount = new AtomicInteger(1); + + private volatile boolean coolingDown = false; + private volatile long lastUsed = System.currentTimeMillis(); + + public PeerSendManager(Peer peer) { + this.peer = peer; + this.executor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r); + t.setName("PeerSendManager-" + peer.getResolvedAddress().getHostString() + "-" + threadCount.getAndIncrement()); + return t; + }); + start(); + } + + private void start() { + executor.submit(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + TimedMessage timedMessage = queue.take(); + long age = System.currentTimeMillis() - timedMessage.timestamp; + + if (age > MAX_QUEUE_DURATION_MS) { + LOGGER.debug("Dropping stale message {} ({}ms old)", timedMessage.message.getId(), age); + continue; + } + + Message message = timedMessage.message; + boolean success = false; + + for (int attempt = 1; attempt <= MAX_MESSAGE_ATTEMPTS; attempt++) { + try { + if (peer.sendMessageWithTimeout(message, SEND_TIMEOUT_MS)) { + success = true; + failureCount.set(0); // reset on success + break; + } + } catch (Exception e) { + LOGGER.debug("Attempt {} failed for message {} to peer {}: {}", attempt, message.getId(), peer, e.getMessage()); + } + + Thread.sleep(RETRY_DELAY_MS); + } + + if (!success) { + int totalFailures = failureCount.incrementAndGet(); + LOGGER.debug("Failed to send message {} to peer {}. Total failures: {}", message.getId(), peer, totalFailures); + + if (totalFailures >= MAX_FAILURES) { + LOGGER.debug("Peer {} exceeded failure limit ({}). Disconnecting...", peer, totalFailures); + peer.disconnect("Too many message send failures"); + coolingDown = true; + queue.clear(); + + try { + Thread.sleep(COOLDOWN_DURATION_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } finally { + coolingDown = false; + failureCount.set(0); + } + } + } + + Thread.sleep(50); // small throttle + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + LOGGER.error("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage(), e); + } + } + }); + } + + public void queueMessage(Message message) { + if (coolingDown) { + LOGGER.debug("In cooldown, ignoring message {}", message.getId()); + return; + } + + lastUsed = System.currentTimeMillis(); + if (!queue.offer(new TimedMessage(message))) { + LOGGER.debug("Send queue full, dropping message {}", message.getId()); + } + } + + public boolean isIdle(long cutoffMillis) { + return System.currentTimeMillis() - lastUsed > cutoffMillis; + } + + public void shutdown() { + queue.clear(); + executor.shutdownNow(); + } + + private static class TimedMessage { + final Message message; + final long timestamp; + + TimedMessage(Message message) { + this.message = message; + this.timestamp = System.currentTimeMillis(); + } + } +} diff --git a/src/main/java/org/qortal/network/task/ChannelWriteTask.java b/src/main/java/org/qortal/network/task/ChannelWriteTask.java index 59bc557e..455a7384 100644 --- a/src/main/java/org/qortal/network/task/ChannelWriteTask.java +++ b/src/main/java/org/qortal/network/task/ChannelWriteTask.java @@ -31,8 +31,28 @@ public class ChannelWriteTask implements Task { @Override public void perform() throws InterruptedException { try { - boolean isSocketClogged = peer.writeChannel(); + + boolean isSocketClogged; + int clogCounter = 0; + do { + isSocketClogged = peer.writeChannel(); + if (clogCounter > 9) { + LOGGER.warn("10 Socket Clogs - GIVING UP"); + break; + } + if (isSocketClogged) { + LOGGER.debug( + "socket is clogged: peer = {} {}, retrying", + peer.getPeerData().getAddress().toString(), + Thread.currentThread().getName() + ); + Thread.sleep(1000); + clogCounter++; + } + + } while( isSocketClogged ); + // Tell Network that we've finished Network.getInstance().notifyChannelNotWriting(socketChannel); @@ -49,4 +69,4 @@ public class ChannelWriteTask implements Task { peer.disconnect("I/O error"); } } -} +} \ No newline at end of file diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBChatRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBChatRepository.java index 535c3ed6..e817763e 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBChatRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBChatRepository.java @@ -44,11 +44,9 @@ public class HSQLDBChatRepository implements ChatRepository { // if the PrimaryTable is available, then use it if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) { - LOGGER.debug("using PrimaryNames for chat transactions"); tableName = "PrimaryNames"; } else { - LOGGER.debug("using Names for chat transactions"); tableName = "Names"; } @@ -164,11 +162,9 @@ public class HSQLDBChatRepository implements ChatRepository { // if the PrimaryTable is available, then use it if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) { - LOGGER.debug("using PrimaryNames for chat transactions"); tableName = "PrimaryNames"; } else { - LOGGER.debug("using Names for chat transactions"); tableName = "Names"; } @@ -322,11 +318,9 @@ public class HSQLDBChatRepository implements ChatRepository { // if the PrimaryTable is available, then use it if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) { - LOGGER.debug("using PrimaryNames for chat transactions"); tableName = "PrimaryNames"; } else { - LOGGER.debug("using Names for chat transactions"); tableName = "Names"; }