forked from Qortal/qortal
Merge pull request #39 from szisti/networking
Code formatting, connection age, and logging changes for networking
This commit is contained in:
commit
5a84016a91
24
pom.xml
24
pom.xml
@ -326,30 +326,6 @@
|
||||
<skipTests>${skipTests}</skipTests>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.jacoco</groupId>
|
||||
<artifactId>jacoco-maven-plugin</artifactId>
|
||||
<version>0.8.7</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>prepare-agent</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>generate-code-coverage-report</id>
|
||||
<phase>test</phase>
|
||||
<goals>
|
||||
<goal>report</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>org.bouncycastle.*</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
|
@ -1,14 +1,15 @@
|
||||
package org.qortal.api.model;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import org.qortal.data.network.PeerChainTipData;
|
||||
import org.qortal.data.network.PeerData;
|
||||
import org.qortal.network.Handshake;
|
||||
import org.qortal.network.Peer;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class ConnectedPeer {
|
||||
@ -17,6 +18,7 @@ public class ConnectedPeer {
|
||||
INBOUND,
|
||||
OUTBOUND;
|
||||
}
|
||||
|
||||
public Direction direction;
|
||||
public Handshake handshakeStatus;
|
||||
public Long lastPing;
|
||||
@ -32,6 +34,8 @@ public class ConnectedPeer {
|
||||
@Schema(example = "base58")
|
||||
public byte[] lastBlockSignature;
|
||||
public Long lastBlockTimestamp;
|
||||
public UUID connectionId;
|
||||
public String age;
|
||||
|
||||
protected ConnectedPeer() {
|
||||
}
|
||||
@ -49,6 +53,15 @@ public class ConnectedPeer {
|
||||
|
||||
this.version = peer.getPeersVersionString();
|
||||
this.nodeId = peer.getPeersNodeId();
|
||||
this.connectionId = peer.getPeerConnectionId();
|
||||
if (peer.getConnectionEstablishedTime() > 0) {
|
||||
long age = (System.currentTimeMillis() - peer.getConnectionEstablishedTime());
|
||||
long minutes = TimeUnit.MILLISECONDS.toMinutes(age);
|
||||
long seconds = TimeUnit.MILLISECONDS.toSeconds(age) - TimeUnit.MINUTES.toSeconds(minutes);
|
||||
this.age = String.format("%dm %ds", minutes, seconds);
|
||||
} else {
|
||||
this.age = "connecting...";
|
||||
}
|
||||
|
||||
PeerChainTipData peerChainTipData = peer.getChainTipData();
|
||||
if (peerChainTipData != null) {
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,28 +1,8 @@
|
||||
package org.qortal.network;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import com.google.common.hash.HashCode;
|
||||
import com.google.common.net.HostAndPort;
|
||||
import com.google.common.net.InetAddresses;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.controller.Controller;
|
||||
@ -31,26 +11,40 @@ import org.qortal.data.network.PeerChainTipData;
|
||||
import org.qortal.data.network.PeerData;
|
||||
import org.qortal.network.message.ChallengeMessage;
|
||||
import org.qortal.network.message.Message;
|
||||
import org.qortal.network.message.PingMessage;
|
||||
import org.qortal.network.message.Message.MessageException;
|
||||
import org.qortal.network.message.Message.MessageType;
|
||||
import org.qortal.network.message.PingMessage;
|
||||
import org.qortal.settings.Settings;
|
||||
import org.qortal.utils.ExecuteProduceConsume;
|
||||
import org.qortal.utils.NTP;
|
||||
|
||||
import com.google.common.hash.HashCode;
|
||||
import com.google.common.net.HostAndPort;
|
||||
import com.google.common.net.InetAddresses;
|
||||
import java.io.IOException;
|
||||
import java.net.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
// For managing one peer
|
||||
public class Peer {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(Peer.class);
|
||||
|
||||
/** Maximum time to allow <tt>connect()</tt> to remote peer to complete. (ms) */
|
||||
/**
|
||||
* Maximum time to allow <tt>connect()</tt> to remote peer to complete. (ms)
|
||||
*/
|
||||
private static final int CONNECT_TIMEOUT = 2000; // ms
|
||||
|
||||
/** Maximum time to wait for a message reply to arrive from peer. (ms) */
|
||||
/**
|
||||
* Maximum time to wait for a message reply to arrive from peer. (ms)
|
||||
*/
|
||||
private static final int RESPONSE_TIMEOUT = 3000; // ms
|
||||
|
||||
/**
|
||||
@ -64,34 +58,47 @@ public class Peer {
|
||||
|
||||
private SocketChannel socketChannel = null;
|
||||
private InetSocketAddress resolvedAddress = null;
|
||||
/** True if remote address is loopback/link-local/site-local, false otherwise. */
|
||||
/**
|
||||
* True if remote address is loopback/link-local/site-local, false otherwise.
|
||||
*/
|
||||
private boolean isLocal;
|
||||
|
||||
private final UUID peerConnectionId = UUID.randomUUID();
|
||||
private final Object byteBufferLock = new Object();
|
||||
private ByteBuffer byteBuffer;
|
||||
|
||||
private Map<Integer, BlockingQueue<Message>> replyQueues;
|
||||
private LinkedBlockingQueue<Message> pendingMessages;
|
||||
|
||||
/** True if we created connection to peer, false if we accepted incoming connection from peer. */
|
||||
/**
|
||||
* True if we created connection to peer, false if we accepted incoming connection from peer.
|
||||
*/
|
||||
private final boolean isOutbound;
|
||||
|
||||
private final Object handshakingLock = new Object();
|
||||
private Handshake handshakeStatus = Handshake.STARTED;
|
||||
private volatile boolean handshakeMessagePending = false;
|
||||
private long handshakeComplete = -1L;
|
||||
|
||||
/** Timestamp of when socket was accepted, or connected. */
|
||||
/**
|
||||
* Timestamp of when socket was accepted, or connected.
|
||||
*/
|
||||
private Long connectionTimestamp = null;
|
||||
|
||||
/** Last PING message round-trip time (ms). */
|
||||
/**
|
||||
* Last PING message round-trip time (ms).
|
||||
*/
|
||||
private Long lastPing = null;
|
||||
/** When last PING message was sent, or null if pings not started yet. */
|
||||
/**
|
||||
* When last PING message was sent, or null if pings not started yet.
|
||||
*/
|
||||
private Long lastPingSent;
|
||||
|
||||
byte[] ourChallenge;
|
||||
|
||||
// Versioning
|
||||
public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
|
||||
public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX
|
||||
+ "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
|
||||
|
||||
// Peer info
|
||||
|
||||
@ -103,29 +110,43 @@ public class Peer {
|
||||
|
||||
private PeerData peerData = null;
|
||||
|
||||
/** Peer's value of connectionTimestamp. */
|
||||
/**
|
||||
* Peer's value of connectionTimestamp.
|
||||
*/
|
||||
private Long peersConnectionTimestamp = null;
|
||||
|
||||
/** Version string as reported by peer. */
|
||||
/**
|
||||
* Version string as reported by peer.
|
||||
*/
|
||||
private String peersVersionString = null;
|
||||
/** Numeric version of peer. */
|
||||
/**
|
||||
* Numeric version of peer.
|
||||
*/
|
||||
private Long peersVersion = null;
|
||||
|
||||
/** Latest block info as reported by peer. */
|
||||
/**
|
||||
* Latest block info as reported by peer.
|
||||
*/
|
||||
private PeerChainTipData peersChainTipData;
|
||||
|
||||
/** Our common block with this peer */
|
||||
/**
|
||||
* Our common block with this peer
|
||||
*/
|
||||
private CommonBlockData commonBlockData;
|
||||
|
||||
// Constructors
|
||||
|
||||
/** Construct unconnected, outbound Peer using socket address in peer data */
|
||||
/**
|
||||
* Construct unconnected, outbound Peer using socket address in peer data
|
||||
*/
|
||||
public Peer(PeerData peerData) {
|
||||
this.isOutbound = true;
|
||||
this.peerData = peerData;
|
||||
}
|
||||
|
||||
/** Construct Peer using existing, connected socket */
|
||||
/**
|
||||
* Construct Peer using existing, connected socket
|
||||
*/
|
||||
public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException {
|
||||
this.isOutbound = false;
|
||||
this.socketChannel = socketChannel;
|
||||
@ -166,13 +187,16 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ void setHandshakeStatus(Handshake handshakeStatus) {
|
||||
protected void setHandshakeStatus(Handshake handshakeStatus) {
|
||||
synchronized (this.handshakingLock) {
|
||||
this.handshakeStatus = handshakeStatus;
|
||||
if (handshakeStatus.equals(Handshake.COMPLETED)) {
|
||||
this.handshakeComplete = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ void resetHandshakeMessagePending() {
|
||||
protected void resetHandshakeMessagePending() {
|
||||
this.handshakeMessagePending = false;
|
||||
}
|
||||
|
||||
@ -200,7 +224,7 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ void setPeersVersion(String versionString, long version) {
|
||||
protected void setPeersVersion(String versionString, long version) {
|
||||
synchronized (this.peerInfoLock) {
|
||||
this.peersVersionString = versionString;
|
||||
this.peersVersion = version;
|
||||
@ -213,7 +237,7 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ void setPeersConnectionTimestamp(Long peersConnectionTimestamp) {
|
||||
protected void setPeersConnectionTimestamp(Long peersConnectionTimestamp) {
|
||||
synchronized (this.peerInfoLock) {
|
||||
this.peersConnectionTimestamp = peersConnectionTimestamp;
|
||||
}
|
||||
@ -225,13 +249,13 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ void setLastPing(long lastPing) {
|
||||
protected void setLastPing(long lastPing) {
|
||||
synchronized (this.peerInfoLock) {
|
||||
this.lastPing = lastPing;
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ byte[] getOurChallenge() {
|
||||
protected byte[] getOurChallenge() {
|
||||
return this.ourChallenge;
|
||||
}
|
||||
|
||||
@ -241,7 +265,7 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ void setPeersNodeId(String peersNodeId) {
|
||||
protected void setPeersNodeId(String peersNodeId) {
|
||||
synchronized (this.peerInfoLock) {
|
||||
this.peersNodeId = peersNodeId;
|
||||
}
|
||||
@ -253,7 +277,7 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ void setPeersPublicKey(byte[] peerPublicKey) {
|
||||
protected void setPeersPublicKey(byte[] peerPublicKey) {
|
||||
synchronized (this.peerInfoLock) {
|
||||
this.peersPublicKey = peerPublicKey;
|
||||
}
|
||||
@ -265,7 +289,7 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ void setPeersChallenge(byte[] peersChallenge) {
|
||||
protected void setPeersChallenge(byte[] peersChallenge) {
|
||||
synchronized (this.peerInfoLock) {
|
||||
this.peersChallenge = peersChallenge;
|
||||
}
|
||||
@ -295,9 +319,10 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/*package*/ void queueMessage(Message message) {
|
||||
if (!this.pendingMessages.offer(message))
|
||||
LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this));
|
||||
protected void queueMessage(Message message) {
|
||||
if (!this.pendingMessages.offer(message)) {
|
||||
LOGGER.info("[{}] No room to queue message from peer {} - discarding", this.peerConnectionId, this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -323,7 +348,7 @@ public class Peer {
|
||||
}
|
||||
|
||||
public SocketChannel connect(Selector channelSelector) {
|
||||
LOGGER.trace(() -> String.format("Connecting to peer %s", this));
|
||||
LOGGER.trace("[{}] Connecting to peer {}", this.peerConnectionId, this);
|
||||
|
||||
try {
|
||||
this.resolvedAddress = this.peerData.getAddress().toSocketAddress();
|
||||
@ -332,22 +357,22 @@ public class Peer {
|
||||
this.socketChannel = SocketChannel.open();
|
||||
this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT);
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOGGER.trace(String.format("Connection timed out to peer %s", this));
|
||||
LOGGER.trace("[{}] Connection timed out to peer {}", this.peerConnectionId, this);
|
||||
return null;
|
||||
} catch (UnknownHostException e) {
|
||||
LOGGER.trace(String.format("Connection failed to unresolved peer %s", this));
|
||||
LOGGER.trace("[{}] Connection failed to unresolved peer {}", this.peerConnectionId, this);
|
||||
return null;
|
||||
} catch (IOException e) {
|
||||
LOGGER.trace(String.format("Connection failed to peer %s", this));
|
||||
LOGGER.trace("[{}] Connection failed to peer {}", this.peerConnectionId, this);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
LOGGER.debug(() -> String.format("Connected to peer %s", this));
|
||||
LOGGER.debug("[{}] Connected to peer {}", this.peerConnectionId, this);
|
||||
sharedSetup(channelSelector);
|
||||
return socketChannel;
|
||||
} catch (IOException e) {
|
||||
LOGGER.trace(String.format("Post-connection setup failed, peer %s", this));
|
||||
LOGGER.trace("[{}] Post-connection setup failed, peer {}", this.peerConnectionId, this);
|
||||
try {
|
||||
socketChannel.close();
|
||||
} catch (IOException ce) {
|
||||
@ -360,40 +385,42 @@ public class Peer {
|
||||
/**
|
||||
* Attempt to buffer bytes from socketChannel.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws IOException If this channel is not yet connected
|
||||
*/
|
||||
/* package */ void readChannel() throws IOException {
|
||||
protected void readChannel() throws IOException {
|
||||
synchronized (this.byteBufferLock) {
|
||||
while (true) {
|
||||
if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed())
|
||||
if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Do we need to allocate byteBuffer?
|
||||
if (this.byteBuffer == null)
|
||||
if (this.byteBuffer == null) {
|
||||
this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize());
|
||||
}
|
||||
|
||||
final int priorPosition = this.byteBuffer.position();
|
||||
final int bytesRead = this.socketChannel.read(this.byteBuffer);
|
||||
if (bytesRead == -1) {
|
||||
this.disconnect("EOF");
|
||||
if (priorPosition > 0) {
|
||||
this.disconnect("EOF - read " + priorPosition + " bytes");
|
||||
} else {
|
||||
this.disconnect("EOF - failed to read any data");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
LOGGER.trace(() -> {
|
||||
if (bytesRead > 0) {
|
||||
byte[] leadingBytes = new byte[Math.min(bytesRead, 8)];
|
||||
this.byteBuffer.asReadOnlyBuffer().position(priorPosition).get(leadingBytes);
|
||||
String leadingHex = HashCode.fromBytes(leadingBytes).toString();
|
||||
|
||||
return String.format("Received %d bytes, starting %s, into byteBuffer[%d] from peer %s",
|
||||
bytesRead,
|
||||
leadingHex,
|
||||
priorPosition,
|
||||
this);
|
||||
LOGGER.trace("[{}] Received {} bytes, starting {}, into byteBuffer[{}] from peer {}",
|
||||
this.peerConnectionId, bytesRead, leadingHex, priorPosition, this);
|
||||
} else {
|
||||
return String.format("Received %d bytes into byteBuffer[%d] from peer %s", bytesRead, priorPosition, this);
|
||||
LOGGER.trace("[{}] Received {} bytes into byteBuffer[{}] from peer {}", this.peerConnectionId,
|
||||
bytesRead, priorPosition, this);
|
||||
}
|
||||
});
|
||||
final boolean wasByteBufferFull = !this.byteBuffer.hasRemaining();
|
||||
|
||||
while (true) {
|
||||
@ -404,34 +431,39 @@ public class Peer {
|
||||
try {
|
||||
message = Message.fromByteBuffer(readOnlyBuffer);
|
||||
} catch (MessageException e) {
|
||||
LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this));
|
||||
LOGGER.debug("[{}] {}, from peer {}", this.peerConnectionId, e.getMessage(), this);
|
||||
this.disconnect(e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
if (message == null && bytesRead == 0 && !wasByteBufferFull) {
|
||||
// No complete message in buffer, no more bytes to read from socket even though there was room to read bytes
|
||||
// No complete message in buffer, no more bytes to read from socket
|
||||
// even though there was room to read bytes
|
||||
|
||||
/* DISABLED
|
||||
// If byteBuffer is empty then we can deallocate it, to save memory, albeit costing GC
|
||||
if (this.byteBuffer.remaining() == this.byteBuffer.capacity())
|
||||
if (this.byteBuffer.remaining() == this.byteBuffer.capacity()) {
|
||||
this.byteBuffer = null;
|
||||
}
|
||||
*/
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (message == null)
|
||||
if (message == null) {
|
||||
// No complete message in buffer, but maybe more bytes to read from socket
|
||||
break;
|
||||
}
|
||||
|
||||
LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this));
|
||||
LOGGER.trace("[{}] Received {} message with ID {} from peer {}", this.peerConnectionId,
|
||||
message.getType().name(), message.getId(), this);
|
||||
|
||||
// Tidy up buffers:
|
||||
this.byteBuffer.flip();
|
||||
// Read-only, flipped buffer's position will be after end of message, so copy that
|
||||
this.byteBuffer.position(readOnlyBuffer.position());
|
||||
// Copy bytes after read message to front of buffer, adjusting position accordingly, reset limit to capacity
|
||||
// Copy bytes after read message to front of buffer,
|
||||
// adjusting position accordingly, reset limit to capacity
|
||||
this.byteBuffer.compact();
|
||||
|
||||
BlockingQueue<Message> queue = this.replyQueues.get(message.getId());
|
||||
@ -446,7 +478,8 @@ public class Peer {
|
||||
|
||||
// Add message to pending queue
|
||||
if (!this.pendingMessages.offer(message)) {
|
||||
LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this));
|
||||
LOGGER.info("[{}] No room to queue message from peer {} - discarding",
|
||||
this.peerConnectionId, this);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -458,24 +491,28 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/* package */ ExecuteProduceConsume.Task getMessageTask() {
|
||||
protected ExecuteProduceConsume.Task getMessageTask() {
|
||||
/*
|
||||
* If we are still handshaking and there is a message yet to be processed then
|
||||
* don't produce another message task. This allows us to process handshake
|
||||
* messages sequentially.
|
||||
*/
|
||||
if (this.handshakeMessagePending)
|
||||
if (this.handshakeMessagePending) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Message nextMessage = this.pendingMessages.poll();
|
||||
|
||||
if (nextMessage == null)
|
||||
if (nextMessage == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
LOGGER.trace(() -> String.format("Produced %s message task from peer %s", nextMessage.getType().name(), this));
|
||||
LOGGER.trace("[{}] Produced {} message task from peer {}", this.peerConnectionId,
|
||||
nextMessage.getType().name(), this);
|
||||
|
||||
if (this.handshakeStatus != Handshake.COMPLETED)
|
||||
if (this.handshakeStatus != Handshake.COMPLETED) {
|
||||
this.handshakeMessagePending = true;
|
||||
}
|
||||
|
||||
// Return a task to process message in queue
|
||||
return () -> Network.getInstance().onMessage(this, nextMessage);
|
||||
@ -484,16 +521,18 @@ public class Peer {
|
||||
/**
|
||||
* Attempt to send Message to peer.
|
||||
*
|
||||
* @param message
|
||||
* @param message message to be sent
|
||||
* @return <code>true</code> if message successfully sent; <code>false</code> otherwise
|
||||
*/
|
||||
public boolean sendMessage(Message message) {
|
||||
if (!this.socketChannel.isOpen())
|
||||
if (!this.socketChannel.isOpen()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// Send message
|
||||
LOGGER.trace(() -> String.format("Sending %s message with ID %d to peer %s", message.getType().name(), message.getId(), this));
|
||||
LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", this.peerConnectionId,
|
||||
message.getType().name(), message.getId(), this);
|
||||
|
||||
ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes());
|
||||
|
||||
@ -503,11 +542,8 @@ public class Peer {
|
||||
while (outputBuffer.hasRemaining()) {
|
||||
int bytesWritten = this.socketChannel.write(outputBuffer);
|
||||
|
||||
LOGGER.trace(() -> String.format("Sent %d bytes of %s message with ID %d to peer %s",
|
||||
bytesWritten,
|
||||
message.getType().name(),
|
||||
message.getId(),
|
||||
this));
|
||||
LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {}", this.peerConnectionId,
|
||||
bytesWritten, message.getType().name(), message.getId(), this);
|
||||
|
||||
if (bytesWritten == 0) {
|
||||
// Underlying socket's internal buffer probably full,
|
||||
@ -522,21 +558,20 @@ public class Peer {
|
||||
*/
|
||||
Thread.sleep(1L); //NOSONAR squid:S2276
|
||||
|
||||
if (System.currentTimeMillis() - sendStart > RESPONSE_TIMEOUT)
|
||||
if (System.currentTimeMillis() - sendStart > RESPONSE_TIMEOUT) {
|
||||
// We've taken too long to send this message
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (MessageException e) {
|
||||
LOGGER.warn(String.format("Failed to send %s message with ID %d to peer %s: %s", message.getType().name(), message.getId(), this, e.getMessage()));
|
||||
LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId,
|
||||
message.getType().name(), message.getId(), this, e.getMessage());
|
||||
return false;
|
||||
} catch (IOException e) {
|
||||
} catch (IOException | InterruptedException e) {
|
||||
// Send failure
|
||||
return false;
|
||||
} catch (InterruptedException e) {
|
||||
// Likely shutdown scenario - so exit
|
||||
return false;
|
||||
}
|
||||
|
||||
// Sent OK
|
||||
@ -546,14 +581,16 @@ public class Peer {
|
||||
/**
|
||||
* Send message to peer and await response.
|
||||
* <p>
|
||||
* Message is assigned a random ID and sent. If a response with matching ID is received then it is returned to caller.
|
||||
* Message is assigned a random ID and sent.
|
||||
* If a response with matching ID is received then it is returned to caller.
|
||||
* <p>
|
||||
* If no response with matching ID within timeout, or some other error/exception occurs, then return <code>null</code>.<br>
|
||||
* If no response with matching ID within timeout, or some other error/exception occurs,
|
||||
* then return <code>null</code>.<br>
|
||||
* (Assume peer will be rapidly disconnected after this).
|
||||
*
|
||||
* @param message
|
||||
* @param message message to send
|
||||
* @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs
|
||||
* @throws InterruptedException
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
*/
|
||||
public Message getResponse(Message message) throws InterruptedException {
|
||||
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<>(1);
|
||||
@ -582,20 +619,22 @@ public class Peer {
|
||||
}
|
||||
}
|
||||
|
||||
/* package */ void startPings() {
|
||||
protected void startPings() {
|
||||
// Replacing initial null value allows getPingTask() to start sending pings.
|
||||
LOGGER.trace(() -> String.format("Enabling pings for peer %s", this));
|
||||
LOGGER.trace("[{}] Enabling pings for peer {}", this.peerConnectionId, this);
|
||||
this.lastPingSent = NTP.getTime();
|
||||
}
|
||||
|
||||
/* package */ ExecuteProduceConsume.Task getPingTask(Long now) {
|
||||
protected ExecuteProduceConsume.Task getPingTask(Long now) {
|
||||
// Pings not enabled yet?
|
||||
if (now == null || this.lastPingSent == null)
|
||||
if (now == null || this.lastPingSent == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Time to send another ping?
|
||||
if (now < this.lastPingSent + PING_INTERVAL)
|
||||
if (now < this.lastPingSent + PING_INTERVAL) {
|
||||
return null; // Not yet
|
||||
}
|
||||
|
||||
// Not strictly true, but prevents this peer from being immediately chosen again
|
||||
this.lastPingSent = now;
|
||||
@ -605,7 +644,8 @@ public class Peer {
|
||||
Message message = this.getResponse(pingMessage);
|
||||
|
||||
if (message == null || message.getType() != MessageType.PING) {
|
||||
LOGGER.debug(() -> String.format("Didn't receive reply from %s for PING ID %d", this, pingMessage.getId()));
|
||||
LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}", this.peerConnectionId, this,
|
||||
pingMessage.getId());
|
||||
this.disconnect("no ping received");
|
||||
return;
|
||||
}
|
||||
@ -615,18 +655,19 @@ public class Peer {
|
||||
}
|
||||
|
||||
public void disconnect(String reason) {
|
||||
if (!isStopping)
|
||||
LOGGER.debug(() -> String.format("Disconnecting peer %s: %s", this, reason));
|
||||
|
||||
if (!isStopping) {
|
||||
LOGGER.debug("[{}] Disconnecting peer {} after {}: {}", this.peerConnectionId, this,
|
||||
getConnectionAge(), reason);
|
||||
}
|
||||
this.shutdown();
|
||||
|
||||
Network.getInstance().onDisconnect(this);
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
if (!isStopping)
|
||||
LOGGER.debug(() -> String.format("Shutting down peer %s", this));
|
||||
|
||||
if (!isStopping) {
|
||||
LOGGER.debug("[{}] Shutting down peer {}", this.peerConnectionId, this);
|
||||
}
|
||||
isStopping = true;
|
||||
|
||||
if (this.socketChannel.isOpen()) {
|
||||
@ -634,7 +675,7 @@ public class Peer {
|
||||
this.socketChannel.shutdownOutput();
|
||||
this.socketChannel.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.debug(String.format("IOException while trying to close peer %s", this));
|
||||
LOGGER.debug("[{}] IOException while trying to close peer {}", this.peerConnectionId, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -643,23 +684,26 @@ public class Peer {
|
||||
// Minimum version
|
||||
|
||||
public boolean isAtLeastVersion(String minVersionString) {
|
||||
if (minVersionString == null)
|
||||
if (minVersionString == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Add the version prefix
|
||||
minVersionString = Controller.VERSION_PREFIX + minVersionString;
|
||||
|
||||
Matcher matcher = VERSION_PATTERN.matcher(minVersionString);
|
||||
if (!matcher.lookingAt())
|
||||
if (!matcher.lookingAt()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// We're expecting 3 positive shorts, so we can convert 1.2.3 into 0x0100020003
|
||||
long minVersion = 0;
|
||||
for (int g = 1; g <= 3; ++g) {
|
||||
long value = Long.parseLong(matcher.group(g));
|
||||
|
||||
if (value < 0 || value > Short.MAX_VALUE)
|
||||
if (value < 0 || value > Short.MAX_VALUE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
minVersion <<= 16;
|
||||
minVersion |= value;
|
||||
@ -677,8 +721,10 @@ public class Peer {
|
||||
|
||||
if (peerChainTipData != null && commonBlockData != null) {
|
||||
PeerChainTipData commonBlockChainTipData = commonBlockData.getChainTipData();
|
||||
if (peerChainTipData.getLastBlockSignature() != null && commonBlockChainTipData != null && commonBlockChainTipData.getLastBlockSignature() != null) {
|
||||
if (Arrays.equals(peerChainTipData.getLastBlockSignature(), commonBlockChainTipData.getLastBlockSignature())) {
|
||||
if (peerChainTipData.getLastBlockSignature() != null && commonBlockChainTipData != null
|
||||
&& commonBlockChainTipData.getLastBlockSignature() != null) {
|
||||
if (Arrays.equals(peerChainTipData.getLastBlockSignature(),
|
||||
commonBlockChainTipData.getLastBlockSignature())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -689,10 +735,13 @@ public class Peer {
|
||||
|
||||
// Utility methods
|
||||
|
||||
/** Returns true if ports and addresses (or hostnames) match */
|
||||
/**
|
||||
* Returns true if ports and addresses (or hostnames) match
|
||||
*/
|
||||
public static boolean addressEquals(InetSocketAddress knownAddress, InetSocketAddress peerAddress) {
|
||||
if (knownAddress.getPort() != peerAddress.getPort())
|
||||
if (knownAddress.getPort() != peerAddress.getPort()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return knownAddress.getHostString().equalsIgnoreCase(peerAddress.getHostString());
|
||||
}
|
||||
@ -703,12 +752,29 @@ public class Peer {
|
||||
// HostAndPort doesn't try to validate host so we do extra checking here
|
||||
InetAddress address = InetAddresses.forString(hostAndPort.getHost());
|
||||
|
||||
return new InetSocketAddress(address, hostAndPort.getPortOrDefault(Settings.getInstance().getDefaultListenPort()));
|
||||
int defaultPort = Settings.getInstance().getDefaultListenPort();
|
||||
return new InetSocketAddress(address, hostAndPort.getPortOrDefault(defaultPort));
|
||||
}
|
||||
|
||||
/** Returns true if address is loopback/link-local/site-local, false otherwise. */
|
||||
/**
|
||||
* Returns true if address is loopback/link-local/site-local, false otherwise.
|
||||
*/
|
||||
public static boolean isAddressLocal(InetAddress address) {
|
||||
return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress();
|
||||
}
|
||||
|
||||
public UUID getPeerConnectionId() {
|
||||
return peerConnectionId;
|
||||
}
|
||||
|
||||
public long getConnectionEstablishedTime() {
|
||||
return handshakeComplete;
|
||||
}
|
||||
|
||||
public long getConnectionAge() {
|
||||
if (handshakeComplete > 0L) {
|
||||
return System.currentTimeMillis() - handshakeComplete;
|
||||
}
|
||||
return handshakeComplete;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user