forked from Qortal/qortal
Rate limit any file list broadcasts
We don't want the network being spammed when a file isn't available by any reachable peers. This feature ensures retries are spaced out over longer timeframes. Basic logic: - Wait 5 minutes in between failed attempts - After 5 failed attempts (i.e. 25 mins) only try once per day from then on - A core restart resets the counters The stats gathered here can also be used to inform the core of when it should attempt a direct connection with a peer to obtain the data. That part isn't implemented yet.
This commit is contained in:
parent
054860b38d
commit
f5235938b7
@ -62,6 +62,13 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
*/
|
*/
|
||||||
private Map<String, Long> arbitraryDataFileRequests = Collections.synchronizedMap(new HashMap<>());
|
private Map<String, Long> arbitraryDataFileRequests = Collections.synchronizedMap(new HashMap<>());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map to keep track of in progress arbitrary data signature requests
|
||||||
|
* Key: string - the signature encoded in base58
|
||||||
|
* Value: Triple<networkBroadcastCount, directPeerRequestCount, lastAttemptTimestamp>
|
||||||
|
*/
|
||||||
|
private Map<String, Triple<Integer, Integer, Long>> arbitraryDataSignatureRequests = Collections.synchronizedMap(new HashMap<>());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map to keep track of cached arbitrary transaction resources.
|
* Map to keep track of cached arbitrary transaction resources.
|
||||||
* When an item is present in this list with a timestamp in the future, we won't invalidate
|
* When an item is present in this list with a timestamp in the future, we won't invalidate
|
||||||
@ -219,7 +226,7 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
|
|
||||||
// Ask our connected peers if they have files for this signature
|
// Ask our connected peers if they have files for this signature
|
||||||
// This process automatically then fetches the files themselves if a peer is found
|
// This process automatically then fetches the files themselves if a peer is found
|
||||||
fetchDataForSignature(signature);
|
fetchDataForSignature(signature); // TODO: keep track
|
||||||
|
|
||||||
} catch (DataException e) {
|
} catch (DataException e) {
|
||||||
LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
|
LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
|
||||||
@ -250,17 +257,94 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Track file list lookups by signature
|
||||||
|
|
||||||
|
private boolean shouldMakeFileListRequestForSignature(String signature58) {
|
||||||
|
Triple<Integer, Integer, Long> request = arbitraryDataSignatureRequests.get(signature58);
|
||||||
|
|
||||||
|
if (request == null) {
|
||||||
|
// Not attempted yet
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the components
|
||||||
|
Integer networkBroadcastCount = request.getA();
|
||||||
|
// Integer directPeerRequestCount = request.getB();
|
||||||
|
Long lastAttemptTimestamp = request.getC();
|
||||||
|
|
||||||
|
if (lastAttemptTimestamp == null) {
|
||||||
|
// Not attempted yet
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
long timeSinceLastAttempt = NTP.getTime() - lastAttemptTimestamp;
|
||||||
|
if (timeSinceLastAttempt > 5 * 60 * 1000L) {
|
||||||
|
// We haven't tried for at least 5 minutes
|
||||||
|
|
||||||
|
if (networkBroadcastCount < 5) {
|
||||||
|
// We've made less than 5 total attempts
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timeSinceLastAttempt > 24 * 60 * 60 * 1000L) {
|
||||||
|
// We haven't tried for at least 24 hours
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addToSignatureRequests(String signature58, boolean incrementNetworkRequests, boolean incrementPeerRequests) {
|
||||||
|
Triple<Integer, Integer, Long> request = arbitraryDataSignatureRequests.get(signature58);
|
||||||
|
Long now = NTP.getTime();
|
||||||
|
|
||||||
|
if (request == null) {
|
||||||
|
// No entry yet
|
||||||
|
Triple<Integer, Integer, Long> newRequest = new Triple<>(0, 0, now);
|
||||||
|
arbitraryDataSignatureRequests.put(signature58, newRequest);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// There is an existing entry
|
||||||
|
if (incrementNetworkRequests) {
|
||||||
|
request.setA(request.getA() + 1);
|
||||||
|
}
|
||||||
|
if (incrementPeerRequests) {
|
||||||
|
request.setB(request.getB() + 1);
|
||||||
|
}
|
||||||
|
request.setC(now);
|
||||||
|
arbitraryDataSignatureRequests.put(signature58, request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeFromSignatureRequests(String signature58) {
|
||||||
|
arbitraryDataSignatureRequests.remove(signature58);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Lookup file lists by signature
|
||||||
|
|
||||||
public boolean fetchDataForSignature(byte[] signature) {
|
public boolean fetchDataForSignature(byte[] signature) {
|
||||||
return this.fetchArbitraryDataFileList(signature);
|
return this.fetchArbitraryDataFileList(signature);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean fetchArbitraryDataFileList(byte[] signature) {
|
private boolean fetchArbitraryDataFileList(byte[] signature) {
|
||||||
|
String signature58 = Base58.encode(signature);
|
||||||
|
|
||||||
|
// If we've already tried too many times in a short space of time, make sure to give up
|
||||||
|
if (!this.shouldMakeFileListRequestForSignature(signature58)) {
|
||||||
|
LOGGER.trace("Skipping file list request for signature {}", signature58);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
this.addToSignatureRequests(signature58, true, false);
|
||||||
|
|
||||||
LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature)));
|
LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature)));
|
||||||
|
|
||||||
// Build request
|
// Build request
|
||||||
Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature);
|
Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature);
|
||||||
|
|
||||||
// Save our request into requests map
|
// Save our request into requests map
|
||||||
String signature58 = Base58.encode(signature);
|
|
||||||
Triple<String, Peer, Long> requestEntry = new Triple<>(signature58, null, NTP.getTime());
|
Triple<String, Peer, Long> requestEntry = new Triple<>(signature58, null, NTP.getTime());
|
||||||
|
|
||||||
// Assign random ID to this message
|
// Assign random ID to this message
|
||||||
@ -298,6 +382,9 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Fetch data files by hash
|
||||||
|
|
||||||
private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException {
|
private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException {
|
||||||
String hash58 = Base58.encode(hash);
|
String hash58 = Base58.encode(hash);
|
||||||
LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer));
|
LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer));
|
||||||
@ -316,6 +403,9 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
return arbitraryDataFileMessage.getArbitraryDataFile();
|
return arbitraryDataFileMessage.getArbitraryDataFile();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Arbitrary data resource cache
|
||||||
|
|
||||||
public void cleanupRequestCache(Long now) {
|
public void cleanupRequestCache(Long now) {
|
||||||
if (now == null) {
|
if (now == null) {
|
||||||
return;
|
return;
|
||||||
@ -325,8 +415,6 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() < requestMinimumTimestamp);
|
arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() < requestMinimumTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Arbitrary data resource cache
|
|
||||||
public boolean isResourceCached(String resourceId) {
|
public boolean isResourceCached(String resourceId) {
|
||||||
if (resourceId == null) {
|
if (resourceId == null) {
|
||||||
return false;
|
return false;
|
||||||
@ -378,9 +466,11 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void invalidateCache(ArbitraryTransactionData arbitraryTransactionData) {
|
public void invalidateCache(ArbitraryTransactionData arbitraryTransactionData) {
|
||||||
|
String signature58 = Base58.encode(arbitraryTransactionData.getSignature());
|
||||||
|
|
||||||
if (arbitraryTransactionData.getName() != null) {
|
if (arbitraryTransactionData.getName() != null) {
|
||||||
String resourceId = arbitraryTransactionData.getName().toLowerCase();
|
String resourceId = arbitraryTransactionData.getName().toLowerCase();
|
||||||
LOGGER.info("We have all data for transaction {}", Base58.encode(arbitraryTransactionData.getSignature()));
|
LOGGER.info("We have all data for transaction {}", signature58);
|
||||||
LOGGER.info("Clearing cache for name {}...", arbitraryTransactionData.getName());
|
LOGGER.info("Clearing cache for name {}...", arbitraryTransactionData.getName());
|
||||||
|
|
||||||
if (this.arbitraryDataCachedResources.containsKey(resourceId)) {
|
if (this.arbitraryDataCachedResources.containsKey(resourceId)) {
|
||||||
@ -392,6 +482,9 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
if (buildManager.arbitraryDataFailedBuilds.containsKey(resourceId)) {
|
if (buildManager.arbitraryDataFailedBuilds.containsKey(resourceId)) {
|
||||||
buildManager.arbitraryDataFailedBuilds.remove(resourceId);
|
buildManager.arbitraryDataFailedBuilds.remove(resourceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove from the signature requests list now that we have all files for this signature
|
||||||
|
this.removeFromSignatureRequests(signature58);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user