diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 6453ad7f..7fe825e5 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -68,11 +68,13 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; //import java.util.concurrent.locks.Lock; //import java.util.concurrent.locks.ReentrantLock; +import java.util.Objects; import org.apache.commons.codec.binary.Hex; import org.qortal.utils.ExecuteProduceConsume; -import org.qortal.utils.NamedThreadFactory; import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot; +import org.qortal.utils.NTP; +import org.qortal.utils.NamedThreadFactory; // logging import lombok.extern.slf4j.Slf4j; @@ -97,13 +99,20 @@ public class RNSNetwork { Identity serverIdentity; public Destination baseDestination; private volatile boolean isShuttingDown = false; + + /** + * Maintain two lists for each subset of peers + * => a synchronizedList, modified when peers are added/removed + * => an immutable List, automatically rebuild to mirror synchronizedList, served to consumers + * linkedPeers are "initiators" (containing initiator reticulum Link), actively doing work. + * incomimgPeers are "non-initiators", the passive end of bidirectional Reticulum Buffers. + */ private final List linkedPeers = Collections.synchronizedList(new ArrayList<>()); private List immutableLinkedPeers = Collections.emptyList(); - //private final List incomingLinks = Collections.synchronizedList(new ArrayList<>()); private final List incomingPeers = Collections.synchronizedList(new ArrayList<>()); private List immutableIncomingPeers = Collections.emptyList(); - //private final ExecuteProduceConsume rnsNetworkEPC; + private final ExecuteProduceConsume rnsNetworkEPC; private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second //private volatile boolean isShuttingDown = false; private int totalThreadCount = 0; @@ -135,13 +144,13 @@ public class RNSNetwork { log.info("reticulum instance created"); log.info("reticulum instance created: {}", reticulum); - //// Settings.getInstance().getMaxRNSNetworkThreadPoolSize(), // statically set to 5 below - //ExecutorService RNSNetworkExecutor = new ThreadPoolExecutor(1, - // 5, - // NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS, - // new SynchronousQueue(), - // new NamedThreadFactory("RNSNetwork-EPC", Settings.getInstance().getNetworkThreadPriority())); - //rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor); + // Settings.getInstance().getMaxRNSNetworkThreadPoolSize(), // statically set to 5 below + ExecutorService RNSNetworkExecutor = new ThreadPoolExecutor(1, + 5, + NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS, + new SynchronousQueue(), + new NamedThreadFactory("RNSNetwork-EPC", Settings.getInstance().getNetworkThreadPriority())); + rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor); } // Note: potentially create persistent serverIdentity (utility rnid) and load it from file @@ -198,10 +207,8 @@ public class RNSNetwork { baseDestination.announce(); log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); - log.info("check point 1"); - // Start up first networking thread (the "server loop") - //rnsNetworkEPC.start(); - log.info("check point 2"); + // Start up first networking thread (the "server loop", JS: the "Tasks engine") + rnsNetworkEPC.start(); } private void initConfig(String configDir) throws IOException { @@ -232,14 +239,6 @@ public class RNSNetwork { p.sendCloseToRemote(pl); } } - //// Stop processing threads (the "server loop") - //try { - // if (!this.rnsNetworkEPC.shutdown(5000)) { - // log.warn("RNSNetwork threads failed to terminate"); - // } - //} catch (InterruptedException e) { - // log.warn("Interrupted while waiting for RNS networking threads to terminate"); - //} // Disconnect peers gracefully and terminate Reticulum for (RNSPeer p: linkedPeers) { log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash())); @@ -251,6 +250,14 @@ public class RNSNetwork { log.error("exception: ", e); } } + // Stop processing threads (the "server loop") + try { + if (!this.rnsNetworkEPC.shutdown(5000)) { + log.warn("RNSNetwork threads failed to terminate"); + } + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for RNS networking threads to terminate"); + } // Note: we still need to get the packet timeout callback to work... reticulum.exitHandler(); } @@ -340,8 +347,6 @@ public class RNSNetwork { } } if (activePeerCount < MAX_PEERS) { - //if (!peerExists) { - //var peer = findPeerByDestinationHash(destinationHash); for (RNSPeer p: lps) { if (Arrays.equals(p.getDestinationHash(), destinationHash)) { log.info("QAnnounceHandler - peer exists - found peer matching destinationHash"); @@ -365,12 +370,10 @@ public class RNSNetwork { RNSPeer newPeer = new RNSPeer(destinationHash); newPeer.setServerIdentity(announcedIdentity); newPeer.setIsInitiator(true); - //lps.add(newPeer); addLinkedPeer(newPeer); log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash)); } } - //} } } @@ -402,31 +405,68 @@ public class RNSNetwork { //if (task != null) { // return task; //} - // - //final Long now = NTP.getTime(); - // - //task = maybeProducePeerPingTask(now); - //if (task != null) { - // return task; - //} - // - //task = maybeProduceConnectPeerTask(now); - //if (task != null) { - // return task; - //} - // + + final Long now = NTP.getTime(); + + task = maybeProducePeerPingTask(now); + if (task != null) { + return task; + } + //task = maybeProduceBroadcastTask(now); //if (task != null) { // return task; //} - // - // Only this method can block to reduce CPU spin - //return maybeProduceChannelTask(canBlock); - - // TODO: flesh out the tasks handled by Reticulum return null; } - //...TODO: implement abstract methods... + + //private Task maybeProducePeerMessageTask() { + // return getImmutableConnectedPeers().stream() + // .map(Peer::getMessageTask) + // .filter(Objects::nonNull) + // .findFirst() + // .orElse(null); + //} + //private Task maybeProducePeerMessageTask() { + // return getImmutableIncommingPeers().stream() + // .map(RNSPeer::getMessageTask) + // .filter(RNSPeer::isAvailable) + // .findFirst*() + // .orElse(null); + //} + + //private Task maybeProducePeerPingTask(Long now) { + // return getImmutableHandshakedPeers().stream() + // .map(peer -> peer.getPingTask(now)) + // .filter(Objects::nonNull) + // .findFirst() + // .orElse(null); + //} + private Task maybeProducePeerPingTask(Long now) { + var ilp = getImmutableLinkedPeers().stream() + .map(peer -> peer.getPingTask(now)) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + if (nonNull(ilp)) { + log.info("ilp - {}", ilp); + } + return ilp; + //return getImmutableLinkedPeers().stream() + // .map(peer -> peer.getPingTask(now)) + // .filter(Objects::nonNull) + // .findFirst() + // .orElse(null); + } + + //private Task maybeProduceBroadcastTask(Long now) { + // if (now == null || now < nextBroadcastTimestamp.get()) { + // return null; + // } + // + // nextBroadcastTimestamp.set(now + BROADCAST_INTERVAL); + // return new BroadcastTask(); + //} } private static class SingletonContainer { @@ -437,9 +477,9 @@ public class RNSNetwork { return SingletonContainer.INSTANCE; } - //public List getImmutableLinkedPeers() { - // return this.immutableLinkedPeers; - //} + public List getImmutableLinkedPeers() { + return this.immutableLinkedPeers; + } public void addLinkedPeer(RNSPeer peer) { this.linkedPeers.add(peer); @@ -461,7 +501,12 @@ public class RNSNetwork { //} } - public void removeIncommingPeer(RNSPeer peer) { + public void addIncomingPeer(RNSPeer peer) { + this.incomingPeers.add(peer); + this.immutableIncomingPeers = List.copyOf(this.incomingPeers); + } + + public void removeIncomingPeer(RNSPeer peer) { if (nonNull(peer.getPeerLink())) { peer.getPeerLink().teardown(); } @@ -530,7 +575,8 @@ public class RNSNetwork { } } //removeExpiredPeers(this.linkedPeers); - log.info("number of links (linkedPeers) after prunig: {}", peerList.size()); + log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(), + getIncomingPeers().size()); //log.info("we have {} non-initiator links, list: {}", incomingLinks.size(), incomingLinks); var activePeerCount = 0; var lps = RNSNetwork.getInstance().getLinkedPeers(); diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index ad0962bc..ce6c5877 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -34,9 +34,25 @@ import io.reticulum.buffer.Buffer; import io.reticulum.buffer.BufferedRWPair; import static io.reticulum.utils.IdentityUtils.concatArrays; +import org.qortal.controller.Controller; +import org.qortal.data.block.BlockSummaryData; +import org.qortal.data.block.CommonBlockData; +import org.qortal.network.message.Message; +import org.qortal.network.message.PingMessage; +import org.qortal.network.message.*; +import org.qortal.network.message.MessageException; +import org.qortal.network.task.RNSMessageTask; +import org.qortal.network.task.RNSPingTask; import org.qortal.settings.Settings; +import org.qortal.utils.ExecuteProduceConsume.Task; +import org.qortal.utils.NTP; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.*; + import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.codec.binary.Hex.encodeHexString; import static org.apache.commons.lang3.ArrayUtils.subarray; @@ -47,6 +63,14 @@ import lombok.extern.slf4j.Slf4j; import lombok.Setter; import lombok.Data; import lombok.AccessLevel; +// +//import org.qortal.network.message.Message; +//import org.qortal.network.message.MessageException; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import java.lang.IllegalStateException; @Data @Slf4j @@ -69,10 +93,22 @@ public class RNSPeer { private Boolean isInitiator; private Boolean deleteMe = false; private Boolean isVacant = true; + private Long lastPacketRtt = null; private Double requestResponseProgress; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; + // for qortal networking + private static final int RESPONSE_TIMEOUT = 3000; // [ms] + private static final int PING_INTERVAL = 34_000; // [ms] + private Long lastPing = null; // last ping roundtrip time [ms] + private Long lastPingSent = null; // time last ping was sent, or null if not started. + private Map> replyQueues; + //private LinkedBlockingQueue pendingMessages; // we might not need this + // Versioning + public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})"); + /** * Constructor for initiator peers */ @@ -83,6 +119,7 @@ public class RNSPeer { //setCreationTimestamp(System.currentTimeMillis()); this.creationTimestamp = Instant.now(); this.isVacant = true; + this.replyQueues = new ConcurrentHashMap<>(); } /** @@ -129,9 +166,17 @@ public class RNSPeer { var channel = this.peerLink.getChannel(); if (nonNull(this.peerBuffer)) { log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); - this.peerBuffer.close(); - this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); - //return this.peerBuffer; + try { + this.peerBuffer.close(); + this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); + } catch (IllegalStateException e) { + // Exception thrown by Reticulum BufferedRWPair.close() + // This is a chance to correct links status when doing a RNSPingTask + log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}"); + this.peerLink.teardown(); + this.peerLink = null; + //log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e); + } } else { log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel); @@ -181,6 +226,9 @@ public class RNSPeer { log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}", peerLink, link, encodeHexString(destinationHash), encodeHexString(link.getDestination().getHash())); + if (isInitiator) { + startPings(); + } } public void linkClosed(Link link) { @@ -232,10 +280,43 @@ public class RNSPeer { * :param readyBytes: The number of bytes ready to read */ public void peerBufferReady(Integer readyBytes) { + // get the message data var data = this.peerBuffer.read(readyBytes); - var decodedData = new String(data); - log.info("Received data over the buffer: {}", decodedData); + try { + Message message = Message.fromByteBuffer(ByteBuffer.wrap(data)); + log.info("type {} message received: {}", message.getType(), message); + // TODO: Now what with message? + switch (message.getType()) { + //case GET_PEERS: + // onGetPeersMessage(peer, message); + // break; + + case PING: + //onPingMessage(this, message); + PongMessage pongMessage = new PongMessage(); + pongMessage.setId(message.getId()); + //var peerBuffer = getOrInitPeerBuffer(); + this.peerBuffer.write(message.toBytes()); + this.peerBuffer.flush(); + break; + + case PONG: + + //case PEERS_V2: + // onPeersV2Message(peer, message); + // break; + // + //default: + // // Bump up to controller for possible action + // Controller.getInstance().onNetworkMessage(peer, message); + // break; + } + } catch (MessageException e) { + log.error("{} from peer {}", e.getMessage(), this); + } + //var decodedData = new String(data); + //log.info("Received data over the buffer: {}", decodedData); //if (isFalse(this.isInitiator)) { // // TODO: process data and reply @@ -271,6 +352,7 @@ public class RNSPeer { var rttString = new String(""); if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) { var rtt = receipt.getRtt(); // rtt (Java) is in milliseconds + this.lastPacketRtt = rtt; if (rtt >= 1000) { rtt = Math.round(rtt / 1000); rttString = String.format("%d seconds", rtt); @@ -287,6 +369,7 @@ public class RNSPeer { //log.info("packet delivered callback, receipt: {}", receipt); if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) { var rtt = receipt.getRtt(); // rtt (Java) is in milliseconds + this.lastPacketRtt = rtt; //log.info("qqp - packetDelivered - rtt: {}", rtt); if (rtt >= 1000) { rtt = Math.round((float) rtt / 1000); @@ -334,6 +417,19 @@ public class RNSPeer { log.debug("Resource transfer complete"); } + ///** + // * Send a message using the peer buffer + // */ + //public Message getResponse(Message message) throws InterruptedException { + // var peerBuffer = getOrInitPeerBuffer(); + // + // //// send message + // //peerBuffer.write(...); + // //peerBuffer.flush(); + // + // // receive - peerBufferReady callback result + //} + /** Utility methods */ public void pingRemote() { var link = this.peerLink; @@ -364,30 +460,121 @@ public class RNSPeer { // packetReceipt.setDeliveryCallback(this::shutdownPacketDelivered); //} - ///** check if a link is available (ACTIVE) - // * link: a certain peer link, or null (default link == link to Qortal node RNS baseDestination) - // */ - //public Boolean peerLinkIsAlive(Link link) { - // var result = false; - // if (isNull(link)) { - // // default link - // var defaultLink = getLink(); - // if (nonNull(defaultLink) && defaultLink.getStatus() == ACTIVE) { - // result = true; - // log.info("Default link is available"); - // } else { - // log.info("Default link {} is not available, status: {}", defaultLink, defaultLink.getStatus()); - // } - // } else { - // // other link (future where we have multiple destinations...) - // if (link.getStatus() == ACTIVE) { - // result = true; - // log.info("Link {} is available (status: {})", link, link.getStatus()); - // } else { - // log.info("Link {} is not available, status: {}", link, link.getStatus()); - // } - // } - // return result; + /** qortal networking specific (Tasks) */ + + //private void onPingMessage(RNSPeer peer, Message message) { + // PingMessage pingMessage = (PingMessage) message; + // + // // Generate 'pong' using same ID + // PingMessage pongMessage = new PingMessage(); + // pongMessage.setId(pingMessage.getId()); + // + // sendMessageWithTimeout(pongMessage, RESPONSE_TIMEOUT); //} - + + /** + * Send message to peer and await response, using default RESPONSE_TIMEOUT. + *

