diff --git a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/maven-metadata-local.xml b/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/maven-metadata-local.xml index 111dbbcc..ddc85eb7 100644 --- a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/maven-metadata-local.xml +++ b/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/maven-metadata-local.xml @@ -7,12 +7,12 @@ true - 20240324170649 + 20240707083116 jar 1.0-SNAPSHOT - 20240324170649 + 20240707083116 pom diff --git a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.jar b/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.jar index 7612e6ad..a7473ddf 100644 Binary files a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.jar and b/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.jar differ diff --git a/lib/io/reticulum/reticulum-network-stack/maven-metadata-local.xml b/lib/io/reticulum/reticulum-network-stack/maven-metadata-local.xml index 6ff327fb..96f17d67 100644 --- a/lib/io/reticulum/reticulum-network-stack/maven-metadata-local.xml +++ b/lib/io/reticulum/reticulum-network-stack/maven-metadata-local.xml @@ -6,6 +6,6 @@ 1.0-SNAPSHOT - 20240324170649 + 20240707083116 diff --git a/src/main/java/org/qortal/network/RNSCommon.java b/src/main/java/org/qortal/network/RNSCommon.java new file mode 100644 index 00000000..a43afb2a --- /dev/null +++ b/src/main/java/org/qortal/network/RNSCommon.java @@ -0,0 +1,23 @@ +package org.qortal.network; + +public class RNSCommon { + + /** + * Destination application name + */ + public static String APP_NAME = "qortal"; + + /** + * Configuration path relative to the Qortal launch directory + */ + public static String defaultRNSConfigPath = new String(".reticulum"); + + ///** + // * Qortal RNS Destinations + // */ + //public enum RNSDestinations { + // BASE, + // QDN; + //} + +} diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 2472c891..0700ae7a 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -1,97 +1,89 @@ package org.qortal.network; -import java.io.IOException; -//import java.nio.channels.SelectionKey; -//import java.io.Paths; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.io.File; -import java.util.*; -//import java.util.function.BiConsumer; -//import java.util.function.Consumer; -//import java.util.function.Function; -//import java.util.concurrent.*; -//import java.util.concurrent.atomic.AtomicLong; - -//import org.qortal.data.network.PeerData; -import org.qortal.repository.DataException; -//import org.qortal.settings.Settings; -import org.qortal.settings.Settings; -//import org.qortal.utils.NTP; - -//import com.fasterxml.jackson.annotation.JsonGetter; - -import org.apache.commons.codec.binary.Hex; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; import io.reticulum.Reticulum; import io.reticulum.Transport; +import io.reticulum.interfaces.ConnectionInterface; import io.reticulum.destination.Destination; import io.reticulum.destination.DestinationType; import io.reticulum.destination.Direction; -import io.reticulum.identity.Identity; -import io.reticulum.interfaces.ConnectionInterface; import io.reticulum.destination.ProofStrategy; -import io.reticulum.transport.AnnounceHandler; -import static io.reticulum.constant.ReticulumConstant.CONFIG_FILE_NAME; -//import static io.reticulum.identity.IdentityKnownDestination.recall; -//import static io.reticulum.identity.IdentityKnownDestination.recallAppData; -//import static io.reticulum.destination.Direction.OUT; - -import lombok.extern.slf4j.Slf4j; -import lombok.Synchronized; +import io.reticulum.identity.Identity; import io.reticulum.link.Link; import io.reticulum.link.LinkStatus; -//import io.reticulum.packet.PacketReceipt; +//import io.reticulum.constant.LinkConstant; import io.reticulum.packet.Packet; - -//import static io.reticulum.link.LinkStatus.ACTIVE; -import static io.reticulum.link.LinkStatus.CLOSED; -import static io.reticulum.link.LinkStatus.PENDING; +import io.reticulum.packet.PacketReceipt; +import io.reticulum.packet.PacketReceiptStatus; +import io.reticulum.transport.AnnounceHandler; +import static io.reticulum.link.TeardownSession.DESTINATION_CLOSED; +import static io.reticulum.link.TeardownSession.INITIATOR_CLOSED; +import static io.reticulum.link.TeardownSession.TIMEOUT; +import static io.reticulum.link.LinkStatus.ACTIVE; import static io.reticulum.link.LinkStatus.STALE; +import static io.reticulum.link.LinkStatus.PENDING; +import static io.reticulum.link.LinkStatus.HANDSHAKE; +//import static io.reticulum.packet.PacketContextType.LINKCLOSE; +import static io.reticulum.identity.IdentityKnownDestination.recall; +import static io.reticulum.utils.IdentityUtils.concatArrays; +//import static io.reticulum.constant.ReticulumConstant.TRUNCATED_HASHLENGTH; +import static io.reticulum.constant.ReticulumConstant.CONFIG_FILE_NAME; +import lombok.extern.slf4j.Slf4j; +import lombok.Data; +//import lombok.Setter; +//import lombok.Getter; +import lombok.Synchronized; + +import org.qortal.repository.DataException; +import org.qortal.settings.Settings; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.StandardCopyOption; +import java.nio.file.Files; +import java.nio.file.Path; import static java.nio.charset.StandardCharsets.UTF_8; //import static java.util.Objects.isNull; +//import static java.util.Objects.isNull; import static java.util.Objects.nonNull; +//import static org.apache.commons.lang3.BooleanUtils.isFalse; -//import org.qortal.network.Network.NetworkProcessor; -//import org.qortal.utils.ExecuteProduceConsume; -//import org.qortal.utils.NamedThreadFactory; +import java.io.File; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +//import java.util.Random; +import java.util.Scanner; +import java.util.concurrent.TimeUnit; -//import java.time.Instant; - -//import org.qortal.network.RNSPeer; +import org.apache.commons.codec.binary.Hex; +@Data @Slf4j public class RNSNetwork { - static final String APP_NAME = "qortal"; - private Reticulum reticulum; - private Identity server_identity; - private Destination baseDestination; // service base (initially: anything node2node) - //private Destination dataDestination; // qdn services (eg. files like music, videos etc) - //private Destination liveDestination; // live/dynamic peer list (eg. video conferencing) - // the following should be retrieved from settings - private static Integer MAX_PEERS = 3; - private static Integer MIN_DESIRED_PEERS = 3; + Reticulum reticulum; + //private static final String APP_NAME = "qortal"; + static final String APP_NAME = RNSCommon.APP_NAME; + //static final String defaultConfigPath = new String(".reticulum"); // if empty will look in Reticulums default paths + static final String defaultConfigPath = RNSCommon.defaultRNSConfigPath; + //private final String defaultConfigPath = Settings.getInstance().getDefaultRNSConfigPathForReticulum(); + private static Integer MAX_PEERS = 12; //private final Integer MAX_PEERS = Settings.getInstance().getMaxReticulumPeers(); - //private final Integer MIN_DESIRED_PEERS = Settings.getInstance().getMinDesiredReticulumPeers(); - static final String defaultConfigPath = new String(".reticulum"); // if empty will look in Reticulums default paths - //private final String defaultConfigPath = Settings.getInstance().getDefaultConfigPathForReticulum(); - - //private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class); - - //private final List linkedPeers = Collections.synchronizedList(new ArrayList<>()); - //private List immutableLinkedPeers = Collections.emptyList(); - private final List linkedPeers = Collections.synchronizedList(new ArrayList<>()); - - //private final ExecuteProduceConsume rnsNetworkEPC; - private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second + private static Integer MIN_DESIRED_PEERS = 3; + //private final Integer MIN_DESIRED_PEERS = Settings.getInstance().getMinDesiredPeers(); + Identity serverIdentity; + public Destination baseDestination; private volatile boolean isShuttingDown = false; - private int totalThreadCount = 0; + private final List linkedPeers = Collections.synchronizedList(new ArrayList<>()); + private final List incomingLinks = Collections.synchronizedList(new ArrayList<>()); - // TODO: settings - MaxReticulumPeers, MaxRNSNetworkThreadPoolSize (if needed) + ////private final ExecuteProduceConsume rnsNetworkEPC; + //private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second + //private volatile boolean isShuttingDown = false; + //private int totalThreadCount = 0; + //// TODO: settings - MaxReticulumPeers, MaxRNSNetworkThreadPoolSize (if needed) // Constructor private RNSNetwork () { @@ -112,19 +104,20 @@ public class RNSNetwork { //rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor); } - // Note: potentially create persistent server_identity (utility rnid) and load it from file + // Note: potentially create persistent serverIdentity (utility rnid) and load it from file public void start() throws IOException, DataException { // create identity either from file or new (creating new keys) - var serverIdentityPath = reticulum.getStoragePath().resolve(APP_NAME); + var serverIdentityPath = reticulum.getStoragePath().resolve("identities/"+APP_NAME); if (Files.isReadable(serverIdentityPath)) { - server_identity = Identity.fromFile(serverIdentityPath); + serverIdentity = Identity.fromFile(serverIdentityPath); log.info("server identity loaded from file {}", serverIdentityPath.toString()); } else { - server_identity = new Identity(); + serverIdentity = new Identity(); + log.info("APP_NAME: {}, storage path: {}", APP_NAME, serverIdentityPath); log.info("new server identity created dynamically."); } - log.debug("Server Identity: {}", server_identity.toString()); + log.debug("Server Identity: {}", serverIdentity.toString()); // show the ifac_size of the configured interfaces (debug code) for (ConnectionInterface i: Transport.getInstance().getInterfaces() ) { @@ -132,74 +125,74 @@ public class RNSNetwork { } baseDestination = new Destination( - server_identity, + serverIdentity, Direction.IN, DestinationType.SINGLE, APP_NAME, "core" ); - //// ideas for other entry points + //// idea for other entry point //dataDestination = new Destination( - // server_identity, + // serverIdentity, // Direction.IN, // DestinationType.SINGLE, // APP_NAME, // "core", // "qdn" //); - //liveDestination = new Destination( - // server_identity, - // Direction.IN, - // DestinationType.SINGLE, - // APP_NAME, - // "core", - // "live" - //); log.info("Destination "+Hex.encodeHexString(baseDestination.getHash())+" "+baseDestination.getName()+" running."); - //log.info("Destination "+Hex.encodeHexString(dataDestination.getHash())+" "+dataDestination.getName()+" running."); baseDestination.setProofStrategy(ProofStrategy.PROVE_ALL); - //dataDestination.setProofStrategy(ProofStrategy.PROVE_ALL); - baseDestination.setAcceptLinkRequests(true); - //dataDestination.setAcceptLinkRequests(true); - //baseDestination.setLinkEstablishedCallback(this::linkExtabishedCallback); - baseDestination.setPacketCallback(this::packetCallback); - //baseDestination.setPacketCallback((message, packet) -> { - // log.info("xyz - Message raw {}", message); - // log.info("xyz - Packet {}", packet.toString()); - //}); + + baseDestination.setLinkEstablishedCallback(this::clientConnected); Transport.getInstance().registerAnnounceHandler(new QAnnounceHandler()); - log.info("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers()); + log.debug("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers()); + // do a first announce baseDestination.announce(); - //dataDestination.announce(); - log.info("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); + log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); // Start up first networking thread (the "server loop") //rnsNetworkEPC.start(); } + //@Synchronized public void shutdown() { isShuttingDown = true; log.info("shutting down Reticulum"); - + // Stop processing threads (the "server loop") //try { // if (!this.rnsNetworkEPC.shutdown(5000)) { - // logger.warn("Network threads failed to terminate"); + // logger.warn("RNSNetwork threads failed to terminate"); // } //} catch (InterruptedException e) { - // logger.warn("Interrupted while waiting for networking threads to terminate"); + // logger.warn("Interrupted while waiting for RNS networking threads to terminate"); //} - - // Disconnect peers and terminate Reticulum - for (RNSPeer p : linkedPeers) { - if (nonNull(p.getLink())) { - p.getLink().teardown(); + + // Disconnect peers gracefully and terminate Reticulum + for (RNSPeer p: linkedPeers) { + log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash())); + log.debug("peer: {}", p); + p.shutdown(); + try { + TimeUnit.SECONDS.sleep(1); // allow for peers to disconnect gracefully + } catch (InterruptedException e) { + log.error("exception: {}", e); } } + // gracefully close links of peers that point to us + for (Link l: incomingLinks) { + var data = concatArrays("close::".getBytes(UTF_8),l.getDestination().getHash()); + Packet closePacket = new Packet(l, data); + var packetReceipt = closePacket.send(); + packetReceipt.setTimeout(3L); + packetReceipt.setDeliveryCallback(this::closePacketDelivered); + packetReceipt.setTimeoutCallback(this::packetTimedOut); + } + // Note: we still need to get the packet timeout callback to work... reticulum.exitHandler(); } @@ -217,29 +210,93 @@ public class RNSNetwork { } } - private void packetCallback(byte[] message, Packet packet) { - log.info("xyz - Message raw {}", message); - log.info("xyz - Packet {}", packet.toString()); + public void closePacketDelivered(PacketReceipt receipt) { + var rttString = new String(""); + if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) { + var rtt = receipt.getRtt(); // rtt (Java) is in miliseconds + //log.info("qqp - packetDelivered - rtt: {}", rtt); + if (rtt >= 1000) { + rtt = Math.round(rtt / 1000); + rttString = String.format("%d seconds", rtt); + } else { + rttString = String.format("%d miliseconds", rtt); + } + log.info("Shutdown packet confirmation received from {}, round-trip time is {}", + Hex.encodeHexString(receipt.getDestination().getHash()), rttString); + } + } + + public void packetTimedOut(PacketReceipt receipt) { + log.info("packet timed out"); + if (receipt.getStatus() == PacketReceiptStatus.FAILED) { + log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED); + } + } + + public void clientConnected(Link link) { + link.setLinkClosedCallback(this::clientDisconnected); + link.setPacketCallback(this::serverPacketReceived); + var peer = findPeerByLink(link); + if (nonNull(peer)) { + log.info("initiator peer {} opened link (link lookup: {}), link destination hash: {}", + Hex.encodeHexString(peer.getDestinationHash()), link, Hex.encodeHexString(link.getDestination().getHash())); + // make sure the peerLink is actvive. + peer.getOrInitPeerLink(); + } else { + log.info("non-initiator closed link (link lookup: {}), link destination hash (initiator): {}", + peer, link, Hex.encodeHexString(link.getDestination().getHash())); + } + incomingLinks.add(link); + log.info("***> Client connected, link: {}", link); + } + + public void clientDisconnected(Link link) { + var peer = findPeerByLink(link); + if (nonNull(peer)) { + log.info("initiator peer {} closed link (link lookup: {}), link destination hash: {}", + Hex.encodeHexString(peer.getDestinationHash()), link, Hex.encodeHexString(link.getDestination().getHash())); + } else { + log.info("non-initiator closed link (link lookup: {}), link destination hash (initiator): {}", + peer, link, Hex.encodeHexString(link.getDestination().getHash())); + } + // if we have a peer pointing to that destination, we can close and remove it + peer = findPeerByDestinationHash(link.getDestination().getHash()); + if (nonNull(peer)) { + // Note: no shutdown as the remobe peer could be only rebooting. + // keep it to reopen link later if possible. + peer.getPeerLink().teardown(); + } + incomingLinks.remove(link); + log.info("***> Client disconnected"); + } + + public void serverPacketReceived(byte[] message, Packet packet) { + var msgText = new String(message, StandardCharsets.UTF_8); + log.info("Received data on link - message: {}, destinationHash: {}", msgText, Hex.encodeHexString(packet.getDestinationHash())); + //var peer = findPeerByDestinationHash(packet.getDestinationHash()); + //if (msgText.equals("ping")) { + // log.info("received ping"); + // //if (nonNull(peer)) { + // // String replyText = "pong"; + // // byte[] replyData = replyText.getBytes(StandardCharsets.UTF_8); + // // Packet reply = new Packet(peer.getPeerLink(), replyData); + // //} + //} + //if (msgText.equals("shutdown")) { + // log.info("shutdown packet received"); + // var link = recall(packet.getDestinationHash()); + // log.info("recalled destinationHash: {}", link); + // //... + //} + // TODO: process packet.... } //public void announceBaseDestination () { // getBaseDestination().announce(); //} - //public Consumer clientConnected(Link link) { - // log.info("Client connected"); - // link.setLinkClosedCallback(clientDisconnected(link)); - // link.setPacketCallback(null); - //} - - //public void clientDisconnected(Link link) { - // log.info("Client disconnected"); - // linkedPeers.remove(link); - //} - - // client part //@Slf4j - private static class QAnnounceHandler implements AnnounceHandler { + private class QAnnounceHandler implements AnnounceHandler { @Override public String getAspectFilter() { // handle all announces @@ -250,10 +307,9 @@ public class RNSNetwork { @Synchronized public void receivedAnnounce(byte[] destinationHash, Identity announcedIdentity, byte[] appData) { var peerExists = false; + var activePeerCount = 0; log.info("Received an announce from {}", Hex.encodeHexString(destinationHash)); - //log.info("aspect: {}", getAspectFilter()); - //log.info("destinationhash: {}, announcedIdentity: {}, appData: {}", destinationHash, announcedIdentity, appData); if (nonNull(appData)) { log.debug("The announce contained the following app data: {}", new String(appData, UTF_8)); @@ -261,29 +317,48 @@ public class RNSNetwork { // add to peer list if we can use more peers //synchronized (this) { - List lps = RNSNetwork.getInstance().getLinkedPeers(); - if (lps.size() < MAX_PEERS) { - for (RNSPeer p : lps) { - //log.info("peer exists: hash: {}, destinationHash: {}", p.getDestinationLink().getDestination().getHash(), destinationHash); - if (Arrays.equals(p.getDestinationLink().getDestination().getHash(), destinationHash)) { - peerExists = true; - log.debug("peer exists: hash: {}, destinationHash: {}", p.getDestinationLink().getDestination().getHash(), destinationHash); - break; + var lps = RNSNetwork.getInstance().getLinkedPeers(); + for (RNSPeer p: lps) { + var pl = p.getPeerLink(); + if ((nonNull(pl) && (pl.getStatus() == ACTIVE))) { + activePeerCount = activePeerCount + 1; + } + } + if (activePeerCount < MAX_PEERS) { + //if (!peerExists) { + //var peer = findPeerByDestinationHash(destinationHash); + for (RNSPeer p: lps) { + if (Arrays.equals(p.getDestinationHash(), destinationHash)) { + log.info("QAnnounceHandler - peer exists - found peer matching destinationHash"); + if (nonNull(p.getPeerLink())) { + log.info("peer link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus()); + } + peerExists = true; + if (p.getPeerLink().getStatus() != ACTIVE) { + p.getOrInitPeerLink(); + } + break; + } else { + if (nonNull(p.getPeerLink())) { + log.info("QAnnounceHandler - other peer - link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus()); + } else { + log.info("QAnnounceHandler - null link peer - link: {}", p.getPeerLink()); } } - if (!peerExists) { - //log.info("announce handler - cerate new peer: **announcedIdentity**: {}, **recall**: {}", announcedIdentity, recall(destinationHash)); - RNSPeer newPeer = new RNSPeer(destinationHash); - lps.add(newPeer); - log.info("added new RNSPeer, Destination - {}, Link: {}", newPeer.getDestinationHash(), newPeer.getDestinationLink()); - } } + if (!peerExists) { + RNSPeer newPeer = new RNSPeer(destinationHash); + newPeer.setServerIdentity(announcedIdentity); + newPeer.setIsInitiator(true); + lps.add(newPeer); + log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash)); + } + } //} } } // Main thread - //class RNSNetworkProcessor extends ExecuteProduceConsume { // // //private final Logger logger = LoggerFactory.getLogger(RNSNetworkProcessor.class); @@ -348,6 +423,14 @@ public class RNSNetwork { return SingletonContainer.INSTANCE; } + public Identity getServerIdentity() { + return this.serverIdentity; + } + + public Reticulum getReticulum() { + return this.reticulum; + } + public List getLinkedPeers() { synchronized(this.linkedPeers) { //return new ArrayList<>(this.linkedPeers); @@ -366,60 +449,145 @@ public class RNSNetwork { } // maintenance - - //private static class AnnounceTimer { - // //public void main(String[] args) throws InterruptedException - // public void main(String[] args) throws InterruptedException - // { - // Timer timer = new Timer(); - // // run timer every 10s (10000ms) - // timer.schedule(new TimerTask() { - // @Override - // public void run() { - // System.out.println("AnnounceTimer: " + new java.util.Date()); - // } - // }, 0, 10000); + //public void removePeer(RNSPeer peer) { + // synchronized(this) { + // List peerList = this.linkedPeers; + // log.info("removing peer {} on peer shutdown", peer); + // peerList.remove(peer); + // } + //} + + //public void pingPeer(RNSPeer peer) { + // if (nonNull(peer)) { + // peer.pingRemote(); + // } else { + // log.error("peer argument is null"); // } //} @Synchronized public void prunePeers() throws DataException { // run periodically (by the Controller) - //log.info("Peer list (linkedPeers): {}",this.linkedPeers.toString()); - //synchronized(this) { - //List linkList = getLinkedPeers(); - List peerList = this.linkedPeers; - log.info("List of RNSPeers: {}", this.linkedPeers); - //log.info("number of links (linkedPeers) before prunig: {}", this.linkedPeers.size()); - Link pLink; - LinkStatus lStatus; - for (RNSPeer p: peerList) { - pLink = p.getLink(); - lStatus = pLink.getStatus(); - //log.debug("link status: "+lStatus.toString()); - // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED - if (lStatus == CLOSED) { - p.resetPeer(); - peerList.remove(p); - } else if (lStatus == STALE) { + //List linkList = getLinkedPeers(); + var peerList = getLinkedPeers(); + log.info("number of links (linkedPeers) before prunig: {}", peerList.size()); + Link pLink; + LinkStatus lStatus; + for (RNSPeer p: peerList) { + pLink = p.getPeerLink(); + log.info("prunePeers - pLink: {}, destinationHash: {}", + pLink, Hex.encodeHexString(p.getDestinationHash())); + log.debug("peer: {}", p); + if (nonNull(pLink)) { + if (p.getPeerTimedOut()) { + // close peer link for now pLink.teardown(); - p.resetPeer(); + } + lStatus = pLink.getStatus(); + log.info("Link {} status: {}", pLink, lStatus); + // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED + if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (p.getDeleteMe())) { + p.shutdown(); + peerList.remove(p); + } else if (lStatus == HANDSHAKE) { + // stuck in handshake state (do we need to shutdown/remove it?) + log.info("peer status HANDSHAKE"); + p.shutdown(); peerList.remove(p); - } else if (lStatus == PENDING) { - log.info("prunePeers - link state still {}", lStatus); - // TODO: can we help the Link along somehow? } } - log.info("number of links (linkedPeers) after prunig: {}", this.linkedPeers.size()); - //} + } + //removeExpiredPeers(this.linkedPeers); + log.info("number of links (linkedPeers) after prunig: {}", peerList.size()); + log.info("we have {} non-initiator links, list: {}", incomingLinks.size(), incomingLinks); maybeAnnounce(getBaseDestination()); } + //public void removeExpiredPeers(List peerList) { + // //List peerList = this.linkedPeers; + // for (RNSPeer p: peerList) { + // if (p.getPeerLink() == null) { + // peerList.remove(p); + // } else if (p.getPeerLink().getStatus() == STALE) { + // peerList.remove(p); + // } + // } + //} + public void maybeAnnounce(Destination d) { if (getLinkedPeers().size() < MIN_DESIRED_PEERS) { d.announce(); } } + /** + * Helper methods + */ + + //@Synchronized + //public RNSPeer getPeerIfExists(RNSPeer peer) { + // List lps = RNSNetwork.getInstance().getLinkedPeers(); + // RNSPeer result = null; + // for (RNSPeer p: lps) { + // if (nonNull(p.getDestinationHash()) && Arrays.equals(p.getDestinationHash(), peer.getDestinationHash())) { + // log.info("found match by destinationHash"); + // result = p; + // //break; + // } + // if (nonNull(p.getPeerDestinationHash()) && Arrays.equals(p.getPeerDestinationHash(), peer.getPeerDestinationHash())) { + // log.info("found match by peerDestinationHash"); + // result = p; + // //break; + // } + // if (nonNull(p.getPeerBaseDestinationHash()) && Arrays.equals(p.getPeerBaseDestinationHash(), peer.getPeerBaseDestinationHash())) { + // log.info("found match by peerBaseDestinationHash"); + // result = p; + // //break; + // } + // if (nonNull(p.getRemoteTestHash()) && Arrays.equals(p.getRemoteTestHash(), peer.getRemoteTestHash())) { + // log.info("found match by remoteTestHash"); + // result = p; + // //break; + // } + // } + // return result; + //} + + public RNSPeer findPeerByLink(Link link) { + List lps = RNSNetwork.getInstance().getLinkedPeers(); + RNSPeer peer = null; + for (RNSPeer p : lps) { + var pLink = p.getPeerLink(); + if (nonNull(pLink)) { + if (Arrays.equals(pLink.getDestination().getHash(),link.getDestination().getHash())) { + log.info("found peer matching destinationHash: {}", Hex.encodeHexString(link.getDestination().getHash())); + peer = p; + break; + } + } + } + return peer; + } + + public RNSPeer findPeerByDestinationHash(byte[] dhash) { + List lps = RNSNetwork.getInstance().getLinkedPeers(); + RNSPeer peer = null; + for (RNSPeer p : lps) { + if (Arrays.equals(p.getDestinationHash(), dhash)) { + log.info("found peer matching destinationHash: {}", Hex.encodeHexString(dhash)); + peer = p; + break; + } + } + return peer; + } + + public void removePeer(RNSPeer peer) { + List peerList = this.linkedPeers; + if (nonNull(peer)) { + peerList.remove(peer); + } + } + } diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index 871bb347..399775db 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -4,18 +4,38 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import io.reticulum.Reticulum; import org.qortal.network.RNSNetwork; import io.reticulum.link.Link; +import io.reticulum.link.RequestReceipt; +import io.reticulum.packet.PacketReceiptStatus; import io.reticulum.packet.Packet; +import io.reticulum.packet.PacketReceipt; import io.reticulum.identity.Identity; import io.reticulum.channel.Channel; import io.reticulum.destination.Destination; import io.reticulum.destination.DestinationType; import io.reticulum.destination.Direction; - +import io.reticulum.destination.ProofStrategy; +import io.reticulum.resource.Resource; +import static io.reticulum.link.TeardownSession.INITIATOR_CLOSED; +import static io.reticulum.link.TeardownSession.DESTINATION_CLOSED; +import static io.reticulum.link.TeardownSession.TIMEOUT; +import static io.reticulum.link.LinkStatus.ACTIVE; +import static io.reticulum.link.LinkStatus.CLOSED; import static io.reticulum.identity.IdentityKnownDestination.recall; //import static io.reticulum.identity.IdentityKnownDestination.recallAppData; + +import java.nio.charset.StandardCharsets; +import static java.nio.charset.StandardCharsets.UTF_8; +import org.apache.commons.codec.binary.Hex; +import static org.apache.commons.lang3.ArrayUtils.subarray; + import lombok.extern.slf4j.Slf4j; import lombok.Setter; import lombok.Data; @@ -25,86 +45,234 @@ import lombok.AccessLevel; @Slf4j public class RNSPeer { - private byte[] destinationHash; - private Link destinationLink; - private Identity destinationIdentity; - @Setter(AccessLevel.PACKAGE) private long creationTimestamp; - private Long lastAccessTimestamp; + //static final String APP_NAME = "qortal"; + static final String APP_NAME = RNSCommon.APP_NAME; + //static final String defaultConfigPath = new String(".reticulum"); + static final String defaultConfigPath = RNSCommon.defaultRNSConfigPath; - // constructors - public RNSPeer (byte[] dhash) { - this.destinationHash = dhash; - this.destinationIdentity = recall(dhash); - Link newLink = new Link( - new Destination( - this.destinationIdentity, - Direction.OUT, - DestinationType.SINGLE, - RNSNetwork.APP_NAME, - "core" - ) + private byte[] destinationHash; // remote destination hash + Destination peerDestination; // OUT destination created for this + private Identity serverIdentity; + @Setter(AccessLevel.PACKAGE) private Instant creationTimestamp; + private Instant lastAccessTimestamp; + Link peerLink; + private Boolean isInitiator; + private Boolean deleteMe = false; + + private Double requestResponseProgress; + @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; + + public RNSPeer(byte[] dhash) { + destinationHash = dhash; + serverIdentity = recall(dhash); + initPeerLink(); + //setCreationTimestamp(System.currentTimeMillis()); + creationTimestamp = Instant.now(); + } + + public void initPeerLink() { + peerDestination = new Destination( + this.serverIdentity, + Direction.OUT, + DestinationType.SINGLE, + RNSNetwork.APP_NAME, + "core" ); - this.destinationLink = newLink; - destinationLink.setPacketCallback(this::packetCallback); + peerDestination.setProofStrategy(ProofStrategy.PROVE_ALL); + + lastAccessTimestamp = Instant.now(); + isInitiator = true; + + peerLink = new Link(peerDestination); + + this.peerLink.setLinkEstablishedCallback(this::linkEstablished); + this.peerLink.setLinkClosedCallback(this::linkClosed); + this.peerLink.setPacketCallback(this::linkPacketReceived); } - public RNSPeer (Link newLink) { - this.destinationHash = newLink.getDestination().getHash(); - this.destinationLink = newLink; - this.destinationIdentity = newLink.getRemoteIdentity(); - setCreationTimestamp(System.currentTimeMillis()); - this.lastAccessTimestamp = null; - destinationLink.setPacketCallback(this::packetCallback); - } - - public RNSPeer () { - this.destinationHash = null; - this.destinationLink = null; - this.destinationIdentity = null; - setCreationTimestamp(System.currentTimeMillis()); - this.lastAccessTimestamp = null; - } - - // utilities (change Link type, call tasks, ...) - //... - - private void packetCallback(byte[] message, Packet packet) { - log.debug("Message raw {}", message); - log.debug("Packet {}", packet.toString()); - // ... - } - - public Link getLink() { - if (isNull(getDestinationLink())) { - Link newLink = new Link( - new Destination( - this.destinationIdentity, - Direction.OUT, - DestinationType.SINGLE, - RNSNetwork.APP_NAME, - "core" - ) - ); - this.destinationLink = newLink; - return newLink; + public Link getOrInitPeerLink() { + if (this.peerLink.getStatus() == ACTIVE) { + lastAccessTimestamp = Instant.now(); + return this.peerLink; + } else { + initPeerLink(); } - return getDestinationLink(); + return this.peerLink; + } + + public void shutdown() { + if (nonNull(peerLink)) { + log.info("shutdown - peerLink: {}, status: {}", peerLink, peerLink.getStatus()); + if (peerLink.getStatus() == ACTIVE) { + peerLink.teardown(); + } + this.peerLink = null; + } + this.deleteMe = true; } public Channel getChannel() { - if (isNull(getDestinationLink())) { + if (isNull(getPeerLink())) { log.warn("link is null."); return null; } - setLastAccessTimestamp(System.currentTimeMillis()); - return getDestinationLink().getChannel(); + setLastAccessTimestamp(Instant.now()); + return getPeerLink().getChannel(); } - public void resetPeer () { - this.destinationHash = null; - this.destinationLink = null; - this.destinationIdentity = null; - this.lastAccessTimestamp = null; + /** Link callbacks */ + public void linkEstablished(Link link) { + link.setLinkClosedCallback(this::linkClosed); + log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}", + peerLink, link, Hex.encodeHexString(destinationHash), + Hex.encodeHexString(link.getDestination().getHash())); } -} + public void linkClosed(Link link) { + if (link.getTeardownReason() == TIMEOUT) { + log.info("The link timed out"); + this.peerTimedOut = true; + } else if (link.getTeardownReason() == INITIATOR_CLOSED) { + log.info("Link closed callback: The initiator closed the link"); + log.info("peerLink {} closed (link: {}), link destination hash: {}", + peerLink, link, Hex.encodeHexString(link.getDestination().getHash())); + } else if (link.getTeardownReason() == DESTINATION_CLOSED) { + log.info("Link closed callback: The link was closed by the peer, removing peer"); + log.info("peerLink {} closed (link: {}), link destination hash: {}", + peerLink, link, Hex.encodeHexString(link.getDestination().getHash())); + } else { + log.info("Link closed callback"); + } + } + + public void linkPacketReceived(byte[] message, Packet packet) { + var msgText = new String(message, StandardCharsets.UTF_8); + if (msgText.equals("ping")) { + log.info("received ping on link"); + } else if (msgText.startsWith("close::")) { + var targetPeerHash = subarray(message, 7, message.length); + log.info("peer dest hash: {}, target hash: {}", + Hex.encodeHexString(destinationHash), + Hex.encodeHexString(targetPeerHash)); + if (Arrays.equals(destinationHash, targetPeerHash)) { + log.info("closing link: {}", peerLink.getDestination().getHexHash()); + peerLink.teardown(); + } + } else if (msgText.startsWith("open::")) { + var targetPeerHash = subarray(message, 7, message.length); + log.info("peer dest hash: {}, target hash: {}", + Hex.encodeHexString(destinationHash), + Hex.encodeHexString(targetPeerHash)); + if (Arrays.equals(destinationHash, targetPeerHash)) { + log.info("closing link: {}", peerLink.getDestination().getHexHash()); + getOrInitPeerLink(); + } + } + // TODO: process incoming packet.... + } + + + /** PacketReceipt callbacks */ + public void packetDelivered(PacketReceipt receipt) { + var rttString = new String(""); + //log.info("packet delivered callback, receipt: {}", receipt); + if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) { + var rtt = receipt.getRtt(); // rtt (Java) is in miliseconds + //log.info("qqp - packetDelivered - rtt: {}", rtt); + if (rtt >= 1000) { + rtt = Math.round(rtt / 1000); + rttString = String.format("%d seconds", rtt); + } else { + rttString = String.format("%d miliseconds", rtt); + } + log.info("Valid reply received from {}, round-trip time is {}", + Hex.encodeHexString(receipt.getDestination().getHash()), rttString); + } + } + + public void packetTimedOut(PacketReceipt receipt) { + log.info("packet timed out"); + if (receipt.getStatus() == PacketReceiptStatus.FAILED) { + log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED); + this.peerTimedOut = true; + peerLink.teardown(); + //this.deleteMe = true; + } + } + + /** Link Request callbacks */ + public void linkRequestResponseReceived(RequestReceipt rr) { + log.info("Response received"); + } + + public void linkRequestResponseProgress(RequestReceipt rr) { + this.requestResponseProgress = rr.getProgress(); + log.debug("Response progress set"); + } + + public void linkRequestFailed(RequestReceipt rr) { + log.error("Request failed"); + } + + /** Link Resource callbacks */ + // Resource: allow arbitrary amounts of data to be passed over a link with + // sequencing, compression, coordination and checksumming handled automatically + //public Boolean linkResourceAdvertised(Resource resource) { + // log.debug("Resource advertised"); + //} + public void linkResourceTransferStarted(Resource resource) { + log.debug("Resource transfer started"); + } + public void linkResourceTransferComcluded(Resource resource) { + log.debug("Resource transfer complete"); + } + + /** Utility methods */ + public void pingRemote() { + var link = this.peerLink; + log.info("pinging remote: {}", link); + var data = "ping".getBytes(UTF_8); + link.setPacketCallback(this::linkPacketReceived); + Packet pingPacket = new Packet(link, data); + PacketReceipt packetReceipt = pingPacket.send(); + //packetReceipt.setTimeout(3L); + packetReceipt.setTimeoutCallback(this::packetTimedOut); + packetReceipt.setDeliveryCallback(this::packetDelivered); + } + + //public void shutdownLink(Link link) { + // var data = "shutdown".getBytes(UTF_8); + // Packet shutdownPacket = new Packet(link, data); + // PacketReceipt packetReceipt = shutdownPacket.send(); + // packetReceipt.setTimeout(2000L); + // packetReceipt.setTimeoutCallback(this::packetTimedOut); + // packetReceipt.setDeliveryCallback(this::shutdownPacketDelivered); + //} + + ///** check if a link is available (ACTIVE) + // * link: a certain peer link, or null (default link == link to Qortal node RNS baseDestination) + // */ + //public Boolean peerLinkIsAlive(Link link) { + // var result = false; + // if (isNull(link)) { + // // default link + // var defaultLink = getLink(); + // if (nonNull(defaultLink) && defaultLink.getStatus() == ACTIVE) { + // result = true; + // log.info("Default link is available"); + // } else { + // log.info("Default link {} is not available, status: {}", defaultLink, defaultLink.getStatus()); + // } + // } else { + // // other link (future where we have multiple destinations...) + // if (link.getStatus() == ACTIVE) { + // result = true; + // log.info("Link {} is available (status: {})", link, link.getStatus()); + // } else { + // log.info("Link {} is not available, status: {}", link, link.getStatus()); + // } + // } + // return result; + //} + +} \ No newline at end of file