initial mesh and peer management implementation

This commit is contained in:
Jürg Schulthess 2024-07-09 13:57:32 +02:00
parent 4df05364f5
commit 32460a1b45
6 changed files with 606 additions and 247 deletions

View File

@ -7,12 +7,12 @@
<snapshot> <snapshot>
<localCopy>true</localCopy> <localCopy>true</localCopy>
</snapshot> </snapshot>
<lastUpdated>20240324170649</lastUpdated> <lastUpdated>20240707083116</lastUpdated>
<snapshotVersions> <snapshotVersions>
<snapshotVersion> <snapshotVersion>
<extension>jar</extension> <extension>jar</extension>
<value>1.0-SNAPSHOT</value> <value>1.0-SNAPSHOT</value>
<updated>20240324170649</updated> <updated>20240707083116</updated>
</snapshotVersion> </snapshotVersion>
<snapshotVersion> <snapshotVersion>
<extension>pom</extension> <extension>pom</extension>

View File

@ -6,6 +6,6 @@
<versions> <versions>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</versions> </versions>
<lastUpdated>20240324170649</lastUpdated> <lastUpdated>20240707083116</lastUpdated>
</versioning> </versioning>
</metadata> </metadata>

View File

@ -0,0 +1,23 @@
package org.qortal.network;
public class RNSCommon {
/**
* Destination application name
*/
public static String APP_NAME = "qortal";
/**
* Configuration path relative to the Qortal launch directory
*/
public static String defaultRNSConfigPath = new String(".reticulum");
///**
// * Qortal RNS Destinations
// */
//public enum RNSDestinations {
// BASE,
// QDN;
//}
}

View File

