add peer flushing

This commit is contained in:
Jürg Schulthess 2025-03-17 04:34:45 +01:00
parent 52b6b79b08
commit da20485870

View File

@ -98,6 +98,7 @@ public class RNSPeer {
private Boolean deleteMe = false; private Boolean deleteMe = false;
private Boolean isVacant = true; private Boolean isVacant = true;
private Long lastPacketRtt = null; private Long lastPacketRtt = null;
private byte[] emptyBuffer = {0,0,0};
private Double requestResponseProgress; private Double requestResponseProgress;
@Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false;
@ -312,20 +313,25 @@ public class RNSPeer {
//var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length); //var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length);
log.trace("peerBufferReady - data bytes: {}", data.length); log.trace("peerBufferReady - data bytes: {}", data.length);
if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) {
log.info("peerBufferReady - empty buffer detected (length: {})", data.length);
} else {
try { try {
//this.peerBuffer.flush();
Message message = Message.fromByteBuffer(ByteBuffer.wrap(data)); Message message = Message.fromByteBuffer(ByteBuffer.wrap(data));
log.info("received message - {}", message); log.info("type {} message received ({} bytes): {}", message.getType(), data.length, message);
log.info("type {} message received: {}", message.getType(), message); // Handle message based on type
// TODO: Now what with message?
switch (message.getType()) { switch (message.getType()) {
// Do we need this ? (seems like a TCP scenario only thing) // Do we need this ? (seems like a TCP scenario only thing)
// Does any RNSPeer ever require an other RNSPeer's peer list? // Does any RNSPeer ever require an other RNSPeer's peer list?
//case GET_PEERS: //case GET_PEERS:
// onGetPeersMessage(peer, message); // //onGetPeersMessage(peer, message);
// onGetRNSPeersMessage(peer, message);
// break; // break;
case PING: case PING:
onPingMessage(this, message); onPingMessage(this, message);
// Note: buffer flush done in onPingMessage method
break; break;
case PONG: case PONG:
@ -335,26 +341,21 @@ public class RNSPeer {
// Do we need this ? (We don't have RNSPeer versions) // Do we need this ? (We don't have RNSPeer versions)
//case PEERS_V2: //case PEERS_V2:
// onPeersV2Message(peer, message); // onPeersV2Message(peer, message);
// this.peerBuffer.flush();
// break; // break;
default: default:
// Bump up to controller for possible action // Bump up to controller for possible action
//Controller.getInstance().onNetworkMessage(peer, message); //Controller.getInstance().onNetworkMessage(peer, message);
Controller.getInstance().onRNSNetworkMessage(this, message); Controller.getInstance().onRNSNetworkMessage(this, message);
this.peerBuffer.flush();
break; break;
} }
} catch (MessageException e) { } catch (MessageException e) {
//log.error("{} from peer {}", e.getMessage(), this); //log.error("{} from peer {}", e.getMessage(), this);
log.error("{} from peer {}", e, this); log.error("{} from peer {}", e, this);
} }
//var decodedData = new String(data); }
//log.info("Received data over the buffer: {}", decodedData);
//if (isFalse(this.isInitiator)) {
// // TODO: process data and reply
//} else {
// this.peerBuffer.flush(); // clear buffer
//}
} }
/** /**