diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 5885fb4d..08cdffd2 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -219,7 +219,7 @@ public class RNSNetwork { baseDestination.announce(); log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); - // Start up first networking thread (the "server loop", JS: the "Tasks engine") + // Start up first networking thread (the "server loop", the "Tasks engine") rnsNetworkEPC.start(); } @@ -299,6 +299,10 @@ public class RNSNetwork { } catch (InterruptedException e) { log.error("exception: ", e); } + //var pl = p.getPeerLink(); + //if (nonNull(pl) & (pl.getStatus() == ACTIVE)) { + // pl.teardown(); + //} } // Stop processing threads (the "server loop") try { @@ -467,33 +471,33 @@ public class RNSNetwork { return task; } - task = maybeProduceBroadcastTask(now); - if (task != null) { - return task; - } + //task = maybeProduceBroadcastTask(now); + //if (task != null) { + // return task; + //} return null; } - //private Task maybeProducePeerMessageTask() { - // return getImmutableConnectedPeers().stream() - // .map(Peer::getMessageTask) - // .filter(Objects::nonNull) - // .findFirst() - // .orElse(null); - //} + ////private Task maybeProducePeerMessageTask() { + //// return getImmutableConnectedPeers().stream() + //// .map(Peer::getMessageTask) + //// .filter(Objects::nonNull) + //// .findFirst() + //// .orElse(null); + ////} + ////private Task maybeProducePeerMessageTask() { + //// return getImmutableIncomingPeers().stream() + //// .map(RNSPeer::getMessageTask) + //// .filter(RNSPeer::isAvailable) + //// .findFirst() + //// .orElse(null); + ////} //private Task maybeProducePeerMessageTask() { // return getImmutableIncomingPeers().stream() // .map(RNSPeer::getMessageTask) - // .filter(RNSPeer::isAvailable) // .findFirst() // .orElse(null); //} - private Task maybeProducePeerMessageTask() { - return getImmutableIncomingPeers().stream() - .map(RNSPeer::getMessageTask) - .findFirst() - .orElse(null); - } //private Task maybeProducePeerPingTask(Long now) { // return getImmutableHandshakedPeers().stream() @@ -604,8 +608,8 @@ public class RNSNetwork { //@Synchronized public void prunePeers() throws DataException { // run periodically (by the Controller) - //var peerList = getLinkedPeers(); - var peerList = getImmutableLinkedPeers(); + var peerList = getLinkedPeers(); + //var peerList = getImmutableLinkedPeers(); log.info("number of links (linkedPeers) before pruning: {}", peerList.size()); Link pLink; LinkStatus lStatus; diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index 40129260..c52baef7 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -67,6 +67,7 @@ import lombok.extern.slf4j.Slf4j; import lombok.Setter; import lombok.Data; import lombok.AccessLevel; +//import lombok.Synchronized; // //import org.qortal.network.message.Message; //import org.qortal.network.message.MessageException; @@ -98,7 +99,7 @@ public class RNSPeer { private Boolean deleteMe = false; private Boolean isVacant = true; private Long lastPacketRtt = null; - private byte[] emptyBuffer = {0,0,0}; + private byte[] emptyBuffer = {0,0,0,0,0,0,0,0}; private Double requestResponseProgress; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; @@ -182,6 +183,13 @@ public class RNSPeer { this.peerLink.setPacketCallback(this::linkPacketReceived); } + @Override + public String toString() { + // for messages we want an address-like string representation + //return encodeHexString(this.getDestinationHash()); + return this.getPeerLink().toString(); + } + public BufferedRWPair getOrInitPeerBuffer() { var channel = this.peerLink.getChannel(); if (nonNull(this.peerBuffer)) { @@ -225,8 +233,8 @@ public class RNSPeer { if (isFalse(this.isInitiator)) { sendCloseToRemote(this.peerLink); } - peerLink.teardown(); - }else { + this.peerLink.teardown(); + } else { log.info("shutdown - status (non-ACTIVE): {}", peerLink.getStatus()); } this.peerLink = null; @@ -308,17 +316,19 @@ public class RNSPeer { */ public void peerBufferReady(Integer readyBytes) { // get the message data - var data = this.peerBuffer.read(readyBytes); - log.info("data length, data: {}, {}", data.length, data); + byte[] data = this.peerBuffer.read(readyBytes); + ByteBuffer bb = ByteBuffer.wrap(data); + log.info("data length: {}, data: {}, ByteBuffer: {}", data.length, data, bb); //var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length); log.trace("peerBufferReady - data bytes: {}", data.length); if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { log.info("peerBufferReady - empty buffer detected (length: {})", data.length); + //this.peerBuffer.flush(); } else { try { - //this.peerBuffer.flush(); - Message message = Message.fromByteBuffer(ByteBuffer.wrap(data)); + log.info("***> creating message from {} bytes", data.length); + Message message = Message.fromByteBuffer(bb); log.info("type {} message received ({} bytes): {}", message.getType(), data.length, message); // Handle message based on type switch (message.getType()) { @@ -330,13 +340,15 @@ public class RNSPeer { // break; case PING: - onPingMessage(this, message); - // Note: buffer flush done in onPingMessage method + if (isFalse(this.isInitiator)) { + onPingMessage(this, message); + // Note: buffer flush done in onPingMessage method + } break; case PONG: - //log.info("PONG received"); - //break; + log.info("PONG received"); + break; // Do we need this ? (We don't have RNSPeer versions) //case PEERS_V2: @@ -345,16 +357,19 @@ public class RNSPeer { // break; default: - // Bump up to controller for possible action - //Controller.getInstance().onNetworkMessage(peer, message); - Controller.getInstance().onRNSNetworkMessage(this, message); - this.peerBuffer.flush(); + //if (isFalse(this.isInitiator)) { + // Bump up to controller for possible action + //Controller.getInstance().onNetworkMessage(peer, message); + Controller.getInstance().onRNSNetworkMessage(this, message); + this.peerBuffer.flush(); + //} break; } } catch (MessageException e) { //log.error("{} from peer {}", e.getMessage(), this); log.error("{} from peer {}", e, this); } + //this.peerBuffer.flush(); // clear buffer } } @@ -466,17 +481,18 @@ public class RNSPeer { /** Utility methods */ public void pingRemote() { var link = this.peerLink; - if (nonNull(link)) { + //if (nonNull(link) & (isFalse(link.isInitiator()))) { + if (nonNull(link) & link.isInitiator()) { if (peerLink.getStatus() == ACTIVE) { log.info("pinging remote: {}", link); var data = "ping".getBytes(UTF_8); link.setPacketCallback(this::linkPacketReceived); Packet pingPacket = new Packet(link, data); PacketReceipt packetReceipt = pingPacket.send(); + packetReceipt.setDeliveryCallback(this::packetDelivered); // Note: don't setTimeout, we want it to timeout with FAIL if not deliverable //packetReceipt.setTimeout(5000L); packetReceipt.setTimeoutCallback(this::packetTimedOut); - packetReceipt.setDeliveryCallback(this::packetDelivered); } else { log.info("can't send ping to a peer {} with (link) status: {}", encodeHexString(peerLink.getDestination().getHash()), peerLink.getStatus()); diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index 65262321..df797be8 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -9,6 +9,8 @@ import java.io.IOException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.Arrays; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * Network message for sending over network, or unpacked data received from network. @@ -33,6 +35,7 @@ import java.util.Arrays; *

