New network handshaking. NOT backwards compatible!

Old Qora v1 message types removed.
Message type values changed.

Network handshaking reworked to fix multiple-connections issue.
Instead of using some random peerID, we now use proper keypairs and a challenge-response handshake to prevent doppelgangers/ID-theft.
This results in simpler handshaking code as we don't have to perform some arcane doppelganger resolution.

Handshaking still uses proof-of-work for challenge-response, but switched to newer MemoryPoW.

API call GET /peers no longer has 'buildTimestamp' field, but does now have 'nodeId' field.

Network no longer has a whole raft of getXXXpeers() due to simplified handshaking.
Quite a few method calls changed to simply Network.getHandshakedPeers(), which is also faster.
This commit is contained in:
catbref 2020-05-22 17:16:45 +01:00
parent bd543a526b
commit 0c32afa07f
23 changed files with 622 additions and 1237 deletions

View File

@ -25,7 +25,8 @@ public class ConnectedPeer {
public String address;
public String version;
public Long buildTimestamp;
public String nodeId;
public Integer lastHeight;
@Schema(example = "base58")
@ -45,10 +46,9 @@ public class ConnectedPeer {
this.peersConnectedWhen = peer.getPeersConnectionTimestamp();
this.address = peerData.getAddress().toString();
if (peer.getVersionMessage() != null) {
this.version = peer.getVersionMessage().getVersionString();
this.buildTimestamp = peer.getVersionMessage().getBuildTimestamp();
}
this.version = peer.getPeersVersionString();
this.nodeId = peer.getPeersNodeId();
PeerChainTipData peerChainTipData = peer.getChainTipData();
if (peerChainTipData != null) {

View File

@ -10,6 +10,7 @@ public class NodeInfo {
public long uptime;
public String buildVersion;
public long buildTimestamp;
public String nodeId;
public NodeInfo() {
}

View File

@ -117,6 +117,7 @@ public class AdminResource {
nodeInfo.uptime = System.currentTimeMillis() - Controller.startTime;
nodeInfo.buildVersion = Controller.getInstance().getVersionString();
nodeInfo.buildTimestamp = Controller.getInstance().getBuildTimestamp();
nodeInfo.nodeId = Network.getInstance().getOurNodeId();
return nodeInfo;
}

View File

@ -128,7 +128,7 @@ public class BlockMinter extends Thread {
}
}
List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers();
List<Peer> peers = Network.getInstance().getHandshakedPeers();
BlockData lastBlockData = blockRepository.getLastBlock();
// Disregard peers that have "misbehaved" recently

View File

@ -60,11 +60,9 @@ import org.qortal.network.message.GetBlockMessage;
import org.qortal.network.message.GetBlockSummariesMessage;
import org.qortal.network.message.GetOnlineAccountsMessage;
import org.qortal.network.message.GetPeersMessage;
import org.qortal.network.message.GetSignaturesMessage;
import org.qortal.network.message.GetSignaturesV2Message;
import org.qortal.network.message.GetTransactionMessage;
import org.qortal.network.message.GetUnconfirmedTransactionsMessage;
import org.qortal.network.message.HeightMessage;
import org.qortal.network.message.HeightV2Message;
import org.qortal.network.message.Message;
import org.qortal.network.message.OnlineAccountsMessage;
@ -501,7 +499,7 @@ public class Controller extends Thread {
};
private void potentiallySynchronize() throws InterruptedException {
List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers();
List<Peer> peers = Network.getInstance().getHandshakedPeers();
// Disregard peers that have "misbehaved" recently
peers.removeIf(hasMisbehaved);
@ -626,7 +624,7 @@ public class Controller extends Thread {
return;
}
final int numberOfPeers = Network.getInstance().getUniqueHandshakedPeers().size();
final int numberOfPeers = Network.getInstance().getHandshakedPeers().size();
final int height = getChainHeight();
@ -782,26 +780,6 @@ public class Controller extends Thread {
public void onPeerHandshakeCompleted(Peer peer) {
// Only send if outbound
if (peer.isOutbound()) {
if (peer.getVersion() < 2) {
// Legacy mode
// Send our unconfirmed transactions
try (final Repository repository = RepositoryManager.getRepository()) {
List<TransactionData> transactions = repository.getTransactionRepository().getUnconfirmedTransactions();
for (TransactionData transactionData : transactions) {
Message transactionMessage = new TransactionMessage(transactionData);
if (!peer.sendMessage(transactionMessage)) {
peer.disconnect("failed to send unconfirmed transaction");
return;
}
}
} catch (DataException e) {
LOGGER.error("Repository issue while sending unconfirmed transactions", e);
}
} else {
// V2 protocol
// Request peer's unconfirmed transactions
Message message = new GetUnconfirmedTransactionsMessage();
if (!peer.sendMessage(message)) {
@ -809,7 +787,6 @@ public class Controller extends Thread {
return;
}
}
}
requestSysTrayUpdate = true;
}
@ -823,22 +800,10 @@ public class Controller extends Thread {
// Ordered by message type value
switch (message.getType()) {
case HEIGHT:
onNetworkHeightMessage(peer, message);
break;
case GET_SIGNATURES:
onNetworkGetSignaturesMessage(peer, message);
break;
case GET_BLOCK:
onNetworkGetBlockMessage(peer, message);
break;
case BLOCK:
onNetworkBlockMessage(peer, message);
break;
case TRANSACTION:
onNetworkTransactionMessage(peer, message);
break;
@ -884,56 +849,11 @@ public class Controller extends Thread {
break;
default:
LOGGER.debug(String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));
LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));
break;
}
}
private void onNetworkHeightMessage(Peer peer, Message message) {
HeightMessage heightMessage = (HeightMessage) message;
// Update all peers with same ID
List<Peer> connectedPeers = Network.getInstance().getHandshakedPeers();
for (Peer connectedPeer : connectedPeers) {
if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId()))
continue;
// Update peer chain tip data
PeerChainTipData newChainTipData = new PeerChainTipData(heightMessage.getHeight(), null, null, null);
connectedPeer.setChainTipData(newChainTipData);
}
// Potentially synchronize
requestSync = true;
}
private void onNetworkGetSignaturesMessage(Peer peer, Message message) {
GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message;
byte[] parentSignature = getSignaturesMessage.getParentSignature();
try (final Repository repository = RepositoryManager.getRepository()) {
List<byte[]> signatures = new ArrayList<>();
do {
BlockData blockData = repository.getBlockRepository().fromReference(parentSignature);
if (blockData == null)
break;
parentSignature = blockData.getSignature();
signatures.add(parentSignature);
} while (signatures.size() < Network.MAX_SIGNATURES_PER_REPLY);
Message signaturesMessage = new SignaturesMessage(signatures);
signaturesMessage.setId(message.getId());
if (!peer.sendMessage(signaturesMessage))
peer.disconnect("failed to send signatures");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending signatures after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
}
private void onNetworkGetBlockMessage(Peer peer, Message message) {
GetBlockMessage getBlockMessage = (GetBlockMessage) message;
byte[] signature = getBlockMessage.getSignature();
@ -957,40 +877,6 @@ public class Controller extends Thread {
}
}
private void onNetworkBlockMessage(Peer peer, Message message) {
// From a v1 peer, with no message ID, this is a broadcast of peer's latest block
// v2 peers announce new blocks using HEIGHT_V2
// Not version 1?
if (peer.getVersion() == null || peer.getVersion() > 1)
return;
// Message ID present?
// XXX Why is this test here? If BLOCK had an ID then surely it would be a response to GET_BLOCK
// and hence captured by Peer's reply queue?
if (message.hasId())
return;
BlockMessage blockMessage = (BlockMessage) message;
BlockData blockData = blockMessage.getBlockData();
// Update all peers with same ID
List<Peer> connectedPeers = Network.getInstance().getHandshakedPeers();
for (Peer connectedPeer : connectedPeers) {
// Skip connectedPeer if they have no ID or their ID doesn't match sender's ID
if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId()))
continue;
// Update peer chain tip data
PeerChainTipData newChainTipData = new PeerChainTipData(blockData.getHeight(), blockData.getSignature(), blockData.getTimestamp(), blockData.getMinterPublicKey());
connectedPeer.setChainTipData(newChainTipData);
}
// Potentially synchronize
requestSync = true;
}
private void onNetworkTransactionMessage(Peer peer, Message message) {
TransactionMessage transactionMessage = (TransactionMessage) message;
TransactionData transactionData = transactionMessage.getTransactionData();
@ -1096,18 +982,9 @@ public class Controller extends Thread {
if (!peer.isOutbound() && (peer.getChainTipData() == null || peer.getChainTipData().getLastHeight() == null))
peer.sendMessage(Network.getInstance().buildHeightMessage(peer, getChainTip()));
// Update all peers with same ID
List<Peer> connectedPeers = Network.getInstance().getHandshakedPeers();
for (Peer connectedPeer : connectedPeers) {
// Skip connectedPeer if they have no ID or their ID doesn't match sender's ID
if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId()))
continue;
// Update peer chain tip data
PeerChainTipData newChainTipData = new PeerChainTipData(heightV2Message.getHeight(), heightV2Message.getSignature(), heightV2Message.getTimestamp(), heightV2Message.getMinterPublicKey());
connectedPeer.setChainTipData(newChainTipData);
}
peer.setChainTipData(newChainTipData);
// Potentially synchronize
requestSync = true;
@ -1561,7 +1438,7 @@ public class Controller extends Thread {
getArbitraryDataMessage.setId(id);
// Broadcast request
Network.getInstance().broadcast(peer -> peer.getVersion() < 2 ? null : getArbitraryDataMessage);
Network.getInstance().broadcast(peer -> getArbitraryDataMessage);
// Poll to see if data has arrived
final long singleWait = 100;
@ -1593,7 +1470,7 @@ public class Controller extends Thread {
if (minLatestBlockTimestamp == null)
return null;
List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers();
List<Peer> peers = Network.getInstance().getHandshakedPeers();
// Filter out unsuitable peers
Iterator<Peer> iterator = peers.iterator();
@ -1639,7 +1516,7 @@ public class Controller extends Thread {
if (latestBlockData == null || latestBlockData.getTimestamp() < minLatestBlockTimestamp)
return false;
List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers();
List<Peer> peers = Network.getInstance().getHandshakedPeers();
if (peers == null)
return false;

View File

@ -22,7 +22,6 @@ import org.qortal.network.message.BlockMessage;
import org.qortal.network.message.BlockSummariesMessage;
import org.qortal.network.message.GetBlockMessage;
import org.qortal.network.message.GetBlockSummariesMessage;
import org.qortal.network.message.GetSignaturesMessage;
import org.qortal.network.message.GetSignaturesV2Message;
import org.qortal.network.message.Message;
import org.qortal.network.message.SignaturesMessage;
@ -372,12 +371,7 @@ public class Synchronizer {
return SynchronizationResult.TOO_DIVERGENT;
}
if (peer.getVersion() >= 2) {
step <<= 1;
} else {
// Old v1 peers are hard-coded to return 500 signatures so we might as well go backward by 500 too
step = 500;
}
step = Math.min(step, MAXIMUM_BLOCK_STEP);
testHeight = Math.max(testHeight - step, 1);
@ -415,8 +409,7 @@ public class Synchronizer {
}
private List<byte[]> getBlockSignatures(Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException {
// numberRequested is v2+ feature
Message getSignaturesMessage = peer.getVersion() >= 2 ? new GetSignaturesV2Message(parentSignature, numberRequested) : new GetSignaturesMessage(parentSignature);
Message getSignaturesMessage = new GetSignaturesV2Message(parentSignature, numberRequested);
Message message = peer.getResponse(getSignaturesMessage);
if (message == null || message.getType() != MessageType.SIGNATURES)

View File

@ -1,158 +1,193 @@
package org.qortal.network;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.crypto.Crypto;
import org.qortal.crypto.MemoryPoW;
import org.qortal.network.message.ChallengeMessage;
import org.qortal.network.message.HelloMessage;
import org.qortal.network.message.Message;
import org.qortal.network.message.PeerIdMessage;
import org.qortal.network.message.PeerVerifyMessage;
import org.qortal.network.message.ProofMessage;
import org.qortal.network.message.VerificationCodesMessage;
import org.qortal.network.message.VersionMessage;
import org.qortal.network.message.Message.MessageType;
import org.qortal.network.message.ResponseMessage;
import org.qortal.utils.NTP;
import com.google.common.primitives.Bytes;
public enum Handshake {
STARTED(null) {
@Override
public Handshake onMessage(Peer peer, Message message) {
return SELF_CHECK;
return HELLO;
}
@Override
public void action(Peer peer) {
}
},
SELF_CHECK(MessageType.PEER_ID) {
HELLO(MessageType.HELLO) {
@Override
public Handshake onMessage(Peer peer, Message message) {
PeerIdMessage peerIdMessage = (PeerIdMessage) message;
byte[] peerId = peerIdMessage.getPeerId();
HelloMessage helloMessage = (HelloMessage) message;
if (Arrays.equals(peerId, Network.ZERO_PEER_ID)) {
if (peer.isOutbound()) {
// Peer has indicated they already have an outbound connection to us
LOGGER.trace(String.format("Peer %s already connected to us - discarding this connection", peer));
} else {
// Not sure this should occur so log it
LOGGER.info(String.format("Inbound peer %s claims we also have outbound connection to them?", peer));
}
long peersConnectionTimestamp = helloMessage.getTimestamp();
long now = NTP.getTime();
long timestampDelta = Math.abs(peersConnectionTimestamp - now);
if (timestampDelta > MAX_TIMESTAMP_DELTA) {
LOGGER.debug(() -> String.format("Peer %s HELLO timestamp %d too divergent (± %d > %d) from ours %d",
peer, peersConnectionTimestamp, timestampDelta, MAX_TIMESTAMP_DELTA, now));
return null;
}
if (Arrays.equals(peerId, Network.getInstance().getOurPeerId())) {
// Connected to self!
String versionString = helloMessage.getVersionString();
Matcher matcher = VERSION_PATTERN.matcher(versionString);
if (!matcher.lookingAt()) {
LOGGER.debug(() -> String.format("Peer %s sent invalid HELLO version string '%s'", peer, versionString));
return null;
}
// We're expecting 3 positive shorts, so we can convert 1.2.3 into 0x0100020003
long version = 0;
for (int g = 1; g <= 3; ++g) {
long value = Long.parseLong(matcher.group(g));
if (value < 0 || value > Short.MAX_VALUE)
return null;
version <<= 16;
version |= value;
}
peer.setPeersConnectionTimestamp(peersConnectionTimestamp);
peer.setPeersVersion(versionString, version);
return CHALLENGE;
}
@Override
public void action(Peer peer) {
String versionString = Controller.getInstance().getVersionString();
long timestamp = NTP.getTime();
Message helloMessage = new HelloMessage(timestamp, versionString);
if (!peer.sendMessage(helloMessage))
peer.disconnect("failed to send HELLO");
}
},
CHALLENGE(MessageType.CHALLENGE) {
@Override
public Handshake onMessage(Peer peer, Message message) {
ChallengeMessage challengeMessage = (ChallengeMessage) message;
byte[] peersPublicKey = challengeMessage.getPublicKey();
byte[] peersChallenge = challengeMessage.getChallenge();
// If public key matches our public key then we've connected to self
byte[] ourPublicKey = Network.getInstance().getOurPublicKey();
if (Arrays.equals(ourPublicKey, peersPublicKey)) {
// If outgoing connection then record destination as self so we don't try again
if (peer.isOutbound()) {
Network.getInstance().noteToSelf(peer);
// Handshake failure - caller will deal with disconnect
// Handshake failure, caller will handle disconnect
return null;
} else {
// We still need to send our ID so our outbound connection can mark their address as 'self'
sendMyId(peer);
// We return SELF_CHECK here to prevent us from closing connection, which currently preempts
// remote end from reading any pending messages, specifically the PEER_ID message we just sent above.
// When our 'remote' outbound counterpart reads our message, they will close both connections.
// Failing that, our connection will timeout or a future handshake error will occur.
return SELF_CHECK;
challengeMessage = new ChallengeMessage(ourPublicKey, ZERO_CHALLENGE);
if (!peer.sendMessage(challengeMessage))
peer.disconnect("failed to send CHALLENGE to self");
/*
* We return CHALLENGE here to prevent us from closing connection. Closing
* connection currently preempts remote end from reading any pending messages,
* specifically the CHALLENGE message we just sent above. When our 'remote'
* outbound counterpart reads our message, they will close both connections.
* Failing that, our connection will timeout or a future handshake error will
* occur.
*/
return CHALLENGE;
}
}
// Is this ID already connected inbound or outbound?
Peer otherInboundPeer = Network.getInstance().getInboundPeerWithId(peerId);
Peer otherOutboundPeer = Network.getInstance().getOutboundHandshakedPeerWithId(peerId);
// Extra checks on inbound peers with known IDs, to prevent ID stealing
if (!peer.isOutbound() && otherInboundPeer != null) {
if (otherOutboundPeer == null) {
// We already have an inbound peer with this ID, but no outgoing peer with which to request verification
LOGGER.trace(String.format("Discarding inbound peer %s with existing ID", peer));
// Let peer know by sending special zero peer ID. This avoids peer keeping connection open until timeout.
peerIdMessage = new PeerIdMessage(Network.ZERO_PEER_ID);
peer.sendMessage(peerIdMessage);
return null;
} else {
// Use corresponding outbound peer to verify inbound
LOGGER.trace(String.format("We will be using outbound peer %s to verify inbound peer %s with same ID", otherOutboundPeer, peer));
// Discard peer's ID
// peer.setPeerId(peerId);
// Generate verification codes for later
peer.generateVerificationCodes();
}
} else if (peer.isOutbound() && otherOutboundPeer != null) {
// We already have an outbound connection to this peer?
LOGGER.info(String.format("We already have another outbound connection to peer %s - discarding", peer));
// Are we already connected to this peer?
Peer existingPeer = Network.getInstance().getHandshakedPeerWithPublicKey(peersPublicKey);
if (existingPeer != null) {
LOGGER.info(() -> String.format("We already have a connection with peer %s - discarding", peer));
// Handshake failure - caller will deal with disconnect
return null;
} else {
// Set peer's ID
peer.setPeerId(peerId);
}
return VERSION;
peer.setPeersPublicKey(peersPublicKey);
peer.setPeersChallenge(peersChallenge);
return RESPONSE;
}
@Override
public void action(Peer peer) {
sendMyId(peer);
// Send challenge
byte[] publicKey = Network.getInstance().getOurPublicKey();
byte[] challenge = peer.getOurChallenge();
Message challengeMessage = new ChallengeMessage(publicKey, challenge);
if (!peer.sendMessage(challengeMessage))
peer.disconnect("failed to send CHALLENGE");
}
},
VERSION(MessageType.VERSION) {
RESPONSE(MessageType.RESPONSE) {
@Override
public Handshake onMessage(Peer peer, Message message) {
peer.setVersionMessage((VersionMessage) message);
ResponseMessage responseMessage = (ResponseMessage) message;
// If we're both version 2 peers then next stage is proof
if (peer.getVersion() >= 2)
return PROOF;
byte[] peersPublicKey = peer.getPeersPublicKey();
byte[] ourChallenge = peer.getOurChallenge();
byte[] sharedSecret = Network.getInstance().getSharedSecret(peersPublicKey);
final byte[] expectedData = Crypto.digest(Bytes.concat(sharedSecret, ourChallenge));
byte[] data = responseMessage.getData();
if (!Arrays.equals(expectedData, data)) {
LOGGER.debug(() -> String.format("Peer %s sent incorrect RESPONSE data", peer));
return null;
}
int nonce = responseMessage.getNonce();
if (!MemoryPoW.verify2(data, POW_BUFFER_SIZE, POW_DIFFICULTY, nonce)) {
LOGGER.debug(() -> String.format("Peer %s sent incorrect RESPONSE nonce", peer));
return null;
}
peer.setPeersNodeId(Crypto.toNodeAddress(peersPublicKey));
// Fall-back for older clients (for now)
return COMPLETED;
}
@Override
public void action(Peer peer) {
sendVersion(peer);
}
},
PROOF(MessageType.PROOF) {
@Override
public Handshake onMessage(Peer peer, Message message) {
ProofMessage proofMessage = (ProofMessage) message;
// Send response
// Check peer's timestamp is within acceptable bounds
if (Math.abs(proofMessage.getTimestamp() - peer.getConnectionTimestamp()) > MAX_TIMESTAMP_DELTA) {
LOGGER.debug(String.format("Rejecting PROOF from %s as timestamp delta %d greater than max %d", peer, Math.abs(proofMessage.getTimestamp() - peer.getConnectionTimestamp()), MAX_TIMESTAMP_DELTA));
return null;
}
byte[] peersPublicKey = peer.getPeersPublicKey();
byte[] peersChallenge = peer.getPeersChallenge();
// Save peer's value for connectionTimestamp
peer.setPeersConnectionTimestamp(proofMessage.getTimestamp());
byte[] sharedSecret = Network.getInstance().getSharedSecret(peersPublicKey);
final byte[] data = Crypto.digest(Bytes.concat(sharedSecret, peersChallenge));
// If we connected outbound to peer, then this is a faked confirmation response, so we're good
if (peer.isOutbound())
return COMPLETED;
// We do this in a new thread as it can take a while...
Thread responseThread = new Thread(() -> {
Integer nonce = MemoryPoW.compute2(data, POW_BUFFER_SIZE, POW_DIFFICULTY);
// Check salt hasn't been seen before - this stops multiple peers reusing same nonce in a Sybil-like attack
if (Proof.seenSalt(proofMessage.getSalt()))
return null;
Message responseMessage = new ResponseMessage(nonce, data);
if (!peer.sendMessage(responseMessage))
peer.disconnect("failed to send RESPONSE");
});
if (!Proof.check(proofMessage.getTimestamp(), proofMessage.getSalt(), proofMessage.getNonce()))
return null;
// Proof valid
return COMPLETED;
}
@Override
public void action(Peer peer) {
sendProof(peer);
responseThread.setDaemon(true);
responseThread.start();
}
},
COMPLETED(null) {
@ -166,40 +201,6 @@ public enum Handshake {
public void action(Peer peer) {
// Note: this is only called when we've made outbound connection
}
},
PEER_VERIFY(null) {
@Override
public Handshake onMessage(Peer peer, Message message) {
// We only accept PEER_VERIFY messages
if (message.getType() != Message.MessageType.PEER_VERIFY)
return PEER_VERIFY;
// Check returned code against expected
PeerVerifyMessage peerVerifyMessage = (PeerVerifyMessage) message;
if (!Arrays.equals(peerVerifyMessage.getVerificationCode(), peer.getVerificationCodeExpected()))
return null;
// Drop other inbound peers with the same ID
for (Peer otherPeer : Network.getInstance().getConnectedPeers())
if (!otherPeer.isOutbound() && otherPeer.getPeerId() != null && Arrays.equals(otherPeer.getPeerId(), peer.getPendingPeerId()))
otherPeer.disconnect("doppelganger");
// Tidy up
peer.setVerificationCodes(null, null);
peer.setPeerId(peer.getPendingPeerId());
peer.setPendingPeerId(null);
// Completed for real this time
return COMPLETED;
}
@Override
public void action(Peer peer) {
// Send VERIFICATION_CODE to other peer (that we connected to)
// Send PEER_VERIFY to peer
sendVerificationCodes(peer);
}
};
private static final Logger LOGGER = LogManager.getLogger(Handshake.class);
@ -207,6 +208,13 @@ public enum Handshake {
/** Maximum allowed difference between peer's reported timestamp and when they connected, in milliseconds. */
private static final long MAX_TIMESTAMP_DELTA = 30 * 1000L; // ms
private static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
private static final int POW_BUFFER_SIZE = 8 * 1024 * 1024; // bytes
private static final int POW_DIFFICULTY = 12; // leading zero bits
private static final byte[] ZERO_CHALLENGE = new byte[ChallengeMessage.CHALLENGE_LENGTH];
public final MessageType expectedMessageType;
private Handshake(MessageType expectedMessageType) {
@ -217,47 +225,4 @@ public enum Handshake {
public abstract void action(Peer peer);
private static void sendVersion(Peer peer) {
long buildTimestamp = Controller.getInstance().getBuildTimestamp();
String versionString = Controller.getInstance().getVersionString();
Message versionMessage = new VersionMessage(buildTimestamp, versionString);
if (!peer.sendMessage(versionMessage))
peer.disconnect("failed to send version");
}
private static void sendMyId(Peer peer) {
Message peerIdMessage = new PeerIdMessage(Network.getInstance().getOurPeerId());
if (!peer.sendMessage(peerIdMessage))
peer.disconnect("failed to send peer ID");
}
private static void sendProof(Peer peer) {
if (peer.isOutbound()) {
// For outbound connections we need to generate real proof
new Proof(peer).start(); // Calculate & send in a new thread to free up networking processing
} else {
// For incoming connections we only need to send a fake proof message as confirmation
Message proofMessage = new ProofMessage(peer.getConnectionTimestamp(), 0, 0);
if (!peer.sendMessage(proofMessage))
peer.disconnect("failed to send proof");
}
}
private static void sendVerificationCodes(Peer peer) {
Peer otherOutboundPeer = Network.getInstance().getOutboundHandshakedPeerWithId(peer.getPendingPeerId());
// Send VERIFICATION_CODES to peer
Message verificationCodesMessage = new VerificationCodesMessage(peer.getVerificationCodeSent(), peer.getVerificationCodeExpected());
if (!otherOutboundPeer.sendMessage(verificationCodesMessage)) {
peer.disconnect("failed to send verification codes"); // give up with this peer instead
return;
}
// Send PEER_VERIFY to peer
Message peerVerifyMessage = new PeerVerifyMessage(peer.getVerificationCodeSent());
if (!peer.sendMessage(peerVerifyMessage))
peer.disconnect("failed to send verification code");
}
}

View File

@ -1,7 +1,6 @@
package org.qortal.network;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
@ -32,28 +31,26 @@ import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bouncycastle.crypto.params.Ed25519PrivateKeyParameters;
import org.bouncycastle.crypto.params.Ed25519PublicKeyParameters;
import org.qortal.block.BlockChain;
import org.qortal.controller.Controller;
import org.qortal.crypto.Crypto;
import org.qortal.data.block.BlockData;
import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.network.message.GetPeersMessage;
import org.qortal.network.message.GetUnconfirmedTransactionsMessage;
import org.qortal.network.message.HeightMessage;
import org.qortal.network.message.HeightV2Message;
import org.qortal.network.message.Message;
import org.qortal.network.message.PeerVerifyMessage;
import org.qortal.network.message.PeersMessage;
import org.qortal.network.message.PeersV2Message;
import org.qortal.network.message.PingMessage;
import org.qortal.network.message.TransactionMessage;
import org.qortal.network.message.TransactionSignaturesMessage;
import org.qortal.network.message.VerificationCodesMessage;
import org.qortal.network.message.Message.MessageType;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import org.qortal.transform.Transformer;
import org.qortal.utils.ExecuteProduceConsume;
// import org.qortal.utils.ExecutorDumper;
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
@ -99,10 +96,11 @@ public class Network {
public static final int MAX_SIGNATURES_PER_REPLY = 500;
public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500;
public static final int PEER_ID_LENGTH = 128;
public static final byte[] ZERO_PEER_ID = new byte[PEER_ID_LENGTH];
private final byte[] ourPeerId;
private final Ed25519PrivateKeyParameters edPrivateKeyParams;
private final Ed25519PublicKeyParameters edPublicKeyParams;
private final String ourNodeId;
private final int maxMessageSize;
private List<PeerData> allKnownPeers;
@ -129,10 +127,13 @@ public class Network {
connectedPeers = new ArrayList<>();
selfPeers = new ArrayList<>();
ourPeerId = new byte[PEER_ID_LENGTH];
new SecureRandom().nextBytes(ourPeerId);
// Set bit to make sure our peer ID is not 0
ourPeerId[ourPeerId.length - 1] |= 0x01;
// Generate our ID
byte[] seed = new byte[Transformer.PRIVATE_KEY_LENGTH];
new SecureRandom().nextBytes(seed);
edPrivateKeyParams = new Ed25519PrivateKeyParameters(seed, 0);
edPublicKeyParams = edPrivateKeyParams.generatePublicKey();
ourNodeId = Crypto.toNodeAddress(edPublicKeyParams.getEncoded());
maxMessageSize = 4 + 1 + 4 + BlockChain.getInstance().getMaxBlockSize();
@ -201,8 +202,12 @@ public class Network {
return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC;
}
public byte[] getOurPeerId() {
return this.ourPeerId;
public String getOurNodeId() {
return this.ourNodeId;
}
/*package*/ byte[] getOurPublicKey() {
return this.edPublicKeyParams.getEncoded();
}
/** Maximum message size (bytes). Needs to be at least maximum block size + MAGIC + message type, etc. */
@ -241,25 +246,6 @@ public class Network {
}
}
/** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */
public List<Peer> getUniqueHandshakedPeers() {
List<Peer> peers = getHandshakedPeers();
// Returns true if this peer is inbound and has corresponding outbound peer with same ID
Predicate<Peer> hasOutboundWithSameId = peer -> {
// Peer is outbound so return fast
if (peer.isOutbound())
return false;
return peers.stream().anyMatch(otherPeer -> otherPeer.isOutbound() && Arrays.equals(otherPeer.getPeerId(), peer.getPeerId()));
};
// Filter out inbound peers that have corresponding outbound peer with the same ID
peers.removeIf(hasOutboundWithSameId);
return peers;
}
/** Returns list of peers we connected to that have completed handshaking. */
public List<Peer> getOutboundHandshakedPeers() {
synchronized (this.connectedPeers) {
@ -267,21 +253,13 @@ public class Network {
}
}
/** Returns Peer with inbound connection and matching ID, or null if none found. */
public Peer getInboundPeerWithId(byte[] peerId) {
/** Returns first peer that has completed handshaking and has matching public key. */
public Peer getHandshakedPeerWithPublicKey(byte[] publicKey) {
synchronized (this.connectedPeers) {
return this.connectedPeers.stream().filter(peer -> !peer.isOutbound() && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null);
return this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED && Arrays.equals(peer.getPeersPublicKey(), publicKey)).findFirst().orElse(null);
}
}
/** Returns handshake-completed Peer with outbound connection and matching ID, or null if none found. */
public Peer getOutboundHandshakedPeerWithId(byte[] peerId) {
synchronized (this.connectedPeers) {
return this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null);
}
}
// Peer list filters
/** Must be inside <tt>synchronized (this.selfPeers) {...}</tt> */
@ -639,10 +617,22 @@ public class Network {
// Peer callbacks
/* package */ void wakeupChannelSelector() {
/*package*/ void wakeupChannelSelector() {
this.channelSelector.wakeup();
}
/*package*/ boolean verify(byte[] signature, byte[] message) {
return Crypto.verify(this.edPublicKeyParams.getEncoded(), signature, message);
}
/*package*/ byte[] sign(byte[] message) {
return Crypto.sign(this.edPrivateKeyParams, message);
}
/*package*/ byte[] getSharedSecret(byte[] publicKey) {
return Crypto.getSharedSecret(this.edPrivateKeyParams.getEncoded(), publicKey);
}
/** Called when Peer's thread has setup and is ready to process messages */
public void onPeerReady(Peer peer) {
this.onMessage(peer, null);
@ -692,17 +682,13 @@ public class Network {
onGetPeersMessage(peer, message);
break;
case PEERS:
onPeersMessage(peer, message);
break;
case PING:
onPingMessage(peer, message);
break;
case VERSION:
case PEER_ID:
case PROOF:
case HELLO:
case CHALLENGE:
case RESPONSE:
LOGGER.debug(() -> String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer));
peer.disconnect("unexpected handshaking message");
return;
@ -711,14 +697,6 @@ public class Network {
onPeersV2Message(peer, message);
break;
case PEER_VERIFY:
onPeerVerifyMessage(peer, message);
break;
case VERIFICATION_CODES:
onVerificationCodesMessage(peer, message);
break;
default:
// Bump up to controller for possible action
Controller.getInstance().onNetworkMessage(peer, message);
@ -731,12 +709,6 @@ public class Network {
// Still handshaking
LOGGER.trace(() -> String.format("Handshake status %s, message %s from peer %s", handshakeStatus.name(), (message != null ? message.getType().name() : "null"), peer));
// v1 nodes are keen on sending PINGs early. Send to back of queue so we'll process right after handshake
if (message != null && message.getType() == MessageType.PING) {
peer.queueMessage(message);
return;
}
// Check message type is as expected
if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) {
LOGGER.debug(() -> String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType));
@ -775,22 +747,6 @@ public class Network {
peer.disconnect("failed to send peers list");
}
private void onPeersMessage(Peer peer, Message message) {
PeersMessage peersMessage = (PeersMessage) message;
List<PeerAddress> peerAddresses = new ArrayList<>();
// v1 PEERS message doesn't support port numbers so we have to add default port
for (InetAddress peerAddress : peersMessage.getPeerAddresses())
// This is always IPv4 so we don't have to worry about bracketing IPv6.
peerAddresses.add(PeerAddress.fromString(peerAddress.getHostAddress()));
// Also add peer's details
peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost()));
opportunisticMergePeers(peer.toString(), peerAddresses);
}
private void onPingMessage(Peer peer, Message message) {
PingMessage pingMessage = (PingMessage) message;
@ -821,45 +777,7 @@ public class Network {
opportunisticMergePeers(peer.toString(), peerV2Addresses);
}
private void onPeerVerifyMessage(Peer peer, Message message) {
// Remote peer wants extra verification
possibleVerificationResponse(peer);
}
private void onVerificationCodesMessage(Peer peer, Message message) {
VerificationCodesMessage verificationCodesMessage = (VerificationCodesMessage) message;
// Remote peer is sending the code it wants to receive back via our outbound connection to it
Peer ourUnverifiedPeer = Network.getInstance().getInboundPeerWithId(Network.getInstance().getOurPeerId());
ourUnverifiedPeer.setVerificationCodes(verificationCodesMessage.getVerificationCodeSent(), verificationCodesMessage.getVerificationCodeExpected());
possibleVerificationResponse(ourUnverifiedPeer);
}
private void possibleVerificationResponse(Peer peer) {
// Can't respond if we don't have the codes (yet?)
if (peer.getVerificationCodeExpected() == null)
return;
PeerVerifyMessage peerVerifyMessage = new PeerVerifyMessage(peer.getVerificationCodeExpected());
if (!peer.sendMessage(peerVerifyMessage)) {
peer.disconnect("failed to send verification code");
return;
}
peer.setVerificationCodes(null, null);
peer.setHandshakeStatus(Handshake.COMPLETED);
this.onHandshakeCompleted(peer);
}
private void onHandshakeCompleted(Peer peer) {
// Do we need extra handshaking because of peer doppelgangers?
if (peer.getPendingPeerId() != null) {
peer.setHandshakeStatus(Handshake.PEER_VERIFY);
peer.getHandshakeStatus().action(peer);
return;
}
LOGGER.debug(String.format("Handshake completed with peer %s", peer));
// Make a note that we've successfully completed handshake (and when)
@ -930,7 +848,6 @@ public class Network {
};
knownPeers.removeIf(notRecentlyConnected);
if (peer.getVersion() >= 2) {
List<PeerAddress> peerAddresses = new ArrayList<>();
for (PeerData peerData : knownPeers) {
@ -938,7 +855,7 @@ public class Network {
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
if (!peer.getIsLocal() && Peer.isAddressLocal(address))
if (!peer.isLocal() && Peer.isAddressLocal(address))
continue;
peerAddresses.add(peerData.getAddress());
@ -949,67 +866,26 @@ public class Network {
// New format PEERS_V2 message that supports hostnames, IPv6 and ports
return new PeersV2Message(peerAddresses);
} else {
// Map to socket addresses
List<InetAddress> peerAddresses = new ArrayList<>();
for (PeerData peerData : knownPeers) {
try {
// We have to resolve to literal IP address to check for IPv4-ness.
// This isn't great if hostnames have both IPv6 and IPv4 DNS entries.
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
// Legacy PEERS message doesn't support IPv6
if (address instanceof Inet6Address)
continue;
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
if (!peer.getIsLocal() && !Peer.isAddressLocal(address))
continue;
peerAddresses.add(address);
} catch (UnknownHostException e) {
// Couldn't resolve hostname to IP address so discard
}
}
// Legacy PEERS message that only sends IPv4 addresses
return new PeersMessage(peerAddresses);
}
}
public Message buildHeightMessage(Peer peer, BlockData blockData) {
if (peer.getVersion() < 2) {
// Legacy height message
return new HeightMessage(blockData.getHeight());
}
// HEIGHT_V2 contains way more useful info
return new HeightV2Message(blockData.getHeight(), blockData.getSignature(), blockData.getTimestamp(), blockData.getMinterPublicKey());
}
public Message buildNewTransactionMessage(Peer peer, TransactionData transactionData) {
if (peer.getVersion() < 2) {
// Legacy TRANSACTION message
return new TransactionMessage(transactionData);
}
// In V2 we send out transaction signature only and peers can decide whether to request the full transaction
return new TransactionSignaturesMessage(Collections.singletonList(transactionData.getSignature()));
}
public Message buildGetUnconfirmedTransactionsMessage(Peer peer) {
// V2 only
if (peer.getVersion() < 2)
return null;
return new GetUnconfirmedTransactionsMessage();
}
// Peer-management calls
public void noteToSelf(Peer peer) {
LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer));
LOGGER.info(() -> String.format("No longer considering peer address %s as it connects to self", peer));
synchronized (this.selfPeers) {
this.selfPeers.add(peer.getPeerData().getAddress());
@ -1230,7 +1106,7 @@ public class Network {
}
try {
broadcastExecutor.execute(new Broadcaster(this.getUniqueHandshakedPeers(), peerMessageBuilder));
broadcastExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessageBuilder));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}

View File

@ -24,9 +24,9 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.data.network.PeerChainTipData;
import org.qortal.data.network.PeerData;
import org.qortal.network.message.ChallengeMessage;
import org.qortal.network.message.Message;
import org.qortal.network.message.PingMessage;
import org.qortal.network.message.VersionMessage;
import org.qortal.network.message.Message.MessageException;
import org.qortal.network.message.Message.MessageType;
import org.qortal.settings.Settings;
@ -42,7 +42,7 @@ public class Peer {
private static final Logger LOGGER = LogManager.getLogger(Peer.class);
/** Maximum time to allow <tt>connect()</tt> to remote peer to complete. (ms) */
private static final int CONNECT_TIMEOUT = 1000; // ms
private static final int CONNECT_TIMEOUT = 2000; // ms
/** Maximum time to wait for a message reply to arrive from peer. (ms) */
private static final int RESPONSE_TIMEOUT = 2000; // ms
@ -54,9 +54,6 @@ public class Peer {
*/
private static final int PING_INTERVAL = 20_000; // ms
/** Threshold for buildTimestamp in VERSION messages where we consider peer to be using v2 protocol. */
private static final long V2_PROTOCOL_TIMESTAMP_THRESHOLD = 1546300800L; // midnight starting 1st Jan 2019
private volatile boolean isStopping = false;
private SocketChannel socketChannel = null;
@ -65,41 +62,48 @@ public class Peer {
private boolean isLocal;
private final Object byteBufferLock = new Object();
private volatile ByteBuffer byteBuffer;
private ByteBuffer byteBuffer;
private Map<Integer, BlockingQueue<Message>> replyQueues;
private LinkedBlockingQueue<Message> pendingMessages;
/** True if we created connection to peer, false if we accepted incoming connection from peer. */
private final boolean isOutbound;
/** Numeric protocol version, typically 1 or 2. */
private volatile Integer version;
private volatile byte[] peerId;
private volatile Handshake handshakeStatus = Handshake.STARTED;
private final Object handshakingLock = new Object();
private Handshake handshakeStatus = Handshake.STARTED;
private volatile boolean handshakeMessagePending = false;
private volatile byte[] pendingPeerId;
private volatile byte[] verificationCodeSent;
private volatile byte[] verificationCodeExpected;
private volatile PeerData peerData = null;
/** Timestamp of when socket was accepted, or connected. */
private volatile Long connectionTimestamp = null;
/** Peer's value of connectionTimestamp. */
private volatile Long peersConnectionTimestamp = null;
/** Version info as reported by peer. */
private volatile VersionMessage versionMessage = null;
private Long connectionTimestamp = null;
/** Last PING message round-trip time (ms). */
private volatile Long lastPing = null;
private Long lastPing = null;
/** When last PING message was sent, or null if pings not started yet. */
private volatile Long lastPingSent;
private Long lastPingSent;
byte[] ourChallenge;
// Peer info
private final Object peerInfoLock = new Object();
private String peersNodeId;
private byte[] peersPublicKey;
private byte[] peersChallenge;
private PeerData peerData = null;
/** Peer's value of connectionTimestamp. */
private Long peersConnectionTimestamp = null;
/** Version string as reported by peer. */
private String peersVersionString = null;
/** Numeric version of peer. */
private Long peersVersion = null;
/** Latest block info as reported by peer. */
private volatile PeerChainTipData chainTipData;
private PeerChainTipData peersChainTipData;
// Constructors
@ -124,16 +128,20 @@ public class Peer {
// Getters / setters
public SocketChannel getSocketChannel() {
return this.socketChannel;
}
public boolean isStopping() {
return this.isStopping;
}
public PeerData getPeerData() {
return this.peerData;
public SocketChannel getSocketChannel() {
return this.socketChannel;
}
public InetSocketAddress getResolvedAddress() {
return this.resolvedAddress;
}
public boolean isLocal() {
return this.isLocal;
}
public boolean isOutbound() {
@ -141,103 +149,131 @@ public class Peer {
}
public Handshake getHandshakeStatus() {
synchronized (this.handshakingLock) {
return this.handshakeStatus;
}
public void setHandshakeStatus(Handshake handshakeStatus) {
this.handshakeStatus = handshakeStatus;
}
public void resetHandshakeMessagePending() {
/*package*/ void setHandshakeStatus(Handshake handshakeStatus) {
synchronized (this.handshakingLock) {
this.handshakeStatus = handshakeStatus;
}
}
/*package*/ void resetHandshakeMessagePending() {
this.handshakeMessagePending = false;
}
public VersionMessage getVersionMessage() {
return this.versionMessage;
public PeerData getPeerData() {
synchronized (this.peerInfoLock) {
return this.peerData;
}
public void setVersionMessage(VersionMessage versionMessage) {
this.versionMessage = versionMessage;
if (this.versionMessage.getBuildTimestamp() >= V2_PROTOCOL_TIMESTAMP_THRESHOLD) {
this.version = 2; // enhanced protocol
} else {
this.version = 1; // legacy protocol
}
}
public Integer getVersion() {
return this.version;
}
public Long getConnectionTimestamp() {
synchronized (this.peerInfoLock) {
return this.connectionTimestamp;
}
}
public String getPeersVersionString() {
synchronized (this.peerInfoLock) {
return this.peersVersionString;
}
}
public Long getPeersVersion() {
synchronized (this.peerInfoLock) {
return this.peersVersion;
}
}
/*package*/ void setPeersVersion(String versionString, long version) {
synchronized (this.peerInfoLock) {
this.peersVersionString = versionString;
this.peersVersion = version;
}
}
public Long getPeersConnectionTimestamp() {
synchronized (this.peerInfoLock) {
return this.peersConnectionTimestamp;
}
}
/* package */ void setPeersConnectionTimestamp(Long peersConnectionTimestamp) {
/*package*/ void setPeersConnectionTimestamp(Long peersConnectionTimestamp) {
synchronized (this.peerInfoLock) {
this.peersConnectionTimestamp = peersConnectionTimestamp;
}
}
public Long getLastPing() {
synchronized (this.peerInfoLock) {
return this.lastPing;
}
}
public void setLastPing(long lastPing) {
/*package*/ void setLastPing(long lastPing) {
synchronized (this.peerInfoLock) {
this.lastPing = lastPing;
}
public InetSocketAddress getResolvedAddress() {
return this.resolvedAddress;
}
public boolean getIsLocal() {
return this.isLocal;
/*package*/ byte[] getOurChallenge() {
return this.ourChallenge;
}
public byte[] getPeerId() {
return this.peerId;
public String getPeersNodeId() {
synchronized (this.peerInfoLock) {
return this.peersNodeId;
}
}
public void setPeerId(byte[] peerId) {
this.peerId = peerId;
/*package*/ void setPeersNodeId(String peersNodeId) {
synchronized (this.peerInfoLock) {
this.peersNodeId = peersNodeId;
}
}
public byte[] getPendingPeerId() {
return this.pendingPeerId;
public byte[] getPeersPublicKey() {
synchronized (this.peerInfoLock) {
return this.peersPublicKey;
}
}
public void setPendingPeerId(byte[] peerId) {
this.pendingPeerId = peerId;
/*package*/ void setPeersPublicKey(byte[] peerPublicKey) {
synchronized (this.peerInfoLock) {
this.peersPublicKey = peerPublicKey;
}
}
public byte[] getVerificationCodeSent() {
return this.verificationCodeSent;
public byte[] getPeersChallenge() {
synchronized (this.peerInfoLock) {
return this.peersChallenge;
}
}
public byte[] getVerificationCodeExpected() {
return this.verificationCodeExpected;
/*package*/ void setPeersChallenge(byte[] peersChallenge) {
synchronized (this.peerInfoLock) {
this.peersChallenge = peersChallenge;
}
public void setVerificationCodes(byte[] sent, byte[] expected) {
this.verificationCodeSent = sent;
this.verificationCodeExpected = expected;
}
public PeerChainTipData getChainTipData() {
return this.chainTipData;
synchronized (this.peerInfoLock) {
return this.peersChainTipData;
}
}
public void setChainTipData(PeerChainTipData chainTipData) {
this.chainTipData = chainTipData;
synchronized (this.peerInfoLock) {
this.peersChainTipData = chainTipData;
}
}
/* package */ void queueMessage(Message message) {
/*package*/ void queueMessage(Message message) {
if (!this.pendingMessages.offer(message))
LOGGER.info(String.format("No room to queue message from peer %s - discarding", this));
LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this));
}
@Override
@ -248,14 +284,6 @@ public class Peer {
// Processing
public void generateVerificationCodes() {
verificationCodeSent = new byte[Network.PEER_ID_LENGTH];
new SecureRandom().nextBytes(verificationCodeSent);
verificationCodeExpected = new byte[Network.PEER_ID_LENGTH];
new SecureRandom().nextBytes(verificationCodeExpected);
}
private void sharedSetup(Selector channelSelector) throws IOException {
this.connectionTimestamp = NTP.getTime();
this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
@ -264,6 +292,10 @@ public class Peer {
this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC!
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
this.pendingMessages = new LinkedBlockingQueue<>();
Random random = new SecureRandom();
this.ourChallenge = new byte[ChallengeMessage.CHALLENGE_LENGTH];
random.nextBytes(this.ourChallenge);
}
public SocketChannel connect(Selector channelSelector) {
@ -378,9 +410,11 @@ public class Peer {
}
/* package */ ExecuteProduceConsume.Task getMessageTask() {
// If we are still handshaking and there is a message yet to be processed
// then don't produce another message task.
// This allows us to process handshake messages sequentially.
/*
* If we are still handshaking and there is a message yet to be processed then
* don't produce another message task. This allows us to process handshake
* messages sequentially.
*/
if (this.handshakeMessagePending)
return null;
@ -454,9 +488,10 @@ public class Peer {
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<>(1);
// Assign random ID to this message
Random random = new Random();
int id;
do {
id = new Random().nextInt(Integer.MAX_VALUE - 1) + 1;
id = random.nextInt(Integer.MAX_VALUE - 1) + 1;
// Put queue into map (keyed by message ID) so we can poll for a response
// If putIfAbsent() doesn't return null, then this ID is already taken

View File

@ -1,126 +0,0 @@
package org.qortal.network;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.HashSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.network.message.ProofMessage;
import com.google.common.primitives.Longs;
public class Proof extends Thread {
private static final int MIN_PROOF_ZEROS = 2;
private static final HashSet<Long> seenSalts = new HashSet<>();
private static final Logger LOGGER = LogManager.getLogger(Proof.class);
private Peer peer;
public Proof(Peer peer) {
this.peer = peer;
setDaemon(true);
}
public static boolean seenSalt(long salt) {
synchronized (seenSalts) {
return seenSalts.contains(salt);
}
}
public static void addSalt(long salt) {
synchronized (seenSalts) {
seenSalts.add(salt);
}
}
@Override
public void run() {
setName("Proof for peer " + this.peer);
// Do proof-of-work calculation to gain acceptance with remote end
final long startTime = LOGGER.isTraceEnabled() ? System.currentTimeMillis() : 0;
// Remote end knows this (approximately)
long timestamp = this.peer.getConnectionTimestamp();
// Needs to be unique on the remote end
long salt = new SecureRandom().nextLong();
byte[] message = new byte[8 + 8 + 8]; // nonce + salt + timestamp
byte[] saltBytes = Longs.toByteArray(salt);
System.arraycopy(saltBytes, 0, message, 8, saltBytes.length);
byte[] timestampBytes = Longs.toByteArray(timestamp);
System.arraycopy(timestampBytes, 0, message, 8 + 8, timestampBytes.length);
MessageDigest sha256;
try {
sha256 = MessageDigest.getInstance("SHA256");
} catch (NoSuchAlgorithmException e) {
// Can't progress
throw new RuntimeException("Message digest SHA256 not available");
}
long nonce;
for (nonce = 0; nonce < Long.MAX_VALUE; ++nonce) {
// Check whether we're shutting down every so often
if ((nonce & 0xff) == 0 && (this.peer.isStopping() || Thread.currentThread().isInterrupted()))
// throw new InterruptedException("Interrupted during peer proof calculation");
return;
byte[] nonceBytes = Longs.toByteArray(nonce);
System.arraycopy(nonceBytes, 0, message, 0, nonceBytes.length);
byte[] digest = sha256.digest(message);
if (check(digest))
break;
sha256.reset();
}
LOGGER.trace(() -> String.format("Proof for peer %s took %dms", this.peer, System.currentTimeMillis() - startTime));
ProofMessage proofMessage = new ProofMessage(timestamp, salt, nonce);
peer.sendMessage(proofMessage);
}
private static boolean check(byte[] digest) {
int idx;
for (idx = 0; idx < MIN_PROOF_ZEROS; ++idx)
if (digest[idx] != 0)
break;
return idx == MIN_PROOF_ZEROS;
}
public static boolean check(long timestamp, long salt, long nonce) {
byte[] message = new byte[8 + 8 + 8];
byte[] saltBytes = Longs.toByteArray(salt);
System.arraycopy(saltBytes, 0, message, 8, saltBytes.length);
byte[] timestampBytes = Longs.toByteArray(timestamp);
System.arraycopy(timestampBytes, 0, message, 8 + 8, timestampBytes.length);
byte[] nonceBytes = Longs.toByteArray(nonce);
System.arraycopy(nonceBytes, 0, message, 0, nonceBytes.length);
MessageDigest sha256;
try {
sha256 = MessageDigest.getInstance("SHA256");
} catch (NoSuchAlgorithmException e) {
// Can't progress
throw new RuntimeException("Message digest SHA256 not available");
}
byte[] digest = sha256.digest(message);
return check(digest);
}
}

View File

@ -0,0 +1,56 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.qortal.transform.Transformer;
public class ChallengeMessage extends Message {
public static final int CHALLENGE_LENGTH = 32;
private final byte[] publicKey;
private final byte[] challenge;
private ChallengeMessage(int id, byte[] publicKey, byte[] challenge) {
super(id, MessageType.CHALLENGE);
this.publicKey = publicKey;
this.challenge = challenge;
}
public ChallengeMessage(byte[] publicKey, byte[] challenge) {
this(-1, publicKey, challenge);
}
public byte[] getPublicKey() {
return this.publicKey;
}
public byte[] getChallenge() {
return this.challenge;
}
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) {
byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH];
byteBuffer.get(publicKey);
byte[] challenge = new byte[CHALLENGE_LENGTH];
byteBuffer.get(challenge);
return new ChallengeMessage(id, publicKey, challenge);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.publicKey);
bytes.write(this.challenge);
return bytes.toByteArray();
}
}

View File

@ -1,54 +0,0 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.qortal.transform.block.BlockTransformer;
public class GetSignaturesMessage extends Message {
private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH;
private byte[] parentSignature;
public GetSignaturesMessage(byte[] parentSignature) {
this(-1, parentSignature);
}
private GetSignaturesMessage(int id, byte[] parentSignature) {
super(id, MessageType.GET_SIGNATURES);
this.parentSignature = parentSignature;
}
public byte[] getParentSignature() {
return this.parentSignature;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH)
return null;
byte[] parentSignature = new byte[BLOCK_SIGNATURE_LENGTH];
bytes.get(parentSignature);
return new GetSignaturesMessage(id, parentSignature);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.parentSignature);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,65 @@
package org.qortal.network.message;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import com.google.common.primitives.Ints;
public class GoodbyeMessage extends Message {
public enum Reason {
NO_HELLO(1),
BAD_HELLO(2),
BAD_HELLO_VERSION(3),
BAD_HELLO_TIMESTAMP(4);
public final int value;
private static final Map<Integer, Reason> map = stream(Reason.values())
.collect(toMap(reason -> reason.value, reason -> reason));
private Reason(int value) {
this.value = value;
}
public static Reason valueOf(int value) {
return map.get(value);
}
}
private final Reason reason;
private GoodbyeMessage(int id, Reason reason) {
super(id, MessageType.GOODBYE);
this.reason = reason;
}
public GoodbyeMessage(Reason reason) {
this(-1, reason);
}
public Reason getReason() {
return this.reason;
}
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) {
int reasonValue = byteBuffer.getInt();
Reason reason = Reason.valueOf(reasonValue);
if (reason == null)
return null;
return new GoodbyeMessage(id, reason);
}
@Override
protected byte[] toData() throws IOException {
return Ints.toByteArray(this.reason.value);
}
}

View File

@ -1,47 +0,0 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import com.google.common.primitives.Ints;
public class HeightMessage extends Message {
private int height;
public HeightMessage(int height) {
this(-1, height);
}
private HeightMessage(int id, int height) {
super(id, MessageType.HEIGHT);
this.height = height;
}
public int getHeight() {
return this.height;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int height = bytes.getInt();
return new HeightMessage(id, height);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.height));
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,55 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.qortal.transform.TransformationException;
import org.qortal.utils.Serialization;
import com.google.common.primitives.Longs;
public class HelloMessage extends Message {
private final long timestamp;
private final String versionString;
private HelloMessage(int id, long timestamp, String versionString) {
super(id, MessageType.HELLO);
this.timestamp = timestamp;
this.versionString = versionString;
}
public HelloMessage(long timestamp, String versionString) {
this(-1, timestamp, versionString);
}
public long getTimestamp() {
return this.timestamp;
}
public String getVersionString() {
return this.versionString;
}
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws TransformationException {
long timestamp = byteBuffer.getLong();
String versionString = Serialization.deserializeSizedString(byteBuffer, 255);
return new HelloMessage(id, timestamp, versionString);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Longs.toByteArray(this.timestamp));
Serialization.serializeSizedString(bytes, this.versionString);
return bytes.toByteArray();
}
}

View File

@ -4,6 +4,7 @@ import java.util.Map;
import org.qortal.crypto.Crypto;
import org.qortal.network.Network;
import org.qortal.transform.TransformationException;
import com.google.common.primitives.Ints;
@ -45,32 +46,41 @@ public abstract class Message {
}
public enum MessageType {
GET_PEERS(1),
PEERS(2),
HEIGHT(3),
GET_SIGNATURES(4),
SIGNATURES(5),
GET_BLOCK(6),
BLOCK(7),
TRANSACTION(8),
PING(9),
VERSION(10),
PEER_ID(11),
PROOF(12),
PEERS_V2(13),
GET_BLOCK_SUMMARIES(14),
BLOCK_SUMMARIES(15),
GET_SIGNATURES_V2(16),
PEER_VERIFY(17),
VERIFICATION_CODES(18),
HEIGHT_V2(19),
GET_TRANSACTION(20),
GET_UNCONFIRMED_TRANSACTIONS(21),
TRANSACTION_SIGNATURES(22),
GET_ARBITRARY_DATA(23),
ARBITRARY_DATA(24),
GET_ONLINE_ACCOUNTS(25),
ONLINE_ACCOUNTS(26);
// Handshaking
HELLO(0),
GOODBYE(1),
CHALLENGE(2),
RESPONSE(3),
// Status / notifications
HEIGHT_V2(10),
PING(11),
PONG(12),
// Requesting data
PEERS_V2(20),
GET_PEERS(21),
TRANSACTION(30),
GET_TRANSACTION(31),
TRANSACTION_SIGNATURES(40),
GET_UNCONFIRMED_TRANSACTIONS(41),
BLOCK(50),
GET_BLOCK(51),
SIGNATURES(60),
GET_SIGNATURES_V2(61),
BLOCK_SUMMARIES(70),
GET_BLOCK_SUMMARIES(71),
ONLINE_ACCOUNTS(80),
GET_ONLINE_ACCOUNTS(81),
ARBITRARY_DATA(90),
GET_ARBITRARY_DATA(91);
public final int value;
public final Method fromByteBufferMethod;
@ -263,11 +273,11 @@ public abstract class Message {
throw new MessageException(String.format("About to send message with length %d larger than allowed %d", bytes.size(), MAX_DATA_SIZE));
return bytes.toByteArray();
} catch (IOException e) {
} catch (IOException | TransformationException e) {
throw new MessageException("Failed to serialize message", e);
}
}
protected abstract byte[] toData();
protected abstract byte[] toData() throws IOException, TransformationException;
}

View File

@ -1,52 +0,0 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.qortal.network.Network;
public class PeerIdMessage extends Message {
private byte[] peerId;
public PeerIdMessage(byte[] peerId) {
this(-1, peerId);
}
private PeerIdMessage(int id, byte[] peerId) {
super(id, MessageType.PEER_ID);
this.peerId = peerId;
}
public byte[] getPeerId() {
return this.peerId;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != Network.PEER_ID_LENGTH)
return null;
byte[] peerId = new byte[Network.PEER_ID_LENGTH];
bytes.get(peerId);
return new PeerIdMessage(id, peerId);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.peerId);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -1,51 +0,0 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.qortal.network.Network;
public class PeerVerifyMessage extends Message {
private byte[] verificationCode;
public PeerVerifyMessage(byte[] verificationCode) {
this(-1, verificationCode);
}
private PeerVerifyMessage(int id, byte[] verificationCode) {
super(id, MessageType.PEER_VERIFY);
this.verificationCode = verificationCode;
}
public byte[] getVerificationCode() {
return this.verificationCode;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != Network.PEER_ID_LENGTH)
return null;
byte[] verificationCode = new byte[Network.PEER_ID_LENGTH];
bytes.get(verificationCode);
return new PeerVerifyMessage(id, verificationCode);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.verificationCode);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -1,79 +0,0 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import com.google.common.primitives.Ints;
// NOTE: this legacy message only supports 4-byte IPv4 addresses and doesn't send port number either
public class PeersMessage extends Message {
private static final int ADDRESS_LENGTH = 4;
private List<InetAddress> peerAddresses;
public PeersMessage(List<InetAddress> peerAddresses) {
super(MessageType.PEERS);
this.peerAddresses = new ArrayList<>(peerAddresses);
// Legacy PEERS message doesn't support IPv6
this.peerAddresses.removeIf(address -> address instanceof Inet6Address);
}
private PeersMessage(int id, List<InetAddress> peerAddresses) {
super(id, MessageType.PEERS);
this.peerAddresses = peerAddresses;
}
public List<InetAddress> getPeerAddresses() {
return this.peerAddresses;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int count = bytes.getInt();
if (bytes.remaining() != count * ADDRESS_LENGTH)
return null;
List<InetAddress> peerAddresses = new ArrayList<>();
byte[] addressBytes = new byte[ADDRESS_LENGTH];
try {
for (int i = 0; i < count; ++i) {
bytes.get(addressBytes);
peerAddresses.add(InetAddress.getByAddress(addressBytes));
}
} catch (UnknownHostException e) {
return null;
}
return new PeersMessage(id, peerAddresses);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.peerAddresses.size()));
for (InetAddress peerAddress : this.peerAddresses)
bytes.write(peerAddress.getAddress());
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -1,63 +0,0 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import com.google.common.primitives.Longs;
public class ProofMessage extends Message {
private long timestamp;
private long salt;
private long nonce;
public ProofMessage(long timestamp, long salt, long nonce) {
this(-1, timestamp, salt, nonce);
}
private ProofMessage(int id, long timestamp, long salt, long nonce) {
super(id, MessageType.PROOF);
this.timestamp = timestamp;
this.salt = salt;
this.nonce = nonce;
}
public long getTimestamp() {
return this.timestamp;
}
public long getSalt() {
return this.salt;
}
public long getNonce() {
return this.nonce;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
long timestamp = bytes.getLong();
long salt = bytes.getLong();
long nonce = bytes.getLong();
return new ProofMessage(id, timestamp, salt, nonce);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Longs.toByteArray(this.timestamp));
bytes.write(Longs.toByteArray(this.salt));
bytes.write(Longs.toByteArray(this.nonce));
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,55 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import com.google.common.primitives.Ints;
public class ResponseMessage extends Message {
public static final int DATA_LENGTH = 32;
private final int nonce;
private final byte[] data;
private ResponseMessage(int id, int nonce, byte[] data) {
super(id, MessageType.RESPONSE);
this.nonce = nonce;
this.data = data;
}
public ResponseMessage(int nonce, byte[] data) {
this(-1, nonce, data);
}
public int getNonce() {
return this.nonce;
}
public byte[] getData() {
return this.data;
}
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) {
int nonce = byteBuffer.getInt();
byte[] data = new byte[DATA_LENGTH];
byteBuffer.get(data);
return new ResponseMessage(id, nonce, data);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(4 + DATA_LENGTH);
bytes.write(Ints.toByteArray(this.nonce));
bytes.write(data);
return bytes.toByteArray();
}
}

View File

@ -1,64 +0,0 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.qortal.network.Network;
public class VerificationCodesMessage extends Message {
private static final int TOTAL_LENGTH = Network.PEER_ID_LENGTH + Network.PEER_ID_LENGTH;
private byte[] verificationCodeSent;
private byte[] verificationCodeExpected;
public VerificationCodesMessage(byte[] verificationCodeSent, byte[] verificationCodeExpected) {
this(-1, verificationCodeSent, verificationCodeExpected);
}
private VerificationCodesMessage(int id, byte[] verificationCodeSent, byte[] verificationCodeExpected) {
super(id, MessageType.VERIFICATION_CODES);
this.verificationCodeSent = verificationCodeSent;
this.verificationCodeExpected = verificationCodeExpected;
}
public byte[] getVerificationCodeSent() {
return this.verificationCodeSent;
}
public byte[] getVerificationCodeExpected() {
return this.verificationCodeExpected;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != TOTAL_LENGTH)
return null;
byte[] verificationCodeSent = new byte[Network.PEER_ID_LENGTH];
bytes.get(verificationCodeSent);
byte[] verificationCodeExpected = new byte[Network.PEER_ID_LENGTH];
bytes.get(verificationCodeExpected);
return new VerificationCodesMessage(id, verificationCodeSent, verificationCodeExpected);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.verificationCodeSent);
bytes.write(this.verificationCodeExpected);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -1,68 +0,0 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.qortal.utils.Serialization;
import com.google.common.primitives.Longs;
public class VersionMessage extends Message {
private long buildTimestamp;
private String versionString;
public VersionMessage(long buildTimestamp, String versionString) {
this(-1, buildTimestamp, versionString);
}
private VersionMessage(int id, long buildTimestamp, String versionString) {
super(id, MessageType.VERSION);
this.buildTimestamp = buildTimestamp;
this.versionString = versionString;
}
public long getBuildTimestamp() {
return this.buildTimestamp;
}
public String getVersionString() {
return this.versionString;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
long buildTimestamp = bytes.getLong();
int versionStringLength = bytes.getInt();
if (versionStringLength != bytes.remaining())
return null;
byte[] versionBytes = new byte[versionStringLength];
bytes.get(versionBytes);
String versionString = new String(versionBytes, StandardCharsets.UTF_8);
return new VersionMessage(id, buildTimestamp, versionString);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Longs.toByteArray(this.buildTimestamp));
Serialization.serializeSizedString(bytes, this.versionString);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}