forked from Qortal/qortal
Arbitrary data manager now only prefetches data according to the storage policy.
- If storage policy includes "FOLLOWING", only process transactions relating to the followed names. - If storage policy is "ALL", process all transactions. - If storage policy is "NONE" or "VIEWED", don't process or prefetch any data.
This commit is contained in:
parent
abfeafc823
commit
a61b0685f0
@ -9,6 +9,7 @@ import org.qortal.controller.Controller;
|
||||
import org.qortal.data.network.ArbitraryPeerData;
|
||||
import org.qortal.data.transaction.ArbitraryTransactionData;
|
||||
import org.qortal.data.transaction.TransactionData;
|
||||
import org.qortal.list.ResourceListManager;
|
||||
import org.qortal.network.Network;
|
||||
import org.qortal.network.Peer;
|
||||
import org.qortal.network.message.*;
|
||||
@ -89,13 +90,6 @@ public class ArbitraryDataManager extends Thread {
|
||||
public void run() {
|
||||
Thread.currentThread().setName("Arbitrary Data Manager");
|
||||
|
||||
// Keep a reference to the storage manager as we will need this a lot
|
||||
ArbitraryDataStorageManager storageManager = ArbitraryDataStorageManager.getInstance();
|
||||
|
||||
// Paginate queries when fetching arbitrary transactions
|
||||
final int limit = 100;
|
||||
int offset = 0;
|
||||
|
||||
try {
|
||||
while (!isStopping) {
|
||||
Thread.sleep(2000);
|
||||
@ -110,80 +104,22 @@ public class ArbitraryDataManager extends Thread {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Any arbitrary transactions we want to fetch data for?
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, ARBITRARY_TX_TYPE, null, null, null, ConfirmationStatus.BOTH, limit, offset, true);
|
||||
// LOGGER.info("Found {} arbitrary transactions at offset: {}, limit: {}", signatures.size(), offset, limit);
|
||||
if (signatures == null || signatures.isEmpty()) {
|
||||
offset = 0;
|
||||
continue;
|
||||
}
|
||||
offset += limit;
|
||||
// Fetch data according to storage policy
|
||||
switch (Settings.getInstance().getStoragePolicy()) {
|
||||
case FOLLOWED:
|
||||
case FOLLOWED_AND_VIEWED:
|
||||
this.processNames();
|
||||
break;
|
||||
|
||||
// Loop through signatures and remove ones we don't need to process
|
||||
Iterator iterator = signatures.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
byte[] signature = (byte[]) iterator.next();
|
||||
case ALL:
|
||||
this.processAll();
|
||||
|
||||
ArbitraryTransaction arbitraryTransaction = fetchTransaction(repository, signature);
|
||||
if (arbitraryTransaction == null) {
|
||||
// Best not to process this one
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) arbitraryTransaction.getTransactionData();
|
||||
|
||||
// Skip transactions that we don't need to store data for
|
||||
if (arbitraryTransactionData.getName() != null) {
|
||||
if (!storageManager.shouldStoreDataForName(arbitraryTransactionData.getName())) {
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else {
|
||||
// Transaction has no name associated with it
|
||||
if (!storageManager.shouldStoreDataWithoutName()) {
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Remove transactions that we already have local data for
|
||||
if (hasLocalData(arbitraryTransaction)) {
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (signatures.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Pick one at random
|
||||
final int index = new Random().nextInt(signatures.size());
|
||||
byte[] signature = signatures.get(index);
|
||||
|
||||
if (signature == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check to see if we have had a more recent PUT
|
||||
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
|
||||
boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData);
|
||||
if (hasMoreRecentPutTransaction) {
|
||||
// There is a more recent PUT transaction than the one we are currently processing.
|
||||
// When a PUT is issued, it replaces any layers that would have been there before.
|
||||
// Therefore any data relating to this older transaction is no longer needed and we
|
||||
// shouldn't fetch it from the network.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ask our connected peers if they have files for this signature
|
||||
// This process automatically then fetches the files themselves if a peer is found
|
||||
fetchDataForSignature(signature);
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
|
||||
case NONE:
|
||||
case VIEWED:
|
||||
default:
|
||||
// Nothing to fetch in advance
|
||||
Thread.sleep(60000);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
@ -196,6 +132,109 @@ public class ArbitraryDataManager extends Thread {
|
||||
this.interrupt();
|
||||
}
|
||||
|
||||
private void processNames() {
|
||||
// Fetch latest list of followed names
|
||||
List<String> followedNames = ResourceListManager.getInstance().getStringsInList("followed", "names");
|
||||
if (followedNames == null || followedNames.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Loop through the names in the list and fetch transactions for each
|
||||
for (String name : followedNames) {
|
||||
this.fetchAndProcessTransactions(name);
|
||||
}
|
||||
}
|
||||
|
||||
private void processAll() {
|
||||
this.fetchAndProcessTransactions(null);
|
||||
}
|
||||
|
||||
private void fetchAndProcessTransactions(String name) {
|
||||
ArbitraryDataStorageManager storageManager = ArbitraryDataStorageManager.getInstance();
|
||||
|
||||
// Paginate queries when fetching arbitrary transactions
|
||||
final int limit = 100;
|
||||
int offset = 0;
|
||||
|
||||
while (!isStopping) {
|
||||
|
||||
// Any arbitrary transactions we want to fetch data for?
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, ARBITRARY_TX_TYPE, null, name, null, ConfirmationStatus.BOTH, limit, offset, true);
|
||||
// LOGGER.info("Found {} arbitrary transactions at offset: {}, limit: {}", signatures.size(), offset, limit);
|
||||
if (signatures == null || signatures.isEmpty()) {
|
||||
offset = 0;
|
||||
break;
|
||||
}
|
||||
offset += limit;
|
||||
|
||||
// Loop through signatures and remove ones we don't need to process
|
||||
Iterator iterator = signatures.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
byte[] signature = (byte[]) iterator.next();
|
||||
|
||||
ArbitraryTransaction arbitraryTransaction = fetchTransaction(repository, signature);
|
||||
if (arbitraryTransaction == null) {
|
||||
// Best not to process this one
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) arbitraryTransaction.getTransactionData();
|
||||
|
||||
// Skip transactions that we don't need to store data for
|
||||
if (arbitraryTransactionData.getName() != null) {
|
||||
if (!storageManager.shouldStoreDataForName(arbitraryTransactionData.getName())) {
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// Transaction has no name associated with it
|
||||
if (!storageManager.shouldStoreDataWithoutName()) {
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Remove transactions that we already have local data for
|
||||
if (hasLocalData(arbitraryTransaction)) {
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (signatures.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Pick one at random
|
||||
final int index = new Random().nextInt(signatures.size());
|
||||
byte[] signature = signatures.get(index);
|
||||
|
||||
if (signature == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check to see if we have had a more recent PUT
|
||||
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
|
||||
boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData);
|
||||
if (hasMoreRecentPutTransaction) {
|
||||
// There is a more recent PUT transaction than the one we are currently processing.
|
||||
// When a PUT is issued, it replaces any layers that would have been there before.
|
||||
// Therefore any data relating to this older transaction is no longer needed and we
|
||||
// shouldn't fetch it from the network.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ask our connected peers if they have files for this signature
|
||||
// This process automatically then fetches the files themselves if a peer is found
|
||||
fetchDataForSignature(signature);
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ArbitraryTransaction fetchTransaction(final Repository repository, byte[] signature) {
|
||||
try {
|
||||
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);
|
||||
|
@ -161,6 +161,10 @@ public class ResourceList {
|
||||
return this.resourceName;
|
||||
}
|
||||
|
||||
public List<String> getList() {
|
||||
return this.list;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return String.format("%s %s", this.category, this.resourceName);
|
||||
}
|
||||
|
@ -125,4 +125,12 @@ public class ResourceListManager {
|
||||
return list.getJSONString();
|
||||
}
|
||||
|
||||
public List<String> getStringsInList(String category, String resourceName) {
|
||||
ResourceList list = this.getList(category, resourceName);
|
||||
if (list == null) {
|
||||
return null;
|
||||
}
|
||||
return list.getList();
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user