Reworked existing unused ArbitraryDataManager.

It's now capable of syncing chunks as well as complete files. This isn't production ready as it currently requests/receives the same file from multiple peers at once, which slows down the sync and wastes lots of bandwidth. Ideally we would find an appropriate peer first and then sync the file from them.
This commit is contained in:
CalDescent 2021-07-05 09:05:45 +01:00
parent 7531fe14fe
commit 5319c5f832
2 changed files with 66 additions and 21 deletions

View File

@ -12,6 +12,8 @@ import org.qortal.data.transaction.TransactionData;
import org.qortal.repository.DataException; import org.qortal.repository.DataException;
import org.qortal.repository.Repository; import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager; import org.qortal.repository.RepositoryManager;
import org.qortal.storage.DataFile;
import org.qortal.storage.DataFileChunk;
import org.qortal.transaction.ArbitraryTransaction; import org.qortal.transaction.ArbitraryTransaction;
import org.qortal.transaction.Transaction.TransactionType; import org.qortal.transaction.Transaction.TransactionType;
@ -45,20 +47,69 @@ public class ArbitraryDataManager extends Thread {
// Any arbitrary transactions we want to fetch data for? // Any arbitrary transactions we want to fetch data for?
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, ARBITRARY_TX_TYPE, null, null, ConfirmationStatus.BOTH, null, null, true); List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, ARBITRARY_TX_TYPE, null, null, ConfirmationStatus.BOTH, null, null, true);
if (signatures == null || signatures.isEmpty()) if (signatures == null || signatures.isEmpty()) {
continue; continue;
}
// Filter out those that already have local data // Filter out those that already have local data
signatures.removeIf(signature -> hasLocalData(repository, signature)); signatures.removeIf(signature -> hasLocalData(repository, signature));
if (signatures.isEmpty()) if (signatures.isEmpty()) {
continue; continue;
}
// Pick one at random // Pick one at random
final int index = new Random().nextInt(signatures.size()); final int index = new Random().nextInt(signatures.size());
byte[] signature = signatures.get(index); byte[] signature = signatures.get(index);
Controller.getInstance().fetchArbitraryData(signature); // Load the full transaction data so we can access the file hashes
ArbitraryTransactionData transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature);
if (!(transactionData instanceof ArbitraryTransactionData)) {
signatures.remove(signature);
continue;
}
// Load hashes
byte[] digest = transactionData.getData();
byte[] chunkHashes = transactionData.getChunkHashes();
// Load data file(s)
DataFile dataFile = DataFile.fromDigest(digest);
if (chunkHashes.length > 0) {
dataFile.addChunkHashes(chunkHashes);
// Now try and fetch each chunk in turn if we don't have them already
for (DataFileChunk dataFileChunk : dataFile.getChunks()) {
if (!dataFileChunk.exists()) {
LOGGER.info("Requesting chunk {}...", dataFileChunk);
boolean success = Controller.getInstance().fetchArbitraryDataFile(dataFileChunk.getHash());
if (success) {
LOGGER.info("Chunk {} received", dataFileChunk);
}
else {
LOGGER.info("Couldn't retrieve chunk {}", dataFileChunk);
}
}
}
}
else if (transactionData.getSize() < DataFileChunk.CHUNK_SIZE) {
// Fetch the complete file, as it is less than the chunk size
LOGGER.info("Requesting file {}...", dataFile.getHash58());
boolean success = Controller.getInstance().fetchArbitraryDataFile(dataFile.getHash());
if (success) {
LOGGER.info("File {} received", dataFile);
}
else {
LOGGER.info("Couldn't retrieve file {}", dataFile);
}
}
else {
// Invalid transaction (should have already failed validation)
LOGGER.info(String.format("Invalid arbitrary transaction: %.8s", signature));
}
signatures.remove(signature);
} catch (DataException e) { } catch (DataException e) {
LOGGER.error("Repository issue when fetching arbitrary transaction data", e); LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
} }

View File

@ -457,8 +457,8 @@ public class Controller extends Thread {
blockMinter.start(); blockMinter.start();
// Arbitrary transaction data manager // Arbitrary transaction data manager
// LOGGER.info("Starting arbitrary-transaction data manager"); LOGGER.info("Starting arbitrary-transaction data manager");
// ArbitraryDataManager.getInstance().start(); ArbitraryDataManager.getInstance().start();
// Auto-update service? // Auto-update service?
if (Settings.getInstance().isAutoUpdateEnabled()) { if (Settings.getInstance().isAutoUpdateEnabled()) {
@ -920,8 +920,8 @@ public class Controller extends Thread {
} }
// Arbitrary transaction data manager // Arbitrary transaction data manager
// LOGGER.info("Shutting down arbitrary-transaction data manager"); LOGGER.info("Shutting down arbitrary-transaction data manager");
// ArbitraryDataManager.getInstance().shutdown(); ArbitraryDataManager.getInstance().shutdown();
if (blockMinter != null) { if (blockMinter != null) {
LOGGER.info("Shutting down block minter"); LOGGER.info("Shutting down block minter");
@ -2022,13 +2022,13 @@ public class Controller extends Thread {
} }
} }
public byte[] fetchArbitraryData(byte[] signature) throws InterruptedException { public boolean fetchArbitraryDataFile(byte[] hash) throws InterruptedException {
// Build request // Build request
Message getArbitraryDataMessage = new GetArbitraryDataMessage(signature); Message getDataFileMessage = new GetDataFileMessage(hash);
// Save our request into requests map // Save our request into requests map
String signature58 = Base58.encode(signature); String hash58 = Base58.encode(hash);
Triple<String, Peer, Long> requestEntry = new Triple<>(signature58, null, NTP.getTime()); Triple<String, Peer, Long> requestEntry = new Triple<>(hash58, null, NTP.getTime());
// Assign random ID to this message // Assign random ID to this message
int id; int id;
@ -2038,10 +2038,10 @@ public class Controller extends Thread {
// Put queue into map (keyed by message ID) so we can poll for a response // Put queue into map (keyed by message ID) so we can poll for a response
// If putIfAbsent() doesn't return null, then this ID is already taken // If putIfAbsent() doesn't return null, then this ID is already taken
} while (arbitraryDataRequests.put(id, requestEntry) != null); } while (arbitraryDataRequests.put(id, requestEntry) != null);
getArbitraryDataMessage.setId(id); getDataFileMessage.setId(id);
// Broadcast request // Broadcast request
Network.getInstance().broadcast(peer -> getArbitraryDataMessage); Network.getInstance().broadcast(peer -> getDataFileMessage);
// Poll to see if data has arrived // Poll to see if data has arrived
final long singleWait = 100; final long singleWait = 100;
@ -2051,20 +2051,14 @@ public class Controller extends Thread {
requestEntry = arbitraryDataRequests.get(id); requestEntry = arbitraryDataRequests.get(id);
if (requestEntry == null) if (requestEntry == null)
return null; return false;
if (requestEntry.getA() == null) if (requestEntry.getA() == null)
break; break;
totalWait += singleWait; totalWait += singleWait;
} }
return true;
try (final Repository repository = RepositoryManager.getRepository()) {
return repository.getArbitraryRepository().fetchData(signature);
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while fetching arbitrary transaction data"), e);
return null;
}
} }
/** Returns a list of peers that are not misbehaving, and have a recent block. */ /** Returns a list of peers that are not misbehaving, and have a recent block. */