Added DHT-style lookup table to track file locations

This maps ARBITRARY transactions to peer addresses, but also includes additional metadata/stats to track the success rate and reachability.

Once a node receives files for a transaction, it broadcasts this info to its peers so they can update their records.

TLDR: this allows us to locate peers that are hosting a copy of the file we need.
This commit is contained in:
CalDescent 2021-10-29 13:35:17 +01:00
parent 54c8aac20d
commit d82da160f3
9 changed files with 422 additions and 6 deletions

View File

@ -1408,6 +1408,10 @@ public class Controller extends Thread {
ArbitraryDataManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message);
break;
case ARBITRARY_SIGNATURES:
ArbitraryDataManager.getInstance().onNetworkArbitrarySignaturesMessage(peer, message);
break;
default:
LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));
break;

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.qortal.api.resource.TransactionsResource.ConfirmationStatus;
import org.qortal.arbitrary.ArbitraryDataBuildQueueItem;
import org.qortal.controller.Controller;
import org.qortal.data.network.ArbitraryPeerData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.network.Network;
@ -544,15 +545,21 @@ public class ArbitraryDataManager extends Thread {
}
}
// If we have all the chunks for this transaction's name, we should invalidate the data cache
// so that it is rebuilt the next time we serve it
// Check if we have all the chunks for this transaction
if (arbitraryDataFile.exists() || arbitraryDataFile.allChunksExist(arbitraryTransactionData.getChunkHashes())) {
// We have all the chunks for this transaction, so we should invalidate the transaction's name's
// data cache so that it is rebuilt the next time we serve it
if (arbitraryTransactionData.getName() != null) {
String resourceId = arbitraryTransactionData.getName().toLowerCase();
if (this.arbitraryDataCachedResources.containsKey(resourceId)) {
this.arbitraryDataCachedResources.remove(resourceId);
}
}
// We also need to broadcast to the network that we are now hosting files for this transaction
Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(Arrays.asList(signature));
Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage);
}
} catch (DataException | InterruptedException e) {
@ -657,4 +664,28 @@ public class ArbitraryDataManager extends Thread {
LOGGER.info("Sent list of hashes (count: {})", hashes.size());
}
public void onNetworkArbitrarySignaturesMessage(Peer peer, Message message) {
LOGGER.info("Received arbitrary signature list from peer {}", peer);
ArbitrarySignaturesMessage arbitrarySignaturesMessage = (ArbitrarySignaturesMessage) message;
List<byte[]> signatures = arbitrarySignaturesMessage.getSignatures();
try (final Repository repository = RepositoryManager.getRepository()) {
for (byte[] signature : signatures) {
// Check if a record already exists for this hash/peer combination
ArbitraryPeerData existingEntry = repository.getArbitraryRepository()
.getArbitraryPeerDataForSignatureAndPeer(signature, peer.getPeerData().getAddress().toString());
if (existingEntry == null) {
// We haven't got a record of this mapping yet, so add it
LOGGER.info("Adding arbitrary peer: {} for signature {}", peer.getPeerData().getAddress().toString(), Base58.encode(signature));
ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer);
repository.getArbitraryRepository().save(arbitraryPeerData);
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing arbitrary transaction signature list from peer %s", peer), e);
}
}
}

View File

@ -0,0 +1,71 @@
package org.qortal.data.network;
import org.qortal.crypto.Crypto;
import org.qortal.network.Peer;
import org.qortal.utils.NTP;
public class ArbitraryPeerData {
private final byte[] hash;
private final String peerAddress;
private Integer successes;
private Integer failures;
private Long lastAttempted;
private Long lastRetrieved;
public ArbitraryPeerData(byte[] hash, String peerAddress, Integer successes,
Integer failures, Long lastAttempted, Long lastRetrieved) {
this.hash = hash;
this.peerAddress = peerAddress;
this.successes = successes;
this.failures = failures;
this.lastAttempted = lastAttempted;
this.lastRetrieved = lastRetrieved;
}
public ArbitraryPeerData(byte[] signature, Peer peer) {
this(Crypto.digest(signature), peer.getPeerData().getAddress().toString(),
0, 0, 0L, 0L);
}
public void incrementSuccesses() {
this.successes++;
}
public void incrementFailures() {
this.failures++;
}
public void markAsAttempted() {
this.lastAttempted = NTP.getTime();
}
public void markAsRetrieved() {
this.lastRetrieved = NTP.getTime();
}
public byte[] getHash() {
return this.hash;
}
public String getPeerAddress() {
return this.peerAddress;
}
public Integer getSuccesses() {
return this.successes;
}
public Integer getFailures() {
return this.failures;
}
public Long getLastAttempted() {
return this.lastAttempted;
}
public Long getLastRetrieved() {
return this.lastRetrieved;
}
}

View File

@ -0,0 +1,65 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import org.qortal.transform.Transformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class ArbitrarySignaturesMessage extends Message {
private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH;
private List<byte[]> signatures;
public ArbitrarySignaturesMessage(List<byte[]> signatures) {
this(-1, signatures);
}
private ArbitrarySignaturesMessage(int id, List<byte[]> signatures) {
super(id, MessageType.ARBITRARY_SIGNATURES);
this.signatures = signatures;
}
public List<byte[]> getSignatures() {
return this.signatures;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int count = bytes.getInt();
if (bytes.remaining() != count * SIGNATURE_LENGTH)
return null;
List<byte[]> signatures = new ArrayList<>();
for (int i = 0; i < count; ++i) {
byte[] signature = new byte[SIGNATURE_LENGTH];
bytes.get(signature);
signatures.add(signature);
}
return new ArbitrarySignaturesMessage(id, signatures);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.signatures.size()));
for (byte[] signature : this.signatures)
bytes.write(signature);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -89,7 +89,9 @@ public abstract class Message {
GET_ARBITRARY_DATA_FILE(111),
ARBITRARY_DATA_FILE_LIST(120),
GET_ARBITRARY_DATA_FILE_LIST(121);
GET_ARBITRARY_DATA_FILE_LIST(121),
ARBITRARY_SIGNATURES(130);
public final int value;
public final Method fromByteBufferMethod;

View File

@ -1,5 +1,6 @@
package org.qortal.repository;
import org.qortal.data.network.ArbitraryPeerData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.ArbitraryTransactionData.*;
@ -19,4 +20,13 @@ public interface ArbitraryRepository {
public ArbitraryTransactionData getLatestTransaction(String name, Service service, Method method) throws DataException;
public List<ArbitraryPeerData> getArbitraryPeerDataForSignature(byte[] signature) throws DataException;
public ArbitraryPeerData getArbitraryPeerDataForSignatureAndPeer(byte[] signature, String peerAddress) throws DataException;
public void save(ArbitraryPeerData arbitraryPeerData) throws DataException;
public void delete(ArbitraryPeerData arbitraryPeerData) throws DataException;
}

View File

@ -1,9 +1,8 @@
package org.qortal.repository.hsqldb;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.crypto.Crypto;
import org.qortal.data.PaymentData;
import org.qortal.data.network.ArbitraryPeerData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.ArbitraryTransactionData.*;
import org.qortal.data.transaction.BaseTransactionData;
@ -16,7 +15,6 @@ import org.qortal.transaction.Transaction.ApprovalStatus;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class HSQLDBArbitraryRepository implements ArbitraryRepository {
@ -290,4 +288,106 @@ public class HSQLDBArbitraryRepository implements ArbitraryRepository {
}
}
// Peer file tracking
/**
* Fetch a list of peers that have reported to be holding chunks related to
* supplied transaction signature.
* @param signature
* @return a list of ArbitraryPeerData objects, or null if none found
* @throws DataException
*/
@Override
public List<ArbitraryPeerData> getArbitraryPeerDataForSignature(byte[] signature) throws DataException {
// Hash the signature so it fits within 32 bytes
byte[] hashedSignature = Crypto.digest(signature);
String sql = "SELECT hash, peer_address, successes, failures, last_attempted, last_retrieved " +
"FROM ArbitraryPeers " +
"WHERE hash = ?";
List<ArbitraryPeerData> arbitraryPeerData = new ArrayList<>();
try (ResultSet resultSet = this.repository.checkedExecute(sql, hashedSignature)) {
if (resultSet == null)
return null;
do {
byte[] hash = resultSet.getBytes(1);
String peerAddr = resultSet.getString(2);
Integer successes = resultSet.getInt(3);
Integer failures = resultSet.getInt(4);
Long lastAttempted = resultSet.getLong(5);
Long lastRetrieved = resultSet.getLong(6);
ArbitraryPeerData peerData = new ArbitraryPeerData(hash, peerAddr, successes, failures,
lastAttempted, lastRetrieved);
arbitraryPeerData.add(peerData);
} while (resultSet.next());
return arbitraryPeerData;
} catch (SQLException e) {
throw new DataException("Unable to fetch arbitrary peer data from repository", e);
}
}
public ArbitraryPeerData getArbitraryPeerDataForSignatureAndPeer(byte[] signature, String peerAddress) throws DataException {
// Hash the signature so it fits within 32 bytes
byte[] hashedSignature = Crypto.digest(signature);
String sql = "SELECT hash, peer_address, successes, failures, last_attempted, last_retrieved " +
"FROM ArbitraryPeers " +
"WHERE hash = ? AND peer_address = ?";
try (ResultSet resultSet = this.repository.checkedExecute(sql, hashedSignature, peerAddress)) {
if (resultSet == null)
return null;
byte[] hash = resultSet.getBytes(1);
String peerAddr = resultSet.getString(2);
Integer successes = resultSet.getInt(3);
Integer failures = resultSet.getInt(4);
Long lastAttempted = resultSet.getLong(5);
Long lastRetrieved = resultSet.getLong(6);
ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(hash, peerAddr, successes, failures,
lastAttempted, lastRetrieved);
return arbitraryPeerData;
} catch (SQLException e) {
throw new DataException("Unable to fetch arbitrary peer data from repository", e);
}
}
@Override
public void save(ArbitraryPeerData arbitraryPeerData) throws DataException {
HSQLDBSaver saveHelper = new HSQLDBSaver("ArbitraryPeers");
saveHelper.bind("hash", arbitraryPeerData.getHash())
.bind("peer_address", arbitraryPeerData.getPeerAddress())
.bind("successes", arbitraryPeerData.getSuccesses())
.bind("failures", arbitraryPeerData.getFailures())
.bind("last_attempted", arbitraryPeerData.getLastAttempted())
.bind("last_retrieved", arbitraryPeerData.getLastRetrieved());
try {
saveHelper.execute(this.repository);
} catch (SQLException e) {
throw new DataException("Unable to save ArbitraryPeerData into repository", e);
}
}
@Override
public void delete(ArbitraryPeerData arbitraryPeerData) throws DataException {
try {
// Remove peer/hash combination
this.repository.delete("ArbitraryPeers", "hash = ? AND peer_address = ?",
arbitraryPeerData.getHash(), arbitraryPeerData.getPeerAddress());
} catch (SQLException e) {
throw new DataException("Unable to delete arbitrary peer data from repository", e);
}
}
}

View File

@ -922,6 +922,24 @@ public class HSQLDBDatabaseUpdates {
stmt.execute("ALTER TABLE ArbitraryTransactions ADD compression INTEGER NOT NULL DEFAULT 0");
break;
case 39:
// Add DHT-style lookup table to track file locations
// This maps ARBITRARY transactions to peer addresses, but also includes additional metadata to
// track the local success rate and reachability. It is keyed by a "hash" column, to keep it
// generic, as this way we aren't limited to transaction signatures only.
// Multiple rows with the same hash are allowed, to allow for metadata. Longer term it could be
// reshaped to one row per hash if this is too verbose.
// Transaction signatures are hashed to 32 bytes using SHA256. In doing this we lose the ability
// to join against transaction tables, but on balance the space savings seem more important.
stmt.execute("CREATE TABLE ArbitraryPeers (hash VARBINARY(32) NOT NULL, "
+ "peer_address VARCHAR(255), successes INTEGER NOT NULL, failures INTEGER NOT NULL, "
+ "last_attempted EpochMillis NOT NULL, last_retrieved EpochMillis NOT NULL, "
+ "PRIMARY KEY (hash, peer_address))");
// For finding peers by data hash
stmt.execute("CREATE INDEX ArbitraryPeersHashIndex ON ArbitraryPeers (hash)");
break;
default:
// nothing to do
return false;

View File

@ -0,0 +1,115 @@
package org.qortal.test.arbitrary;
import org.junit.Before;
import org.junit.Test;
import org.qortal.crypto.Crypto;
import org.qortal.data.network.ArbitraryPeerData;
import org.qortal.data.network.PeerData;
import org.qortal.network.Peer;
import org.qortal.network.PeerAddress;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.test.common.Common;
import org.qortal.utils.NTP;
import java.util.Random;
import static org.junit.Assert.*;
public class ArbitraryPeerTests extends Common {
@Before
public void beforeTest() throws DataException {
Common.useDefaultSettings();
}
@Test
public void testSaveArbitraryPeerData() throws DataException {
try (final Repository repository = RepositoryManager.getRepository()) {
String peerAddress = "127.0.0.1:12392";
// Create random bytes to represent a signature
byte[] signature = new byte[64];
new Random().nextBytes(signature);
// Make sure we don't have an entry for this hash/peer combination
assertNull(repository.getArbitraryRepository().getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress));
// Now add this mapping to the db
Peer peer = new Peer(new PeerData(PeerAddress.fromString(peerAddress)));
ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer);
repository.getArbitraryRepository().save(arbitraryPeerData);
// We should now have an entry for this hash/peer combination
ArbitraryPeerData retrievedArbitraryPeerData = repository.getArbitraryRepository()
.getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress);
assertNotNull(arbitraryPeerData);
// .. and its data should match what was saved
assertArrayEquals(Crypto.digest(signature), retrievedArbitraryPeerData.getHash());
assertEquals(peerAddress, retrievedArbitraryPeerData.getPeerAddress());
}
}
@Test
public void testUpdateArbitraryPeerData() throws DataException, InterruptedException {
try (final Repository repository = RepositoryManager.getRepository()) {
String peerAddress = "127.0.0.1:12392";
// Create random bytes to represent a signature
byte[] signature = new byte[64];
new Random().nextBytes(signature);
// Make sure we don't have an entry for this hash/peer combination
assertNull(repository.getArbitraryRepository().getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress));
// Now add this mapping to the db
Peer peer = new Peer(new PeerData(PeerAddress.fromString(peerAddress)));
ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer);
repository.getArbitraryRepository().save(arbitraryPeerData);
// We should now have an entry for this hash/peer combination
ArbitraryPeerData retrievedArbitraryPeerData = repository.getArbitraryRepository()
.getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress);
assertNotNull(arbitraryPeerData);
// .. and its data should match what was saved
assertArrayEquals(Crypto.digest(signature), retrievedArbitraryPeerData.getHash());
assertEquals(peerAddress, retrievedArbitraryPeerData.getPeerAddress());
// All stats should be zero
assertEquals(Integer.valueOf(0), retrievedArbitraryPeerData.getSuccesses());
assertEquals(Integer.valueOf(0), retrievedArbitraryPeerData.getFailures());
assertEquals(Long.valueOf(0), retrievedArbitraryPeerData.getLastAttempted());
assertEquals(Long.valueOf(0), retrievedArbitraryPeerData.getLastRetrieved());
// Now modify some values and re-save
retrievedArbitraryPeerData.incrementSuccesses(); retrievedArbitraryPeerData.incrementSuccesses(); // Twice
retrievedArbitraryPeerData.incrementFailures(); // Once
retrievedArbitraryPeerData.markAsAttempted();
Thread.sleep(100);
retrievedArbitraryPeerData.markAsRetrieved();
repository.getArbitraryRepository().save(retrievedArbitraryPeerData);
// Retrieve data once again
ArbitraryPeerData updatedArbitraryPeerData = repository.getArbitraryRepository()
.getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress);
assertNotNull(updatedArbitraryPeerData);
// Check the values
assertArrayEquals(Crypto.digest(signature), updatedArbitraryPeerData.getHash());
assertEquals(peerAddress, updatedArbitraryPeerData.getPeerAddress());
assertEquals(Integer.valueOf(2), updatedArbitraryPeerData.getSuccesses());
assertEquals(Integer.valueOf(1), updatedArbitraryPeerData.getFailures());
assertTrue(updatedArbitraryPeerData.getLastRetrieved().longValue() > 0L);
assertTrue(updatedArbitraryPeerData.getLastAttempted().longValue() > 0L);
assertTrue(updatedArbitraryPeerData.getLastRetrieved() > updatedArbitraryPeerData.getLastAttempted());
assertTrue(NTP.getTime() - updatedArbitraryPeerData.getLastRetrieved() < 1000);
assertTrue(NTP.getTime() - updatedArbitraryPeerData.getLastAttempted() < 1000);
}
}
}