forked from Qortal/qortal
Added relay mode for file list requests
This reuses most of the code already in place in the core related to forwarding. - A node can opt into relay mode via the "relayModeEnabled": true setting - From this time onwards, they will ask their peers if they ever receive a file list request that they cannot serve by themselves - Whenever a peer responds with a file list, it is forwarded on to the originally requesting peer, complete with the peer address of the node that responded - The original peer can then make a request for the data file(s) themselves using a similar approach, specifying the IP address of the ultimate peer so that the relay node knows who to ask. This part is not implemented yet.
This commit is contained in:
parent
cc297ccfcd
commit
f4b06fb834
@ -732,15 +732,7 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
|
|
||||||
public void onNetworkGetArbitraryDataMessage(Peer peer, Message message) {
|
public void onNetworkGetArbitraryDataMessage(Peer peer, Message message) {
|
||||||
GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message;
|
GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message;
|
||||||
|
|
||||||
byte[] signature = getArbitraryDataMessage.getSignature();
|
byte[] signature = getArbitraryDataMessage.getSignature();
|
||||||
String signature58 = Base58.encode(signature);
|
|
||||||
Long timestamp = NTP.getTime();
|
|
||||||
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, timestamp);
|
|
||||||
|
|
||||||
// If we've seen this request recently, then ignore
|
|
||||||
if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null)
|
|
||||||
return;
|
|
||||||
|
|
||||||
// Do we even have this transaction?
|
// Do we even have this transaction?
|
||||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||||
@ -756,10 +748,6 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
if (data == null)
|
if (data == null)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// Update requests map to reflect that we've sent it
|
|
||||||
newEntry = new Triple<>(signature58, null, timestamp);
|
|
||||||
arbitraryDataFileListRequests.put(message.getId(), newEntry);
|
|
||||||
|
|
||||||
Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data);
|
Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data);
|
||||||
arbitraryDataMessage.setId(message.getId());
|
arbitraryDataMessage.setId(message.getId());
|
||||||
if (!peer.sendMessage(arbitraryDataMessage))
|
if (!peer.sendMessage(arbitraryDataMessage))
|
||||||
@ -777,10 +765,12 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
|
|
||||||
public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) {
|
public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) {
|
||||||
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
|
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
|
||||||
LOGGER.info("Received hash list from peer {} with {} hashes", peer, arbitraryDataFileListMessage.getHashes().size());
|
String sourcePeer = arbitraryDataFileListMessage.getPeerAddress();
|
||||||
|
LOGGER.info("Received hash list from peer {} with {} hashes, source peer: {}", peer, arbitraryDataFileListMessage.getHashes().size(), sourcePeer);
|
||||||
|
|
||||||
// Do we have a pending request for this data?
|
// Do we have a pending request for this data?
|
||||||
Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId());
|
Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId());
|
||||||
|
boolean isRelayRequest = (request.getB() != null);
|
||||||
if (request == null || request.getA() == null) {
|
if (request == null || request.getA() == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -825,22 +815,27 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
Triple<String, Peer, Long> newEntry = new Triple<>(null, null, request.getC());
|
Triple<String, Peer, Long> newEntry = new Triple<>(null, null, request.getC());
|
||||||
arbitraryDataFileListRequests.put(message.getId(), newEntry);
|
arbitraryDataFileListRequests.put(message.getId(), newEntry);
|
||||||
|
|
||||||
// Go and fetch the actual data
|
if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) {
|
||||||
|
// Go and fetch the actual data, since this isn't a relay request
|
||||||
this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes);
|
this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes);
|
||||||
// FUTURE: handle response
|
}
|
||||||
|
|
||||||
} catch (DataException e) {
|
} catch (DataException e) {
|
||||||
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e);
|
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// // Forwarding (not yet used)
|
// Forwarding
|
||||||
// Peer requestingPeer = request.getB();
|
if (isRelayRequest && Settings.getInstance().isRelayModeEnabled()) {
|
||||||
// if (requestingPeer != null) {
|
Peer requestingPeer = request.getB();
|
||||||
// // Forward to requesting peer;
|
if (requestingPeer != null) {
|
||||||
// if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) {
|
// Add the source peer's address
|
||||||
// requestingPeer.disconnect("failed to forward arbitrary data file list");
|
arbitraryDataFileListMessage.setPeerAddress(peer.getPeerData().getAddress().toString());
|
||||||
// }
|
// Forward to requesting peer;
|
||||||
// }
|
if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) {
|
||||||
|
requestingPeer.disconnect("failed to forward arbitrary data file list");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) {
|
public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) {
|
||||||
@ -886,9 +881,18 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) {
|
public void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) {
|
||||||
|
Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet();
|
||||||
|
|
||||||
GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message;
|
GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message;
|
||||||
byte[] signature = getArbitraryDataFileListMessage.getSignature();
|
byte[] signature = getArbitraryDataFileListMessage.getSignature();
|
||||||
Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet();
|
String signature58 = Base58.encode(signature);
|
||||||
|
Long timestamp = NTP.getTime();
|
||||||
|
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, timestamp);
|
||||||
|
|
||||||
|
// If we've seen this request recently, then ignore
|
||||||
|
if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature));
|
LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature));
|
||||||
|
|
||||||
@ -937,13 +941,22 @@ public class ArbitraryDataManager extends Thread {
|
|||||||
LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e);
|
LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
|
// We should only respond if we have at least one hash
|
||||||
|
if (hashes.size() > 0) {
|
||||||
|
|
||||||
|
// Update requests map to reflect that we've sent it
|
||||||
|
newEntry = new Triple<>(signature58, null, timestamp);
|
||||||
|
arbitraryDataFileListRequests.put(message.getId(), newEntry);
|
||||||
|
|
||||||
|
ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, null, hashes);
|
||||||
arbitraryDataFileListMessage.setId(message.getId());
|
arbitraryDataFileListMessage.setId(message.getId());
|
||||||
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
|
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
|
||||||
LOGGER.info("Couldn't send list of hashes");
|
LOGGER.info("Couldn't send list of hashes");
|
||||||
peer.disconnect("failed to send list of hashes");
|
peer.disconnect("failed to send list of hashes");
|
||||||
}
|
}
|
||||||
LOGGER.info("Sent list of hashes (count: {})", hashes.size());
|
LOGGER.info("Sent list of hashes (count: {})", hashes.size());
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onNetworkArbitrarySignaturesMessage(Peer peer, Message message) {
|
public void onNetworkArbitrarySignaturesMessage(Peer peer, Message message) {
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
package org.qortal.network.message;
|
package org.qortal.network.message;
|
||||||
|
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import org.qortal.data.block.BlockSummaryData;
|
import org.qortal.transform.TransformationException;
|
||||||
import org.qortal.transform.Transformer;
|
import org.qortal.transform.Transformer;
|
||||||
import org.qortal.transform.block.BlockTransformer;
|
import org.qortal.utils.Serialization;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -19,18 +19,21 @@ public class ArbitraryDataFileListMessage extends Message {
|
|||||||
|
|
||||||
private final byte[] signature;
|
private final byte[] signature;
|
||||||
private final List<byte[]> hashes;
|
private final List<byte[]> hashes;
|
||||||
|
private String peerAddress;
|
||||||
|
|
||||||
public ArbitraryDataFileListMessage(byte[] signature, List<byte[]> hashes) {
|
public ArbitraryDataFileListMessage(byte[] signature, String peerAddress, List<byte[]> hashes) {
|
||||||
super(MessageType.ARBITRARY_DATA_FILE_LIST);
|
super(MessageType.ARBITRARY_DATA_FILE_LIST);
|
||||||
|
|
||||||
this.signature = signature;
|
this.signature = signature;
|
||||||
|
this.peerAddress = peerAddress;
|
||||||
this.hashes = hashes;
|
this.hashes = hashes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ArbitraryDataFileListMessage(int id, byte[] signature, List<byte[]> hashes) {
|
public ArbitraryDataFileListMessage(int id, byte[] signature, String peerAddress, List<byte[]> hashes) {
|
||||||
super(id, MessageType.ARBITRARY_DATA_FILE_LIST);
|
super(id, MessageType.ARBITRARY_DATA_FILE_LIST);
|
||||||
|
|
||||||
this.signature = signature;
|
this.signature = signature;
|
||||||
|
this.peerAddress = peerAddress;
|
||||||
this.hashes = hashes;
|
this.hashes = hashes;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -42,10 +45,21 @@ public class ArbitraryDataFileListMessage extends Message {
|
|||||||
return this.signature;
|
return this.signature;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
|
public void setPeerAddress(String peerAddress) {
|
||||||
|
this.peerAddress = peerAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPeerAddress() {
|
||||||
|
return this.peerAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException, TransformationException {
|
||||||
byte[] signature = new byte[SIGNATURE_LENGTH];
|
byte[] signature = new byte[SIGNATURE_LENGTH];
|
||||||
bytes.get(signature);
|
bytes.get(signature);
|
||||||
|
|
||||||
|
String peerAddress = Serialization.deserializeSizedString(bytes, 255);
|
||||||
|
|
||||||
int count = bytes.getInt();
|
int count = bytes.getInt();
|
||||||
|
|
||||||
if (bytes.remaining() != count * HASH_LENGTH)
|
if (bytes.remaining() != count * HASH_LENGTH)
|
||||||
@ -59,7 +73,7 @@ public class ArbitraryDataFileListMessage extends Message {
|
|||||||
hashes.add(hash);
|
hashes.add(hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new ArbitraryDataFileListMessage(id, signature, hashes);
|
return new ArbitraryDataFileListMessage(id, signature, peerAddress, hashes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -69,6 +83,8 @@ public class ArbitraryDataFileListMessage extends Message {
|
|||||||
|
|
||||||
bytes.write(this.signature);
|
bytes.write(this.signature);
|
||||||
|
|
||||||
|
Serialization.serializeSizedString(bytes, this.peerAddress);
|
||||||
|
|
||||||
bytes.write(Ints.toByteArray(this.hashes.size()));
|
bytes.write(Ints.toByteArray(this.hashes.size()));
|
||||||
|
|
||||||
for (byte[] hash : this.hashes) {
|
for (byte[] hash : this.hashes) {
|
||||||
@ -82,7 +98,7 @@ public class ArbitraryDataFileListMessage extends Message {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public ArbitraryDataFileListMessage cloneWithNewId(int newId) {
|
public ArbitraryDataFileListMessage cloneWithNewId(int newId) {
|
||||||
ArbitraryDataFileListMessage clone = new ArbitraryDataFileListMessage(this.signature, this.hashes);
|
ArbitraryDataFileListMessage clone = new ArbitraryDataFileListMessage(this.signature, this.peerAddress, this.hashes);
|
||||||
clone.setId(newId);
|
clone.setId(newId);
|
||||||
return clone;
|
return clone;
|
||||||
}
|
}
|
||||||
|
@ -279,6 +279,9 @@ public class Settings {
|
|||||||
/** Storage policy to indicate which data should be hosted */
|
/** Storage policy to indicate which data should be hosted */
|
||||||
private String storagePolicy = "FOLLOWED_AND_VIEWED";
|
private String storagePolicy = "FOLLOWED_AND_VIEWED";
|
||||||
|
|
||||||
|
/** Whether to allow data outside of the storage policy to be relayed between other peers */
|
||||||
|
private boolean relayModeEnabled = false;
|
||||||
|
|
||||||
/** Expiry time (ms) for (unencrypted) built/cached data */
|
/** Expiry time (ms) for (unencrypted) built/cached data */
|
||||||
private Long builtDataExpiryInterval = 30 * 24 * 60 * 60 * 1000L; // 30 days
|
private Long builtDataExpiryInterval = 30 * 24 * 60 * 60 * 1000L; // 30 days
|
||||||
|
|
||||||
@ -828,6 +831,10 @@ public class Settings {
|
|||||||
return StoragePolicy.valueOf(this.storagePolicy);
|
return StoragePolicy.valueOf(this.storagePolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isRelayModeEnabled() {
|
||||||
|
return this.relayModeEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
public Long getBuiltDataExpiryInterval() {
|
public Long getBuiltDataExpiryInterval() {
|
||||||
return this.builtDataExpiryInterval;
|
return this.builtDataExpiryInterval;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user