@ -1,97 +1,89 @@
package org.qortal.network; package org.qortal.network;
import java.io.IOException;
//import java.nio.channels.SelectionKey;
//import java.io.Paths;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.io.File;
import java.util.*;
//import java.util.function.BiConsumer;
//import java.util.function.Consumer;
//import java.util.function.Function;
//import java.util.concurrent.*;
//import java.util.concurrent.atomic.AtomicLong;
//import org.qortal.data.network.PeerData;
import org.qortal.repository.DataException;
//import org.qortal.settings.Settings;
import org.qortal.settings.Settings;
//import org.qortal.utils.NTP;
//import com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.commons.codec.binary.Hex;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import io.reticulum.Reticulum; import io.reticulum.Reticulum;
import io.reticulum.Transport; import io.reticulum.Transport;
import io.reticulum.interfaces.ConnectionInterface;
import io.reticulum.destination.Destination; import io.reticulum.destination.Destination;
import io.reticulum.destination.DestinationType; import io.reticulum.destination.DestinationType;
import io.reticulum.destination.Direction; import io.reticulum.destination.Direction;
import io.reticulum.identity.Identity;
import io.reticulum.interfaces.ConnectionInterface;
import io.reticulum.destination.ProofStrategy; import io.reticulum.destination.ProofStrategy;
import io.reticulum.transport.AnnounceHandler; import io.reticulum.identity.Identity;
import static io.reticulum.constant.ReticulumConstant.CONFIG_FILE_NAME;
//import static io.reticulum.identity.IdentityKnownDestination.recall;
//import static io.reticulum.identity.IdentityKnownDestination.recallAppData;
//import static io.reticulum.destination.Direction.OUT;
import lombok.extern.slf4j.Slf4j;
import lombok.Synchronized;
import io.reticulum.link.Link; import io.reticulum.link.Link;
import io.reticulum.link.LinkStatus; import io.reticulum.link.LinkStatus;
//import io.reticulum.packet.PacketReceipt; //import io.reticulum.constant.LinkConstant;
import io.reticulum.packet.Packet; import io.reticulum.packet.Packet;
import io.reticulum.packet.PacketReceipt;
//import static io.reticulum.link.LinkStatus.ACTIVE; import io.reticulum.packet.PacketReceiptStatus;
import static io.reticulum.link.LinkStatus.CLOSED; import io.reticulum.transport.AnnounceHandler;
import static io.reticulum.link.LinkStatus.PENDING; import static io.reticulum.link.TeardownSession.DESTINATION_CLOSED;
import static io.reticulum.link.TeardownSession.INITIATOR_CLOSED;
import static io.reticulum.link.TeardownSession.TIMEOUT;
import static io.reticulum.link.LinkStatus.ACTIVE;
import static io.reticulum.link.LinkStatus.STALE; import static io.reticulum.link.LinkStatus.STALE;
import static io.reticulum.link.LinkStatus.PENDING;
import static io.reticulum.link.LinkStatus.HANDSHAKE;
//import static io.reticulum.packet.PacketContextType.LINKCLOSE;
import static io.reticulum.identity.IdentityKnownDestination.recall;
import static io.reticulum.utils.IdentityUtils.concatArrays;
//import static io.reticulum.constant.ReticulumConstant.TRUNCATED_HASHLENGTH;
import static io.reticulum.constant.ReticulumConstant.CONFIG_FILE_NAME;
import lombok.extern.slf4j.Slf4j;
import lombok.Data;
//import lombok.Setter;
//import lombok.Getter;
import lombok.Synchronized;
import org.qortal.repository.DataException;
import org.qortal.settings.Settings;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardCopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
//import static java.util.Objects.isNull; //import static java.util.Objects.isNull;
//import static java.util.Objects.isNull;
import static java.util.Objects.nonNull; import static java.util.Objects.nonNull;
//import static org.apache.commons.lang3.BooleanUtils.isFalse;
//import org.qortal.network.Network.NetworkProcessor; import java.io.File;
//import org.qortal.utils.ExecuteProduceConsume; import java.util.Arrays;
//import org.qortal.utils.NamedThreadFactory; import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
//import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
//import java.time.Instant; import org.apache.commons.codec.binary.Hex;
//import org.qortal.network.RNSPeer;
@Data
@Slf4j @Slf4j
public class RNSNetwork { public class RNSNetwork {
static final String APP_NAME = "qortal"; Reticulum reticulum;
private Reticulum reticulum; //private static final String APP_NAME = "qortal";
private Identity server_identity; static final String APP_NAME = RNSCommon.APP_NAME;
private Destination baseDestination; // service base (initially: anything node2node) //static final String defaultConfigPath = new String(".reticulum"); // if empty will look in Reticulums default paths
//private Destination dataDestination; // qdn services (eg. files like music, videos etc) static final String defaultConfigPath = RNSCommon.defaultRNSConfigPath;
//private Destination liveDestination; // live/dynamic peer list (eg. video conferencing) //private final String defaultConfigPath = Settings.getInstance().getDefaultRNSConfigPathForReticulum();
// the following should be retrieved from settings private static Integer MAX_PEERS = 12;
private static Integer MAX_PEERS = 3;
private static Integer MIN_DESIRED_PEERS = 3;
//private final Integer MAX_PEERS = Settings.getInstance().getMaxReticulumPeers(); //private final Integer MAX_PEERS = Settings.getInstance().getMaxReticulumPeers();
//private final Integer MIN_DESIRED_PEERS = Settings.getInstance().getMinDesiredReticulumPeers(); private static Integer MIN_DESIRED_PEERS = 3;
static final String defaultConfigPath = new String(".reticulum"); // if empty will look in Reticulums default paths //private final Integer MIN_DESIRED_PEERS = Settings.getInstance().getMinDesiredPeers();
//private final String defaultConfigPath = Settings.getInstance().getDefaultConfigPathForReticulum(); Identity serverIdentity;
public Destination baseDestination;
//private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class);
//private final List<Link> linkedPeers = Collections.synchronizedList(new ArrayList<>());
//private List<Link> immutableLinkedPeers = Collections.emptyList();
private final List<RNSPeer> linkedPeers = Collections.synchronizedList(new ArrayList<>());
//private final ExecuteProduceConsume rnsNetworkEPC;
private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second
private volatile boolean isShuttingDown = false; private volatile boolean isShuttingDown = false;
private int totalThreadCount = 0; private final List<RNSPeer> linkedPeers = Collections.synchronizedList(new ArrayList<>());
private final List<Link> incomingLinks = Collections.synchronizedList(new ArrayList<>());
// TODO: settings - MaxReticulumPeers, MaxRNSNetworkThreadPoolSize (if needed) ////private final ExecuteProduceConsume rnsNetworkEPC;
//private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second
//private volatile boolean isShuttingDown = false;
//private int totalThreadCount = 0;
//// TODO: settings - MaxReticulumPeers, MaxRNSNetworkThreadPoolSize (if needed)
// Constructor // Constructor
private RNSNetwork () { private RNSNetwork () {
@ -112,19 +104,20 @@ public class RNSNetwork {
//rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor); //rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor);
} }
// Note: potentially create persistent server_identity (utility rnid) and load it from file // Note: potentially create persistent serverIdentity (utility rnid) and load it from file
public void start() throws IOException, DataException { public void start() throws IOException, DataException {
// create identity either from file or new (creating new keys) // create identity either from file or new (creating new keys)
var serverIdentityPath = reticulum.getStoragePath().resolve(APP_NAME); var serverIdentityPath = reticulum.getStoragePath().resolve("identities/"+APP_NAME);
if (Files.isReadable(serverIdentityPath)) { if (Files.isReadable(serverIdentityPath)) {
server_identity = Identity.fromFile(serverIdentityPath); serverIdentity = Identity.fromFile(serverIdentityPath);
log.info("server identity loaded from file {}", serverIdentityPath.toString()); log.info("server identity loaded from file {}", serverIdentityPath.toString());
} else { } else {
server_identity = new Identity(); serverIdentity = new Identity();
log.info("APP_NAME: {}, storage path: {}", APP_NAME, serverIdentityPath);
log.info("new server identity created dynamically."); log.info("new server identity created dynamically.");
} }
log.debug("Server Identity: {}", server_identity.toString()); log.debug("Server Identity: {}", serverIdentity.toString());
// show the ifac_size of the configured interfaces (debug code) // show the ifac_size of the configured interfaces (debug code)
for (ConnectionInterface i: Transport.getInstance().getInterfaces() ) { for (ConnectionInterface i: Transport.getInstance().getInterfaces() ) {
@ -132,74 +125,74 @@ public class RNSNetwork {
} }
baseDestination = new Destination( baseDestination = new Destination(
server_identity, serverIdentity,
Direction.IN, Direction.IN,
DestinationType.SINGLE, DestinationType.SINGLE,
APP_NAME, APP_NAME,
"core" "core"
); );
//// ideas for other entry points //// idea for other entry point
//dataDestination = new Destination( //dataDestination = new Destination(
// server_identity, // serverIdentity,
// Direction.IN, // Direction.IN,
// DestinationType.SINGLE, // DestinationType.SINGLE,
// APP_NAME, // APP_NAME,
// "core", // "core",
// "qdn" // "qdn"
//); //);
//liveDestination = new Destination(
// server_identity,
// Direction.IN,
// DestinationType.SINGLE,
// APP_NAME,
// "core",
// "live"
//);
log.info("Destination "+Hex.encodeHexString(baseDestination.getHash())+" "+baseDestination.getName()+" running."); log.info("Destination "+Hex.encodeHexString(baseDestination.getHash())+" "+baseDestination.getName()+" running.");
//log.info("Destination "+Hex.encodeHexString(dataDestination.getHash())+" "+dataDestination.getName()+" running.");
baseDestination.setProofStrategy(ProofStrategy.PROVE_ALL); baseDestination.setProofStrategy(ProofStrategy.PROVE_ALL);
//dataDestination.setProofStrategy(ProofStrategy.PROVE_ALL);
baseDestination.setAcceptLinkRequests(true); baseDestination.setAcceptLinkRequests(true);
//dataDestination.setAcceptLinkRequests(true);
//baseDestination.setLinkEstablishedCallback(this::linkExtabishedCallback); baseDestination.setLinkEstablishedCallback(this::clientConnected);
baseDestination.setPacketCallback(this::packetCallback);
//baseDestination.setPacketCallback((message, packet) -> {
// log.info("xyz - Message raw {}", message);
// log.info("xyz - Packet {}", packet.toString());
//});
Transport.getInstance().registerAnnounceHandler(new QAnnounceHandler()); Transport.getInstance().registerAnnounceHandler(new QAnnounceHandler());
log.info("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers()); log.debug("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers());
// do a first announce
baseDestination.announce(); baseDestination.announce();
//dataDestination.announce(); log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName());
log.info("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName());
// Start up first networking thread (the "server loop") // Start up first networking thread (the "server loop")
//rnsNetworkEPC.start(); //rnsNetworkEPC.start();
} }
//@Synchronized
public void shutdown() { public void shutdown() {
isShuttingDown = true; isShuttingDown = true;
log.info("shutting down Reticulum"); log.info("shutting down Reticulum");
// Stop processing threads (the "server loop") // Stop processing threads (the "server loop")
//try { //try {
// if (!this.rnsNetworkEPC.shutdown(5000)) { // if (!this.rnsNetworkEPC.shutdown(5000)) {
// logger.warn("Network threads failed to terminate"); // logger.warn("RNSNetwork threads failed to terminate");
// } // }
//} catch (InterruptedException e) { //} catch (InterruptedException e) {
// logger.warn("Interrupted while waiting for networking threads to terminate"); // logger.warn("Interrupted while waiting for RNS networking threads to terminate");
//} //}
// Disconnect peers and terminate Reticulum // Disconnect peers gracefully and terminate Reticulum
for (RNSPeer p : linkedPeers) { for (RNSPeer p: linkedPeers) {
if (nonNull(p.getLink())) { log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash()));
p.getLink().teardown(); log.debug("peer: {}", p);
p.shutdown();
try {
TimeUnit.SECONDS.sleep(1); // allow for peers to disconnect gracefully
} catch (InterruptedException e) {
log.error("exception: {}", e);
} }
} }
// gracefully close links of peers that point to us
for (Link l: incomingLinks) {
var data = concatArrays("close::".getBytes(UTF_8),l.getDestination().getHash());
Packet closePacket = new Packet(l, data);
var packetReceipt = closePacket.send();
packetReceipt.setTimeout(3L);
packetReceipt.setDeliveryCallback(this::closePacketDelivered);
packetReceipt.setTimeoutCallback(this::packetTimedOut);
}
// Note: we still need to get the packet timeout callback to work...
reticulum.exitHandler(); reticulum.exitHandler();
} }
@ -217,29 +210,93 @@ public class RNSNetwork {
} }
} }
private void packetCallback(byte[] message, Packet packet) { public void closePacketDelivered(PacketReceipt receipt) {
log.info("xyz - Message raw {}", message); var rttString = new String("");
log.info("xyz - Packet {}", packet.toString()); if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) {
var rtt = receipt.getRtt(); // rtt (Java) is in miliseconds
//log.info("qqp - packetDelivered - rtt: {}", rtt);
if (rtt >= 1000) {
rtt = Math.round(rtt / 1000);
rttString = String.format("%d seconds", rtt);
} else {
rttString = String.format("%d miliseconds", rtt);
}
log.info("Shutdown packet confirmation received from {}, round-trip time is {}",
Hex.encodeHexString(receipt.getDestination().getHash()), rttString);
}
}
public void packetTimedOut(PacketReceipt receipt) {
log.info("packet timed out");
if (receipt.getStatus() == PacketReceiptStatus.FAILED) {
log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED);
}
}
public void clientConnected(Link link) {
link.setLinkClosedCallback(this::clientDisconnected);
link.setPacketCallback(this::serverPacketReceived);
var peer = findPeerByLink(link);
if (nonNull(peer)) {
log.info("initiator peer {} opened link (link lookup: {}), link destination hash: {}",
Hex.encodeHexString(peer.getDestinationHash()), link, Hex.encodeHexString(link.getDestination().getHash()));
// make sure the peerLink is actvive.
peer.getOrInitPeerLink();
} else {
log.info("non-initiator closed link (link lookup: {}), link destination hash (initiator): {}",
peer, link, Hex.encodeHexString(link.getDestination().getHash()));
}
incomingLinks.add(link);
log.info("***> Client connected, link: {}", link);
}
public void clientDisconnected(Link link) {
var peer = findPeerByLink(link);
if (nonNull(peer)) {
log.info("initiator peer {} closed link (link lookup: {}), link destination hash: {}",
Hex.encodeHexString(peer.getDestinationHash()), link, Hex.encodeHexString(link.getDestination().getHash()));
} else {
log.info("non-initiator closed link (link lookup: {}), link destination hash (initiator): {}",
peer, link, Hex.encodeHexString(link.getDestination().getHash()));
}
// if we have a peer pointing to that destination, we can close and remove it
peer = findPeerByDestinationHash(link.getDestination().getHash());
if (nonNull(peer)) {
// Note: no shutdown as the remobe peer could be only rebooting.
// keep it to reopen link later if possible.
peer.getPeerLink().teardown();
}
incomingLinks.remove(link);
log.info("***> Client disconnected");
}
public void serverPacketReceived(byte[] message, Packet packet) {
var msgText = new String(message, StandardCharsets.UTF_8);
log.info("Received data on link - message: {}, destinationHash: {}", msgText, Hex.encodeHexString(packet.getDestinationHash()));
//var peer = findPeerByDestinationHash(packet.getDestinationHash());
//if (msgText.equals("ping")) {
// log.info("received ping");
// //if (nonNull(peer)) {
// // String replyText = "pong";
// // byte[] replyData = replyText.getBytes(StandardCharsets.UTF_8);
// // Packet reply = new Packet(peer.getPeerLink(), replyData);
// //}
//}
//if (msgText.equals("shutdown")) {
// log.info("shutdown packet received");
// var link = recall(packet.getDestinationHash());
// log.info("recalled destinationHash: {}", link);
// //...
//}
// TODO: process packet....
} }
//public void announceBaseDestination () { //public void announceBaseDestination () {
// getBaseDestination().announce(); // getBaseDestination().announce();
//} //}
//public Consumer<Link> clientConnected(Link link) {
// log.info("Client connected");
// link.setLinkClosedCallback(clientDisconnected(link));
// link.setPacketCallback(null);
//}
//public void clientDisconnected(Link link) {
// log.info("Client disconnected");
// linkedPeers.remove(link);
//}
// client part
//@Slf4j //@Slf4j
private static class QAnnounceHandler implements AnnounceHandler { private class QAnnounceHandler implements AnnounceHandler {
@Override @Override
public String getAspectFilter() { public String getAspectFilter() {
// handle all announces // handle all announces
@ -250,10 +307,9 @@ public class RNSNetwork {
@Synchronized @Synchronized
public void receivedAnnounce(byte[] destinationHash, Identity announcedIdentity, byte[] appData) { public void receivedAnnounce(byte[] destinationHash, Identity announcedIdentity, byte[] appData) {
var peerExists = false; var peerExists = false;
var activePeerCount = 0;
log.info("Received an announce from {}", Hex.encodeHexString(destinationHash)); log.info("Received an announce from {}", Hex.encodeHexString(destinationHash));
//log.info("aspect: {}", getAspectFilter());
//log.info("destinationhash: {}, announcedIdentity: {}, appData: {}", destinationHash, announcedIdentity, appData);
if (nonNull(appData)) { if (nonNull(appData)) {
log.debug("The announce contained the following app data: {}", new String(appData, UTF_8)); log.debug("The announce contained the following app data: {}", new String(appData, UTF_8));
@ -261,29 +317,48 @@ public class RNSNetwork {
// add to peer list if we can use more peers // add to peer list if we can use more peers
//synchronized (this) { //synchronized (this) {
List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers(); var lps = RNSNetwork.getInstance().getLinkedPeers();
if (lps.size() < MAX_PEERS) { for (RNSPeer p: lps) {
for (RNSPeer p : lps) { var pl = p.getPeerLink();
//log.info("peer exists: hash: {}, destinationHash: {}", p.getDestinationLink().getDestination().getHash(), destinationHash); if ((nonNull(pl) && (pl.getStatus() == ACTIVE))) {
if (Arrays.equals(p.getDestinationLink().getDestination().getHash(), destinationHash)) { activePeerCount = activePeerCount + 1;
peerExists = true; }
log.debug("peer exists: hash: {}, destinationHash: {}", p.getDestinationLink().getDestination().getHash(), destinationHash); }
break; if (activePeerCount < MAX_PEERS) {
//if (!peerExists) {
//var peer = findPeerByDestinationHash(destinationHash);
for (RNSPeer p: lps) {
if (Arrays.equals(p.getDestinationHash(), destinationHash)) {
log.info("QAnnounceHandler - peer exists - found peer matching destinationHash");
if (nonNull(p.getPeerLink())) {
log.info("peer link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus());
}
peerExists = true;
if (p.getPeerLink().getStatus() != ACTIVE) {
p.getOrInitPeerLink();
}
break;
} else {
if (nonNull(p.getPeerLink())) {
log.info("QAnnounceHandler - other peer - link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus());
} else {
log.info("QAnnounceHandler - null link peer - link: {}", p.getPeerLink());
} }
} }
if (!peerExists) {
//log.info("announce handler - cerate new peer: **announcedIdentity**: {}, **recall**: {}", announcedIdentity, recall(destinationHash));
RNSPeer newPeer = new RNSPeer(destinationHash);
lps.add(newPeer);
log.info("added new RNSPeer, Destination - {}, Link: {}", newPeer.getDestinationHash(), newPeer.getDestinationLink());
}
} }
if (!peerExists) {
RNSPeer newPeer = new RNSPeer(destinationHash);
newPeer.setServerIdentity(announcedIdentity);
newPeer.setIsInitiator(true);
lps.add(newPeer);
log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash));
}
}
//} //}
} }
} }
// Main thread // Main thread
//class RNSNetworkProcessor extends ExecuteProduceConsume { //class RNSNetworkProcessor extends ExecuteProduceConsume {
// //
// //private final Logger logger = LoggerFactory.getLogger(RNSNetworkProcessor.class); // //private final Logger logger = LoggerFactory.getLogger(RNSNetworkProcessor.class);
@ -348,6 +423,14 @@ public class RNSNetwork {
return SingletonContainer.INSTANCE; return SingletonContainer.INSTANCE;
} }
public Identity getServerIdentity() {
return this.serverIdentity;
}
public Reticulum getReticulum() {
return this.reticulum;
}
public List<RNSPeer> getLinkedPeers() { public List<RNSPeer> getLinkedPeers() {
synchronized(this.linkedPeers) { synchronized(this.linkedPeers) {
//return new ArrayList<>(this.linkedPeers); //return new ArrayList<>(this.linkedPeers);
@ -366,60 +449,145 @@ public class RNSNetwork {
} }
// maintenance // maintenance
//public void removePeer(RNSPeer peer) {
//private static class AnnounceTimer { // synchronized(this) {
// //public void main(String[] args) throws InterruptedException // List<RNSPeer> peerList = this.linkedPeers;
// public void main(String[] args) throws InterruptedException // log.info("removing peer {} on peer shutdown", peer);
// { // peerList.remove(peer);
// Timer timer = new Timer(); // }
// // run timer every 10s (10000ms) //}
// timer.schedule(new TimerTask() {
// @Override //public void pingPeer(RNSPeer peer) {
// public void run() { // if (nonNull(peer)) {
// System.out.println("AnnounceTimer: " + new java.util.Date()); // peer.pingRemote();
// } // } else {
// }, 0, 10000); // log.error("peer argument is null");
// } // }
//} //}
@Synchronized @Synchronized
public void prunePeers() throws DataException { public void prunePeers() throws DataException {
// run periodically (by the Controller) // run periodically (by the Controller)
//log.info("Peer list (linkedPeers): {}",this.linkedPeers.toString()); //List<Link> linkList = getLinkedPeers();
//synchronized(this) { var peerList = getLinkedPeers();
//List<Link> linkList = getLinkedPeers(); log.info("number of links (linkedPeers) before prunig: {}", peerList.size());
List<RNSPeer> peerList = this.linkedPeers; Link pLink;
log.info("List of RNSPeers: {}", this.linkedPeers); LinkStatus lStatus;
//log.info("number of links (linkedPeers) before prunig: {}", this.linkedPeers.size()); for (RNSPeer p: peerList) {
Link pLink; pLink = p.getPeerLink();
LinkStatus lStatus; log.info("prunePeers - pLink: {}, destinationHash: {}",
for (RNSPeer p: peerList) { pLink, Hex.encodeHexString(p.getDestinationHash()));
pLink = p.getLink(); log.debug("peer: {}", p);
lStatus = pLink.getStatus(); if (nonNull(pLink)) {
//log.debug("link status: "+lStatus.toString()); if (p.getPeerTimedOut()) {
// lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED // close peer link for now
if (lStatus == CLOSED) {
p.resetPeer();
peerList.remove(p);
} else if (lStatus == STALE) {
pLink.teardown(); pLink.teardown();
p.resetPeer(); }
lStatus = pLink.getStatus();
log.info("Link {} status: {}", pLink, lStatus);
// lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED
if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (p.getDeleteMe())) {
p.shutdown();
peerList.remove(p);
} else if (lStatus == HANDSHAKE) {
// stuck in handshake state (do we need to shutdown/remove it?)
log.info("peer status HANDSHAKE");
p.shutdown();
peerList.remove(p); peerList.remove(p);
} else if (lStatus == PENDING) {
log.info("prunePeers - link state still {}", lStatus);
// TODO: can we help the Link along somehow?
} }
} }
log.info("number of links (linkedPeers) after prunig: {}", this.linkedPeers.size()); }
//} //removeExpiredPeers(this.linkedPeers);
log.info("number of links (linkedPeers) after prunig: {}", peerList.size());
log.info("we have {} non-initiator links, list: {}", incomingLinks.size(), incomingLinks);
maybeAnnounce(getBaseDestination()); maybeAnnounce(getBaseDestination());
} }
//public void removeExpiredPeers(List<RNSPeer> peerList) {
// //List<RNSPeer> peerList = this.linkedPeers;
// for (RNSPeer p: peerList) {
// if (p.getPeerLink() == null) {
// peerList.remove(p);
// } else if (p.getPeerLink().getStatus() == STALE) {
// peerList.remove(p);
// }
// }
//}
public void maybeAnnounce(Destination d) { public void maybeAnnounce(Destination d) {
if (getLinkedPeers().size() < MIN_DESIRED_PEERS) { if (getLinkedPeers().size() < MIN_DESIRED_PEERS) {
d.announce(); d.announce();
} }
} }
/**
* Helper methods
*/
//@Synchronized
//public RNSPeer getPeerIfExists(RNSPeer peer) {
// List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers();
// RNSPeer result = null;
// for (RNSPeer p: lps) {
// if (nonNull(p.getDestinationHash()) && Arrays.equals(p.getDestinationHash(), peer.getDestinationHash())) {
// log.info("found match by destinationHash");
// result = p;
// //break;
// }
// if (nonNull(p.getPeerDestinationHash()) && Arrays.equals(p.getPeerDestinationHash(), peer.getPeerDestinationHash())) {
// log.info("found match by peerDestinationHash");
// result = p;
// //break;
// }
// if (nonNull(p.getPeerBaseDestinationHash()) && Arrays.equals(p.getPeerBaseDestinationHash(), peer.getPeerBaseDestinationHash())) {
// log.info("found match by peerBaseDestinationHash");
// result = p;
// //break;
// }
// if (nonNull(p.getRemoteTestHash()) && Arrays.equals(p.getRemoteTestHash(), peer.getRemoteTestHash())) {
// log.info("found match by remoteTestHash");
// result = p;
// //break;
// }
// }
// return result;
//}
public RNSPeer findPeerByLink(Link link) {
List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers();
RNSPeer peer = null;
for (RNSPeer p : lps) {
var pLink = p.getPeerLink();
if (nonNull(pLink)) {
if (Arrays.equals(pLink.getDestination().getHash(),link.getDestination().getHash())) {
log.info("found peer matching destinationHash: {}", Hex.encodeHexString(link.getDestination().getHash()));
peer = p;
break;
}
}
}
return peer;
}
public RNSPeer findPeerByDestinationHash(byte[] dhash) {
List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers();
RNSPeer peer = null;
for (RNSPeer p : lps) {
if (Arrays.equals(p.getDestinationHash(), dhash)) {
log.info("found peer matching destinationHash: {}", Hex.encodeHexString(dhash));
peer = p;
break;
}
}
return peer;
}
public void removePeer(RNSPeer peer) {
List<RNSPeer> peerList = this.linkedPeers;
if (nonNull(peer)) {
peerList.remove(peer);
}
}
} }

View File

@ -4,18 +4,38 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static java.util.Objects.isNull; import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import io.reticulum.Reticulum;
import org.qortal.network.RNSNetwork; import org.qortal.network.RNSNetwork;
import io.reticulum.link.Link; import io.reticulum.link.Link;
import io.reticulum.link.RequestReceipt;
import io.reticulum.packet.PacketReceiptStatus;
import io.reticulum.packet.Packet; import io.reticulum.packet.Packet;
import io.reticulum.packet.PacketReceipt;
import io.reticulum.identity.Identity; import io.reticulum.identity.Identity;
import io.reticulum.channel.Channel; import io.reticulum.channel.Channel;
import io.reticulum.destination.Destination; import io.reticulum.destination.Destination;
import io.reticulum.destination.DestinationType; import io.reticulum.destination.DestinationType;
import io.reticulum.destination.Direction; import io.reticulum.destination.Direction;
import io.reticulum.destination.ProofStrategy;
import io.reticulum.resource.Resource;
import static io.reticulum.link.TeardownSession.INITIATOR_CLOSED;
import static io.reticulum.link.TeardownSession.DESTINATION_CLOSED;
import static io.reticulum.link.TeardownSession.TIMEOUT;
import static io.reticulum.link.LinkStatus.ACTIVE;
import static io.reticulum.link.LinkStatus.CLOSED;
import static io.reticulum.identity.IdentityKnownDestination.recall; import static io.reticulum.identity.IdentityKnownDestination.recall;
//import static io.reticulum.identity.IdentityKnownDestination.recallAppData; //import static io.reticulum.identity.IdentityKnownDestination.recallAppData;
import java.nio.charset.StandardCharsets;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.commons.codec.binary.Hex;
import static org.apache.commons.lang3.ArrayUtils.subarray;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import lombok.Setter; import lombok.Setter;
import lombok.Data; import lombok.Data;
@ -25,86 +45,234 @@ import lombok.AccessLevel;
@Slf4j @Slf4j
public class RNSPeer { public class RNSPeer {
private byte[] destinationHash; //static final String APP_NAME = "qortal";
private Link destinationLink; static final String APP_NAME = RNSCommon.APP_NAME;
private Identity destinationIdentity; //static final String defaultConfigPath = new String(".reticulum");
@Setter(AccessLevel.PACKAGE) private long creationTimestamp; static final String defaultConfigPath = RNSCommon.defaultRNSConfigPath;
private Long lastAccessTimestamp;
// constructors private byte[] destinationHash; // remote destination hash
public RNSPeer (byte[] dhash) { Destination peerDestination; // OUT destination created for this
this.destinationHash = dhash; private Identity serverIdentity;
this.destinationIdentity = recall(dhash); @Setter(AccessLevel.PACKAGE) private Instant creationTimestamp;
Link newLink = new Link( private Instant lastAccessTimestamp;
new Destination( Link peerLink;
this.destinationIdentity, private Boolean isInitiator;
Direction.OUT, private Boolean deleteMe = false;
DestinationType.SINGLE,
RNSNetwork.APP_NAME, private Double requestResponseProgress;
"core" @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false;
)
public RNSPeer(byte[] dhash) {
destinationHash = dhash;
serverIdentity = recall(dhash);
initPeerLink();
//setCreationTimestamp(System.currentTimeMillis());
creationTimestamp = Instant.now();
}
public void initPeerLink() {
peerDestination = new Destination(
this.serverIdentity,
Direction.OUT,
DestinationType.SINGLE,
RNSNetwork.APP_NAME,
"core"
); );
this.destinationLink = newLink; peerDestination.setProofStrategy(ProofStrategy.PROVE_ALL);
destinationLink.setPacketCallback(this::packetCallback);
lastAccessTimestamp = Instant.now();
isInitiator = true;
peerLink = new Link(peerDestination);
this.peerLink.setLinkEstablishedCallback(this::linkEstablished);
this.peerLink.setLinkClosedCallback(this::linkClosed);
this.peerLink.setPacketCallback(this::linkPacketReceived);
} }
public RNSPeer (Link newLink) { public Link getOrInitPeerLink() {
this.destinationHash = newLink.getDestination().getHash(); if (this.peerLink.getStatus() == ACTIVE) {
this.destinationLink = newLink; lastAccessTimestamp = Instant.now();
this.destinationIdentity = newLink.getRemoteIdentity(); return this.peerLink;
setCreationTimestamp(System.currentTimeMillis()); } else {
this.lastAccessTimestamp = null; initPeerLink();
destinationLink.setPacketCallback(this::packetCallback);
}
public RNSPeer () {
this.destinationHash = null;
this.destinationLink = null;
this.destinationIdentity = null;
setCreationTimestamp(System.currentTimeMillis());
this.lastAccessTimestamp = null;
}
// utilities (change Link type, call tasks, ...)
//...
private void packetCallback(byte[] message, Packet packet) {
log.debug("Message raw {}", message);
log.debug("Packet {}", packet.toString());
// ...
}
public Link getLink() {
if (isNull(getDestinationLink())) {
Link newLink = new Link(
new Destination(
this.destinationIdentity,
Direction.OUT,
DestinationType.SINGLE,
RNSNetwork.APP_NAME,
"core"
)
);
this.destinationLink = newLink;
return newLink;
} }
return getDestinationLink(); return this.peerLink;
}
public void shutdown() {
if (nonNull(peerLink)) {
log.info("shutdown - peerLink: {}, status: {}", peerLink, peerLink.getStatus());
if (peerLink.getStatus() == ACTIVE) {
peerLink.teardown();
}
this.peerLink = null;
}
this.deleteMe = true;
} }
public Channel getChannel() { public Channel getChannel() {
if (isNull(getDestinationLink())) { if (isNull(getPeerLink())) {
log.warn("link is null."); log.warn("link is null.");
return null; return null;
} }
setLastAccessTimestamp(System.currentTimeMillis()); setLastAccessTimestamp(Instant.now());
return getDestinationLink().getChannel(); return getPeerLink().getChannel();
} }
public void resetPeer () { /** Link callbacks */
this.destinationHash = null; public void linkEstablished(Link link) {
this.destinationLink = null; link.setLinkClosedCallback(this::linkClosed);
this.destinationIdentity = null; log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}",
this.lastAccessTimestamp = null; peerLink, link, Hex.encodeHexString(destinationHash),
Hex.encodeHexString(link.getDestination().getHash()));
} }
} public void linkClosed(Link link) {
if (link.getTeardownReason() == TIMEOUT) {
log.info("The link timed out");
this.peerTimedOut = true;
} else if (link.getTeardownReason() == INITIATOR_CLOSED) {
log.info("Link closed callback: The initiator closed the link");
log.info("peerLink {} closed (link: {}), link destination hash: {}",
peerLink, link, Hex.encodeHexString(link.getDestination().getHash()));
} else if (link.getTeardownReason() == DESTINATION_CLOSED) {
log.info("Link closed callback: The link was closed by the peer, removing peer");
log.info("peerLink {} closed (link: {}), link destination hash: {}",
peerLink, link, Hex.encodeHexString(link.getDestination().getHash()));
} else {
log.info("Link closed callback");
}
}
public void linkPacketReceived(byte[] message, Packet packet) {
var msgText = new String(message, StandardCharsets.UTF_8);
if (msgText.equals("ping")) {
log.info("received ping on link");
} else if (msgText.startsWith("close::")) {
var targetPeerHash = subarray(message, 7, message.length);
log.info("peer dest hash: {}, target hash: {}",
Hex.encodeHexString(destinationHash),
Hex.encodeHexString(targetPeerHash));
if (Arrays.equals(destinationHash, targetPeerHash)) {
log.info("closing link: {}", peerLink.getDestination().getHexHash());
peerLink.teardown();
}
} else if (msgText.startsWith("open::")) {
var targetPeerHash = subarray(message, 7, message.length);
log.info("peer dest hash: {}, target hash: {}",
Hex.encodeHexString(destinationHash),
Hex.encodeHexString(targetPeerHash));
if (Arrays.equals(destinationHash, targetPeerHash)) {
log.info("closing link: {}", peerLink.getDestination().getHexHash());
getOrInitPeerLink();
}
}
// TODO: process incoming packet....
}
/** PacketReceipt callbacks */
public void packetDelivered(PacketReceipt receipt) {
var rttString = new String("");
//log.info("packet delivered callback, receipt: {}", receipt);
if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) {
var rtt = receipt.getRtt(); // rtt (Java) is in miliseconds
//log.info("qqp - packetDelivered - rtt: {}", rtt);
if (rtt >= 1000) {
rtt = Math.round(rtt / 1000);
rttString = String.format("%d seconds", rtt);
} else {
rttString = String.format("%d miliseconds", rtt);
}
log.info("Valid reply received from {}, round-trip time is {}",
Hex.encodeHexString(receipt.getDestination().getHash()), rttString);
}
}
public void packetTimedOut(PacketReceipt receipt) {
log.info("packet timed out");
if (receipt.getStatus() == PacketReceiptStatus.FAILED) {
log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED);
this.peerTimedOut = true;
peerLink.teardown();
//this.deleteMe = true;
}
}
/** Link Request callbacks */
public void linkRequestResponseReceived(RequestReceipt rr) {
log.info("Response received");
}
public void linkRequestResponseProgress(RequestReceipt rr) {
this.requestResponseProgress = rr.getProgress();
log.debug("Response progress set");
}
public void linkRequestFailed(RequestReceipt rr) {
log.error("Request failed");
}
/** Link Resource callbacks */
// Resource: allow arbitrary amounts of data to be passed over a link with
// sequencing, compression, coordination and checksumming handled automatically
//public Boolean linkResourceAdvertised(Resource resource) {
// log.debug("Resource advertised");
//}
public void linkResourceTransferStarted(Resource resource) {
log.debug("Resource transfer started");
}
public void linkResourceTransferComcluded(Resource resource) {
log.debug("Resource transfer complete");
}
/** Utility methods */
public void pingRemote() {
var link = this.peerLink;
log.info("pinging remote: {}", link);
var data = "ping".getBytes(UTF_8);
link.setPacketCallback(this::linkPacketReceived);
Packet pingPacket = new Packet(link, data);
PacketReceipt packetReceipt = pingPacket.send();
//packetReceipt.setTimeout(3L);
packetReceipt.setTimeoutCallback(this::packetTimedOut);
packetReceipt.setDeliveryCallback(this::packetDelivered);
}
//public void shutdownLink(Link link) {
// var data = "shutdown".getBytes(UTF_8);
// Packet shutdownPacket = new Packet(link, data);
// PacketReceipt packetReceipt = shutdownPacket.send();
// packetReceipt.setTimeout(2000L);
// packetReceipt.setTimeoutCallback(this::packetTimedOut);
// packetReceipt.setDeliveryCallback(this::shutdownPacketDelivered);
//}
///** check if a link is available (ACTIVE)
// * link: a certain peer link, or null (default link == link to Qortal node RNS baseDestination)
// */
//public Boolean peerLinkIsAlive(Link link) {
// var result = false;
// if (isNull(link)) {
// // default link
// var defaultLink = getLink();
// if (nonNull(defaultLink) && defaultLink.getStatus() == ACTIVE) {
// result = true;
// log.info("Default link is available");
// } else {
// log.info("Default link {} is not available, status: {}", defaultLink, defaultLink.getStatus());
// }
// } else {
// // other link (future where we have multiple destinations...)
// if (link.getStatus() == ACTIVE) {
// result = true;
// log.info("Link {} is available (status: {})", link, link.getStatus());
// } else {
// log.info("Link {} is not available, status: {}", link, link.getStatus());
// }
// }
// return result;
//}
}