forked from Qortal/qortal
Direct peer connections now use the on-demand data retrieved from file list requests, rather than the stale and incomplete ArbitraryPeerData.
This commit is contained in:
parent
ea785f79b8
commit
6697b3376b
@ -5,6 +5,7 @@ import org.apache.logging.log4j.Logger;
|
|||||||
import org.qortal.arbitrary.ArbitraryDataFile;
|
import org.qortal.arbitrary.ArbitraryDataFile;
|
||||||
import org.qortal.arbitrary.ArbitraryDataFileChunk;
|
import org.qortal.arbitrary.ArbitraryDataFileChunk;
|
||||||
import org.qortal.controller.Controller;
|
import org.qortal.controller.Controller;
|
||||||
|
import org.qortal.data.arbitrary.ArbitraryDirectConnectionInfo;
|
||||||
import org.qortal.data.arbitrary.ArbitraryFileListResponseInfo;
|
import org.qortal.data.arbitrary.ArbitraryFileListResponseInfo;
|
||||||
import org.qortal.data.arbitrary.ArbitraryRelayInfo;
|
import org.qortal.data.arbitrary.ArbitraryRelayInfo;
|
||||||
import org.qortal.data.transaction.ArbitraryTransactionData;
|
import org.qortal.data.transaction.ArbitraryTransactionData;
|
||||||
@ -434,7 +435,6 @@ public class ArbitraryDataFileListManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ArbitraryTransactionData arbitraryTransactionData = null;
|
ArbitraryTransactionData arbitraryTransactionData = null;
|
||||||
ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance();
|
|
||||||
|
|
||||||
// Check transaction exists and hashes are correct
|
// Check transaction exists and hashes are correct
|
||||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||||
@ -461,9 +461,10 @@ public class ArbitraryDataFileListManager {
|
|||||||
// }
|
// }
|
||||||
|
|
||||||
if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) {
|
if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) {
|
||||||
|
Long now = NTP.getTime();
|
||||||
|
|
||||||
if (ArbitraryDataFileManager.getInstance().arbitraryDataFileHashResponses.size() < MAX_FILE_HASH_RESPONSES) {
|
if (ArbitraryDataFileManager.getInstance().arbitraryDataFileHashResponses.size() < MAX_FILE_HASH_RESPONSES) {
|
||||||
// Keep track of the hashes this peer reports to have access to
|
// Keep track of the hashes this peer reports to have access to
|
||||||
Long now = NTP.getTime();
|
|
||||||
for (byte[] hash : hashes) {
|
for (byte[] hash : hashes) {
|
||||||
String hash58 = Base58.encode(hash);
|
String hash58 = Base58.encode(hash);
|
||||||
|
|
||||||
@ -476,6 +477,12 @@ public class ArbitraryDataFileListManager {
|
|||||||
ArbitraryDataFileManager.getInstance().arbitraryDataFileHashResponses.add(responseInfo);
|
ArbitraryDataFileManager.getInstance().arbitraryDataFileHashResponses.add(responseInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Keep track of the source peer, for direct connections
|
||||||
|
if (arbitraryDataFileListMessage.getPeerAddress() != null) {
|
||||||
|
ArbitraryDataFileManager.getInstance().directConnectionInfo.add(
|
||||||
|
new ArbitraryDirectConnectionInfo(signature, arbitraryDataFileListMessage.getPeerAddress(), hashes, now));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (DataException e) {
|
} catch (DataException e) {
|
||||||
|
@ -4,6 +4,7 @@ import org.apache.logging.log4j.LogManager;
|
|||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.qortal.arbitrary.ArbitraryDataFile;
|
import org.qortal.arbitrary.ArbitraryDataFile;
|
||||||
import org.qortal.controller.Controller;
|
import org.qortal.controller.Controller;
|
||||||
|
import org.qortal.data.arbitrary.ArbitraryDirectConnectionInfo;
|
||||||
import org.qortal.data.arbitrary.ArbitraryFileListResponseInfo;
|
import org.qortal.data.arbitrary.ArbitraryFileListResponseInfo;
|
||||||
import org.qortal.data.arbitrary.ArbitraryRelayInfo;
|
import org.qortal.data.arbitrary.ArbitraryRelayInfo;
|
||||||
import org.qortal.data.network.ArbitraryPeerData;
|
import org.qortal.data.network.ArbitraryPeerData;
|
||||||
@ -49,6 +50,12 @@ public class ArbitraryDataFileManager extends Thread {
|
|||||||
*/
|
*/
|
||||||
public List<ArbitraryFileListResponseInfo> arbitraryDataFileHashResponses = Collections.synchronizedList(new ArrayList<>());
|
public List<ArbitraryFileListResponseInfo> arbitraryDataFileHashResponses = Collections.synchronizedList(new ArrayList<>());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List to keep track of peers potentially available for direct connections, based on recent requests
|
||||||
|
*/
|
||||||
|
public List<ArbitraryDirectConnectionInfo> directConnectionInfo = Collections.synchronizedList(new ArrayList<>());
|
||||||
|
|
||||||
|
|
||||||
public static int MAX_FILE_HASH_RESPONSES = 1000;
|
public static int MAX_FILE_HASH_RESPONSES = 1000;
|
||||||
|
|
||||||
|
|
||||||
@ -99,6 +106,9 @@ public class ArbitraryDataFileManager extends Thread {
|
|||||||
final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT;
|
final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT;
|
||||||
arbitraryRelayMap.removeIf(entry -> entry == null || entry.getTimestamp() == null || entry.getTimestamp() < relayMinimumTimestamp);
|
arbitraryRelayMap.removeIf(entry -> entry == null || entry.getTimestamp() == null || entry.getTimestamp() < relayMinimumTimestamp);
|
||||||
arbitraryDataFileHashResponses.removeIf(entry -> entry.getTimestamp() < relayMinimumTimestamp);
|
arbitraryDataFileHashResponses.removeIf(entry -> entry.getTimestamp() < relayMinimumTimestamp);
|
||||||
|
|
||||||
|
final long directConnectionInfoMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT;
|
||||||
|
directConnectionInfo.removeIf(entry -> entry.getTimestamp() < directConnectionInfoMinimumTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -296,89 +306,80 @@ public class ArbitraryDataFileManager extends Thread {
|
|||||||
|
|
||||||
// Fetch data directly from peers
|
// Fetch data directly from peers
|
||||||
|
|
||||||
|
private List<ArbitraryDirectConnectionInfo> getDirectConnectionInfoForSignature(byte[] signature) {
|
||||||
|
synchronized (directConnectionInfo) {
|
||||||
|
return directConnectionInfo.stream().filter(i -> Arrays.equals(i.getSignature(), signature)).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public boolean fetchDataFilesFromPeersForSignature(byte[] signature) {
|
public boolean fetchDataFilesFromPeersForSignature(byte[] signature) {
|
||||||
String signature58 = Base58.encode(signature);
|
String signature58 = Base58.encode(signature);
|
||||||
ArbitraryDataFileListManager.getInstance().addToSignatureRequests(signature58, false, true);
|
|
||||||
|
|
||||||
// Firstly fetch peers that claim to be hosting files for this signature
|
// Firstly fetch peers that claim to be hosting files for this signature
|
||||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
List<ArbitraryDirectConnectionInfo> connectionInfoList = getDirectConnectionInfoForSignature(signature);
|
||||||
|
if (connectionInfoList == null || connectionInfoList.isEmpty()) {
|
||||||
List<ArbitraryPeerData> peers = repository.getArbitraryRepository().getArbitraryPeerDataForSignature(signature);
|
LOGGER.debug("No direct connection peers found for signature {}", signature58);
|
||||||
if (peers == null || peers.isEmpty()) {
|
return false;
|
||||||
LOGGER.debug("No peers found for signature {}", signature58);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOGGER.debug("Attempting a direct peer connection for signature {}...", signature58);
|
|
||||||
|
|
||||||
// Peers found, so pick a random one and request data from it
|
|
||||||
int index = new SecureRandom().nextInt(peers.size());
|
|
||||||
ArbitraryPeerData arbitraryPeerData = peers.get(index);
|
|
||||||
String peerAddressString = arbitraryPeerData.getPeerAddress();
|
|
||||||
boolean success = Network.getInstance().requestDataFromPeer(peerAddressString, signature);
|
|
||||||
|
|
||||||
// Parse the peer address to find the host and port
|
|
||||||
String host = null;
|
|
||||||
int port = -1;
|
|
||||||
String[] parts = peerAddressString.split(":");
|
|
||||||
if (parts.length > 1) {
|
|
||||||
host = parts[0];
|
|
||||||
port = Integer.parseInt(parts[1]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// If unsuccessful, and using a non-standard port, try a second connection with the default listen port,
|
|
||||||
// since almost all nodes use that. This is a workaround to account for any ephemeral ports that may
|
|
||||||
// have made it into the dataset.
|
|
||||||
if (!success) {
|
|
||||||
if (host != null && port > 0) {
|
|
||||||
int defaultPort = Settings.getInstance().getDefaultListenPort();
|
|
||||||
if (port != defaultPort) {
|
|
||||||
String newPeerAddressString = String.format("%s:%d", host, defaultPort);
|
|
||||||
success = Network.getInstance().requestDataFromPeer(newPeerAddressString, signature);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If _still_ unsuccessful, try matching the peer's IP address with some known peers, and then connect
|
|
||||||
// to each of those in turn until one succeeds.
|
|
||||||
if (!success) {
|
|
||||||
if (host != null) {
|
|
||||||
final String finalHost = host;
|
|
||||||
List<PeerData> knownPeers = Network.getInstance().getAllKnownPeers().stream()
|
|
||||||
.filter(knownPeerData -> knownPeerData.getAddress().getHost().equals(finalHost))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
// Loop through each match and attempt a connection
|
|
||||||
for (PeerData matchingPeer : knownPeers) {
|
|
||||||
String matchingPeerAddress = matchingPeer.getAddress().toString();
|
|
||||||
success = Network.getInstance().requestDataFromPeer(matchingPeerAddress, signature);
|
|
||||||
if (success) {
|
|
||||||
// Successfully connected, so stop making connections
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Keep track of the success or failure
|
|
||||||
arbitraryPeerData.markAsAttempted();
|
|
||||||
if (success) {
|
|
||||||
arbitraryPeerData.markAsRetrieved();
|
|
||||||
arbitraryPeerData.incrementSuccesses();
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
arbitraryPeerData.incrementFailures();
|
|
||||||
}
|
|
||||||
repository.discardChanges();
|
|
||||||
repository.getArbitraryRepository().save(arbitraryPeerData);
|
|
||||||
repository.saveChanges();
|
|
||||||
|
|
||||||
return success;
|
|
||||||
|
|
||||||
} catch (DataException e) {
|
|
||||||
LOGGER.debug("Unable to fetch peer list from repository");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
// We have at least one peer, so mark this as a request in progress
|
||||||
|
ArbitraryDataFileListManager.getInstance().addToSignatureRequests(signature58, false, true);
|
||||||
|
|
||||||
|
LOGGER.debug("Attempting a direct peer connection for signature {}...", signature58);
|
||||||
|
|
||||||
|
// Peers found, so pick a random one and request data from it
|
||||||
|
int index = new SecureRandom().nextInt(connectionInfoList.size());
|
||||||
|
ArbitraryDirectConnectionInfo directConnectionInfo = connectionInfoList.get(index);
|
||||||
|
String peerAddressString = directConnectionInfo.getPeerAddress();
|
||||||
|
boolean success = Network.getInstance().requestDataFromPeer(peerAddressString, signature);
|
||||||
|
|
||||||
|
// Parse the peer address to find the host and port
|
||||||
|
String host = null;
|
||||||
|
int port = -1;
|
||||||
|
String[] parts = peerAddressString.split(":");
|
||||||
|
if (parts.length > 1) {
|
||||||
|
host = parts[0];
|
||||||
|
port = Integer.parseInt(parts[1]);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Use default listen port
|
||||||
|
port = Settings.getInstance().getDefaultListenPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
// If unsuccessful, and using a non-standard port, try a second connection with the default listen port,
|
||||||
|
// since almost all nodes use that. This is a workaround to account for any ephemeral ports that may
|
||||||
|
// have made it into the dataset.
|
||||||
|
if (!success) {
|
||||||
|
if (host != null && port > 0) {
|
||||||
|
int defaultPort = Settings.getInstance().getDefaultListenPort();
|
||||||
|
if (port != defaultPort) {
|
||||||
|
String newPeerAddressString = String.format("%s:%d", host, defaultPort);
|
||||||
|
success = Network.getInstance().requestDataFromPeer(newPeerAddressString, signature);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If _still_ unsuccessful, try matching the peer's IP address with some known peers, and then connect
|
||||||
|
// to each of those in turn until one succeeds.
|
||||||
|
if (!success) {
|
||||||
|
if (host != null) {
|
||||||
|
final String finalHost = host;
|
||||||
|
List<PeerData> knownPeers = Network.getInstance().getAllKnownPeers().stream()
|
||||||
|
.filter(knownPeerData -> knownPeerData.getAddress().getHost().equals(finalHost))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
// Loop through each match and attempt a connection
|
||||||
|
for (PeerData matchingPeer : knownPeers) {
|
||||||
|
String matchingPeerAddress = matchingPeer.getAddress().toString();
|
||||||
|
success = Network.getInstance().requestDataFromPeer(matchingPeerAddress, signature);
|
||||||
|
if (success) {
|
||||||
|
// Successfully connected, so stop making connections
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -44,6 +44,9 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
/** Maximum time to hold information about an in-progress relay */
|
/** Maximum time to hold information about an in-progress relay */
|
||||||
public static final long ARBITRARY_RELAY_TIMEOUT = 60 * 1000L; // ms
|
public static final long ARBITRARY_RELAY_TIMEOUT = 60 * 1000L; // ms
|
||||||
|
|
||||||
|
/** Maximum time to hold direct peer connection information */
|
||||||
|
public static final long ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT = 2 * 60 * 1000L; // ms
|
||||||
|
|
||||||
/** Maximum number of hops that an arbitrary signatures request is allowed to make */
|
/** Maximum number of hops that an arbitrary signatures request is allowed to make */
|
||||||
private static int ARBITRARY_SIGNATURES_REQUEST_MAX_HOPS = 3;
|
private static int ARBITRARY_SIGNATURES_REQUEST_MAX_HOPS = 3;
|
||||||
|
|
||||||
|
@ -0,0 +1,34 @@
|
|||||||
|
package org.qortal.data.arbitrary;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ArbitraryDirectConnectionInfo {
|
||||||
|
|
||||||
|
private final byte[] signature;
|
||||||
|
private final String peerAddress;
|
||||||
|
private final List<byte[]> hashes;
|
||||||
|
private final long timestamp;
|
||||||
|
|
||||||
|
public ArbitraryDirectConnectionInfo(byte[] signature, String peerAddress, List<byte[]> hashes, long timestamp) {
|
||||||
|
this.signature = signature;
|
||||||
|
this.peerAddress = peerAddress;
|
||||||
|
this.hashes = hashes;
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getSignature() {
|
||||||
|
return this.signature;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPeerAddress() {
|
||||||
|
return this.peerAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<byte[]> getHashes() {
|
||||||
|
return this.hashes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTimestamp() {
|
||||||
|
return this.timestamp;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user