diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java index ee37dbec..07989589 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -402,8 +402,8 @@ public class ArbitraryDataFileListManager { return true; } - public void deleteFileListRequestsForSignature(byte[] signature) { - String signature58 = Base58.encode(signature); + public void deleteFileListRequestsForSignature(String signature58) { + for (Iterator>> it = arbitraryDataFileListRequests.entrySet().iterator(); it.hasNext();) { Map.Entry> entry = it.next(); if (entry == null || entry.getKey() == null || entry.getValue() == null) { diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index a4034596..4b50603c 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -23,7 +23,6 @@ import org.qortal.utils.NTP; import java.security.SecureRandom; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -70,6 +69,7 @@ public class ArbitraryDataFileManager extends Thread { private ArbitraryDataFileManager() { this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS); + this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS); } public static ArbitraryDataFileManager getInstance() { @@ -140,7 +140,7 @@ public class ArbitraryDataFileManager extends Thread { if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { LOGGER.debug("Requesting data file {} from peer {}", hash58, peer); Long startTime = NTP.getTime(); - ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, null, arbitraryTransactionData, signature, hash, null); + ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, arbitraryTransactionData, signature, hash); Long endTime = NTP.getTime(); if (receivedArbitraryDataFile != null) { LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFile.getHash58(), peer, (endTime-startTime)); @@ -207,14 +207,71 @@ public class ArbitraryDataFileManager extends Thread { } } - private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, Peer requestingPeer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash, Message originalMessage) throws DataException { - ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); - boolean fileAlreadyExists = existingFile.exists(); - String hash58 = Base58.encode(hash); + private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash) throws DataException { ArbitraryDataFile arbitraryDataFile; - // Fetch the file if it doesn't exist locally - if (!fileAlreadyExists) { + try { + ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); + boolean fileAlreadyExists = existingFile.exists(); + String hash58 = Base58.encode(hash); + + // Fetch the file if it doesn't exist locally + if (!fileAlreadyExists) { + LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer)); + arbitraryDataFileRequests.put(hash58, NTP.getTime()); + Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); + + Message response = null; + try { + response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT); + } catch (InterruptedException e) { + // Will return below due to null response + } + arbitraryDataFileRequests.remove(hash58); + LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); + + if (response == null) { + LOGGER.debug("Received null response from peer {}", peer); + return null; + } + if (response.getType() != MessageType.ARBITRARY_DATA_FILE) { + LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer); + return null; + } + + ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response; + arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile(); + } else { + LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58)); + arbitraryDataFile = existingFile; + } + + if (arbitraryDataFile != null) { + + arbitraryDataFile.save(); + + // If this is a metadata file then we need to update the cache + if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) { + if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) { + ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData); + } + } + + // We may need to remove the file list request, if we have all the files for this transaction + this.handleFileListRequests(signature); + } + } catch (DataException e) { + LOGGER.error(e.getMessage(), e); + arbitraryDataFile = null; + } + + return arbitraryDataFile; + } + + private void fetchFileForRelay(Peer peer, Peer requestingPeer, byte[] signature, byte[] hash, Message originalMessage) throws DataException { + try { + String hash58 = Base58.encode(hash); + LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer)); arbitraryDataFileRequests.put(hash58, NTP.getTime()); Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); @@ -228,77 +285,75 @@ public class ArbitraryDataFileManager extends Thread { arbitraryDataFileRequests.remove(hash58); LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); - - if (response == null) { LOGGER.debug("Received null response from peer {}", peer); - return null; + return; } if (response.getType() != MessageType.ARBITRARY_DATA_FILE) { LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer); - return null; - } - - ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response; - arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile(); - } else { - LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58)); - arbitraryDataFile = existingFile; - } - - if (arbitraryDataFile == null) { - // We don't have a file, so give up here - return null; - } - - // We might want to forward the request to the peer that originally requested it - this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage); - - boolean isRelayRequest = (requestingPeer != null); - if (isRelayRequest) { - if (!fileAlreadyExists) { - // File didn't exist locally before the request, and it's a forwarding request, so delete it if it exists. - // It shouldn't exist on the filesystem yet, but leaving this here just in case. - arbitraryDataFile.delete(10); - } - } - else { - arbitraryDataFile.save(); - } - - // If this is a metadata file then we need to update the cache - if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) { - if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) { - ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData); - } - } - - // We may need to remove the file list request, if we have all the files for this transaction - this.handleFileListRequests(signature); - - return arbitraryDataFile; - } - - private void handleFileListRequests(byte[] signature) { - try (final Repository repository = RepositoryManager.getRepository()) { - - // Fetch the transaction data - ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - if (arbitraryTransactionData == null) { return; } - boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData); + ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response; + ArbitraryDataFile arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile(); - if (completeFileExists) { - String signature58 = Base58.encode(arbitraryTransactionData.getSignature()); - LOGGER.info("All chunks or complete file exist for transaction {}", signature58); - - ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature); + if (arbitraryDataFile != null) { + + // We might want to forward the request to the peer that originally requested it + this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage); + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + + Map signatureBySignature58 = new HashMap<>(); + + // Lock to synchronize access to the list + private final Object handleFileListRequestsLock = new Object(); + + // Scheduled executor service to process messages every second + private final ScheduledExecutorService handleFileListRequestsScheduler = Executors.newScheduledThreadPool(1); + + private void handleFileListRequests(byte[] signature) { + + synchronized (handleFileListRequestsLock) { + signatureBySignature58.put(Base58.encode(signature), signature); + } + } + + private void handleFileListRequestProcess() { + + Map signaturesToProcess; + + synchronized (handleFileListRequestsLock) { + signaturesToProcess = new HashMap<>(signatureBySignature58); + signatureBySignature58.clear(); + } + + if( signaturesToProcess.isEmpty() ) return; + + LOGGER.info("signatures to process = " + signaturesToProcess.size()); + + try (final Repository repository = RepositoryManager.getRepository()) { + + // Fetch the transaction data + List arbitraryTransactionDataList + = ArbitraryTransactionUtils.fetchTransactionDataList(repository, new ArrayList<>(signaturesToProcess.values())); + + for( ArbitraryTransactionData arbitraryTransactionData : arbitraryTransactionDataList ) { + boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData); + + if (completeFileExists) { + String signature58 = Base58.encode(arbitraryTransactionData.getSignature()); + LOGGER.info("All chunks or complete file exist for transaction {}", signature58); + + ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature58); + } } - } catch (DataException e) { - LOGGER.debug("Unable to handle file list requests: {}", e.getMessage()); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } } @@ -315,15 +370,19 @@ public class ArbitraryDataFileManager extends Thread { LOGGER.debug("Received arbitrary data file - forwarding is needed"); - // The ID needs to match that of the original request - message.setId(originalMessage.getId()); + try { + // The ID needs to match that of the original request + message.setId(originalMessage.getId()); - if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { - LOGGER.debug("Failed to forward arbitrary data file to peer {}", requestingPeer); - requestingPeer.disconnect("failed to forward arbitrary data file"); - } - else { - LOGGER.debug("Forwarded arbitrary data file to peer {}", requestingPeer); + if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { + LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer); + requestingPeer.disconnect("failed to forward arbitrary data file"); + } + else { + LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer); + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } } @@ -615,7 +674,7 @@ public class ArbitraryDataFileManager extends Thread { LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58); // No need to pass arbitraryTransactionData below because this is only used for metadata caching, // and metadata isn't retained when relaying. - this.fetchArbitraryDataFile(peerToAsk, peer, null, signature, hash, message); + this.fetchFileForRelay(peerToAsk, peer, signature, hash, message); } else { LOGGER.debug("Peer {} not found in relay info", peer);