+ * Message is assigned a random ID and sent. + * Responses are handled by registered callbacks. + *

+ * Note: The method is called "get..." to match the original method name + * + * @param message message to send + * @return Message if valid response received; null if not or error/exception occurs + * @throws InterruptedException if interrupted while waiting + */ + public void getResponse(Message message) throws InterruptedException { + getResponseWithTimeout(message, RESPONSE_TIMEOUT); + } + + /** + * Send message to peer and await response. + *

+ * Message is assigned a random ID and sent. + * If a response with matching ID is received then it is returned to caller. + *

+ * If no response with matching ID within timeout, or some other error/exception occurs, + * then return null.
+ * (Assume peer will be rapidly disconnected after this). + * + * @param message message to send + * @return Message if valid response received; null if not or error/exception occurs + * @throws InterruptedException if interrupted while waiting + */ + public void getResponseWithTimeout(Message message, int timeout) throws InterruptedException { + BlockingQueue blockingQueue = new ArrayBlockingQueue<>(1); + // TODO: implement equivalent of Peer class... + // Assign random ID to this message + Random random = new Random(); + int id; + do { + id = random.nextInt(Integer.MAX_VALUE - 1) + 1; + + // Put queue into map (keyed by message ID) so we can poll for a response + // If putIfAbsent() doesn't return null, then this ID is already taken + } while (this.replyQueues.putIfAbsent(id, blockingQueue) != null); + message.setId(id); + + // Try to send message + if (!this.sendMessageWithTimeout(message, timeout)) { + this.replyQueues.remove(id); + return; + } + + try { + blockingQueue.poll(timeout, TimeUnit.MILLISECONDS); + } finally { + this.replyQueues.remove(id); + } + } + + /** + * Attempt to send Message to peer using the buffer and a custom timeout. + * + * @param message message to be sent + * @return true if message successfully sent; false otherwise + */ + public boolean sendMessageWithTimeout(Message message, int timeout) { + try { + log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); + var peerBuffer = getOrInitPeerBuffer(); + peerBuffer.write(message.toBytes()); + peerBuffer.flush(); + return true; + //} catch (InterruptedException e) { + // // Send failure + // return false; + } catch (IllegalStateException e) { + //log.warn("Can't write to buffer (remote buffer down?)"); + log.error("IllegalStateException - can't write to buffer: e", e); + return false; + } catch (MessageException e) { + log.error(e.getMessage(), e); + return false; + } + } + + protected void startPings() { + log.trace("[{}] Enabling pings for peer {}", + peerLink.getDestination().getHexHash(), this); + this.lastPingSent = NTP.getTime(); + } + + protected Task getPingTask(Long now) { + // Pings not enabled yet? + if (now == null || this.lastPingSent == null) { + return null; + } + + // Time to send another ping? + if (now < this.lastPingSent + PING_INTERVAL) { + return null; // Not yet + } + + // Not strictly true, but prevents this peer from being immediately chosen again + this.lastPingSent = now; + + return new RNSPingTask(this, now); + } } diff --git a/src/main/java/org/qortal/network/task/RNSPingTask.java b/src/main/java/org/qortal/network/task/RNSPingTask.java new file mode 100644 index 00000000..3c219ddd --- /dev/null +++ b/src/main/java/org/qortal/network/task/RNSPingTask.java @@ -0,0 +1,53 @@ +package org.qortal.network.task; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.network.RNSPeer; +import org.qortal.network.message.Message; +import org.qortal.network.message.MessageType; +import org.qortal.network.message.PingMessage; +//import org.qortal.network.message.RNSPingMessage; +import org.qortal.utils.ExecuteProduceConsume.Task; +import org.qortal.utils.NTP; + +public class RNSPingTask implements Task { + private static final Logger LOGGER = LogManager.getLogger(PingTask.class); + + private final RNSPeer peer; + private final Long now; + private final String name; + + public RNSPingTask(RNSPeer peer, Long now) { + this.peer = peer; + this.now = now; + this.name = "PingTask::" + peer; + } + + @Override + public String getName() { + return name; + } + + @Override + public void perform() throws InterruptedException { + //RNSPingMessage pingMessage = new RNSPingMessage(); + PingMessage pingMessage = new PingMessage(); + + //var peerBuffer = peer.getOrInitPeerBuffer(); + //peerBuffer.write(...) + //peerBuffer.flush() + peer.getResponse(pingMessage); + + //Message message = peer.getResponse(pingMessage); + // + //if (message == null || message.getType() != MessageType.PING) { + // LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}", + // peer.getPeerConnectionId(), peer, pingMessage.getId()); + // peer.disconnect("no ping received"); + // return; + //} + + //// tast is not over here. + //peer.setLastPing(NTP.getTime() - now); + } +}