*/ public abstract class Message { + private static final Logger LOGGER = LogManager.getLogger(Message.class); // MAGIC(4) + TYPE(4) + HAS-ID(1) + ID?(4) + DATA-SIZE(4) + CHECKSUM?(4) + DATA?(*) private static final int MAGIC_LENGTH = 4; @@ -95,9 +98,11 @@ public abstract class Message { byte[] messageMagic = new byte[MAGIC_LENGTH]; readOnlyBuffer.get(messageMagic); - if (!Arrays.equals(messageMagic, Network.getInstance().getMessageMagic())) + if (!Arrays.equals(messageMagic, Network.getInstance().getMessageMagic())) { + LOGGER.info("xyz - mM: {}, Network getMessageMagic: {}", messageMagic, Network.getInstance().getMessageMagic()); // Didn't receive correct Message "magic" throw new MessageException("Received incorrect message 'magic'"); + } // Find supporting object int typeValue = readOnlyBuffer.getInt(); diff --git a/src/main/java/org/qortal/network/task/RNSMessageTask.java b/src/main/java/org/qortal/network/task/RNSMessageTask.java index c364dc90..9b4ea56f 100644 --- a/src/main/java/org/qortal/network/task/RNSMessageTask.java +++ b/src/main/java/org/qortal/network/task/RNSMessageTask.java @@ -24,6 +24,7 @@ public class RNSMessageTask implements Task { @Override public void perform() throws InterruptedException { //RNSNetwork.getInstance().onMessage(peer, nextMessage); - // TODO: what do we do in the Reticulum case? => implement + // TODO: what do we do in the Reticulum case? + // Note: this is automatically handled (asynchronously) by the RNSPeer peerBufferReady callback } } diff --git a/src/main/java/org/qortal/network/task/RNSPingTask.java b/src/main/java/org/qortal/network/task/RNSPingTask.java index 94c29f8d..705cdd1d 100644 --- a/src/main/java/org/qortal/network/task/RNSPingTask.java +++ b/src/main/java/org/qortal/network/task/RNSPingTask.java @@ -45,9 +45,10 @@ public class RNSPingTask implements Task { //} catch (MessageException e) { // LOGGER.error(e.getMessage(), e); //} + // Note: We might use peer.sendMessage(pingMessage) instead peer.getResponse(pingMessage); - //// task is not over here. + //// task is not over here (Reticulum is asynchronous) //peer.setLastPing(NTP.getTime() - now); } }