From d82da160f3e62223723b490085fba922bc8baa9f Mon Sep 17 00:00:00 2001 From: CalDescent Date: Fri, 29 Oct 2021 13:35:17 +0100 Subject: [PATCH] Added DHT-style lookup table to track file locations This maps ARBITRARY transactions to peer addresses, but also includes additional metadata/stats to track the success rate and reachability. Once a node receives files for a transaction, it broadcasts this info to its peers so they can update their records. TLDR: this allows us to locate peers that are hosting a copy of the file we need. --- .../org/qortal/controller/Controller.java | 4 + .../arbitrary/ArbitraryDataManager.java | 35 +++++- .../data/network/ArbitraryPeerData.java | 71 +++++++++++ .../message/ArbitrarySignaturesMessage.java | 65 ++++++++++ .../org/qortal/network/message/Message.java | 4 +- .../repository/ArbitraryRepository.java | 10 ++ .../hsqldb/HSQLDBArbitraryRepository.java | 106 +++++++++++++++- .../hsqldb/HSQLDBDatabaseUpdates.java | 18 +++ .../test/arbitrary/ArbitraryPeerTests.java | 115 ++++++++++++++++++ 9 files changed, 422 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/qortal/data/network/ArbitraryPeerData.java create mode 100644 src/main/java/org/qortal/network/message/ArbitrarySignaturesMessage.java create mode 100644 src/test/java/org/qortal/test/arbitrary/ArbitraryPeerTests.java diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 5b04aae5..0805b4a3 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1408,6 +1408,10 @@ public class Controller extends Thread { ArbitraryDataManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message); break; + case ARBITRARY_SIGNATURES: + ArbitraryDataManager.getInstance().onNetworkArbitrarySignaturesMessage(peer, message); + break; + default: LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer)); break; diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index d329ce6d..85e2b2d4 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.qortal.api.resource.TransactionsResource.ConfirmationStatus; import org.qortal.arbitrary.ArbitraryDataBuildQueueItem; import org.qortal.controller.Controller; +import org.qortal.data.network.ArbitraryPeerData; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; import org.qortal.network.Network; @@ -544,15 +545,21 @@ public class ArbitraryDataManager extends Thread { } } - // If we have all the chunks for this transaction's name, we should invalidate the data cache - // so that it is rebuilt the next time we serve it + // Check if we have all the chunks for this transaction if (arbitraryDataFile.exists() || arbitraryDataFile.allChunksExist(arbitraryTransactionData.getChunkHashes())) { + + // We have all the chunks for this transaction, so we should invalidate the transaction's name's + // data cache so that it is rebuilt the next time we serve it if (arbitraryTransactionData.getName() != null) { String resourceId = arbitraryTransactionData.getName().toLowerCase(); if (this.arbitraryDataCachedResources.containsKey(resourceId)) { this.arbitraryDataCachedResources.remove(resourceId); } } + + // We also need to broadcast to the network that we are now hosting files for this transaction + Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(Arrays.asList(signature)); + Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage); } } catch (DataException | InterruptedException e) { @@ -657,4 +664,28 @@ public class ArbitraryDataManager extends Thread { LOGGER.info("Sent list of hashes (count: {})", hashes.size()); } + public void onNetworkArbitrarySignaturesMessage(Peer peer, Message message) { + LOGGER.info("Received arbitrary signature list from peer {}", peer); + ArbitrarySignaturesMessage arbitrarySignaturesMessage = (ArbitrarySignaturesMessage) message; + List signatures = arbitrarySignaturesMessage.getSignatures(); + + try (final Repository repository = RepositoryManager.getRepository()) { + for (byte[] signature : signatures) { + + // Check if a record already exists for this hash/peer combination + ArbitraryPeerData existingEntry = repository.getArbitraryRepository() + .getArbitraryPeerDataForSignatureAndPeer(signature, peer.getPeerData().getAddress().toString()); + + if (existingEntry == null) { + // We haven't got a record of this mapping yet, so add it + LOGGER.info("Adding arbitrary peer: {} for signature {}", peer.getPeerData().getAddress().toString(), Base58.encode(signature)); + ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer); + repository.getArbitraryRepository().save(arbitraryPeerData); + } + } + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while processing arbitrary transaction signature list from peer %s", peer), e); + } + } + } diff --git a/src/main/java/org/qortal/data/network/ArbitraryPeerData.java b/src/main/java/org/qortal/data/network/ArbitraryPeerData.java new file mode 100644 index 00000000..3bd70939 --- /dev/null +++ b/src/main/java/org/qortal/data/network/ArbitraryPeerData.java @@ -0,0 +1,71 @@ +package org.qortal.data.network; + +import org.qortal.crypto.Crypto; +import org.qortal.network.Peer; +import org.qortal.utils.NTP; + +public class ArbitraryPeerData { + + private final byte[] hash; + private final String peerAddress; + private Integer successes; + private Integer failures; + private Long lastAttempted; + private Long lastRetrieved; + + public ArbitraryPeerData(byte[] hash, String peerAddress, Integer successes, + Integer failures, Long lastAttempted, Long lastRetrieved) { + this.hash = hash; + this.peerAddress = peerAddress; + this.successes = successes; + this.failures = failures; + this.lastAttempted = lastAttempted; + this.lastRetrieved = lastRetrieved; + } + + public ArbitraryPeerData(byte[] signature, Peer peer) { + this(Crypto.digest(signature), peer.getPeerData().getAddress().toString(), + 0, 0, 0L, 0L); + } + + public void incrementSuccesses() { + this.successes++; + } + + public void incrementFailures() { + this.failures++; + } + + public void markAsAttempted() { + this.lastAttempted = NTP.getTime(); + } + + public void markAsRetrieved() { + this.lastRetrieved = NTP.getTime(); + } + + public byte[] getHash() { + return this.hash; + } + + public String getPeerAddress() { + return this.peerAddress; + } + + public Integer getSuccesses() { + return this.successes; + } + + public Integer getFailures() { + return this.failures; + } + + public Long getLastAttempted() { + return this.lastAttempted; + } + + public Long getLastRetrieved() { + return this.lastRetrieved; + } + +} diff --git a/src/main/java/org/qortal/network/message/ArbitrarySignaturesMessage.java b/src/main/java/org/qortal/network/message/ArbitrarySignaturesMessage.java new file mode 100644 index 00000000..70a26775 --- /dev/null +++ b/src/main/java/org/qortal/network/message/ArbitrarySignaturesMessage.java @@ -0,0 +1,65 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class ArbitrarySignaturesMessage extends Message { + + private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH; + + private List signatures; + + public ArbitrarySignaturesMessage(List signatures) { + this(-1, signatures); + } + + private ArbitrarySignaturesMessage(int id, List signatures) { + super(id, MessageType.ARBITRARY_SIGNATURES); + + this.signatures = signatures; + } + + public List getSignatures() { + return this.signatures; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int count = bytes.getInt(); + + if (bytes.remaining() != count * SIGNATURE_LENGTH) + return null; + + List signatures = new ArrayList<>(); + for (int i = 0; i < count; ++i) { + byte[] signature = new byte[SIGNATURE_LENGTH]; + bytes.get(signature); + signatures.add(signature); + } + + return new ArbitrarySignaturesMessage(id, signatures); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Ints.toByteArray(this.signatures.size())); + + for (byte[] signature : this.signatures) + bytes.write(signature); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index 662eaeaf..c7657493 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -89,7 +89,9 @@ public abstract class Message { GET_ARBITRARY_DATA_FILE(111), ARBITRARY_DATA_FILE_LIST(120), - GET_ARBITRARY_DATA_FILE_LIST(121); + GET_ARBITRARY_DATA_FILE_LIST(121), + + ARBITRARY_SIGNATURES(130); public final int value; public final Method fromByteBufferMethod; diff --git a/src/main/java/org/qortal/repository/ArbitraryRepository.java b/src/main/java/org/qortal/repository/ArbitraryRepository.java index 5e3e657a..549f2c5d 100644 --- a/src/main/java/org/qortal/repository/ArbitraryRepository.java +++ b/src/main/java/org/qortal/repository/ArbitraryRepository.java @@ -1,5 +1,6 @@ package org.qortal.repository; +import org.qortal.data.network.ArbitraryPeerData; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.ArbitraryTransactionData.*; @@ -19,4 +20,13 @@ public interface ArbitraryRepository { public ArbitraryTransactionData getLatestTransaction(String name, Service service, Method method) throws DataException; + + public List getArbitraryPeerDataForSignature(byte[] signature) throws DataException; + + public ArbitraryPeerData getArbitraryPeerDataForSignatureAndPeer(byte[] signature, String peerAddress) throws DataException; + + public void save(ArbitraryPeerData arbitraryPeerData) throws DataException; + + public void delete(ArbitraryPeerData arbitraryPeerData) throws DataException; + } diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java index 0e94314c..e23373ee 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java @@ -1,9 +1,8 @@ package org.qortal.repository.hsqldb; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.qortal.crypto.Crypto; import org.qortal.data.PaymentData; +import org.qortal.data.network.ArbitraryPeerData; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.ArbitraryTransactionData.*; import org.qortal.data.transaction.BaseTransactionData; @@ -16,7 +15,6 @@ import org.qortal.transaction.Transaction.ApprovalStatus; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; public class HSQLDBArbitraryRepository implements ArbitraryRepository { @@ -290,4 +288,106 @@ public class HSQLDBArbitraryRepository implements ArbitraryRepository { } } + + // Peer file tracking + + /** + * Fetch a list of peers that have reported to be holding chunks related to + * supplied transaction signature. + * @param signature + * @return a list of ArbitraryPeerData objects, or null if none found + * @throws DataException + */ + @Override + public List getArbitraryPeerDataForSignature(byte[] signature) throws DataException { + // Hash the signature so it fits within 32 bytes + byte[] hashedSignature = Crypto.digest(signature); + + String sql = "SELECT hash, peer_address, successes, failures, last_attempted, last_retrieved " + + "FROM ArbitraryPeers " + + "WHERE hash = ?"; + + List arbitraryPeerData = new ArrayList<>(); + + try (ResultSet resultSet = this.repository.checkedExecute(sql, hashedSignature)) { + if (resultSet == null) + return null; + + do { + byte[] hash = resultSet.getBytes(1); + String peerAddr = resultSet.getString(2); + Integer successes = resultSet.getInt(3); + Integer failures = resultSet.getInt(4); + Long lastAttempted = resultSet.getLong(5); + Long lastRetrieved = resultSet.getLong(6); + + ArbitraryPeerData peerData = new ArbitraryPeerData(hash, peerAddr, successes, failures, + lastAttempted, lastRetrieved); + + arbitraryPeerData.add(peerData); + } while (resultSet.next()); + + return arbitraryPeerData; + } catch (SQLException e) { + throw new DataException("Unable to fetch arbitrary peer data from repository", e); + } + } + + public ArbitraryPeerData getArbitraryPeerDataForSignatureAndPeer(byte[] signature, String peerAddress) throws DataException { + // Hash the signature so it fits within 32 bytes + byte[] hashedSignature = Crypto.digest(signature); + + String sql = "SELECT hash, peer_address, successes, failures, last_attempted, last_retrieved " + + "FROM ArbitraryPeers " + + "WHERE hash = ? AND peer_address = ?"; + + try (ResultSet resultSet = this.repository.checkedExecute(sql, hashedSignature, peerAddress)) { + if (resultSet == null) + return null; + + byte[] hash = resultSet.getBytes(1); + String peerAddr = resultSet.getString(2); + Integer successes = resultSet.getInt(3); + Integer failures = resultSet.getInt(4); + Long lastAttempted = resultSet.getLong(5); + Long lastRetrieved = resultSet.getLong(6); + + ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(hash, peerAddr, successes, failures, + lastAttempted, lastRetrieved); + + return arbitraryPeerData; + } catch (SQLException e) { + throw new DataException("Unable to fetch arbitrary peer data from repository", e); + } + } + + @Override + public void save(ArbitraryPeerData arbitraryPeerData) throws DataException { + HSQLDBSaver saveHelper = new HSQLDBSaver("ArbitraryPeers"); + + saveHelper.bind("hash", arbitraryPeerData.getHash()) + .bind("peer_address", arbitraryPeerData.getPeerAddress()) + .bind("successes", arbitraryPeerData.getSuccesses()) + .bind("failures", arbitraryPeerData.getFailures()) + .bind("last_attempted", arbitraryPeerData.getLastAttempted()) + .bind("last_retrieved", arbitraryPeerData.getLastRetrieved()); + + try { + saveHelper.execute(this.repository); + } catch (SQLException e) { + throw new DataException("Unable to save ArbitraryPeerData into repository", e); + } + } + + @Override + public void delete(ArbitraryPeerData arbitraryPeerData) throws DataException { + try { + // Remove peer/hash combination + this.repository.delete("ArbitraryPeers", "hash = ? AND peer_address = ?", + arbitraryPeerData.getHash(), arbitraryPeerData.getPeerAddress()); + + } catch (SQLException e) { + throw new DataException("Unable to delete arbitrary peer data from repository", e); + } + } } diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBDatabaseUpdates.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBDatabaseUpdates.java index 29a1fbc1..1cc3e7f9 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBDatabaseUpdates.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBDatabaseUpdates.java @@ -922,6 +922,24 @@ public class HSQLDBDatabaseUpdates { stmt.execute("ALTER TABLE ArbitraryTransactions ADD compression INTEGER NOT NULL DEFAULT 0"); break; + case 39: + // Add DHT-style lookup table to track file locations + // This maps ARBITRARY transactions to peer addresses, but also includes additional metadata to + // track the local success rate and reachability. It is keyed by a "hash" column, to keep it + // generic, as this way we aren't limited to transaction signatures only. + // Multiple rows with the same hash are allowed, to allow for metadata. Longer term it could be + // reshaped to one row per hash if this is too verbose. + // Transaction signatures are hashed to 32 bytes using SHA256. In doing this we lose the ability + // to join against transaction tables, but on balance the space savings seem more important. + stmt.execute("CREATE TABLE ArbitraryPeers (hash VARBINARY(32) NOT NULL, " + + "peer_address VARCHAR(255), successes INTEGER NOT NULL, failures INTEGER NOT NULL, " + + "last_attempted EpochMillis NOT NULL, last_retrieved EpochMillis NOT NULL, " + + "PRIMARY KEY (hash, peer_address))"); + + // For finding peers by data hash + stmt.execute("CREATE INDEX ArbitraryPeersHashIndex ON ArbitraryPeers (hash)"); + break; + default: // nothing to do return false; diff --git a/src/test/java/org/qortal/test/arbitrary/ArbitraryPeerTests.java b/src/test/java/org/qortal/test/arbitrary/ArbitraryPeerTests.java new file mode 100644 index 00000000..16eb17bb --- /dev/null +++ b/src/test/java/org/qortal/test/arbitrary/ArbitraryPeerTests.java @@ -0,0 +1,115 @@ +package org.qortal.test.arbitrary; + +import org.junit.Before; +import org.junit.Test; +import org.qortal.crypto.Crypto; +import org.qortal.data.network.ArbitraryPeerData; +import org.qortal.data.network.PeerData; +import org.qortal.network.Peer; +import org.qortal.network.PeerAddress; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.test.common.Common; +import org.qortal.utils.NTP; + +import java.util.Random; + +import static org.junit.Assert.*; + +public class ArbitraryPeerTests extends Common { + + @Before + public void beforeTest() throws DataException { + Common.useDefaultSettings(); + } + + @Test + public void testSaveArbitraryPeerData() throws DataException { + try (final Repository repository = RepositoryManager.getRepository()) { + + String peerAddress = "127.0.0.1:12392"; + + // Create random bytes to represent a signature + byte[] signature = new byte[64]; + new Random().nextBytes(signature); + + // Make sure we don't have an entry for this hash/peer combination + assertNull(repository.getArbitraryRepository().getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress)); + + // Now add this mapping to the db + Peer peer = new Peer(new PeerData(PeerAddress.fromString(peerAddress))); + ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer); + repository.getArbitraryRepository().save(arbitraryPeerData); + + // We should now have an entry for this hash/peer combination + ArbitraryPeerData retrievedArbitraryPeerData = repository.getArbitraryRepository() + .getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress); + assertNotNull(arbitraryPeerData); + + // .. and its data should match what was saved + assertArrayEquals(Crypto.digest(signature), retrievedArbitraryPeerData.getHash()); + assertEquals(peerAddress, retrievedArbitraryPeerData.getPeerAddress()); + + } + } + + @Test + public void testUpdateArbitraryPeerData() throws DataException, InterruptedException { + try (final Repository repository = RepositoryManager.getRepository()) { + + String peerAddress = "127.0.0.1:12392"; + + // Create random bytes to represent a signature + byte[] signature = new byte[64]; + new Random().nextBytes(signature); + + // Make sure we don't have an entry for this hash/peer combination + assertNull(repository.getArbitraryRepository().getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress)); + + // Now add this mapping to the db + Peer peer = new Peer(new PeerData(PeerAddress.fromString(peerAddress))); + ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer); + repository.getArbitraryRepository().save(arbitraryPeerData); + + // We should now have an entry for this hash/peer combination + ArbitraryPeerData retrievedArbitraryPeerData = repository.getArbitraryRepository() + .getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress); + assertNotNull(arbitraryPeerData); + + // .. and its data should match what was saved + assertArrayEquals(Crypto.digest(signature), retrievedArbitraryPeerData.getHash()); + assertEquals(peerAddress, retrievedArbitraryPeerData.getPeerAddress()); + + // All stats should be zero + assertEquals(Integer.valueOf(0), retrievedArbitraryPeerData.getSuccesses()); + assertEquals(Integer.valueOf(0), retrievedArbitraryPeerData.getFailures()); + assertEquals(Long.valueOf(0), retrievedArbitraryPeerData.getLastAttempted()); + assertEquals(Long.valueOf(0), retrievedArbitraryPeerData.getLastRetrieved()); + + // Now modify some values and re-save + retrievedArbitraryPeerData.incrementSuccesses(); retrievedArbitraryPeerData.incrementSuccesses(); // Twice + retrievedArbitraryPeerData.incrementFailures(); // Once + retrievedArbitraryPeerData.markAsAttempted(); + Thread.sleep(100); + retrievedArbitraryPeerData.markAsRetrieved(); + repository.getArbitraryRepository().save(retrievedArbitraryPeerData); + + // Retrieve data once again + ArbitraryPeerData updatedArbitraryPeerData = repository.getArbitraryRepository() + .getArbitraryPeerDataForSignatureAndPeer(signature, peerAddress); + assertNotNull(updatedArbitraryPeerData); + + // Check the values + assertArrayEquals(Crypto.digest(signature), updatedArbitraryPeerData.getHash()); + assertEquals(peerAddress, updatedArbitraryPeerData.getPeerAddress()); + assertEquals(Integer.valueOf(2), updatedArbitraryPeerData.getSuccesses()); + assertEquals(Integer.valueOf(1), updatedArbitraryPeerData.getFailures()); + assertTrue(updatedArbitraryPeerData.getLastRetrieved().longValue() > 0L); + assertTrue(updatedArbitraryPeerData.getLastAttempted().longValue() > 0L); + assertTrue(updatedArbitraryPeerData.getLastRetrieved() > updatedArbitraryPeerData.getLastAttempted()); + assertTrue(NTP.getTime() - updatedArbitraryPeerData.getLastRetrieved() < 1000); + assertTrue(NTP.getTime() - updatedArbitraryPeerData.getLastAttempted() < 1000); + } + } +}