ping task working

This commit is contained in:
Jürg Schulthess 2025-03-28 18:16:20 +01:00
parent da20485870
commit e1e5bceb05
5 changed files with 68 additions and 41 deletions

View File

@ -219,7 +219,7 @@ public class RNSNetwork {
baseDestination.announce();
log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName());
// Start up first networking thread (the "server loop", JS: the "Tasks engine")
// Start up first networking thread (the "server loop", the "Tasks engine")
rnsNetworkEPC.start();
}
@ -299,6 +299,10 @@ public class RNSNetwork {
} catch (InterruptedException e) {
log.error("exception: ", e);
}
//var pl = p.getPeerLink();
//if (nonNull(pl) & (pl.getStatus() == ACTIVE)) {
// pl.teardown();
//}
}
// Stop processing threads (the "server loop")
try {
@ -467,33 +471,33 @@ public class RNSNetwork {
return task;
}
task = maybeProduceBroadcastTask(now);
if (task != null) {
return task;
}
//task = maybeProduceBroadcastTask(now);
//if (task != null) {
// return task;
//}
return null;
}
//private Task maybeProducePeerMessageTask() {
// return getImmutableConnectedPeers().stream()
// .map(Peer::getMessageTask)
// .filter(Objects::nonNull)
// .findFirst()
// .orElse(null);
//}
////private Task maybeProducePeerMessageTask() {
//// return getImmutableConnectedPeers().stream()
//// .map(Peer::getMessageTask)
//// .filter(Objects::nonNull)
//// .findFirst()
//// .orElse(null);
////}
////private Task maybeProducePeerMessageTask() {
//// return getImmutableIncomingPeers().stream()
//// .map(RNSPeer::getMessageTask)
//// .filter(RNSPeer::isAvailable)
//// .findFirst()
//// .orElse(null);
////}
//private Task maybeProducePeerMessageTask() {
// return getImmutableIncomingPeers().stream()
// .map(RNSPeer::getMessageTask)
// .filter(RNSPeer::isAvailable)
// .findFirst()
// .orElse(null);
//}
private Task maybeProducePeerMessageTask() {
return getImmutableIncomingPeers().stream()
.map(RNSPeer::getMessageTask)
.findFirst()
.orElse(null);
}
//private Task maybeProducePeerPingTask(Long now) {
// return getImmutableHandshakedPeers().stream()
@ -604,8 +608,8 @@ public class RNSNetwork {
//@Synchronized
public void prunePeers() throws DataException {
// run periodically (by the Controller)
//var peerList = getLinkedPeers();
var peerList = getImmutableLinkedPeers();
var peerList = getLinkedPeers();
//var peerList = getImmutableLinkedPeers();
log.info("number of links (linkedPeers) before pruning: {}", peerList.size());
Link pLink;
LinkStatus lStatus;

View File

@ -67,6 +67,7 @@ import lombok.extern.slf4j.Slf4j;
import lombok.Setter;
import lombok.Data;
import lombok.AccessLevel;
//import lombok.Synchronized;
//
//import org.qortal.network.message.Message;
//import org.qortal.network.message.MessageException;
@ -98,7 +99,7 @@ public class RNSPeer {
private Boolean deleteMe = false;
private Boolean isVacant = true;
private Long lastPacketRtt = null;
private byte[] emptyBuffer = {0,0,0};
private byte[] emptyBuffer = {0,0,0,0,0,0,0,0};
private Double requestResponseProgress;
@Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false;
@ -182,6 +183,13 @@ public class RNSPeer {
this.peerLink.setPacketCallback(this::linkPacketReceived);
}
@Override
public String toString() {
// for messages we want an address-like string representation
//return encodeHexString(this.getDestinationHash());
return this.getPeerLink().toString();
}
public BufferedRWPair getOrInitPeerBuffer() {
var channel = this.peerLink.getChannel();
if (nonNull(this.peerBuffer)) {
@ -225,8 +233,8 @@ public class RNSPeer {
if (isFalse(this.isInitiator)) {
sendCloseToRemote(this.peerLink);
}
peerLink.teardown();
}else {
this.peerLink.teardown();
} else {
log.info("shutdown - status (non-ACTIVE): {}", peerLink.getStatus());
}
this.peerLink = null;
@ -308,17 +316,19 @@ public class RNSPeer {
*/
public void peerBufferReady(Integer readyBytes) {
// get the message data
var data = this.peerBuffer.read(readyBytes);
log.info("data length, data: {}, {}", data.length, data);
byte[] data = this.peerBuffer.read(readyBytes);
ByteBuffer bb = ByteBuffer.wrap(data);
log.info("data length: {}, data: {}, ByteBuffer: {}", data.length, data, bb);
//var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length);
log.trace("peerBufferReady - data bytes: {}", data.length);
if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) {
log.info("peerBufferReady - empty buffer detected (length: {})", data.length);
//this.peerBuffer.flush();
} else {
try {
//this.peerBuffer.flush();
Message message = Message.fromByteBuffer(ByteBuffer.wrap(data));
log.info("***> creating message from {} bytes", data.length);
Message message = Message.fromByteBuffer(bb);
log.info("type {} message received ({} bytes): {}", message.getType(), data.length, message);
// Handle message based on type
switch (message.getType()) {
@ -330,13 +340,15 @@ public class RNSPeer {
// break;
case PING:
onPingMessage(this, message);
// Note: buffer flush done in onPingMessage method
if (isFalse(this.isInitiator)) {
onPingMessage(this, message);
// Note: buffer flush done in onPingMessage method
}
break;
case PONG:
//log.info("PONG received");
//break;
log.info("PONG received");
break;
// Do we need this ? (We don't have RNSPeer versions)
//case PEERS_V2:
@ -345,16 +357,19 @@ public class RNSPeer {
// break;
default:
// Bump up to controller for possible action
//Controller.getInstance().onNetworkMessage(peer, message);
Controller.getInstance().onRNSNetworkMessage(this, message);
this.peerBuffer.flush();
//if (isFalse(this.isInitiator)) {
// Bump up to controller for possible action
//Controller.getInstance().onNetworkMessage(peer, message);
Controller.getInstance().onRNSNetworkMessage(this, message);
this.peerBuffer.flush();
//}
break;
}
} catch (MessageException e) {
//log.error("{} from peer {}", e.getMessage(), this);
log.error("{} from peer {}", e, this);
}
//this.peerBuffer.flush(); // clear buffer
}
}
@ -466,17 +481,18 @@ public class RNSPeer {
/** Utility methods */
public void pingRemote() {
var link = this.peerLink;
if (nonNull(link)) {
//if (nonNull(link) & (isFalse(link.isInitiator()))) {
if (nonNull(link) & link.isInitiator()) {
if (peerLink.getStatus() == ACTIVE) {
log.info("pinging remote: {}", link);
var data = "ping".getBytes(UTF_8);
link.setPacketCallback(this::linkPacketReceived);
Packet pingPacket = new Packet(link, data);
PacketReceipt packetReceipt = pingPacket.send();
packetReceipt.setDeliveryCallback(this::packetDelivered);
// Note: don't setTimeout, we want it to timeout with FAIL if not deliverable
//packetReceipt.setTimeout(5000L);
packetReceipt.setTimeoutCallback(this::packetTimedOut);
packetReceipt.setDeliveryCallback(this::packetDelivered);
} else {
log.info("can't send ping to a peer {} with (link) status: {}",
encodeHexString(peerLink.getDestination().getHash()), peerLink.getStatus());

View File

@ -9,6 +9,8 @@ import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Network message for sending over network, or unpacked data received from network.
@ -33,6 +35,7 @@ import java.util.Arrays;
* </p>
*/
public abstract class Message {
private static final Logger LOGGER = LogManager.getLogger(Message.class);
// MAGIC(4) + TYPE(4) + HAS-ID(1) + ID?(4) + DATA-SIZE(4) + CHECKSUM?(4) + DATA?(*)
private static final int MAGIC_LENGTH = 4;
@ -95,9 +98,11 @@ public abstract class Message {
byte[] messageMagic = new byte[MAGIC_LENGTH];
readOnlyBuffer.get(messageMagic);
if (!Arrays.equals(messageMagic, Network.getInstance().getMessageMagic()))
if (!Arrays.equals(messageMagic, Network.getInstance().getMessageMagic())) {
LOGGER.info("xyz - mM: {}, Network getMessageMagic: {}", messageMagic, Network.getInstance().getMessageMagic());
// Didn't receive correct Message "magic"
throw new MessageException("Received incorrect message 'magic'");
}
// Find supporting object
int typeValue = readOnlyBuffer.getInt();

View File

@ -24,6 +24,7 @@ public class RNSMessageTask implements Task {
@Override
public void perform() throws InterruptedException {
//RNSNetwork.getInstance().onMessage(peer, nextMessage);
// TODO: what do we do in the Reticulum case? => implement
// TODO: what do we do in the Reticulum case?
// Note: this is automatically handled (asynchronously) by the RNSPeer peerBufferReady callback
}
}

View File

@ -45,9 +45,10 @@ public class RNSPingTask implements Task {
//} catch (MessageException e) {
// LOGGER.error(e.getMessage(), e);
//}
// Note: We might use peer.sendMessage(pingMessage) instead
peer.getResponse(pingMessage);
//// task is not over here.
//// task is not over here (Reticulum is asynchronous)
//peer.setLastPing(NTP.getTime() - now);
}
}