Networking performance improvements and message sending bugfix

This commit is contained in:
catbref 2020-11-02 11:15:53 +00:00
parent de2fc78ad1
commit 8f06765caf
2 changed files with 67 additions and 33 deletions

View File

@ -33,6 +33,7 @@ import org.qortal.settings.Settings;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.NTP;
import com.google.common.hash.HashCode;
import com.google.common.net.HostAndPort;
import com.google.common.net.InetAddresses;
@ -348,21 +349,37 @@ public class Peer {
if (this.byteBuffer == null)
this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize());
final int priorPosition = this.byteBuffer.position();
final int bytesRead = this.socketChannel.read(this.byteBuffer);
if (bytesRead == -1) {
this.disconnect("EOF");
return;
}
LOGGER.trace(() -> String.format("Received %d bytes from peer %s", bytesRead, this));
LOGGER.trace(() -> {
if (bytesRead > 0) {
byte[] leadingBytes = new byte[Math.min(bytesRead, 8)];
this.byteBuffer.asReadOnlyBuffer().position(priorPosition).get(leadingBytes);
String leadingHex = HashCode.fromBytes(leadingBytes).toString();
return String.format("Received %d bytes, starting %s, into byteBuffer[%d] from peer %s",
bytesRead,
leadingHex,
priorPosition,
this);
} else {
return String.format("Received %d bytes into byteBuffer[%d] from peer %s", bytesRead, priorPosition, this);
}
});
final boolean wasByteBufferFull = !this.byteBuffer.hasRemaining();
while (true) {
final Message message;
// Can we build a message from buffer now?
ByteBuffer readOnlyBuffer = this.byteBuffer.asReadOnlyBuffer().flip();
try {
message = Message.fromByteBuffer(this.byteBuffer);
message = Message.fromByteBuffer(readOnlyBuffer);
} catch (MessageException e) {
LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this));
this.disconnect(e.getMessage());
@ -387,6 +404,13 @@ public class Peer {
LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this));
// Tidy up buffers:
this.byteBuffer.flip();
// Read-only, flipped buffer's position will be after end of message, so copy that
this.byteBuffer.position(readOnlyBuffer.position());
// Copy bytes after read message to front of buffer, adjusting position accordingly, reset limit to capacity
this.byteBuffer.compact();
BlockingQueue<Message> queue = this.replyQueues.get(message.getId());
if (queue != null) {
// Adding message to queue will unblock thread waiting for response
@ -399,7 +423,7 @@ public class Peer {
// Add message to pending queue
if (!this.pendingMessages.offer(message)) {
LOGGER.info(String.format("No room to queue message from peer %s - discarding", this));
LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this));
return;
}
@ -454,10 +478,24 @@ public class Peer {
while (outputBuffer.hasRemaining()) {
int bytesWritten = this.socketChannel.write(outputBuffer);
LOGGER.trace(() -> String.format("Sent %d bytes of %s message with ID %d to peer %s",
bytesWritten,
message.getType().name(),
message.getId(),
this));
if (bytesWritten == 0)
// Underlying socket's internal buffer probably full,
// so wait a short while for bytes to actually be transmitted over the wire
this.socketChannel.wait(1L);
/*
* NOSONAR squid:S2276 - we don't want to use this.socketChannel.wait()
* as this releases the lock held by synchronized() above
* and would allow another thread to send another message,
* potentially interleaving them on-the-wire, causing checksum failures
* and connection loss.
*/
Thread.sleep(1L); //NOSONAR squid:S2276
}
}
} catch (MessageException e) {

View File

@ -160,80 +160,72 @@ public abstract class Message {
/**
* Attempt to read a message from byte buffer.
*
* @param byteBuffer
* @param readOnlyBuffer
* @return null if no complete message can be read
* @throws MessageException
*/
public static Message fromByteBuffer(ByteBuffer byteBuffer) throws MessageException {
public static Message fromByteBuffer(ByteBuffer readOnlyBuffer) throws MessageException {
try {
byteBuffer.flip();
ByteBuffer readBuffer = byteBuffer.asReadOnlyBuffer();
// Read only enough bytes to cover Message "magic" preamble
byte[] messageMagic = new byte[MAGIC_LENGTH];
readBuffer.get(messageMagic);
readOnlyBuffer.get(messageMagic);
if (!Arrays.equals(messageMagic, Network.getInstance().getMessageMagic()))
// Didn't receive correct Message "magic"
throw new MessageException("Received incorrect message 'magic'");
// Find supporting object
int typeValue = readBuffer.getInt();
int typeValue = readOnlyBuffer.getInt();
MessageType messageType = MessageType.valueOf(typeValue);
if (messageType == null)
// Unrecognised message type
throw new MessageException(String.format("Received unknown message type [%d]", typeValue));
// Optional message ID
byte hasId = readBuffer.get();
byte hasId = readOnlyBuffer.get();
int id = -1;
if (hasId != 0) {
id = readBuffer.getInt();
id = readOnlyBuffer.getInt();
if (id <= 0)
// Invalid ID
throw new MessageException("Invalid negative ID");
}
int dataSize = readBuffer.getInt();
int dataSize = readOnlyBuffer.getInt();
if (dataSize > MAX_DATA_SIZE)
// Too large
throw new MessageException(String.format("Declared data length %d larger than max allowed %d", dataSize, MAX_DATA_SIZE));
// Don't have all the data yet?
if (dataSize > 0 && dataSize + CHECKSUM_LENGTH > readOnlyBuffer.remaining())
return null;
ByteBuffer dataSlice = null;
if (dataSize > 0) {
byte[] expectedChecksum = new byte[CHECKSUM_LENGTH];
readBuffer.get(expectedChecksum);
readOnlyBuffer.get(expectedChecksum);
// Remember this position in readBuffer so we can pass to Message subclass
dataSlice = readBuffer.slice();
// Consume data from buffer
byte[] data = new byte[dataSize];
readBuffer.get(data);
// We successfully read all the data bytes, so we can set limit on dataSlice
// Slice data in readBuffer so we can pass to Message subclass
dataSlice = readOnlyBuffer.slice();
dataSlice.limit(dataSize);
// Test checksum
byte[] actualChecksum = generateChecksum(data);
byte[] actualChecksum = generateChecksum(dataSlice);
if (!Arrays.equals(expectedChecksum, actualChecksum))
throw new MessageException("Message checksum incorrect");
// Reset position after being consumed by generateChecksum
dataSlice.position(0);
// Update position in readOnlyBuffer
readOnlyBuffer.position(readOnlyBuffer.position() + dataSize);
}
Message message = messageType.fromByteBuffer(id, dataSlice);
// We successfully read a message, so bump byteBuffer's position to reflect this
byteBuffer.position(readBuffer.position());
return message;
return messageType.fromByteBuffer(id, dataSlice);
} catch (BufferUnderflowException e) {
// Not enough bytes to fully decode message...
return null;
} finally {
byteBuffer.compact();
}
}
@ -241,6 +233,10 @@ public abstract class Message {
return Arrays.copyOfRange(Crypto.digest(data), 0, CHECKSUM_LENGTH);
}
protected static byte[] generateChecksum(ByteBuffer dataBuffer) {
return Arrays.copyOfRange(Crypto.digest(dataBuffer), 0, CHECKSUM_LENGTH);
}
public byte[] toBytes() throws MessageException {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(256);