diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 2748b82d..cbb91e66 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -50,11 +50,8 @@ import org.qortal.api.GatewayService; import org.qortal.block.Block; import org.qortal.block.BlockChain; import org.qortal.block.BlockChain.BlockTimingByHeight; -import org.qortal.controller.arbitrary.ArbitraryDataBuildManager; -import org.qortal.controller.arbitrary.ArbitraryDataCleanupManager; -import org.qortal.controller.arbitrary.ArbitraryDataManager; +import org.qortal.controller.arbitrary.*; import org.qortal.controller.Synchronizer.SynchronizationResult; -import org.qortal.controller.arbitrary.ArbitraryDataStorageManager; import org.qortal.controller.repository.PruneManager; import org.qortal.controller.repository.NamesDatabaseIntegrityCheck; import org.qortal.controller.tradebot.TradeBot; @@ -83,8 +80,6 @@ import org.qortal.transaction.Transaction.TransactionType; import org.qortal.transaction.Transaction.ValidationResult; import org.qortal.utils.*; -import static org.qortal.network.Peer.FETCH_BLOCKS_TIMEOUT; - public class Controller extends Thread { static { @@ -1382,15 +1377,15 @@ public class Controller extends Thread { break; case ARBITRARY_DATA_FILE_LIST: - ArbitraryDataManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message); + ArbitraryDataFileListManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message); break; case GET_ARBITRARY_DATA_FILE: - ArbitraryDataManager.getInstance().onNetworkGetArbitraryDataFileMessage(peer, message); + ArbitraryDataFileManager.getInstance().onNetworkGetArbitraryDataFileMessage(peer, message); break; case GET_ARBITRARY_DATA_FILE_LIST: - ArbitraryDataManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message); + ArbitraryDataFileListManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message); break; case ARBITRARY_SIGNATURES: diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java new file mode 100644 index 00000000..cd126fff --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -0,0 +1,495 @@ +package org.qortal.controller.arbitrary; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.arbitrary.ArbitraryDataFile; +import org.qortal.arbitrary.ArbitraryDataFileChunk; +import org.qortal.controller.Controller; +import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.data.transaction.TransactionData; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.message.ArbitraryDataFileListMessage; +import org.qortal.network.message.GetArbitraryDataFileListMessage; +import org.qortal.network.message.Message; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.settings.Settings; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; +import org.qortal.utils.Triple; + +import java.util.*; + +public class ArbitraryDataFileListManager extends Thread { + + private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileListManager.class); + + private static ArbitraryDataFileListManager instance; + private volatile boolean isStopping = false; + + + /** + * Map of recent incoming requests for ARBITRARY transaction data file lists. + *

+ * Key is original request's message ID
+ * Value is Triple<transaction signature in base58, first requesting peer, first request's timestamp> + *

+ * If peer is null then either:
+ *

+ * If signature is null then we have already received the file list and either:
+ * + */ + public Map> arbitraryDataFileListRequests = Collections.synchronizedMap(new HashMap<>()); + + /** + * Map to keep track of in progress arbitrary data signature requests + * Key: string - the signature encoded in base58 + * Value: Triple + */ + private Map> arbitraryDataSignatureRequests = Collections.synchronizedMap(new HashMap<>()); + + + private ArbitraryDataFileListManager() { + } + + public static ArbitraryDataFileListManager getInstance() { + if (instance == null) + instance = new ArbitraryDataFileListManager(); + + return instance; + } + + @Override + public void run() { + Thread.currentThread().setName("Arbitrary Data File List Manager"); + + try { + while (!isStopping) { + Thread.sleep(2000); + + // TODO + } + } catch (InterruptedException e) { + // Fall-through to exit thread... + } + } + + public void shutdown() { + isStopping = true; + this.interrupt(); + } + + + public void cleanupRequestCache(Long now) { + if (now == null) { + return; + } + final long requestMinimumTimestamp = now - ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT; + arbitraryDataFileListRequests.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < requestMinimumTimestamp); + } + + + // Track file list lookups by signature + + private boolean shouldMakeFileListRequestForSignature(String signature58) { + Triple request = arbitraryDataSignatureRequests.get(signature58); + + if (request == null) { + // Not attempted yet + return true; + } + + // Extract the components + Integer networkBroadcastCount = request.getA(); + // Integer directPeerRequestCount = request.getB(); + Long lastAttemptTimestamp = request.getC(); + + if (lastAttemptTimestamp == null) { + // Not attempted yet + return true; + } + + long timeSinceLastAttempt = NTP.getTime() - lastAttemptTimestamp; + if (timeSinceLastAttempt > 5 * 60 * 1000L) { + // We haven't tried for at least 5 minutes + + if (networkBroadcastCount < 5) { + // We've made less than 5 total attempts + return true; + } + } + + if (timeSinceLastAttempt > 24 * 60 * 60 * 1000L) { + // We haven't tried for at least 24 hours + return true; + } + + return false; + } + + private boolean shouldMakeDirectFileRequestsForSignature(String signature58) { + if (!Settings.getInstance().isDirectDataRetrievalEnabled()) { + // Direct connections are disabled in the settings + return false; + } + + Triple request = arbitraryDataSignatureRequests.get(signature58); + + if (request == null) { + // Not attempted yet + return true; + } + + // Extract the components + //Integer networkBroadcastCount = request.getA(); + Integer directPeerRequestCount = request.getB(); + Long lastAttemptTimestamp = request.getC(); + + if (lastAttemptTimestamp == null) { + // Not attempted yet + return true; + } + + if (directPeerRequestCount == 0) { + // We haven't tried asking peers directly yet, so we should + return true; + } + + long timeSinceLastAttempt = NTP.getTime() - lastAttemptTimestamp; + if (timeSinceLastAttempt > 10 * 1000L) { + // We haven't tried for at least 10 seconds + if (directPeerRequestCount < 5) { + // We've made less than 5 total attempts + return true; + } + } + + if (timeSinceLastAttempt > 5 * 60 * 1000L) { + // We haven't tried for at least 5 minutes + if (directPeerRequestCount < 10) { + // We've made less than 10 total attempts + return true; + } + } + + if (timeSinceLastAttempt > 24 * 60 * 60 * 1000L) { + // We haven't tried for at least 24 hours + return true; + } + + return false; + } + + public boolean isSignatureRateLimited(byte[] signature) { + String signature58 = Base58.encode(signature); + return !this.shouldMakeFileListRequestForSignature(signature58) + && !this.shouldMakeDirectFileRequestsForSignature(signature58); + } + + public long lastRequestForSignature(byte[] signature) { + String signature58 = Base58.encode(signature); + Triple request = arbitraryDataSignatureRequests.get(signature58); + + if (request == null) { + // Not attempted yet + return 0; + } + + // Extract the components + Long lastAttemptTimestamp = request.getC(); + if (lastAttemptTimestamp != null) { + return lastAttemptTimestamp; + } + return 0; + } + + public void addToSignatureRequests(String signature58, boolean incrementNetworkRequests, boolean incrementPeerRequests) { + Triple request = arbitraryDataSignatureRequests.get(signature58); + Long now = NTP.getTime(); + + if (request == null) { + // No entry yet + Triple newRequest = new Triple<>(0, 0, now); + arbitraryDataSignatureRequests.put(signature58, newRequest); + } + else { + // There is an existing entry + if (incrementNetworkRequests) { + request.setA(request.getA() + 1); + } + if (incrementPeerRequests) { + request.setB(request.getB() + 1); + } + request.setC(now); + arbitraryDataSignatureRequests.put(signature58, request); + } + } + + public void removeFromSignatureRequests(String signature58) { + arbitraryDataSignatureRequests.remove(signature58); + } + + + // Lookup file lists by signature + + public boolean fetchArbitraryDataFileList(ArbitraryTransactionData arbitraryTransactionData) { + byte[] signature = arbitraryTransactionData.getSignature(); + String signature58 = Base58.encode(signature); + + // If we've already tried too many times in a short space of time, make sure to give up + if (!this.shouldMakeFileListRequestForSignature(signature58)) { + // Check if we should make direct connections to peers + if (this.shouldMakeDirectFileRequestsForSignature(signature58)) { + return ArbitraryDataFileManager.getInstance().fetchDataFilesFromPeersForSignature(signature); + } + + LOGGER.debug("Skipping file list request for signature {} due to rate limit", signature58); + return false; + } + this.addToSignatureRequests(signature58, true, false); + + LOGGER.info(String.format("Sending data file list request for signature %s...", Base58.encode(signature))); + + // Build request + Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature); + + // Save our request into requests map + Triple requestEntry = new Triple<>(signature58, null, NTP.getTime()); + + // Assign random ID to this message + int id; + do { + id = new Random().nextInt(Integer.MAX_VALUE - 1) + 1; + + // Put queue into map (keyed by message ID) so we can poll for a response + // If putIfAbsent() doesn't return null, then this ID is already taken + } while (arbitraryDataFileListRequests.put(id, requestEntry) != null); + getArbitraryDataFileListMessage.setId(id); + + // Broadcast request + Network.getInstance().broadcast(peer -> getArbitraryDataFileListMessage); + + // Poll to see if data has arrived + final long singleWait = 100; + long totalWait = 0; + while (totalWait < ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT) { + try { + Thread.sleep(singleWait); + } catch (InterruptedException e) { + break; + } + + requestEntry = arbitraryDataFileListRequests.get(id); + if (requestEntry == null) + return false; + + if (requestEntry.getA() == null) + break; + + totalWait += singleWait; + } + return true; + } + + + + // Network handlers + + public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) { + // Don't process if QDN is disabled + if (!Settings.getInstance().isQdnEnabled()) { + return; + } + + ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message; + LOGGER.info("Received hash list from peer {} with {} hashes", peer, arbitraryDataFileListMessage.getHashes().size()); + + // Do we have a pending request for this data? // TODO: might we want to relay all of them anyway? + Triple request = arbitraryDataFileListRequests.get(message.getId()); + if (request == null || request.getA() == null) { + return; + } + boolean isRelayRequest = (request.getB() != null); + + // Does this message's signature match what we're expecting? + byte[] signature = arbitraryDataFileListMessage.getSignature(); + String signature58 = Base58.encode(signature); + if (!request.getA().equals(signature58)) { + return; + } + + List hashes = arbitraryDataFileListMessage.getHashes(); + if (hashes == null || hashes.isEmpty()) { + return; + } + + ArbitraryTransactionData arbitraryTransactionData = null; + ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance(); + + // Check transaction exists and hashes are correct + try (final Repository repository = RepositoryManager.getRepository()) { + TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); + if (!(transactionData instanceof ArbitraryTransactionData)) + return; + + arbitraryTransactionData = (ArbitraryTransactionData) transactionData; + + // Load data file(s) + ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(arbitraryTransactionData.getData(), signature); + arbitraryDataFile.setMetadataHash(arbitraryTransactionData.getMetadataHash()); + +// // Check all hashes exist +// for (byte[] hash : hashes) { +// //LOGGER.info("Received hash {}", Base58.encode(hash)); +// if (!arbitraryDataFile.containsChunk(hash)) { +// // Check the hash against the complete file +// if (!Arrays.equals(arbitraryDataFile.getHash(), hash)) { +// LOGGER.info("Received non-matching chunk hash {} for signature {}. This could happen if we haven't obtained the metadata file yet.", Base58.encode(hash), signature58); +// return; +// } +// } +// } + + // Update requests map to reflect that we've received it + Triple newEntry = new Triple<>(null, null, request.getC()); + arbitraryDataFileListRequests.put(message.getId(), newEntry); + + if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) { + // Go and fetch the actual data, since this isn't a relay request + arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes); + } + + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e); + } + + // Forwarding + if (isRelayRequest && Settings.getInstance().isRelayModeEnabled()) { + boolean isBlocked = (arbitraryTransactionData == null || ArbitraryDataStorageManager.getInstance().isNameBlocked(arbitraryTransactionData.getName())); + if (!isBlocked) { + Peer requestingPeer = request.getB(); + if (requestingPeer != null) { + // Add each hash to our local mapping so we know who to ask later + Long now = NTP.getTime(); + for (byte[] hash : hashes) { + String hash58 = Base58.encode(hash); + Triple value = new Triple<>(signature58, peer, now); + arbitraryDataFileManager.arbitraryRelayMap.put(hash58, value); + LOGGER.debug("Added {} to relay map: {}, {}, {}", hash58, signature58, peer, now); + } + + // Forward to requesting peer + LOGGER.info("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer); + if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) { + requestingPeer.disconnect("failed to forward arbitrary data file list"); + } + } + } + } + } + + public void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) { + // Don't respond if QDN is disabled + if (!Settings.getInstance().isQdnEnabled()) { + return; + } + + Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet(); + + GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message; + byte[] signature = getArbitraryDataFileListMessage.getSignature(); + String signature58 = Base58.encode(signature); + Long timestamp = NTP.getTime(); + Triple newEntry = new Triple<>(signature58, peer, timestamp); + + // If we've seen this request recently, then ignore + if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null) { + return; + } + + LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature)); + + List hashes = new ArrayList<>(); + ArbitraryTransactionData transactionData = null; + + try (final Repository repository = RepositoryManager.getRepository()) { + + // Firstly we need to lookup this file on chain to get a list of its hashes + transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature); + if (transactionData instanceof ArbitraryTransactionData) { + + // Check if we're even allowed to serve data for this transaction + if (ArbitraryDataStorageManager.getInstance().canStoreData(transactionData)) { + + byte[] hash = transactionData.getData(); + byte[] metadataHash = transactionData.getMetadataHash(); + + // Load file(s) and add any that exist to the list of hashes + ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature); + if (metadataHash != null) { + arbitraryDataFile.setMetadataHash(metadataHash); + + // If we have the metadata file, add its hash + if (arbitraryDataFile.getMetadataFile().exists()) { + hashes.add(arbitraryDataFile.getMetadataHash()); + } + + for (ArbitraryDataFileChunk chunk : arbitraryDataFile.getChunks()) { + if (chunk.exists()) { + hashes.add(chunk.getHash()); + //LOGGER.info("Added hash {}", chunk.getHash58()); + } else { + LOGGER.info("Couldn't add hash {} because it doesn't exist", chunk.getHash58()); + } + } + } else { + // This transaction has no chunks, so include the complete file if we have it + if (arbitraryDataFile.exists()) { + hashes.add(arbitraryDataFile.getHash()); + } + } + } + } + + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e); + } + + // We should only respond if we have at least one hash + if (hashes.size() > 0) { + + // Update requests map to reflect that we've sent it + newEntry = new Triple<>(signature58, null, timestamp); + arbitraryDataFileListRequests.put(message.getId(), newEntry); + + ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes); + arbitraryDataFileListMessage.setId(message.getId()); + if (!peer.sendMessage(arbitraryDataFileListMessage)) { + LOGGER.info("Couldn't send list of hashes"); + peer.disconnect("failed to send list of hashes"); + } + LOGGER.info("Sent list of hashes (count: {})", hashes.size()); + + } + else { + boolean isBlocked = (transactionData == null || ArbitraryDataStorageManager.getInstance().isNameBlocked(transactionData.getName())); + if (Settings.getInstance().isRelayModeEnabled() && !isBlocked) { + // In relay mode - so ask our other peers if they have it + LOGGER.info("Rebroadcasted hash list request from peer {} for signature {} to our other peers", peer, Base58.encode(signature)); + Network.getInstance().broadcast( + broadcastPeer -> broadcastPeer == peer || + Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost()) + ? null : message); + } + } + } + +} diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java new file mode 100644 index 00000000..d07ec444 --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -0,0 +1,363 @@ +package org.qortal.controller.arbitrary; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.arbitrary.ArbitraryDataFile; +import org.qortal.controller.Controller; +import org.qortal.data.network.ArbitraryPeerData; +import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.data.transaction.TransactionData; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.message.*; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.settings.Settings; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; +import org.qortal.utils.Triple; + +import java.security.SecureRandom; +import java.util.*; + +public class ArbitraryDataFileManager extends Thread { + + private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class); + + private static ArbitraryDataFileManager instance; + private volatile boolean isStopping = false; + + + /** + * Map to keep track of our in progress (outgoing) arbitrary data file requests + */ + private Map arbitraryDataFileRequests = Collections.synchronizedMap(new HashMap<>()); + + /** + * Map to keep track of hashes that we might need to relay, keyed by the hash of the file (base58 encoded). + * Value is comprised of the base58-encoded signature, the peer that is hosting it, and the timestamp that it was added + */ + public Map> arbitraryRelayMap = Collections.synchronizedMap(new HashMap<>()); + + + private ArbitraryDataFileManager() { + } + + public static ArbitraryDataFileManager getInstance() { + if (instance == null) + instance = new ArbitraryDataFileManager(); + + return instance; + } + + @Override + public void run() { + Thread.currentThread().setName("Arbitrary Data File List Manager"); + + try { + while (!isStopping) { + Thread.sleep(2000); + + // TODO + } + } catch (InterruptedException e) { + // Fall-through to exit thread... + } + } + + public void shutdown() { + isStopping = true; + this.interrupt(); + } + + + public void cleanupRequestCache(Long now) { + if (now == null) { + return; + } + final long requestMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_REQUEST_TIMEOUT; + arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < requestMinimumTimestamp); + + final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT; + arbitraryRelayMap.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); + } + + + + // Fetch data files by hash + + public boolean fetchAllArbitraryDataFiles(Repository repository, Peer peer, byte[] signature) { + try { + TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); + if (!(transactionData instanceof ArbitraryTransactionData)) + return false; + + ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData; + + // We use null to represent all hashes associated with this transaction + return this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, null); + + } catch (DataException e) {} + + return false; + } + + public boolean fetchArbitraryDataFiles(Repository repository, + Peer peer, + byte[] signature, + ArbitraryTransactionData arbitraryTransactionData, + List hashes) throws DataException { + + // Load data file(s) + ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(arbitraryTransactionData.getData(), signature); + byte[] metadataHash = arbitraryTransactionData.getMetadataHash(); + arbitraryDataFile.setMetadataHash(metadataHash); + + // If hashes are null, we will treat this to mean all data hashes associated with this file + if (hashes == null) { + if (metadataHash == null) { + // This transaction has no metadata/chunks, so use the main file hash + hashes = Arrays.asList(arbitraryDataFile.getHash()); + } + else if (!arbitraryDataFile.getMetadataFile().exists()) { + // We don't have the metadata file yet, so request it + hashes = Arrays.asList(arbitraryDataFile.getMetadataFile().getHash()); + } + else { + // Add the chunk hashes + hashes = arbitraryDataFile.getChunkHashes(); + } + } + + boolean receivedAtLeastOneFile = false; + + // Now fetch actual data from this peer + for (byte[] hash : hashes) { + if (!arbitraryDataFile.chunkExists(hash)) { + // Only request the file if we aren't already requesting it from someone else + if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { + ArbitraryDataFileMessage receivedArbitraryDataFileMessage = fetchArbitraryDataFile(peer, null, signature, hash, null); + if (receivedArbitraryDataFileMessage != null) { + LOGGER.info("Received data file {} from peer {}", receivedArbitraryDataFileMessage.getArbitraryDataFile().getHash58(), peer); + receivedAtLeastOneFile = true; + } + else { + LOGGER.info("Peer {} didn't respond with data file {} for signature {}", peer, Base58.encode(hash), Base58.encode(signature)); + } + } + else { + LOGGER.info("Already requesting data file {} for signature {}", arbitraryDataFile, Base58.encode(signature)); + } + } + } + + if (receivedAtLeastOneFile) { + // Update our lookup table to indicate that this peer holds data for this signature + String peerAddress = peer.getPeerData().getAddress().toString(); + LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature)); + ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer); + repository.discardChanges(); + repository.getArbitraryRepository().save(arbitraryPeerData); + repository.saveChanges(); + + // Invalidate the hosted transactions cache as we are now hosting something new + ArbitraryDataStorageManager.getInstance().invalidateHostedTransactionsCache(); + } + + // Check if we have all the files we need for this transaction + if (arbitraryDataFile.allFilesExist()) { + + // We have all the chunks for this transaction, so we should invalidate the transaction's name's + // data cache so that it is rebuilt the next time we serve it + ArbitraryDataManager.getInstance().invalidateCache(arbitraryTransactionData); + + // We may also need to broadcast to the network that we are now hosting files for this transaction, + // but only if these files are in accordance with our storage policy + if (ArbitraryDataStorageManager.getInstance().canStoreData(arbitraryTransactionData)) { + // Use a null peer address to indicate our own + Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, Arrays.asList(signature)); + Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage); + } + } + + return receivedAtLeastOneFile; + } + + private ArbitraryDataFileMessage fetchArbitraryDataFile(Peer peer, Peer requestingPeer, byte[] signature, byte[] hash, Message originalMessage) throws DataException { + ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); + boolean fileAlreadyExists = existingFile.exists(); + Message message = null; + + // Fetch the file if it doesn't exist locally + if (!fileAlreadyExists) { + String hash58 = Base58.encode(hash); + LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer)); + arbitraryDataFileRequests.put(hash58, NTP.getTime()); + Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); + + try { + message = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT); + } catch (InterruptedException e) { + // Will return below due to null message + } + arbitraryDataFileRequests.remove(hash58); + LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); + + if (message == null || message.getType() != Message.MessageType.ARBITRARY_DATA_FILE) { + return null; + } + } + ArbitraryDataFileMessage arbitraryDataFileMessage = (ArbitraryDataFileMessage) message; + + // We might want to forward the request to the peer that originally requested it + this.handleArbitraryDataFileForwarding(requestingPeer, message, originalMessage); + + boolean isRelayRequest = (requestingPeer != null); + if (isRelayRequest) { + if (!fileAlreadyExists) { + // File didn't exist locally before the request, and it's a forwarding request, so delete it + LOGGER.info("Deleting file {} because it was needed for forwarding only", Base58.encode(hash)); + ArbitraryDataFile dataFile = arbitraryDataFileMessage.getArbitraryDataFile(); + dataFile.delete(); + } + } + + return arbitraryDataFileMessage; + } + + + public void handleArbitraryDataFileForwarding(Peer requestingPeer, Message message, Message originalMessage) { + // Return if there is no originally requesting peer to forward to + if (requestingPeer == null) { + return; + } + + // Return if we're not in relay mode or if this request doesn't need forwarding + if (!Settings.getInstance().isRelayModeEnabled()) { + return; + } + + LOGGER.info("Received arbitrary data file - forwarding is needed"); + + // The ID needs to match that of the original request + message.setId(originalMessage.getId()); + + if (!requestingPeer.sendMessage(message)) { + LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer); + requestingPeer.disconnect("failed to forward arbitrary data file"); + } + else { + LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer); + } + } + + + // Fetch data directly from peers + + public boolean fetchDataFilesFromPeersForSignature(byte[] signature) { + String signature58 = Base58.encode(signature); + ArbitraryDataFileListManager.getInstance().addToSignatureRequests(signature58, false, true); + + // Firstly fetch peers that claim to be hosting files for this signature + try (final Repository repository = RepositoryManager.getRepository()) { + + List peers = repository.getArbitraryRepository().getArbitraryPeerDataForSignature(signature); + if (peers == null || peers.isEmpty()) { + LOGGER.info("No peers found for signature {}", signature58); + return false; + } + + LOGGER.info("Attempting a direct peer connection for signature {}...", signature58); + + // Peers found, so pick a random one and request data from it + int index = new SecureRandom().nextInt(peers.size()); + ArbitraryPeerData arbitraryPeerData = peers.get(index); + String peerAddressString = arbitraryPeerData.getPeerAddress(); + return Network.getInstance().requestDataFromPeer(peerAddressString, signature); + + } catch (DataException e) { + LOGGER.info("Unable to fetch peer list from repository"); + } + + return false; + } + + + // Network handlers + + public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) { + // Don't respond if QDN is disabled + if (!Settings.getInstance().isQdnEnabled()) { + return; + } + + GetArbitraryDataFileMessage getArbitraryDataFileMessage = (GetArbitraryDataFileMessage) message; + byte[] hash = getArbitraryDataFileMessage.getHash(); + String hash58 = Base58.encode(hash); + byte[] signature = getArbitraryDataFileMessage.getSignature(); + Controller.getInstance().stats.getArbitraryDataFileMessageStats.requests.incrementAndGet(); + + LOGGER.info("Received GetArbitraryDataFileMessage from peer {} for hash {}", peer, Base58.encode(hash)); + + try { + ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature); + Triple relayInfo = this.arbitraryRelayMap.get(hash58); + + if (arbitraryDataFile.exists()) { + LOGGER.info("Hash {} exists", hash58); + + // We can serve the file directly as we already have it + ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile); + arbitraryDataFileMessage.setId(message.getId()); + if (!peer.sendMessage(arbitraryDataFileMessage)) { + LOGGER.info("Couldn't sent file"); + peer.disconnect("failed to send file"); + } + LOGGER.info("Sent file {}", arbitraryDataFile); + } + else if (relayInfo != null) { + LOGGER.info("We have relay info for hash {}", Base58.encode(hash)); + // We need to ask this peer for the file + Peer peerToAsk = relayInfo.getB(); + if (peerToAsk != null) { + + // Forward the message to this peer + LOGGER.info("Asking peer {} for hash {}", peerToAsk, hash58); + this.fetchArbitraryDataFile(peerToAsk, peer, signature, hash, message); + + // Remove from the map regardless of outcome, as the relay attempt is now considered complete + arbitraryRelayMap.remove(hash58); + } + else { + LOGGER.info("Peer {} not found in relay info", peer); + } + } + else { + LOGGER.info("Hash {} doesn't exist and we don't have relay info", hash58); + + // We don't have this file + Controller.getInstance().stats.getArbitraryDataFileMessageStats.unknownFiles.getAndIncrement(); + + // Send valid, yet unexpected message type in response, so peer's synchronizer doesn't have to wait for timeout + LOGGER.debug(String.format("Sending 'file unknown' response to peer %s for GET_FILE request for unknown file %s", peer, arbitraryDataFile)); + + // We'll send empty block summaries message as it's very short + // TODO: use a different message type here + Message fileUnknownMessage = new BlockSummariesMessage(Collections.emptyList()); + fileUnknownMessage.setId(message.getId()); + if (!peer.sendMessage(fileUnknownMessage)) { + LOGGER.info("Couldn't sent file-unknown response"); + peer.disconnect("failed to send file-unknown response"); + } + else { + LOGGER.info("Sent file-unknown response for file {}", arbitraryDataFile); + } + } + } + catch (DataException e) { + LOGGER.info("Unable to handle request for arbitrary data file: {}", hash58); + } + } + +} diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index c878c36a..a6dc5dfb 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -1,12 +1,10 @@ package org.qortal.controller.arbitrary; -import java.security.SecureRandom; import java.util.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.api.resource.TransactionsResource.ConfirmationStatus; -import org.qortal.arbitrary.ArbitraryDataRenderer; import org.qortal.controller.Controller; import org.qortal.data.network.ArbitraryPeerData; import org.qortal.data.transaction.ArbitraryTransactionData; @@ -18,15 +16,12 @@ import org.qortal.network.message.*; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; -import org.qortal.arbitrary.ArbitraryDataFile; -import org.qortal.arbitrary.ArbitraryDataFileChunk; import org.qortal.settings.Settings; import org.qortal.transaction.ArbitraryTransaction; import org.qortal.transaction.Transaction.TransactionType; import org.qortal.utils.ArbitraryTransactionUtils; import org.qortal.utils.Base58; import org.qortal.utils.NTP; -import org.qortal.utils.Triple; public class ArbitraryDataManager extends Thread { @@ -34,53 +29,16 @@ public class ArbitraryDataManager extends Thread { private static final List ARBITRARY_TX_TYPE = Arrays.asList(TransactionType.ARBITRARY); /** Request timeout when transferring arbitrary data */ - private static final long ARBITRARY_REQUEST_TIMEOUT = 6 * 1000L; // ms + public static final long ARBITRARY_REQUEST_TIMEOUT = 6 * 1000L; // ms /** Maximum time to hold information about an in-progress relay */ - private static final long ARBITRARY_RELAY_TIMEOUT = 30 * 1000L; // ms + public static final long ARBITRARY_RELAY_TIMEOUT = 30 * 1000L; // ms private static ArbitraryDataManager instance; private final Object peerDataLock = new Object(); private volatile boolean isStopping = false; - /** - * Map of recent incoming requests for ARBITRARY transaction data file lists. - *

- * Key is original request's message ID
- * Value is Triple<transaction signature in base58, first requesting peer, first request's timestamp> - *

- * If peer is null then either:
- *

    - *
  • we are the original requesting peer
  • - *
  • we have already sent data payload to original requesting peer.
  • - *
- * If signature is null then we have already received the file list and either:
- *
    - *
  • we are the original requesting peer and have processed it
  • - *
  • we have forwarded the file list
  • - *
- */ - public Map> arbitraryDataFileListRequests = Collections.synchronizedMap(new HashMap<>()); - - /** - * Map to keep track of our in progress (outgoing) arbitrary data file requests - */ - private Map arbitraryDataFileRequests = Collections.synchronizedMap(new HashMap<>()); - - /** - * Map to keep track of hashes that we might need to relay, keyed by the hash of the file (base58 encoded). - * Value is comprised of the base58-encoded signature, the peer that is hosting it, and the timestamp that it was added - */ - private Map> arbitraryRelayMap = Collections.synchronizedMap(new HashMap<>()); - - /** - * Map to keep track of in progress arbitrary data signature requests - * Key: string - the signature encoded in base58 - * Value: Triple - */ - private Map> arbitraryDataSignatureRequests = Collections.synchronizedMap(new HashMap<>()); - /** * Map to keep track of cached arbitrary transaction resources. * When an item is present in this list with a timestamp in the future, we won't invalidate @@ -109,6 +67,9 @@ public class ArbitraryDataManager extends Thread { public void run() { Thread.currentThread().setName("Arbitrary Data Manager"); + ArbitraryDataFileListManager.getInstance().start(); + ArbitraryDataFileManager.getInstance().start(); + try { while (!isStopping) { Thread.sleep(2000); @@ -149,6 +110,9 @@ public class ArbitraryDataManager extends Thread { public void shutdown() { isStopping = true; this.interrupt(); + + ArbitraryDataFileListManager.getInstance().shutdown(); + ArbitraryDataFileManager.getInstance().shutdown(); } private void processNames() { @@ -269,315 +233,21 @@ public class ArbitraryDataManager extends Thread { } } - private boolean hasLocalMetadata(ArbitraryTransactionData transactionData) { - if (transactionData == null) { - return false; - } - // Load hashes - byte[] hash = transactionData.getData(); - byte[] metadataHash = transactionData.getMetadataHash(); - - if (metadataHash == null) { - // This transaction has no metadata, so we can treat it as local - return true; - } - - // Load data file(s) - byte[] signature = transactionData.getSignature(); - try { - ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature); - arbitraryDataFile.setMetadataHash(metadataHash); - - return arbitraryDataFile.getMetadataFile().exists(); - } - catch (DataException e) { - // Assume not local - return false; - } + // Entrypoint to request new data from peers + public boolean fetchData(ArbitraryTransactionData arbitraryTransactionData) { + return ArbitraryDataFileListManager.getInstance().fetchArbitraryDataFileList(arbitraryTransactionData); } - // Track file list lookups by signature - - private boolean shouldMakeFileListRequestForSignature(String signature58) { - Triple request = arbitraryDataSignatureRequests.get(signature58); - - if (request == null) { - // Not attempted yet - return true; - } - - // Extract the components - Integer networkBroadcastCount = request.getA(); - // Integer directPeerRequestCount = request.getB(); - Long lastAttemptTimestamp = request.getC(); - - if (lastAttemptTimestamp == null) { - // Not attempted yet - return true; - } - - long timeSinceLastAttempt = NTP.getTime() - lastAttemptTimestamp; - if (timeSinceLastAttempt > 5 * 60 * 1000L) { - // We haven't tried for at least 5 minutes - - if (networkBroadcastCount < 5) { - // We've made less than 5 total attempts - return true; - } - } - - if (timeSinceLastAttempt > 24 * 60 * 60 * 1000L) { - // We haven't tried for at least 24 hours - return true; - } - - return false; - } - - private boolean shouldMakeDirectFileRequestsForSignature(String signature58) { - if (!Settings.getInstance().isDirectDataRetrievalEnabled()) { - // Direct connections are disabled in the settings - return false; - } - - Triple request = arbitraryDataSignatureRequests.get(signature58); - - if (request == null) { - // Not attempted yet - return true; - } - - // Extract the components - //Integer networkBroadcastCount = request.getA(); - Integer directPeerRequestCount = request.getB(); - Long lastAttemptTimestamp = request.getC(); - - if (lastAttemptTimestamp == null) { - // Not attempted yet - return true; - } - - if (directPeerRequestCount == 0) { - // We haven't tried asking peers directly yet, so we should - return true; - } - - long timeSinceLastAttempt = NTP.getTime() - lastAttemptTimestamp; - if (timeSinceLastAttempt > 10 * 1000L) { - // We haven't tried for at least 10 seconds - if (directPeerRequestCount < 5) { - // We've made less than 5 total attempts - return true; - } - } - - if (timeSinceLastAttempt > 5 * 60 * 1000L) { - // We haven't tried for at least 5 minutes - if (directPeerRequestCount < 10) { - // We've made less than 10 total attempts - return true; - } - } - - if (timeSinceLastAttempt > 24 * 60 * 60 * 1000L) { - // We haven't tried for at least 24 hours - return true; - } - - return false; - } + // Useful methods used by other parts of the app public boolean isSignatureRateLimited(byte[] signature) { - String signature58 = Base58.encode(signature); - return !this.shouldMakeFileListRequestForSignature(signature58) - && !this.shouldMakeDirectFileRequestsForSignature(signature58); + return ArbitraryDataFileListManager.getInstance().isSignatureRateLimited(signature); } public long lastRequestForSignature(byte[] signature) { - String signature58 = Base58.encode(signature); - Triple request = arbitraryDataSignatureRequests.get(signature58); - - if (request == null) { - // Not attempted yet - return 0; - } - - // Extract the components - Long lastAttemptTimestamp = request.getC(); - if (lastAttemptTimestamp != null) { - return lastAttemptTimestamp; - } - return 0; - } - - private void addToSignatureRequests(String signature58, boolean incrementNetworkRequests, boolean incrementPeerRequests) { - Triple request = arbitraryDataSignatureRequests.get(signature58); - Long now = NTP.getTime(); - - if (request == null) { - // No entry yet - Triple newRequest = new Triple<>(0, 0, now); - arbitraryDataSignatureRequests.put(signature58, newRequest); - } - else { - // There is an existing entry - if (incrementNetworkRequests) { - request.setA(request.getA() + 1); - } - if (incrementPeerRequests) { - request.setB(request.getB() + 1); - } - request.setC(now); - arbitraryDataSignatureRequests.put(signature58, request); - } - } - - private void removeFromSignatureRequests(String signature58) { - arbitraryDataSignatureRequests.remove(signature58); - } - - - // Lookup file lists by signature - - public boolean fetchData(ArbitraryTransactionData arbitraryTransactionData) { - return this.fetchArbitraryDataFileList(arbitraryTransactionData); - } - - private boolean fetchArbitraryDataFileList(ArbitraryTransactionData arbitraryTransactionData) { - byte[] signature = arbitraryTransactionData.getSignature(); - String signature58 = Base58.encode(signature); - - // If we've already tried too many times in a short space of time, make sure to give up - if (!this.shouldMakeFileListRequestForSignature(signature58)) { - // Check if we should make direct connections to peers - if (this.shouldMakeDirectFileRequestsForSignature(signature58)) { - return this.fetchDataFilesFromPeersForSignature(signature); - } - - LOGGER.debug("Skipping file list request for signature {} due to rate limit", signature58); - return false; - } - this.addToSignatureRequests(signature58, true, false); - - LOGGER.info(String.format("Sending data file list request for signature %s...", Base58.encode(signature))); - - // Build request - Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature); - - // Save our request into requests map - Triple requestEntry = new Triple<>(signature58, null, NTP.getTime()); - - // Assign random ID to this message - int id; - do { - id = new Random().nextInt(Integer.MAX_VALUE - 1) + 1; - - // Put queue into map (keyed by message ID) so we can poll for a response - // If putIfAbsent() doesn't return null, then this ID is already taken - } while (arbitraryDataFileListRequests.put(id, requestEntry) != null); - getArbitraryDataFileListMessage.setId(id); - - // Broadcast request - Network.getInstance().broadcast(peer -> getArbitraryDataFileListMessage); - - // Poll to see if data has arrived - final long singleWait = 100; - long totalWait = 0; - while (totalWait < ARBITRARY_REQUEST_TIMEOUT) { - try { - Thread.sleep(singleWait); - } catch (InterruptedException e) { - break; - } - - requestEntry = arbitraryDataFileListRequests.get(id); - if (requestEntry == null) - return false; - - if (requestEntry.getA() == null) - break; - - totalWait += singleWait; - } - return true; - } - - - // Fetch data directly from peers - - private boolean fetchDataFilesFromPeersForSignature(byte[] signature) { - String signature58 = Base58.encode(signature); - this.addToSignatureRequests(signature58, false, true); - - // Firstly fetch peers that claim to be hosting files for this signature - try (final Repository repository = RepositoryManager.getRepository()) { - - List peers = repository.getArbitraryRepository().getArbitraryPeerDataForSignature(signature); - if (peers == null || peers.isEmpty()) { - LOGGER.info("No peers found for signature {}", signature58); - return false; - } - - LOGGER.info("Attempting a direct peer connection for signature {}...", signature58); - - // Peers found, so pick a random one and request data from it - int index = new SecureRandom().nextInt(peers.size()); - ArbitraryPeerData arbitraryPeerData = peers.get(index); - String peerAddressString = arbitraryPeerData.getPeerAddress(); - return Network.getInstance().requestDataFromPeer(peerAddressString, signature); - - } catch (DataException e) { - LOGGER.info("Unable to fetch peer list from repository"); - } - - return false; - } - - - // Fetch data files by hash - - private ArbitraryDataFileMessage fetchArbitraryDataFile(Peer peer, Peer requestingPeer, byte[] signature, byte[] hash, Message originalMessage) throws DataException { - ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); - boolean fileAlreadyExists = existingFile.exists(); - Message message = null; - - // Fetch the file if it doesn't exist locally - if (!fileAlreadyExists) { - String hash58 = Base58.encode(hash); - LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer)); - arbitraryDataFileRequests.put(hash58, NTP.getTime()); - Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); - - try { - message = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ARBITRARY_REQUEST_TIMEOUT); - } catch (InterruptedException e) { - // Will return below due to null message - } - arbitraryDataFileRequests.remove(hash58); - LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); - - if (message == null || message.getType() != Message.MessageType.ARBITRARY_DATA_FILE) { - return null; - } - } - ArbitraryDataFileMessage arbitraryDataFileMessage = (ArbitraryDataFileMessage) message; - - // We might want to forward the request to the peer that originally requested it - this.handleArbitraryDataFileForwarding(requestingPeer, message, originalMessage); - - boolean isRelayRequest = (requestingPeer != null); - if (isRelayRequest) { - if (!fileAlreadyExists) { - // File didn't exist locally before the request, and it's a forwarding request, so delete it - LOGGER.info("Deleting file {} because it was needed for forwarding only", Base58.encode(hash)); - ArbitraryDataFile dataFile = arbitraryDataFileMessage.getArbitraryDataFile(); - dataFile.delete(); - } - } - - return arbitraryDataFileMessage; + return ArbitraryDataFileListManager.getInstance().lastRequestForSignature(signature); } @@ -587,12 +257,12 @@ public class ArbitraryDataManager extends Thread { if (now == null) { return; } - final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT; - arbitraryDataFileListRequests.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < requestMinimumTimestamp); - arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < requestMinimumTimestamp); - final long relayMinimumTimestamp = now - ARBITRARY_RELAY_TIMEOUT; - arbitraryRelayMap.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); + // Cleanup file list request caches + ArbitraryDataFileListManager.getInstance().cleanupRequestCache(now); + + // Cleanup file request caches + ArbitraryDataFileManager.getInstance().cleanupRequestCache(now); } public boolean isResourceCached(String resourceId) { @@ -664,396 +334,12 @@ public class ArbitraryDataManager extends Thread { } // Remove from the signature requests list now that we have all files for this signature - this.removeFromSignatureRequests(signature58); - } - } - - public boolean fetchAllArbitraryDataFiles(Repository repository, Peer peer, byte[] signature) { - try { - TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); - if (!(transactionData instanceof ArbitraryTransactionData)) - return false; - - ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData; - - // We use null to represent all hashes associated with this transaction - return this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, null); - - } catch (DataException e) {} - - return false; - } - - public boolean fetchArbitraryDataFiles(Repository repository, - Peer peer, - byte[] signature, - ArbitraryTransactionData arbitraryTransactionData, - List hashes) throws DataException { - - // Load data file(s) - ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(arbitraryTransactionData.getData(), signature); - byte[] metadataHash = arbitraryTransactionData.getMetadataHash(); - arbitraryDataFile.setMetadataHash(metadataHash); - - // If hashes are null, we will treat this to mean all data hashes associated with this file - if (hashes == null) { - if (metadataHash == null) { - // This transaction has no metadata/chunks, so use the main file hash - hashes = Arrays.asList(arbitraryDataFile.getHash()); - } - else if (!arbitraryDataFile.getMetadataFile().exists()) { - // We don't have the metadata file yet, so request it - hashes = Arrays.asList(arbitraryDataFile.getMetadataFile().getHash()); - } - else { - // Add the chunk hashes - hashes = arbitraryDataFile.getChunkHashes(); - } - } - - boolean receivedAtLeastOneFile = false; - - // Now fetch actual data from this peer - for (byte[] hash : hashes) { - if (!arbitraryDataFile.chunkExists(hash)) { - // Only request the file if we aren't already requesting it from someone else - if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { - ArbitraryDataFileMessage receivedArbitraryDataFileMessage = fetchArbitraryDataFile(peer, null, signature, hash, null); - if (receivedArbitraryDataFileMessage != null) { - LOGGER.info("Received data file {} from peer {}", receivedArbitraryDataFileMessage.getArbitraryDataFile().getHash58(), peer); - receivedAtLeastOneFile = true; - } - else { - LOGGER.info("Peer {} didn't respond with data file {} for signature {}", peer, Base58.encode(hash), Base58.encode(signature)); - } - } - else { - LOGGER.info("Already requesting data file {} for signature {}", arbitraryDataFile, Base58.encode(signature)); - } - } - } - - if (receivedAtLeastOneFile) { - // Update our lookup table to indicate that this peer holds data for this signature - String peerAddress = peer.getPeerData().getAddress().toString(); - LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature)); - ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer); - repository.discardChanges(); - repository.getArbitraryRepository().save(arbitraryPeerData); - repository.saveChanges(); - - // Invalidate the hosted transactions cache as we are now hosting something new - ArbitraryDataStorageManager.getInstance().invalidateHostedTransactionsCache(); - } - - // Check if we have all the files we need for this transaction - if (arbitraryDataFile.allFilesExist()) { - - // We have all the chunks for this transaction, so we should invalidate the transaction's name's - // data cache so that it is rebuilt the next time we serve it - invalidateCache(arbitraryTransactionData); - - // We may also need to broadcast to the network that we are now hosting files for this transaction, - // but only if these files are in accordance with our storage policy - if (ArbitraryDataStorageManager.getInstance().canStoreData(arbitraryTransactionData)) { - // Use a null peer address to indicate our own - Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, Arrays.asList(signature)); - Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage); - } - } - - return receivedAtLeastOneFile; - } - - public void handleArbitraryDataFileForwarding(Peer requestingPeer, Message message, Message originalMessage) { - // Return if there is no originally requesting peer to forward to - if (requestingPeer == null) { - return; - } - - // Return if we're not in relay mode or if this request doesn't need forwarding - if (!Settings.getInstance().isRelayModeEnabled()) { - return; - } - - LOGGER.info("Received arbitrary data file - forwarding is needed"); - - // The ID needs to match that of the original request - message.setId(originalMessage.getId()); - - if (!requestingPeer.sendMessage(message)) { - LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer); - requestingPeer.disconnect("failed to forward arbitrary data file"); - } - else { - LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer); + ArbitraryDataFileListManager.getInstance().removeFromSignatureRequests(signature58); } } - // Network handlers - - public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) { - // Don't process if QDN is disabled - if (!Settings.getInstance().isQdnEnabled()) { - return; - } - - ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message; - LOGGER.info("Received hash list from peer {} with {} hashes", peer, arbitraryDataFileListMessage.getHashes().size()); - - // Do we have a pending request for this data? // TODO: might we want to relay all of them anyway? - Triple request = arbitraryDataFileListRequests.get(message.getId()); - if (request == null || request.getA() == null) { - return; - } - boolean isRelayRequest = (request.getB() != null); - - // Does this message's signature match what we're expecting? - byte[] signature = arbitraryDataFileListMessage.getSignature(); - String signature58 = Base58.encode(signature); - if (!request.getA().equals(signature58)) { - return; - } - - List hashes = arbitraryDataFileListMessage.getHashes(); - if (hashes == null || hashes.isEmpty()) { - return; - } - - ArbitraryTransactionData arbitraryTransactionData = null; - - // Check transaction exists and hashes are correct - try (final Repository repository = RepositoryManager.getRepository()) { - TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); - if (!(transactionData instanceof ArbitraryTransactionData)) - return; - - arbitraryTransactionData = (ArbitraryTransactionData) transactionData; - - // Load data file(s) - ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(arbitraryTransactionData.getData(), signature); - arbitraryDataFile.setMetadataHash(arbitraryTransactionData.getMetadataHash()); - -// // Check all hashes exist -// for (byte[] hash : hashes) { -// //LOGGER.info("Received hash {}", Base58.encode(hash)); -// if (!arbitraryDataFile.containsChunk(hash)) { -// // Check the hash against the complete file -// if (!Arrays.equals(arbitraryDataFile.getHash(), hash)) { -// LOGGER.info("Received non-matching chunk hash {} for signature {}. This could happen if we haven't obtained the metadata file yet.", Base58.encode(hash), signature58); -// return; -// } -// } -// } - - // Update requests map to reflect that we've received it - Triple newEntry = new Triple<>(null, null, request.getC()); - arbitraryDataFileListRequests.put(message.getId(), newEntry); - - if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) { - // Go and fetch the actual data, since this isn't a relay request - this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes); - } - - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e); - } - - // Forwarding - if (isRelayRequest && Settings.getInstance().isRelayModeEnabled()) { - boolean isBlocked = (arbitraryTransactionData == null || ArbitraryDataStorageManager.getInstance().isNameBlocked(arbitraryTransactionData.getName())); - if (!isBlocked) { - Peer requestingPeer = request.getB(); - if (requestingPeer != null) { - // Add each hash to our local mapping so we know who to ask later - Long now = NTP.getTime(); - for (byte[] hash : hashes) { - String hash58 = Base58.encode(hash); - Triple value = new Triple<>(signature58, peer, now); - this.arbitraryRelayMap.put(hash58, value); - LOGGER.debug("Added {} to relay map: {}, {}, {}", hash58, signature58, peer, now); - } - - // Forward to requesting peer - LOGGER.info("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer); - if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) { - requestingPeer.disconnect("failed to forward arbitrary data file list"); - } - } - } - } - } - - public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) { - // Don't respond if QDN is disabled - if (!Settings.getInstance().isQdnEnabled()) { - return; - } - - GetArbitraryDataFileMessage getArbitraryDataFileMessage = (GetArbitraryDataFileMessage) message; - byte[] hash = getArbitraryDataFileMessage.getHash(); - String hash58 = Base58.encode(hash); - byte[] signature = getArbitraryDataFileMessage.getSignature(); - Controller.getInstance().stats.getArbitraryDataFileMessageStats.requests.incrementAndGet(); - - LOGGER.info("Received GetArbitraryDataFileMessage from peer {} for hash {}", peer, Base58.encode(hash)); - - try { - ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature); - Triple relayInfo = this.arbitraryRelayMap.get(hash58); - - if (arbitraryDataFile.exists()) { - LOGGER.info("Hash {} exists", hash58); - - // We can serve the file directly as we already have it - ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile); - arbitraryDataFileMessage.setId(message.getId()); - if (!peer.sendMessage(arbitraryDataFileMessage)) { - LOGGER.info("Couldn't sent file"); - peer.disconnect("failed to send file"); - } - LOGGER.info("Sent file {}", arbitraryDataFile); - } - else if (relayInfo != null) { - LOGGER.info("We have relay info for hash {}", Base58.encode(hash)); - // We need to ask this peer for the file - Peer peerToAsk = relayInfo.getB(); - if (peerToAsk != null) { - - // Forward the message to this peer - LOGGER.info("Asking peer {} for hash {}", peerToAsk, hash58); - this.fetchArbitraryDataFile(peerToAsk, peer, signature, hash, message); - - // Remove from the map regardless of outcome, as the relay attempt is now considered complete - arbitraryRelayMap.remove(hash58); - } - else { - LOGGER.info("Peer {} not found in relay info", peer); - } - } - else { - LOGGER.info("Hash {} doesn't exist and we don't have relay info", hash58); - - // We don't have this file - Controller.getInstance().stats.getArbitraryDataFileMessageStats.unknownFiles.getAndIncrement(); - - // Send valid, yet unexpected message type in response, so peer's synchronizer doesn't have to wait for timeout - LOGGER.debug(String.format("Sending 'file unknown' response to peer %s for GET_FILE request for unknown file %s", peer, arbitraryDataFile)); - - // We'll send empty block summaries message as it's very short - // TODO: use a different message type here - Message fileUnknownMessage = new BlockSummariesMessage(Collections.emptyList()); - fileUnknownMessage.setId(message.getId()); - if (!peer.sendMessage(fileUnknownMessage)) { - LOGGER.info("Couldn't sent file-unknown response"); - peer.disconnect("failed to send file-unknown response"); - } - else { - LOGGER.info("Sent file-unknown response for file {}", arbitraryDataFile); - } - } - } - catch (DataException e) { - LOGGER.info("Unable to handle request for arbitrary data file: {}", hash58); - } - } - - public void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) { - // Don't respond if QDN is disabled - if (!Settings.getInstance().isQdnEnabled()) { - return; - } - - Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet(); - - GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message; - byte[] signature = getArbitraryDataFileListMessage.getSignature(); - String signature58 = Base58.encode(signature); - Long timestamp = NTP.getTime(); - Triple newEntry = new Triple<>(signature58, peer, timestamp); - - // If we've seen this request recently, then ignore - if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null) { - return; - } - - LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature)); - - List hashes = new ArrayList<>(); - ArbitraryTransactionData transactionData = null; - - try (final Repository repository = RepositoryManager.getRepository()) { - - // Firstly we need to lookup this file on chain to get a list of its hashes - transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature); - if (transactionData instanceof ArbitraryTransactionData) { - - // Check if we're even allowed to serve data for this transaction - if (ArbitraryDataStorageManager.getInstance().canStoreData(transactionData)) { - - byte[] hash = transactionData.getData(); - byte[] metadataHash = transactionData.getMetadataHash(); - - // Load file(s) and add any that exist to the list of hashes - ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature); - if (metadataHash != null) { - arbitraryDataFile.setMetadataHash(metadataHash); - - // If we have the metadata file, add its hash - if (arbitraryDataFile.getMetadataFile().exists()) { - hashes.add(arbitraryDataFile.getMetadataHash()); - } - - for (ArbitraryDataFileChunk chunk : arbitraryDataFile.getChunks()) { - if (chunk.exists()) { - hashes.add(chunk.getHash()); - //LOGGER.info("Added hash {}", chunk.getHash58()); - } else { - LOGGER.info("Couldn't add hash {} because it doesn't exist", chunk.getHash58()); - } - } - } else { - // This transaction has no chunks, so include the complete file if we have it - if (arbitraryDataFile.exists()) { - hashes.add(arbitraryDataFile.getHash()); - } - } - } - } - - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e); - } - - // We should only respond if we have at least one hash - if (hashes.size() > 0) { - - // Update requests map to reflect that we've sent it - newEntry = new Triple<>(signature58, null, timestamp); - arbitraryDataFileListRequests.put(message.getId(), newEntry); - - ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes); - arbitraryDataFileListMessage.setId(message.getId()); - if (!peer.sendMessage(arbitraryDataFileListMessage)) { - LOGGER.info("Couldn't send list of hashes"); - peer.disconnect("failed to send list of hashes"); - } - LOGGER.info("Sent list of hashes (count: {})", hashes.size()); - - } - else { - boolean isBlocked = (transactionData == null || ArbitraryDataStorageManager.getInstance().isNameBlocked(transactionData.getName())); - if (Settings.getInstance().isRelayModeEnabled() && !isBlocked) { - // In relay mode - so ask our other peers if they have it - LOGGER.info("Rebroadcasted hash list request from peer {} for signature {} to our other peers", peer, Base58.encode(signature)); - Network.getInstance().broadcast( - broadcastPeer -> broadcastPeer == peer || - Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost()) - ? null : message); - } - } - } + // Handle incoming arbitrary signatures messages public void onNetworkArbitrarySignaturesMessage(Peer peer, Message message) { // Don't process if QDN is disabled diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index 5b2f2d8f..c9843ce4 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -6,6 +6,7 @@ import org.bouncycastle.crypto.params.Ed25519PrivateKeyParameters; import org.bouncycastle.crypto.params.Ed25519PublicKeyParameters; import org.qortal.block.BlockChain; import org.qortal.controller.Controller; +import org.qortal.controller.arbitrary.ArbitraryDataFileManager; import org.qortal.controller.arbitrary.ArbitraryDataManager; import org.qortal.crypto.Crypto; import org.qortal.data.block.BlockData; @@ -306,7 +307,7 @@ public class Network { } try (final Repository repository = RepositoryManager.getRepository()) { - return ArbitraryDataManager.getInstance().fetchAllArbitraryDataFiles(repository, connectedPeer, signature); + return ArbitraryDataFileManager.getInstance().fetchAllArbitraryDataFiles(repository, connectedPeer, signature); } catch (DataException e) { LOGGER.info("Unable to fetch arbitrary data files"); }