Code formatting and logging changes for networking

This commit is contained in:
Istvan Szabo 2021-05-27 09:03:18 +01:00
parent b64e52c0c0
commit 265d40f04a
3 changed files with 2023 additions and 1852 deletions

View File

@ -1,14 +1,14 @@
package org.qortal.api.model; package org.qortal.api.model;
import javax.xml.bind.annotation.XmlAccessType; import io.swagger.v3.oas.annotations.media.Schema;
import javax.xml.bind.annotation.XmlAccessorType;
import org.qortal.data.network.PeerChainTipData; import org.qortal.data.network.PeerChainTipData;
import org.qortal.data.network.PeerData; import org.qortal.data.network.PeerData;
import org.qortal.network.Handshake; import org.qortal.network.Handshake;
import org.qortal.network.Peer; import org.qortal.network.Peer;
import io.swagger.v3.oas.annotations.media.Schema; import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import java.util.UUID;
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
public class ConnectedPeer { public class ConnectedPeer {
@ -17,6 +17,7 @@ public class ConnectedPeer {
INBOUND, INBOUND,
OUTBOUND; OUTBOUND;
} }
public Direction direction; public Direction direction;
public Handshake handshakeStatus; public Handshake handshakeStatus;
public Long lastPing; public Long lastPing;
@ -32,6 +33,7 @@ public class ConnectedPeer {
@Schema(example = "base58") @Schema(example = "base58")
public byte[] lastBlockSignature; public byte[] lastBlockSignature;
public Long lastBlockTimestamp; public Long lastBlockTimestamp;
public UUID connectionId;
protected ConnectedPeer() { protected ConnectedPeer() {
} }
@ -49,6 +51,7 @@ public class ConnectedPeer {
this.version = peer.getPeersVersionString(); this.version = peer.getPeersVersionString();
this.nodeId = peer.getPeersNodeId(); this.nodeId = peer.getPeersNodeId();
this.connectionId = peer.getPeerConnectionId();
PeerChainTipData peerChainTipData = peer.getChainTipData(); PeerChainTipData peerChainTipData = peer.getChainTipData();
if (peerChainTipData != null) { if (peerChainTipData != null) {

File diff suppressed because it is too large Load Diff

View File

@ -1,28 +1,8 @@
package org.qortal.network; package org.qortal.network;
import java.io.IOException; import com.google.common.hash.HashCode;
import java.net.InetAddress; import com.google.common.net.HostAndPort;
import java.net.InetSocketAddress; import com.google.common.net.InetAddresses;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller; import org.qortal.controller.Controller;
@ -31,26 +11,40 @@ import org.qortal.data.network.PeerChainTipData;
import org.qortal.data.network.PeerData; import org.qortal.data.network.PeerData;
import org.qortal.network.message.ChallengeMessage; import org.qortal.network.message.ChallengeMessage;
import org.qortal.network.message.Message; import org.qortal.network.message.Message;
import org.qortal.network.message.PingMessage;
import org.qortal.network.message.Message.MessageException; import org.qortal.network.message.Message.MessageException;
import org.qortal.network.message.Message.MessageType; import org.qortal.network.message.Message.MessageType;
import org.qortal.network.message.PingMessage;
import org.qortal.settings.Settings; import org.qortal.settings.Settings;
import org.qortal.utils.ExecuteProduceConsume; import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.NTP; import org.qortal.utils.NTP;
import com.google.common.hash.HashCode; import java.io.IOException;
import com.google.common.net.HostAndPort; import java.net.*;
import com.google.common.net.InetAddresses; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
// For managing one peer // For managing one peer
public class Peer { public class Peer {
private static final Logger LOGGER = LogManager.getLogger(Peer.class); private static final Logger LOGGER = LogManager.getLogger(Peer.class);
/** Maximum time to allow <tt>connect()</tt> to remote peer to complete. (ms) */ /**
* Maximum time to allow <tt>connect()</tt> to remote peer to complete. (ms)
*/
private static final int CONNECT_TIMEOUT = 2000; // ms private static final int CONNECT_TIMEOUT = 2000; // ms
/** Maximum time to wait for a message reply to arrive from peer. (ms) */ /**
* Maximum time to wait for a message reply to arrive from peer. (ms)
*/
private static final int RESPONSE_TIMEOUT = 3000; // ms private static final int RESPONSE_TIMEOUT = 3000; // ms
/** /**
@ -64,34 +58,47 @@ public class Peer {
private SocketChannel socketChannel = null; private SocketChannel socketChannel = null;
private InetSocketAddress resolvedAddress = null; private InetSocketAddress resolvedAddress = null;
/** True if remote address is loopback/link-local/site-local, false otherwise. */ /**
* True if remote address is loopback/link-local/site-local, false otherwise.
*/
private boolean isLocal; private boolean isLocal;
private final UUID peerConnectionId = UUID.randomUUID();
private final Object byteBufferLock = new Object(); private final Object byteBufferLock = new Object();
private ByteBuffer byteBuffer; private ByteBuffer byteBuffer;
private Map<Integer, BlockingQueue<Message>> replyQueues; private Map<Integer, BlockingQueue<Message>> replyQueues;
private LinkedBlockingQueue<Message> pendingMessages; private LinkedBlockingQueue<Message> pendingMessages;
/** True if we created connection to peer, false if we accepted incoming connection from peer. */ /**
* True if we created connection to peer, false if we accepted incoming connection from peer.
*/
private final boolean isOutbound; private final boolean isOutbound;
private final Object handshakingLock = new Object(); private final Object handshakingLock = new Object();
private Handshake handshakeStatus = Handshake.STARTED; private Handshake handshakeStatus = Handshake.STARTED;
private volatile boolean handshakeMessagePending = false; private volatile boolean handshakeMessagePending = false;
private long handshakeComplete = -1L;
/** Timestamp of when socket was accepted, or connected. */ /**
* Timestamp of when socket was accepted, or connected.
*/
private Long connectionTimestamp = null; private Long connectionTimestamp = null;
/** Last PING message round-trip time (ms). */ /**
* Last PING message round-trip time (ms).
*/
private Long lastPing = null; private Long lastPing = null;
/** When last PING message was sent, or null if pings not started yet. */ /**
* When last PING message was sent, or null if pings not started yet.
*/
private Long lastPingSent; private Long lastPingSent;
byte[] ourChallenge; byte[] ourChallenge;
// Versioning // Versioning
public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})"); public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX
+ "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
// Peer info // Peer info
@ -103,29 +110,43 @@ public class Peer {
private PeerData peerData = null; private PeerData peerData = null;
/** Peer's value of connectionTimestamp. */ /**
* Peer's value of connectionTimestamp.
*/
private Long peersConnectionTimestamp = null; private Long peersConnectionTimestamp = null;
/** Version string as reported by peer. */ /**
* Version string as reported by peer.
*/
private String peersVersionString = null; private String peersVersionString = null;
/** Numeric version of peer. */ /**
* Numeric version of peer.
*/
private Long peersVersion = null; private Long peersVersion = null;
/** Latest block info as reported by peer. */ /**
* Latest block info as reported by peer.
*/
private PeerChainTipData peersChainTipData; private PeerChainTipData peersChainTipData;
/** Our common block with this peer */ /**
* Our common block with this peer
*/
private CommonBlockData commonBlockData; private CommonBlockData commonBlockData;
// Constructors // Constructors
/** Construct unconnected, outbound Peer using socket address in peer data */ /**
* Construct unconnected, outbound Peer using socket address in peer data
*/
public Peer(PeerData peerData) { public Peer(PeerData peerData) {
this.isOutbound = true; this.isOutbound = true;
this.peerData = peerData; this.peerData = peerData;
} }
/** Construct Peer using existing, connected socket */ /**
* Construct Peer using existing, connected socket
*/
public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException { public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException {
this.isOutbound = false; this.isOutbound = false;
this.socketChannel = socketChannel; this.socketChannel = socketChannel;
@ -166,13 +187,16 @@ public class Peer {
} }
} }
/*package*/ void setHandshakeStatus(Handshake handshakeStatus) { protected void setHandshakeStatus(Handshake handshakeStatus) {
synchronized (this.handshakingLock) { synchronized (this.handshakingLock) {
this.handshakeStatus = handshakeStatus; this.handshakeStatus = handshakeStatus;
if (handshakeStatus.equals(Handshake.COMPLETED)) {
this.handshakeComplete = System.currentTimeMillis();
}
} }
} }
/*package*/ void resetHandshakeMessagePending() { protected void resetHandshakeMessagePending() {
this.handshakeMessagePending = false; this.handshakeMessagePending = false;
} }
@ -200,7 +224,7 @@ public class Peer {
} }
} }
/*package*/ void setPeersVersion(String versionString, long version) { protected void setPeersVersion(String versionString, long version) {
synchronized (this.peerInfoLock) { synchronized (this.peerInfoLock) {
this.peersVersionString = versionString; this.peersVersionString = versionString;
this.peersVersion = version; this.peersVersion = version;
@ -213,7 +237,7 @@ public class Peer {
} }
} }
/*package*/ void setPeersConnectionTimestamp(Long peersConnectionTimestamp) { protected void setPeersConnectionTimestamp(Long peersConnectionTimestamp) {
synchronized (this.peerInfoLock) { synchronized (this.peerInfoLock) {
this.peersConnectionTimestamp = peersConnectionTimestamp; this.peersConnectionTimestamp = peersConnectionTimestamp;
} }
@ -225,13 +249,13 @@ public class Peer {
} }
} }
/*package*/ void setLastPing(long lastPing) { protected void setLastPing(long lastPing) {
synchronized (this.peerInfoLock) { synchronized (this.peerInfoLock) {
this.lastPing = lastPing; this.lastPing = lastPing;
} }
} }
/*package*/ byte[] getOurChallenge() { protected byte[] getOurChallenge() {
return this.ourChallenge; return this.ourChallenge;
} }
@ -241,7 +265,7 @@ public class Peer {
} }
} }
/*package*/ void setPeersNodeId(String peersNodeId) { protected void setPeersNodeId(String peersNodeId) {
synchronized (this.peerInfoLock) { synchronized (this.peerInfoLock) {
this.peersNodeId = peersNodeId; this.peersNodeId = peersNodeId;
} }
@ -253,7 +277,7 @@ public class Peer {
} }
} }
/*package*/ void setPeersPublicKey(byte[] peerPublicKey) { protected void setPeersPublicKey(byte[] peerPublicKey) {
synchronized (this.peerInfoLock) { synchronized (this.peerInfoLock) {
this.peersPublicKey = peerPublicKey; this.peersPublicKey = peerPublicKey;
} }
@ -265,7 +289,7 @@ public class Peer {
} }
} }
/*package*/ void setPeersChallenge(byte[] peersChallenge) { protected void setPeersChallenge(byte[] peersChallenge) {
synchronized (this.peerInfoLock) { synchronized (this.peerInfoLock) {
this.peersChallenge = peersChallenge; this.peersChallenge = peersChallenge;
} }
@ -295,9 +319,10 @@ public class Peer {
} }
} }
/*package*/ void queueMessage(Message message) { protected void queueMessage(Message message) {
if (!this.pendingMessages.offer(message)) if (!this.pendingMessages.offer(message)) {
LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this)); LOGGER.info("[{}] No room to queue message from peer {} - discarding", this.peerConnectionId, this);
}
} }
@Override @Override
@ -323,7 +348,7 @@ public class Peer {
} }
public SocketChannel connect(Selector channelSelector) { public SocketChannel connect(Selector channelSelector) {
LOGGER.trace(() -> String.format("Connecting to peer %s", this)); LOGGER.trace("[{}] Connecting to peer {}", this.peerConnectionId, this);
try { try {
this.resolvedAddress = this.peerData.getAddress().toSocketAddress(); this.resolvedAddress = this.peerData.getAddress().toSocketAddress();
@ -332,22 +357,22 @@ public class Peer {
this.socketChannel = SocketChannel.open(); this.socketChannel = SocketChannel.open();
this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT); this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT);
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
LOGGER.trace(String.format("Connection timed out to peer %s", this)); LOGGER.trace("[{}] Connection timed out to peer {}", this.peerConnectionId, this);
return null; return null;
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
LOGGER.trace(String.format("Connection failed to unresolved peer %s", this)); LOGGER.trace("[{}] Connection failed to unresolved peer {}", this.peerConnectionId, this);
return null; return null;
} catch (IOException e) { } catch (IOException e) {
LOGGER.trace(String.format("Connection failed to peer %s", this)); LOGGER.trace("[{}] Connection failed to peer {}", this.peerConnectionId, this);
return null; return null;
} }
try { try {
LOGGER.debug(() -> String.format("Connected to peer %s", this)); LOGGER.debug("[{}] Connected to peer {}", this.peerConnectionId, this);
sharedSetup(channelSelector); sharedSetup(channelSelector);
return socketChannel; return socketChannel;
} catch (IOException e) { } catch (IOException e) {
LOGGER.trace(String.format("Post-connection setup failed, peer %s", this)); LOGGER.trace("[{}] Post-connection setup failed, peer {}", this.peerConnectionId, this);
try { try {
socketChannel.close(); socketChannel.close();
} catch (IOException ce) { } catch (IOException ce) {
@ -360,40 +385,42 @@ public class Peer {
/** /**
* Attempt to buffer bytes from socketChannel. * Attempt to buffer bytes from socketChannel.
* *
* @throws IOException * @throws IOException If this channel is not yet connected
*/ */
/* package */ void readChannel() throws IOException { protected void readChannel() throws IOException {
synchronized (this.byteBufferLock) { synchronized (this.byteBufferLock) {
while (true) { while (true) {
if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) {
return; return;
}
// Do we need to allocate byteBuffer? // Do we need to allocate byteBuffer?
if (this.byteBuffer == null) if (this.byteBuffer == null) {
this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize()); this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize());
}
final int priorPosition = this.byteBuffer.position(); final int priorPosition = this.byteBuffer.position();
final int bytesRead = this.socketChannel.read(this.byteBuffer); final int bytesRead = this.socketChannel.read(this.byteBuffer);
if (bytesRead == -1) { if (bytesRead == -1) {
this.disconnect("EOF"); if (priorPosition > 0) {
this.disconnect("EOF - read " + priorPosition + " bytes");
} else {
this.disconnect("EOF - failed to read any data");
}
return; return;
} }
LOGGER.trace(() -> {
if (bytesRead > 0) { if (bytesRead > 0) {
byte[] leadingBytes = new byte[Math.min(bytesRead, 8)]; byte[] leadingBytes = new byte[Math.min(bytesRead, 8)];
this.byteBuffer.asReadOnlyBuffer().position(priorPosition).get(leadingBytes); this.byteBuffer.asReadOnlyBuffer().position(priorPosition).get(leadingBytes);
String leadingHex = HashCode.fromBytes(leadingBytes).toString(); String leadingHex = HashCode.fromBytes(leadingBytes).toString();
return String.format("Received %d bytes, starting %s, into byteBuffer[%d] from peer %s", LOGGER.trace("[{}] Received {} bytes, starting {}, into byteBuffer[{}] from peer {}",
bytesRead, this.peerConnectionId, bytesRead, leadingHex, priorPosition, this);
leadingHex,
priorPosition,
this);
} else { } else {
return String.format("Received %d bytes into byteBuffer[%d] from peer %s", bytesRead, priorPosition, this); LOGGER.trace("[{}] Received {} bytes into byteBuffer[{}] from peer {}", this.peerConnectionId,
bytesRead, priorPosition, this);
} }
});
final boolean wasByteBufferFull = !this.byteBuffer.hasRemaining(); final boolean wasByteBufferFull = !this.byteBuffer.hasRemaining();
while (true) { while (true) {
@ -404,34 +431,39 @@ public class Peer {
try { try {
message = Message.fromByteBuffer(readOnlyBuffer); message = Message.fromByteBuffer(readOnlyBuffer);
} catch (MessageException e) { } catch (MessageException e) {
LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this)); LOGGER.debug("[{}] {}, from peer {}", this.peerConnectionId, e.getMessage(), this);
this.disconnect(e.getMessage()); this.disconnect(e.getMessage());
return; return;
} }
if (message == null && bytesRead == 0 && !wasByteBufferFull) { if (message == null && bytesRead == 0 && !wasByteBufferFull) {
// No complete message in buffer, no more bytes to read from socket even though there was room to read bytes // No complete message in buffer, no more bytes to read from socket
// even though there was room to read bytes
/* DISABLED /* DISABLED
// If byteBuffer is empty then we can deallocate it, to save memory, albeit costing GC // If byteBuffer is empty then we can deallocate it, to save memory, albeit costing GC
if (this.byteBuffer.remaining() == this.byteBuffer.capacity()) if (this.byteBuffer.remaining() == this.byteBuffer.capacity()) {
this.byteBuffer = null; this.byteBuffer = null;
}
*/ */
return; return;
} }
if (message == null) if (message == null) {
// No complete message in buffer, but maybe more bytes to read from socket // No complete message in buffer, but maybe more bytes to read from socket
break; break;
}
LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); LOGGER.trace("[{}] Received {} message with ID {} from peer {}", this.peerConnectionId,
message.getType().name(), message.getId(), this);
// Tidy up buffers: // Tidy up buffers:
this.byteBuffer.flip(); this.byteBuffer.flip();
// Read-only, flipped buffer's position will be after end of message, so copy that // Read-only, flipped buffer's position will be after end of message, so copy that
this.byteBuffer.position(readOnlyBuffer.position()); this.byteBuffer.position(readOnlyBuffer.position());
// Copy bytes after read message to front of buffer, adjusting position accordingly, reset limit to capacity // Copy bytes after read message to front of buffer,
// adjusting position accordingly, reset limit to capacity
this.byteBuffer.compact(); this.byteBuffer.compact();
BlockingQueue<Message> queue = this.replyQueues.get(message.getId()); BlockingQueue<Message> queue = this.replyQueues.get(message.getId());
@ -446,7 +478,8 @@ public class Peer {
// Add message to pending queue // Add message to pending queue
if (!this.pendingMessages.offer(message)) { if (!this.pendingMessages.offer(message)) {
LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this)); LOGGER.info("[{}] No room to queue message from peer {} - discarding",
this.peerConnectionId, this);
return; return;
} }
@ -458,24 +491,28 @@ public class Peer {
} }
} }
/* package */ ExecuteProduceConsume.Task getMessageTask() { protected ExecuteProduceConsume.Task getMessageTask() {
/* /*
* If we are still handshaking and there is a message yet to be processed then * If we are still handshaking and there is a message yet to be processed then
* don't produce another message task. This allows us to process handshake * don't produce another message task. This allows us to process handshake
* messages sequentially. * messages sequentially.
*/ */
if (this.handshakeMessagePending) if (this.handshakeMessagePending) {
return null; return null;
}
final Message nextMessage = this.pendingMessages.poll(); final Message nextMessage = this.pendingMessages.poll();
if (nextMessage == null) if (nextMessage == null) {
return null; return null;
}
LOGGER.trace(() -> String.format("Produced %s message task from peer %s", nextMessage.getType().name(), this)); LOGGER.trace("[{}] Produced {} message task from peer {}", this.peerConnectionId,
nextMessage.getType().name(), this);
if (this.handshakeStatus != Handshake.COMPLETED) if (this.handshakeStatus != Handshake.COMPLETED) {
this.handshakeMessagePending = true; this.handshakeMessagePending = true;
}
// Return a task to process message in queue // Return a task to process message in queue
return () -> Network.getInstance().onMessage(this, nextMessage); return () -> Network.getInstance().onMessage(this, nextMessage);
@ -484,16 +521,18 @@ public class Peer {
/** /**
* Attempt to send Message to peer. * Attempt to send Message to peer.
* *
* @param message * @param message message to be sent
* @return <code>true</code> if message successfully sent; <code>false</code> otherwise * @return <code>true</code> if message successfully sent; <code>false</code> otherwise
*/ */
public boolean sendMessage(Message message) { public boolean sendMessage(Message message) {
if (!this.socketChannel.isOpen()) if (!this.socketChannel.isOpen()) {
return false; return false;
}
try { try {
// Send message // Send message
LOGGER.trace(() -> String.format("Sending %s message with ID %d to peer %s", message.getType().name(), message.getId(), this)); LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", this.peerConnectionId,
message.getType().name(), message.getId(), this);
ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes()); ByteBuffer outputBuffer = ByteBuffer.wrap(message.toBytes());
@ -503,11 +542,8 @@ public class Peer {
while (outputBuffer.hasRemaining()) { while (outputBuffer.hasRemaining()) {
int bytesWritten = this.socketChannel.write(outputBuffer); int bytesWritten = this.socketChannel.write(outputBuffer);
LOGGER.trace(() -> String.format("Sent %d bytes of %s message with ID %d to peer %s", LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {}", this.peerConnectionId,
bytesWritten, bytesWritten, message.getType().name(), message.getId(), this);
message.getType().name(),
message.getId(),
this));
if (bytesWritten == 0) { if (bytesWritten == 0) {
// Underlying socket's internal buffer probably full, // Underlying socket's internal buffer probably full,
@ -522,21 +558,20 @@ public class Peer {
*/ */
Thread.sleep(1L); //NOSONAR squid:S2276 Thread.sleep(1L); //NOSONAR squid:S2276
if (System.currentTimeMillis() - sendStart > RESPONSE_TIMEOUT) if (System.currentTimeMillis() - sendStart > RESPONSE_TIMEOUT) {
// We've taken too long to send this message // We've taken too long to send this message
return false; return false;
} }
} }
} }
}
} catch (MessageException e) { } catch (MessageException e) {
LOGGER.warn(String.format("Failed to send %s message with ID %d to peer %s: %s", message.getType().name(), message.getId(), this, e.getMessage())); LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId,
message.getType().name(), message.getId(), this, e.getMessage());
return false; return false;
} catch (IOException e) { } catch (IOException | InterruptedException e) {
// Send failure // Send failure
return false; return false;
} catch (InterruptedException e) {
// Likely shutdown scenario - so exit
return false;
} }
// Sent OK // Sent OK
@ -546,14 +581,16 @@ public class Peer {
/** /**
* Send message to peer and await response. * Send message to peer and await response.
* <p> * <p>
* Message is assigned a random ID and sent. If a response with matching ID is received then it is returned to caller. * Message is assigned a random ID and sent.
* If a response with matching ID is received then it is returned to caller.
* <p> * <p>
* If no response with matching ID within timeout, or some other error/exception occurs, then return <code>null</code>.<br> * If no response with matching ID within timeout, or some other error/exception occurs,
* then return <code>null</code>.<br>
* (Assume peer will be rapidly disconnected after this). * (Assume peer will be rapidly disconnected after this).
* *
* @param message * @param message message to send
* @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs * @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs
* @throws InterruptedException * @throws InterruptedException if interrupted while waiting
*/ */
public Message getResponse(Message message) throws InterruptedException { public Message getResponse(Message message) throws InterruptedException {
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<>(1); BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<>(1);
@ -582,20 +619,22 @@ public class Peer {
} }
} }
/* package */ void startPings() { protected void startPings() {
// Replacing initial null value allows getPingTask() to start sending pings. // Replacing initial null value allows getPingTask() to start sending pings.
LOGGER.trace(() -> String.format("Enabling pings for peer %s", this)); LOGGER.trace("[{}] Enabling pings for peer {}", this.peerConnectionId, this);
this.lastPingSent = NTP.getTime(); this.lastPingSent = NTP.getTime();
} }
/* package */ ExecuteProduceConsume.Task getPingTask(Long now) { protected ExecuteProduceConsume.Task getPingTask(Long now) {
// Pings not enabled yet? // Pings not enabled yet?
if (now == null || this.lastPingSent == null) if (now == null || this.lastPingSent == null) {
return null; return null;
}
// Time to send another ping? // Time to send another ping?
if (now < this.lastPingSent + PING_INTERVAL) if (now < this.lastPingSent + PING_INTERVAL) {
return null; // Not yet return null; // Not yet
}
// Not strictly true, but prevents this peer from being immediately chosen again // Not strictly true, but prevents this peer from being immediately chosen again
this.lastPingSent = now; this.lastPingSent = now;
@ -605,7 +644,8 @@ public class Peer {
Message message = this.getResponse(pingMessage); Message message = this.getResponse(pingMessage);
if (message == null || message.getType() != MessageType.PING) { if (message == null || message.getType() != MessageType.PING) {
LOGGER.debug(() -> String.format("Didn't receive reply from %s for PING ID %d", this, pingMessage.getId())); LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}", this.peerConnectionId, this,
pingMessage.getId());
this.disconnect("no ping received"); this.disconnect("no ping received");
return; return;
} }
@ -615,18 +655,19 @@ public class Peer {
} }
public void disconnect(String reason) { public void disconnect(String reason) {
if (!isStopping) if (!isStopping) {
LOGGER.debug(() -> String.format("Disconnecting peer %s: %s", this, reason)); LOGGER.debug("[{}] Disconnecting peer {} after {}: {}", this.peerConnectionId, this,
getConnectionAge(), reason);
}
this.shutdown(); this.shutdown();
Network.getInstance().onDisconnect(this); Network.getInstance().onDisconnect(this);
} }
public void shutdown() { public void shutdown() {
if (!isStopping) if (!isStopping) {
LOGGER.debug(() -> String.format("Shutting down peer %s", this)); LOGGER.debug("[{}] Shutting down peer {}", this.peerConnectionId, this);
}
isStopping = true; isStopping = true;
if (this.socketChannel.isOpen()) { if (this.socketChannel.isOpen()) {
@ -634,7 +675,7 @@ public class Peer {
this.socketChannel.shutdownOutput(); this.socketChannel.shutdownOutput();
this.socketChannel.close(); this.socketChannel.close();
} catch (IOException e) { } catch (IOException e) {
LOGGER.debug(String.format("IOException while trying to close peer %s", this)); LOGGER.debug("[{}] IOException while trying to close peer {}", this.peerConnectionId, this);
} }
} }
} }
@ -643,23 +684,26 @@ public class Peer {
// Minimum version // Minimum version
public boolean isAtLeastVersion(String minVersionString) { public boolean isAtLeastVersion(String minVersionString) {
if (minVersionString == null) if (minVersionString == null) {
return false; return false;
}
// Add the version prefix // Add the version prefix
minVersionString = Controller.VERSION_PREFIX + minVersionString; minVersionString = Controller.VERSION_PREFIX + minVersionString;
Matcher matcher = VERSION_PATTERN.matcher(minVersionString); Matcher matcher = VERSION_PATTERN.matcher(minVersionString);
if (!matcher.lookingAt()) if (!matcher.lookingAt()) {
return false; return false;
}
// We're expecting 3 positive shorts, so we can convert 1.2.3 into 0x0100020003 // We're expecting 3 positive shorts, so we can convert 1.2.3 into 0x0100020003
long minVersion = 0; long minVersion = 0;
for (int g = 1; g <= 3; ++g) { for (int g = 1; g <= 3; ++g) {
long value = Long.parseLong(matcher.group(g)); long value = Long.parseLong(matcher.group(g));
if (value < 0 || value > Short.MAX_VALUE) if (value < 0 || value > Short.MAX_VALUE) {
return false; return false;
}
minVersion <<= 16; minVersion <<= 16;
minVersion |= value; minVersion |= value;
@ -677,8 +721,10 @@ public class Peer {
if (peerChainTipData != null && commonBlockData != null) { if (peerChainTipData != null && commonBlockData != null) {
PeerChainTipData commonBlockChainTipData = commonBlockData.getChainTipData(); PeerChainTipData commonBlockChainTipData = commonBlockData.getChainTipData();
if (peerChainTipData.getLastBlockSignature() != null && commonBlockChainTipData != null && commonBlockChainTipData.getLastBlockSignature() != null) { if (peerChainTipData.getLastBlockSignature() != null && commonBlockChainTipData != null
if (Arrays.equals(peerChainTipData.getLastBlockSignature(), commonBlockChainTipData.getLastBlockSignature())) { && commonBlockChainTipData.getLastBlockSignature() != null) {
if (Arrays.equals(peerChainTipData.getLastBlockSignature(),
commonBlockChainTipData.getLastBlockSignature())) {
return true; return true;
} }
} }
@ -689,10 +735,13 @@ public class Peer {
// Utility methods // Utility methods
/** Returns true if ports and addresses (or hostnames) match */ /**
* Returns true if ports and addresses (or hostnames) match
*/
public static boolean addressEquals(InetSocketAddress knownAddress, InetSocketAddress peerAddress) { public static boolean addressEquals(InetSocketAddress knownAddress, InetSocketAddress peerAddress) {
if (knownAddress.getPort() != peerAddress.getPort()) if (knownAddress.getPort() != peerAddress.getPort()) {
return false; return false;
}
return knownAddress.getHostString().equalsIgnoreCase(peerAddress.getHostString()); return knownAddress.getHostString().equalsIgnoreCase(peerAddress.getHostString());
} }
@ -703,12 +752,29 @@ public class Peer {
// HostAndPort doesn't try to validate host so we do extra checking here // HostAndPort doesn't try to validate host so we do extra checking here
InetAddress address = InetAddresses.forString(hostAndPort.getHost()); InetAddress address = InetAddresses.forString(hostAndPort.getHost());
return new InetSocketAddress(address, hostAndPort.getPortOrDefault(Settings.getInstance().getDefaultListenPort())); int defaultPort = Settings.getInstance().getDefaultListenPort();
return new InetSocketAddress(address, hostAndPort.getPortOrDefault(defaultPort));
} }
/** Returns true if address is loopback/link-local/site-local, false otherwise. */ /**
* Returns true if address is loopback/link-local/site-local, false otherwise.
*/
public static boolean isAddressLocal(InetAddress address) { public static boolean isAddressLocal(InetAddress address) {
return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress(); return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress();
} }
public UUID getPeerConnectionId() {
return peerConnectionId;
}
public long getConnectionEstablishedTime() {
return handshakeComplete;
}
public long getConnectionAge() {
if (handshakeComplete > 0L) {
return System.currentTimeMillis() - handshakeComplete;
}
return handshakeComplete;
}
} }