WIP: EPC-fixes

BlockMessage was broken because the repository 'connection' associated with the message's Block object was closed between message queuing and message sending.

The fix was to serialize Message subclasses on construction, thus freeing reliance on objects passed into constructor.
The serialized byte[] is held by the message between queuing and sending.
This forces messages into one of two 'modes': outgoing or incoming.
Outgoing messages contain serialized byte[] whereas incoming messages unpack a ByteBuffer into Message subclass fields.
As a result, all network message types have been refactored in this way.
More details in Message's class comment.

A knock-on effect is that incoming messages cannot then be sent out - a new message needs to be constructed.
Some changes needed to Arbitrary controller package classes in this respect.

Bonus: Network no longer needs broadcast threads because 'broadcasting' is now simply the act of queuing a message for many peers.
This commit is contained in:
catbref 2022-04-02 15:51:00 +01:00
parent 8e09567221
commit 22aa5c41b5
43 changed files with 827 additions and 953 deletions

View File

@ -58,6 +58,7 @@ import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory;
import org.qortal.settings.Settings;
import org.qortal.transaction.Transaction;
import org.qortal.transaction.Transaction.TransactionType;
import org.qortal.transform.TransformationException;
import org.qortal.utils.*;
public class Controller extends Thread {
@ -1236,7 +1237,7 @@ public class Controller extends Thread {
this.stats.getBlockMessageStats.cacheHits.incrementAndGet();
// We need to duplicate it to prevent multiple threads setting ID on the same message
CachedBlockMessage clonedBlockMessage = cachedBlockMessage.cloneWithNewId(message.getId());
CachedBlockMessage clonedBlockMessage = Message.cloneWithNewId(cachedBlockMessage, message.getId());
if (!peer.sendMessage(clonedBlockMessage))
peer.disconnect("failed to send block");
@ -1295,7 +1296,6 @@ public class Controller extends Thread {
CachedBlockMessage blockMessage = new CachedBlockMessage(block);
blockMessage.setId(message.getId());
// This call also causes the other needed data to be pulled in from repository
if (!peer.sendMessage(blockMessage)) {
peer.disconnect("failed to send block");
// Don't fall-through to caching because failure to send might be from failure to build message
@ -1309,7 +1309,9 @@ public class Controller extends Thread {
this.blockMessageCache.put(ByteArray.wrap(blockData.getSignature()), blockMessage);
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e);
LOGGER.error(String.format("Repository issue while sending block %s to peer %s", Base58.encode(signature), peer), e);
} catch (TransformationException e) {
LOGGER.error(String.format("Serialization issue while sending block %s to peer %s", Base58.encode(signature), peer), e);
}
}

View File

@ -12,6 +12,7 @@ import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.transaction.Transaction;
import org.qortal.transform.TransformationException;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
@ -289,7 +290,9 @@ public class TransactionImporter extends Thread {
if (!peer.sendMessage(transactionMessage))
peer.disconnect("failed to send transaction");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while send transaction %s to peer %s", Base58.encode(signature), peer), e);
LOGGER.error(String.format("Repository issue while sending transaction %s to peer %s", Base58.encode(signature), peer), e);
} catch (TransformationException e) {
LOGGER.error(String.format("Serialization issue while sending transaction %s to peer %s", Base58.encode(signature), peer), e);
}
}

View File

@ -511,18 +511,23 @@ public class ArbitraryDataFileListManager {
// Bump requestHops if it exists
if (requestHops != null) {
arbitraryDataFileListMessage.setRequestHops(++requestHops);
requestHops++;
}
ArbitraryDataFileListMessage forwardArbitraryDataFileListMessage;
// Remove optional parameters if the requesting peer doesn't support it yet
// A message with less statistical data is better than no message at all
if (!requestingPeer.isAtLeastVersion(MIN_PEER_VERSION_FOR_FILE_LIST_STATS)) {
arbitraryDataFileListMessage.removeOptionalStats();
forwardArbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
} else {
forwardArbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes, requestTime, requestHops,
arbitraryDataFileListMessage.getPeerAddress(), arbitraryDataFileListMessage.isRelayPossible());
}
// Forward to requesting peer
LOGGER.debug("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer);
if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) {
if (!requestingPeer.sendMessage(forwardArbitraryDataFileListMessage)) {
requestingPeer.disconnect("failed to forward arbitrary data file list");
}
}
@ -639,16 +644,19 @@ public class ArbitraryDataFileListManager {
}
String ourAddress = Network.getInstance().getOurExternalIpAddressAndPort();
ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature,
hashes, NTP.getTime(), 0, ourAddress, true);
arbitraryDataFileListMessage.setId(message.getId());
ArbitraryDataFileListMessage arbitraryDataFileListMessage;
// Remove optional parameters if the requesting peer doesn't support it yet
// A message with less statistical data is better than no message at all
if (!peer.isAtLeastVersion(MIN_PEER_VERSION_FOR_FILE_LIST_STATS)) {
arbitraryDataFileListMessage.removeOptionalStats();
arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
} else {
arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature,
hashes, NTP.getTime(), 0, ourAddress, true);
}
arbitraryDataFileListMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
LOGGER.debug("Couldn't send list of hashes");
peer.disconnect("failed to send list of hashes");
@ -670,8 +678,7 @@ public class ArbitraryDataFileListManager {
// In relay mode - so ask our other peers if they have it
long requestTime = getArbitraryDataFileListMessage.getRequestTime();
int requestHops = getArbitraryDataFileListMessage.getRequestHops();
getArbitraryDataFileListMessage.setRequestHops(++requestHops);
int requestHops = getArbitraryDataFileListMessage.getRequestHops() + 1;
long totalRequestTime = now - requestTime;
if (totalRequestTime < RELAY_REQUEST_MAX_DURATION) {
@ -679,11 +686,13 @@ public class ArbitraryDataFileListManager {
if (requestHops < RELAY_REQUEST_MAX_HOPS) {
// Relay request hasn't reached the maximum number of hops yet, so can be rebroadcast
Message relayGetArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature, hashes, requestTime, requestHops, requestingPeer);
LOGGER.debug("Rebroadcasting hash list request from peer {} for signature {} to our other peers... totalRequestTime: {}, requestHops: {}", peer, Base58.encode(signature), totalRequestTime, requestHops);
Network.getInstance().broadcast(
broadcastPeer -> broadcastPeer == peer ||
Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost())
? null : getArbitraryDataFileListMessage);
? null : relayGetArbitraryDataFileListMessage);
}
else {

View File

@ -186,7 +186,7 @@ public class ArbitraryDataFileManager extends Thread {
ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature);
boolean fileAlreadyExists = existingFile.exists();
String hash58 = Base58.encode(hash);
Message message = null;
ArbitraryDataFileMessage arbitraryDataFileMessage;
// Fetch the file if it doesn't exist locally
if (!fileAlreadyExists) {
@ -194,10 +194,11 @@ public class ArbitraryDataFileManager extends Thread {
arbitraryDataFileRequests.put(hash58, NTP.getTime());
Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash);
Message response = null;
try {
message = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT);
response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT);
} catch (InterruptedException e) {
// Will return below due to null message
// Will return below due to null response
}
arbitraryDataFileRequests.remove(hash58);
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
@ -205,22 +206,24 @@ public class ArbitraryDataFileManager extends Thread {
// We may need to remove the file list request, if we have all the files for this transaction
this.handleFileListRequests(signature);
if (message == null) {
LOGGER.debug("Received null message from peer {}", peer);
if (response == null) {
LOGGER.debug("Received null response from peer {}", peer);
return null;
}
if (message.getType() != MessageType.ARBITRARY_DATA_FILE) {
LOGGER.debug("Received message with invalid type: {} from peer {}", message.getType(), peer);
if (response.getType() != MessageType.ARBITRARY_DATA_FILE) {
LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer);
return null;
}
}
else {
ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response;
arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, peersArbitraryDataFileMessage.getArbitraryDataFile());
} else {
LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58));
arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, existingFile);
}
ArbitraryDataFileMessage arbitraryDataFileMessage = (ArbitraryDataFileMessage) message;
// We might want to forward the request to the peer that originally requested it
this.handleArbitraryDataFileForwarding(requestingPeer, message, originalMessage);
this.handleArbitraryDataFileForwarding(requestingPeer, arbitraryDataFileMessage, originalMessage);
boolean isRelayRequest = (requestingPeer != null);
if (isRelayRequest) {

View File

@ -338,9 +338,11 @@ public class ArbitraryMetadataManager {
Peer requestingPeer = request.getB();
if (requestingPeer != null) {
ArbitraryMetadataMessage forwardArbitraryMetadataMessage = new ArbitraryMetadataMessage(signature, arbitraryMetadataMessage.getArbitraryMetadataFile());
// Forward to requesting peer
LOGGER.debug("Forwarding metadata to requesting peer: {}", requestingPeer);
if (!requestingPeer.sendMessage(arbitraryMetadataMessage)) {
if (!requestingPeer.sendMessage(forwardArbitraryMetadataMessage)) {
requestingPeer.disconnect("failed to forward arbitrary metadata");
}
}
@ -423,8 +425,7 @@ public class ArbitraryMetadataManager {
// In relay mode - so ask our other peers if they have it
long requestTime = getArbitraryMetadataMessage.getRequestTime();
int requestHops = getArbitraryMetadataMessage.getRequestHops();
getArbitraryMetadataMessage.setRequestHops(++requestHops);
int requestHops = getArbitraryMetadataMessage.getRequestHops() + 1;
long totalRequestTime = now - requestTime;
if (totalRequestTime < RELAY_REQUEST_MAX_DURATION) {
@ -432,11 +433,13 @@ public class ArbitraryMetadataManager {
if (requestHops < RELAY_REQUEST_MAX_HOPS) {
// Relay request hasn't reached the maximum number of hops yet, so can be rebroadcast
Message relayGetArbitraryMetadataMessage = new GetArbitraryMetadataMessage(signature, requestTime, requestHops);
LOGGER.debug("Rebroadcasting metadata request from peer {} for signature {} to our other peers... totalRequestTime: {}, requestHops: {}", peer, Base58.encode(signature), totalRequestTime, requestHops);
Network.getInstance().broadcast(
broadcastPeer -> broadcastPeer == peer ||
Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost())
? null : getArbitraryMetadataMessage);
? null : relayGetArbitraryMetadataMessage);
}
else {

View File

@ -126,8 +126,6 @@ public class Network {
private SelectionKey serverSelectionKey;
private final Set<SelectableChannel> channelsPendingWrite = ConcurrentHashMap.newKeySet();
private final ExecutorService broadcastExecutor = Executors.newCachedThreadPool();
private final Lock mergePeersLock = new ReentrantLock();
private List<String> ourExternalIpAddressHistory = new ArrayList<>();
@ -1455,49 +1453,17 @@ public class Network {
}
public void broadcast(Function<Peer, Message> peerMessageBuilder) {
class Broadcaster implements Runnable {
private final Random random = new Random();
for (Peer peer : getImmutableHandshakedPeers()) {
Message message = peerMessageBuilder.apply(peer);
private List<Peer> targetPeers;
private Function<Peer, Message> peerMessageBuilder;
Broadcaster(List<Peer> targetPeers, Function<Peer, Message> peerMessageBuilder) {
this.targetPeers = targetPeers;
this.peerMessageBuilder = peerMessageBuilder;
if (message == null) {
continue;
}
@Override
public void run() {
Thread.currentThread().setName("Network Broadcast");
for (Peer peer : targetPeers) {
// Very short sleep to reduce strain, improve multi-threading and catch interrupts
try {
Thread.sleep(random.nextInt(20) + 20L);
} catch (InterruptedException e) {
break;
}
Message message = peerMessageBuilder.apply(peer);
if (message == null) {
continue;
}
if (!peer.sendMessage(message)) {
peer.disconnect("failed to broadcast message");
}
}
Thread.currentThread().setName("Network Broadcast (dormant)");
if (!peer.sendMessage(message)) {
peer.disconnect("failed to broadcast message");
}
}
try {
broadcastExecutor.execute(new Broadcaster(this.getImmutableHandshakedPeers(), peerMessageBuilder));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}
}
// Shutdown
@ -1521,16 +1487,6 @@ public class Network {
LOGGER.warn("Interrupted while waiting for networking threads to terminate");
}
// Stop broadcasts
this.broadcastExecutor.shutdownNow();
try {
if (!this.broadcastExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Broadcast threads failed to terminate");
}
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while waiting for broadcast threads failed to terminate");
}
// Close all peer connections
for (Peer peer : this.getImmutableConnectedPeers()) {
peer.shutdown();

View File

@ -664,6 +664,9 @@ public class Peer {
LOGGER.trace("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId,
message.getType().name(), message.getId(), this);
// Check message properly constructed
message.checkValidOutgoing();
// Possible race condition:
// We set OP_WRITE, EPC creates ChannelWriteTask which calls Peer.writeChannel, writeChannel's poll() finds no message to send
// Avoided by poll-with-timeout in writeChannel() above.
@ -672,6 +675,9 @@ public class Peer {
} catch (InterruptedException e) {
// Send failure
return false;
} catch (MessageException e) {
LOGGER.error(e.getMessage(), e);
return false;
}
}

View File

@ -15,27 +15,53 @@ import java.util.List;
public class ArbitraryDataFileListMessage extends Message {
private final byte[] signature;
private final List<byte[]> hashes;
private byte[] signature;
private List<byte[]> hashes;
private Long requestTime;
private Integer requestHops;
private String peerAddress;
private Boolean isRelayPossible;
public ArbitraryDataFileListMessage(byte[] signature, List<byte[]> hashes, Long requestTime,
Integer requestHops, String peerAddress, boolean isRelayPossible) {
Integer requestHops, String peerAddress, Boolean isRelayPossible) {
super(MessageType.ARBITRARY_DATA_FILE_LIST);
this.signature = signature;
this.hashes = hashes;
this.requestTime = requestTime;
this.requestHops = requestHops;
this.peerAddress = peerAddress;
this.isRelayPossible = isRelayPossible;
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(signature);
bytes.write(Ints.toByteArray(hashes.size()));
for (byte[] hash : hashes) {
bytes.write(hash);
}
if (requestTime != null) {
// The remaining fields are optional
bytes.write(Longs.toByteArray(requestTime));
bytes.write(Ints.toByteArray(requestHops));
Serialization.serializeSizedStringV2(bytes, peerAddress);
bytes.write(Ints.toByteArray(Boolean.TRUE.equals(isRelayPossible) ? 1 : 0));
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
public ArbitraryDataFileListMessage(int id, byte[] signature, List<byte[]> hashes, Long requestTime,
/** Legacy version */
public ArbitraryDataFileListMessage(byte[] signature, List<byte[]> hashes) {
this(signature, hashes, null, null, null, null);
}
private ArbitraryDataFileListMessage(int id, byte[] signature, List<byte[]> hashes, Long requestTime,
Integer requestHops, String peerAddress, boolean isRelayPossible) {
super(id, MessageType.ARBITRARY_DATA_FILE_LIST);
@ -47,12 +73,28 @@ public class ArbitraryDataFileListMessage extends Message {
this.isRelayPossible = isRelayPossible;
}
public byte[] getSignature() {
return this.signature;
}
public List<byte[]> getHashes() {
return this.hashes;
}
public byte[] getSignature() {
return this.signature;
public Long getRequestTime() {
return this.requestTime;
}
public Integer getRequestHops() {
return this.requestHops;
}
public String getPeerAddress() {
return this.peerAddress;
}
public Boolean isRelayPossible() {
return this.isRelayPossible;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws MessageException {
@ -74,7 +116,6 @@ public class ArbitraryDataFileListMessage extends Message {
boolean isRelayPossible = true; // Legacy versions only send this message when relaying is possible
// The remaining fields are optional
if (bytes.hasRemaining()) {
try {
requestTime = bytes.getLong();
@ -92,79 +133,4 @@ public class ArbitraryDataFileListMessage extends Message {
return new ArbitraryDataFileListMessage(id, signature, hashes, requestTime, requestHops, peerAddress, isRelayPossible);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
bytes.write(Ints.toByteArray(this.hashes.size()));
for (byte[] hash : this.hashes) {
bytes.write(hash);
}
if (this.requestTime == null) { // To maintain backwards support
return bytes.toByteArray();
}
// The remaining fields are optional
bytes.write(Longs.toByteArray(this.requestTime));
bytes.write(Ints.toByteArray(this.requestHops));
Serialization.serializeSizedStringV2(bytes, this.peerAddress);
bytes.write(Ints.toByteArray(this.isRelayPossible ? 1 : 0));
return bytes.toByteArray();
}
public ArbitraryDataFileListMessage cloneWithNewId(int newId) {
ArbitraryDataFileListMessage clone = new ArbitraryDataFileListMessage(this.signature, this.hashes,
this.requestTime, this.requestHops, this.peerAddress, this.isRelayPossible);
clone.setId(newId);
return clone;
}
public void removeOptionalStats() {
this.requestTime = null;
this.requestHops = null;
this.peerAddress = null;
this.isRelayPossible = null;
}
public Long getRequestTime() {
return this.requestTime;
}
public void setRequestTime(Long requestTime) {
this.requestTime = requestTime;
}
public Integer getRequestHops() {
return this.requestHops;
}
public void setRequestHops(Integer requestHops) {
this.requestHops = requestHops;
}
public String getPeerAddress() {
return this.peerAddress;
}
public void setPeerAddress(String peerAddress) {
this.peerAddress = peerAddress;
}
public Boolean isRelayPossible() {
return this.isRelayPossible;
}
public void setIsRelayPossible(Boolean isRelayPossible) {
this.isRelayPossible = isRelayPossible;
}
}

View File

@ -16,21 +16,39 @@ public class ArbitraryDataFileMessage extends Message {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileMessage.class);
private final byte[] signature;
private final ArbitraryDataFile arbitraryDataFile;
private byte[] signature;
private ArbitraryDataFile arbitraryDataFile;
public ArbitraryDataFileMessage(byte[] signature, ArbitraryDataFile arbitraryDataFile) {
super(MessageType.ARBITRARY_DATA_FILE);
byte[] data = arbitraryDataFile.getBytes();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(signature);
bytes.write(Ints.toByteArray(data.length));
bytes.write(data);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private ArbitraryDataFileMessage(int id, byte[] signature, ArbitraryDataFile arbitraryDataFile) {
super(id, MessageType.ARBITRARY_DATA_FILE);
this.signature = signature;
this.arbitraryDataFile = arbitraryDataFile;
}
public ArbitraryDataFileMessage(int id, byte[] signature, ArbitraryDataFile arbitraryDataFile) {
super(id, MessageType.ARBITRARY_DATA_FILE);
this.signature = signature;
this.arbitraryDataFile = arbitraryDataFile;
public byte[] getSignature() {
return this.signature;
}
public ArbitraryDataFile getArbitraryDataFile() {
@ -58,32 +76,4 @@ public class ArbitraryDataFileMessage extends Message {
}
}
@Override
protected byte[] toData() throws IOException {
if (this.arbitraryDataFile == null) {
return null;
}
byte[] data = this.arbitraryDataFile.getBytes();
if (data == null) {
return null;
}
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(signature);
bytes.write(Ints.toByteArray(data.length));
bytes.write(data);
return bytes.toByteArray();
}
public ArbitraryDataFileMessage cloneWithNewId(int newId) {
ArbitraryDataFileMessage clone = new ArbitraryDataFileMessage(this.signature, this.arbitraryDataFile);
clone.setId(newId);
return clone;
}
}

View File

@ -11,11 +11,26 @@ import com.google.common.primitives.Ints;
public class ArbitraryDataMessage extends Message {
private final byte[] signature;
private final byte[] data;
private byte[] signature;
private byte[] data;
public ArbitraryDataMessage(byte[] signature, byte[] data) {
this(-1, signature, data);
super(MessageType.ARBITRARY_DATA);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(signature);
bytes.write(Ints.toByteArray(data.length));
bytes.write(data);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private ArbitraryDataMessage(int id, byte[] signature, byte[] data) {
@ -48,20 +63,4 @@ public class ArbitraryDataMessage extends Message {
return new ArbitraryDataMessage(id, signature, data);
}
@Override
protected byte[] toData() throws IOException {
if (this.data == null)
return null;
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
bytes.write(Ints.toByteArray(this.data.length));
bytes.write(this.data);
return bytes.toByteArray();
}
}

View File

@ -12,21 +12,35 @@ import java.nio.ByteBuffer;
public class ArbitraryMetadataMessage extends Message {
private final byte[] signature;
private final ArbitraryDataFile arbitraryMetadataFile;
private byte[] signature;
private ArbitraryDataFile arbitraryMetadataFile;
public ArbitraryMetadataMessage(byte[] signature, ArbitraryDataFile arbitraryDataFile) {
public ArbitraryMetadataMessage(byte[] signature, ArbitraryDataFile arbitraryMetadataFile) {
super(MessageType.ARBITRARY_METADATA);
this.signature = signature;
this.arbitraryMetadataFile = arbitraryDataFile;
byte[] data = arbitraryMetadataFile.getBytes();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(signature);
bytes.write(Ints.toByteArray(data.length));
bytes.write(data);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
public ArbitraryMetadataMessage(int id, byte[] signature, ArbitraryDataFile arbitraryDataFile) {
private ArbitraryMetadataMessage(int id, byte[] signature, ArbitraryDataFile arbitraryMetadataFile) {
super(id, MessageType.ARBITRARY_METADATA);
this.signature = signature;
this.arbitraryMetadataFile = arbitraryDataFile;
this.arbitraryMetadataFile = arbitraryMetadataFile;
}
public byte[] getSignature() {
@ -57,32 +71,4 @@ public class ArbitraryMetadataMessage extends Message {
}
}
@Override
protected byte[] toData() throws IOException {
if (this.arbitraryMetadataFile == null) {
return null;
}
byte[] data = this.arbitraryMetadataFile.getBytes();
if (data == null) {
return null;
}
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(signature);
bytes.write(Ints.toByteArray(data.length));
bytes.write(data);
return bytes.toByteArray();
}
public ArbitraryMetadataMessage cloneWithNewId(int newId) {
ArbitraryMetadataMessage clone = new ArbitraryMetadataMessage(this.signature, this.arbitraryMetadataFile);
clone.setId(newId);
return clone;
}
}

View File

@ -20,7 +20,25 @@ public class ArbitrarySignaturesMessage extends Message {
private List<byte[]> signatures;
public ArbitrarySignaturesMessage(String peerAddress, int requestHops, List<byte[]> signatures) {
this(-1, peerAddress, requestHops, signatures);
super(MessageType.ARBITRARY_SIGNATURES);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
Serialization.serializeSizedStringV2(bytes, peerAddress);
bytes.write(Ints.toByteArray(requestHops));
bytes.write(Ints.toByteArray(signatures.size()));
for (byte[] signature : signatures)
bytes.write(signature);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private ArbitrarySignaturesMessage(int id, String peerAddress, int requestHops, List<byte[]> signatures) {
@ -39,14 +57,6 @@ public class ArbitrarySignaturesMessage extends Message {
return this.signatures;
}
public int getRequestHops() {
return this.requestHops;
}
public void setRequestHops(int requestHops) {
this.requestHops = requestHops;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws MessageException {
String peerAddress;
try {
@ -72,20 +82,4 @@ public class ArbitrarySignaturesMessage extends Message {
return new ArbitrarySignaturesMessage(id, peerAddress, requestHops, signatures);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
Serialization.serializeSizedStringV2(bytes, this.peerAddress);
bytes.write(Ints.toByteArray(this.requestHops));
bytes.write(Ints.toByteArray(this.signatures.size()));
for (byte[] signature : this.signatures)
bytes.write(signature);
return bytes.toByteArray();
}
}

View File

@ -1,13 +1,10 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.block.Block;
import org.qortal.data.at.ATStateData;
import org.qortal.data.block.BlockData;
import org.qortal.data.transaction.TransactionData;
@ -15,27 +12,15 @@ import org.qortal.transform.TransformationException;
import org.qortal.transform.block.BlockTransformer;
import org.qortal.utils.Triple;
import com.google.common.primitives.Ints;
public class BlockMessage extends Message {
private static final Logger LOGGER = LogManager.getLogger(BlockMessage.class);
private Block block = null;
private final BlockData blockData;
private final List<TransactionData> transactions;
private final List<ATStateData> atStates;
private BlockData blockData = null;
private List<TransactionData> transactions = null;
private List<ATStateData> atStates = null;
private final int height;
public BlockMessage(Block block) {
super(MessageType.BLOCK);
this.block = block;
this.blockData = block.getBlockData();
this.height = block.getBlockData().getHeight();
}
// No public constructor as we're an incoming-only message type.
private BlockMessage(int id, BlockData blockData, List<TransactionData> transactions, List<ATStateData> atStates) {
super(id, MessageType.BLOCK);
@ -43,8 +28,6 @@ public class BlockMessage extends Message {
this.blockData = blockData;
this.transactions = transactions;
this.atStates = atStates;
this.height = blockData.getHeight();
}
public BlockData getBlockData() {
@ -75,24 +58,4 @@ public class BlockMessage extends Message {
}
}
@Override
protected byte[] toData() throws IOException, TransformationException {
if (this.block == null)
return null;
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.height));
bytes.write(BlockTransformer.toBytes(this.block));
return bytes.toByteArray();
}
public BlockMessage cloneWithNewId(int newId) {
BlockMessage clone = new BlockMessage(this.block);
clone.setId(newId);
return clone;
}
}

View File

@ -17,10 +17,28 @@ public class BlockSummariesMessage extends Message {
private static final int BLOCK_SUMMARY_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH + Transformer.INT_LENGTH + Transformer.PUBLIC_KEY_LENGTH + Transformer.INT_LENGTH;
private final List<BlockSummaryData> blockSummaries;
private List<BlockSummaryData> blockSummaries;
public BlockSummariesMessage(List<BlockSummaryData> blockSummaries) {
this(-1, blockSummaries);
super(MessageType.BLOCK_SUMMARIES);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(blockSummaries.size()));
for (BlockSummaryData blockSummary : blockSummaries) {
bytes.write(Ints.toByteArray(blockSummary.getHeight()));
bytes.write(blockSummary.getSignature());
bytes.write(blockSummary.getMinterPublicKey());
bytes.write(Ints.toByteArray(blockSummary.getOnlineAccountsCount()));
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private BlockSummariesMessage(int id, List<BlockSummaryData> blockSummaries) {
@ -58,20 +76,4 @@ public class BlockSummariesMessage extends Message {
return new BlockSummariesMessage(id, blockSummaries);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.blockSummaries.size()));
for (BlockSummaryData blockSummary : this.blockSummaries) {
bytes.write(Ints.toByteArray(blockSummary.getHeight()));
bytes.write(blockSummary.getSignature());
bytes.write(blockSummary.getMinterPublicKey());
bytes.write(Ints.toByteArray(blockSummary.getOnlineAccountsCount()));
}
return bytes.toByteArray();
}
}

View File

@ -11,55 +11,34 @@ import org.qortal.transform.block.BlockTransformer;
import com.google.common.primitives.Ints;
// This is an OUTGOING-only Message which more readily lends itself to being cached
public class CachedBlockMessage extends Message {
public class CachedBlockMessage extends Message implements Cloneable {
private Block block;
private byte[] cachedBytes = null;
public CachedBlockMessage(Block block) {
public CachedBlockMessage(Block block) throws TransformationException {
super(MessageType.BLOCK);
this.block = block;
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(block.getBlockData().getHeight()));
bytes.write(BlockTransformer.toBytes(block));
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
public CachedBlockMessage(byte[] cachedBytes) {
super(MessageType.BLOCK);
this.block = null;
this.cachedBytes = cachedBytes;
this.dataBytes = cachedBytes;
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) {
throw new UnsupportedOperationException("CachedBlockMessage is for outgoing messages only");
}
@Override
protected byte[] toData() throws IOException, TransformationException {
// Already serialized?
if (this.cachedBytes != null)
return cachedBytes;
if (this.block == null)
return null;
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.block.getBlockData().getHeight()));
bytes.write(BlockTransformer.toBytes(this.block));
this.cachedBytes = bytes.toByteArray();
// We no longer need source Block
// and Block contains repository handle which is highly likely to be invalid after this call
this.block = null;
return this.cachedBytes;
}
public CachedBlockMessage cloneWithNewId(int newId) {
CachedBlockMessage clone = new CachedBlockMessage(this.cachedBytes);
clone.setId(newId);
return clone;
}
}

View File

@ -10,8 +10,25 @@ public class ChallengeMessage extends Message {
public static final int CHALLENGE_LENGTH = 32;
private final byte[] publicKey;
private final byte[] challenge;
private byte[] publicKey;
private byte[] challenge;
public ChallengeMessage(byte[] publicKey, byte[] challenge) {
super(MessageType.CHALLENGE);
ByteArrayOutputStream bytes = new ByteArrayOutputStream(publicKey.length + challenge.length);
try {
bytes.write(publicKey);
bytes.write(challenge);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private ChallengeMessage(int id, byte[] publicKey, byte[] challenge) {
super(id, MessageType.CHALLENGE);
@ -20,10 +37,6 @@ public class ChallengeMessage extends Message {
this.challenge = challenge;
}
public ChallengeMessage(byte[] publicKey, byte[] challenge) {
this(-1, publicKey, challenge);
}
public byte[] getPublicKey() {
return this.publicKey;
}
@ -42,15 +55,4 @@ public class ChallengeMessage extends Message {
return new ChallengeMessage(id, publicKey, challenge);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.publicKey);
bytes.write(this.challenge);
return bytes.toByteArray();
}
}

View File

@ -15,14 +15,44 @@ import java.util.List;
public class GetArbitraryDataFileListMessage extends Message {
private final byte[] signature;
private byte[] signature;
private List<byte[]> hashes;
private final long requestTime;
private long requestTime;
private int requestHops;
private String requestingPeer;
public GetArbitraryDataFileListMessage(byte[] signature, List<byte[]> hashes, long requestTime, int requestHops, String requestingPeer) {
this(-1, signature, hashes, requestTime, requestHops, requestingPeer);
super(MessageType.GET_ARBITRARY_DATA_FILE_LIST);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(signature);
bytes.write(Longs.toByteArray(requestTime));
bytes.write(Ints.toByteArray(requestHops));
if (hashes != null) {
bytes.write(Ints.toByteArray(hashes.size()));
for (byte[] hash : hashes) {
bytes.write(hash);
}
}
else {
bytes.write(Ints.toByteArray(0));
}
if (requestingPeer != null) {
Serialization.serializeSizedStringV2(bytes, requestingPeer);
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetArbitraryDataFileListMessage(int id, byte[] signature, List<byte[]> hashes, long requestTime, int requestHops, String requestingPeer) {
@ -43,6 +73,18 @@ public class GetArbitraryDataFileListMessage extends Message {
return this.hashes;
}
public long getRequestTime() {
return this.requestTime;
}
public int getRequestHops() {
return this.requestHops;
}
public String getRequestingPeer() {
return this.requestingPeer;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws MessageException {
byte[] signature = new byte[Transformer.SIGNATURE_LENGTH];
@ -76,47 +118,4 @@ public class GetArbitraryDataFileListMessage extends Message {
return new GetArbitraryDataFileListMessage(id, signature, hashes, requestTime, requestHops, requestingPeer);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
bytes.write(Longs.toByteArray(this.requestTime));
bytes.write(Ints.toByteArray(this.requestHops));
if (this.hashes != null) {
bytes.write(Ints.toByteArray(this.hashes.size()));
for (byte[] hash : this.hashes) {
bytes.write(hash);
}
}
else {
bytes.write(Ints.toByteArray(0));
}
if (this.requestingPeer != null) {
Serialization.serializeSizedStringV2(bytes, this.requestingPeer);
}
return bytes.toByteArray();
}
public long getRequestTime() {
return this.requestTime;
}
public int getRequestHops() {
return this.requestHops;
}
public void setRequestHops(int requestHops) {
this.requestHops = requestHops;
}
public String getRequestingPeer() {
return this.requestingPeer;
}
}

View File

@ -8,11 +8,24 @@ import java.nio.ByteBuffer;
public class GetArbitraryDataFileMessage extends Message {
private final byte[] signature;
private final byte[] hash;
private byte[] signature;
private byte[] hash;
public GetArbitraryDataFileMessage(byte[] signature, byte[] hash) {
this(-1, signature, hash);
super(MessageType.GET_ARBITRARY_DATA_FILE);
ByteArrayOutputStream bytes = new ByteArrayOutputStream(signature.length + hash.length);
try {
bytes.write(signature);
bytes.write(hash);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetArbitraryDataFileMessage(int id, byte[] signature, byte[] hash) {
@ -40,15 +53,4 @@ public class GetArbitraryDataFileMessage extends Message {
return new GetArbitraryDataFileMessage(id, signature, hash);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
bytes.write(this.hash);
return bytes.toByteArray();
}
}

View File

@ -1,17 +1,19 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.qortal.transform.Transformer;
public class GetArbitraryDataMessage extends Message {
private final byte[] signature;
private byte[] signature;
public GetArbitraryDataMessage(byte[] signature) {
this(-1, signature);
super(MessageType.GET_ARBITRARY_DATA);
this.dataBytes = Arrays.copyOf(signature, signature.length);
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetArbitraryDataMessage(int id, byte[] signature) {
@ -32,13 +34,4 @@ public class GetArbitraryDataMessage extends Message {
return new GetArbitraryDataMessage(id, signature);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
return bytes.toByteArray();
}
}

View File

@ -10,12 +10,27 @@ import java.nio.ByteBuffer;
public class GetArbitraryMetadataMessage extends Message {
private final byte[] signature;
private final long requestTime;
private byte[] signature;
private long requestTime;
private int requestHops;
public GetArbitraryMetadataMessage(byte[] signature, long requestTime, int requestHops) {
this(-1, signature, requestTime, requestHops);
super(MessageType.GET_ARBITRARY_METADATA);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(signature);
bytes.write(Longs.toByteArray(requestTime));
bytes.write(Ints.toByteArray(requestHops));
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetArbitraryMetadataMessage(int id, byte[] signature, long requestTime, int requestHops) {
@ -30,6 +45,14 @@ public class GetArbitraryMetadataMessage extends Message {
return this.signature;
}
public long getRequestTime() {
return this.requestTime;
}
public int getRequestHops() {
return this.requestHops;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
byte[] signature = new byte[Transformer.SIGNATURE_LENGTH];
bytes.get(signature);
@ -41,29 +64,4 @@ public class GetArbitraryMetadataMessage extends Message {
return new GetArbitraryMetadataMessage(id, signature, requestTime, requestHops);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
bytes.write(Longs.toByteArray(this.requestTime));
bytes.write(Ints.toByteArray(this.requestHops));
return bytes.toByteArray();
}
public long getRequestTime() {
return this.requestTime;
}
public int getRequestHops() {
return this.requestHops;
}
public void setRequestHops(int requestHops) {
this.requestHops = requestHops;
}
}

View File

@ -1,17 +1,19 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.qortal.transform.block.BlockTransformer;
public class GetBlockMessage extends Message {
private final byte[] signature;
private byte[] signature;
public GetBlockMessage(byte[] signature) {
this(-1, signature);
super(MessageType.GET_BLOCK);
this.dataBytes = Arrays.copyOf(signature, signature.length);
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetBlockMessage(int id, byte[] signature) {
@ -31,13 +33,4 @@ public class GetBlockMessage extends Message {
return new GetBlockMessage(id, signature);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
return bytes.toByteArray();
}
}

View File

@ -10,11 +10,24 @@ import com.google.common.primitives.Ints;
public class GetBlockSummariesMessage extends Message {
private final byte[] parentSignature;
private final int numberRequested;
private byte[] parentSignature;
private int numberRequested;
public GetBlockSummariesMessage(byte[] parentSignature, int numberRequested) {
this(-1, parentSignature, numberRequested);
super(MessageType.GET_BLOCK_SUMMARIES);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(parentSignature);
bytes.write(Ints.toByteArray(numberRequested));
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetBlockSummariesMessage(int id, byte[] parentSignature, int numberRequested) {
@ -41,15 +54,4 @@ public class GetBlockSummariesMessage extends Message {
return new GetBlockSummariesMessage(id, parentSignature, numberRequested);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.parentSignature);
bytes.write(Ints.toByteArray(this.numberRequested));
return bytes.toByteArray();
}
}

View File

@ -16,10 +16,27 @@ import com.google.common.primitives.Longs;
public class GetOnlineAccountsMessage extends Message {
private static final int MAX_ACCOUNT_COUNT = 5000;
private final List<OnlineAccountData> onlineAccounts;
private List<OnlineAccountData> onlineAccounts;
public GetOnlineAccountsMessage(List<OnlineAccountData> onlineAccounts) {
this(-1, onlineAccounts);
super(MessageType.GET_ONLINE_ACCOUNTS);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(onlineAccounts.size()));
for (OnlineAccountData onlineAccountData : onlineAccounts) {
bytes.write(Longs.toByteArray(onlineAccountData.getTimestamp()));
bytes.write(onlineAccountData.getPublicKey());
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetOnlineAccountsMessage(int id, List<OnlineAccountData> onlineAccounts) {
@ -49,19 +66,4 @@ public class GetOnlineAccountsMessage extends Message {
return new GetOnlineAccountsMessage(id, onlineAccounts);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.onlineAccounts.size()));
for (OnlineAccountData onlineAccountData : this.onlineAccounts) {
bytes.write(Longs.toByteArray(onlineAccountData.getTimestamp()));
bytes.write(onlineAccountData.getPublicKey());
}
return bytes.toByteArray();
}
}

View File

@ -23,11 +23,43 @@ import java.util.Map;
* Also V2 only builds online accounts message once!
*/
public class GetOnlineAccountsV2Message extends Message {
private final List<OnlineAccountData> onlineAccounts;
private byte[] cachedData;
private List<OnlineAccountData> onlineAccounts;
public GetOnlineAccountsV2Message(List<OnlineAccountData> onlineAccounts) {
this(-1, onlineAccounts);
super(MessageType.GET_ONLINE_ACCOUNTS_V2);
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (OnlineAccountData onlineAccountData : onlineAccounts) {
Long timestamp = onlineAccountData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ onlineAccounts.size() * Transformer.PUBLIC_KEY_LENGTH;
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
try {
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (OnlineAccountData onlineAccountData : onlineAccounts) {
if (onlineAccountData.getTimestamp() == timestamp)
bytes.write(onlineAccountData.getPublicKey());
}
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetOnlineAccountsV2Message(int id, List<OnlineAccountData> onlineAccounts) {
@ -66,44 +98,4 @@ public class GetOnlineAccountsV2Message extends Message {
return new GetOnlineAccountsV2Message(id, onlineAccounts);
}
@Override
protected synchronized byte[] toData() throws IOException {
if (this.cachedData != null)
return this.cachedData;
// Shortcut in case we have no online accounts
if (this.onlineAccounts.isEmpty()) {
this.cachedData = Ints.toByteArray(0);
return this.cachedData;
}
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (OnlineAccountData onlineAccountData : this.onlineAccounts) {
Long timestamp = onlineAccountData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ this.onlineAccounts.size() * Transformer.PUBLIC_KEY_LENGTH;
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (OnlineAccountData onlineAccountData : this.onlineAccounts) {
if (onlineAccountData.getTimestamp() == timestamp)
bytes.write(onlineAccountData.getPublicKey());
}
}
this.cachedData = bytes.toByteArray();
return this.cachedData;
}
}

View File

@ -5,7 +5,9 @@ import java.nio.ByteBuffer;
public class GetPeersMessage extends Message {
public GetPeersMessage() {
this(-1);
super(MessageType.GET_PEERS);
this.dataBytes = EMPTY_DATA_BYTES;
}
private GetPeersMessage(int id) {
@ -16,9 +18,4 @@ public class GetPeersMessage extends Message {
return new GetPeersMessage(id);
}
@Override
protected byte[] toData() {
return new byte[0];
}
}

View File

@ -10,11 +10,24 @@ import com.google.common.primitives.Ints;
public class GetSignaturesV2Message extends Message {
private final byte[] parentSignature;
private final int numberRequested;
private byte[] parentSignature;
private int numberRequested;
public GetSignaturesV2Message(byte[] parentSignature, int numberRequested) {
this(-1, parentSignature, numberRequested);
super(MessageType.GET_SIGNATURES_V2);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(parentSignature);
bytes.write(Ints.toByteArray(numberRequested));
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetSignaturesV2Message(int id, byte[] parentSignature, int numberRequested) {
@ -41,15 +54,4 @@ public class GetSignaturesV2Message extends Message {
return new GetSignaturesV2Message(id, parentSignature, numberRequested);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.parentSignature);
bytes.write(Ints.toByteArray(this.numberRequested));
return bytes.toByteArray();
}
}

View File

@ -19,11 +19,49 @@ import java.util.Map;
* Groups of: number of entries, timestamp, then AT trade pubkey for each entry.
*/
public class GetTradePresencesMessage extends Message {
private final List<TradePresenceData> tradePresences;
private byte[] cachedData;
private List<TradePresenceData> tradePresences;
public GetTradePresencesMessage(List<TradePresenceData> tradePresences) {
this(-1, tradePresences);
super(MessageType.GET_TRADE_PRESENCES);
// Shortcut in case we have no trade presences
if (tradePresences.isEmpty()) {
this.dataBytes = Ints.toByteArray(0);
this.checksumBytes = Message.generateChecksum(this.dataBytes);
return;
}
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (TradePresenceData tradePresenceData : tradePresences) {
Long timestamp = tradePresenceData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ tradePresences.size() * Transformer.PUBLIC_KEY_LENGTH;
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
try {
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (TradePresenceData tradePresenceData : tradePresences) {
if (tradePresenceData.getTimestamp() == timestamp)
bytes.write(tradePresenceData.getPublicKey());
}
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetTradePresencesMessage(int id, List<TradePresenceData> tradePresences) {
@ -62,44 +100,4 @@ public class GetTradePresencesMessage extends Message {
return new GetTradePresencesMessage(id, tradePresences);
}
@Override
protected synchronized byte[] toData() throws IOException {
if (this.cachedData != null)
return this.cachedData;
// Shortcut in case we have no trade presences
if (this.tradePresences.isEmpty()) {
this.cachedData = Ints.toByteArray(0);
return this.cachedData;
}
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (TradePresenceData tradePresenceData : this.tradePresences) {
Long timestamp = tradePresenceData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ this.tradePresences.size() * Transformer.PUBLIC_KEY_LENGTH;
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (TradePresenceData tradePresenceData : this.tradePresences) {
if (tradePresenceData.getTimestamp() == timestamp)
bytes.write(tradePresenceData.getPublicKey());
}
}
this.cachedData = bytes.toByteArray();
return this.cachedData;
}
}

View File

@ -1,17 +1,19 @@
package org.qortal.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.qortal.transform.Transformer;
public class GetTransactionMessage extends Message {
private final byte[] signature;
private byte[] signature;
public GetTransactionMessage(byte[] signature) {
this(-1, signature);
super(MessageType.GET_TRANSACTION);
this.dataBytes = Arrays.copyOf(signature, signature.length);
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetTransactionMessage(int id, byte[] signature) {
@ -32,13 +34,4 @@ public class GetTransactionMessage extends Message {
return new GetTransactionMessage(id, signature);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
return bytes.toByteArray();
}
}

View File

@ -5,7 +5,9 @@ import java.nio.ByteBuffer;
public class GetUnconfirmedTransactionsMessage extends Message {
public GetUnconfirmedTransactionsMessage() {
this(-1);
super(MessageType.GET_UNCONFIRMED_TRANSACTIONS);
this.dataBytes = EMPTY_DATA_BYTES;
}
private GetUnconfirmedTransactionsMessage(int id) {
@ -16,9 +18,4 @@ public class GetUnconfirmedTransactionsMessage extends Message {
return new GetUnconfirmedTransactionsMessage(id);
}
@Override
protected byte[] toData() {
return new byte[0];
}
}

View File

@ -3,7 +3,6 @@ package org.qortal.network.message;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
@ -31,7 +30,14 @@ public class GoodbyeMessage extends Message {
}
}
private final Reason reason;
private Reason reason;
public GoodbyeMessage(Reason reason) {
super(MessageType.GOODBYE);
this.dataBytes = Ints.toByteArray(reason.value);
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GoodbyeMessage(int id, Reason reason) {
super(id, MessageType.GOODBYE);
@ -39,10 +45,6 @@ public class GoodbyeMessage extends Message {
this.reason = reason;
}
public GoodbyeMessage(Reason reason) {
this(-1, reason);
}
public Reason getReason() {
return this.reason;
}
@ -57,9 +59,4 @@ public class GoodbyeMessage extends Message {
return new GoodbyeMessage(id, reason);
}
@Override
protected byte[] toData() throws IOException {
return Ints.toByteArray(this.reason.value);
}
}

View File

@ -12,13 +12,30 @@ import com.google.common.primitives.Longs;
public class HeightV2Message extends Message {
private final int height;
private final byte[] signature;
private final long timestamp;
private final byte[] minterPublicKey;
private int height;
private byte[] signature;
private long timestamp;
private byte[] minterPublicKey;
public HeightV2Message(int height, byte[] signature, long timestamp, byte[] minterPublicKey) {
this(-1, height, signature, timestamp, minterPublicKey);
super(MessageType.HEIGHT_V2);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(height));
bytes.write(signature);
bytes.write(Longs.toByteArray(timestamp));
bytes.write(minterPublicKey);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private HeightV2Message(int id, int height, byte[] signature, long timestamp, byte[] minterPublicKey) {
@ -60,19 +77,4 @@ public class HeightV2Message extends Message {
return new HeightV2Message(id, height, signature, timestamp, minterPublicKey);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.height));
bytes.write(this.signature);
bytes.write(Longs.toByteArray(this.timestamp));
bytes.write(this.minterPublicKey);
return bytes.toByteArray();
}
}

View File

@ -11,9 +11,28 @@ import com.google.common.primitives.Longs;
public class HelloMessage extends Message {
private final long timestamp;
private final String versionString;
private final String senderPeerAddress;
private long timestamp;
private String versionString;
private String senderPeerAddress;
public HelloMessage(long timestamp, String versionString, String senderPeerAddress) {
super(MessageType.HELLO);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Longs.toByteArray(timestamp));
Serialization.serializeSizedString(bytes, versionString);
Serialization.serializeSizedString(bytes, senderPeerAddress);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private HelloMessage(int id, long timestamp, String versionString, String senderPeerAddress) {
super(id, MessageType.HELLO);
@ -23,10 +42,6 @@ public class HelloMessage extends Message {
this.senderPeerAddress = senderPeerAddress;
}
public HelloMessage(long timestamp, String versionString, String senderPeerAddress) {
this(-1, timestamp, versionString, senderPeerAddress);
}
public long getTimestamp() {
return this.timestamp;
}
@ -58,17 +73,4 @@ public class HelloMessage extends Message {
return new HelloMessage(id, timestamp, versionString, senderPeerAddress);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Longs.toByteArray(this.timestamp));
Serialization.serializeSizedString(bytes, this.versionString);
Serialization.serializeSizedString(bytes, this.senderPeerAddress);
return bytes.toByteArray();
}
}

View File

@ -2,35 +2,66 @@ package org.qortal.network.message;
import org.qortal.crypto.Crypto;
import org.qortal.network.Network;
import org.qortal.transform.TransformationException;
import com.google.common.primitives.Ints;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toMap;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Network message for sending over network, or unpacked data received from network.
* <p></p>
* <p>
* For messages received from network, subclass's {@code fromByteBuffer()} method is used
* to construct a subclassed instance. Original bytes from network are not retained.
* Access to deserialized data should be via subclass's getters. Ideally there should be NO setters!
* </p>
* <p></p>
* <p>
* Each subclass's <b>public</b> constructor is for building a message to send <b>only</b>.
* The constructor will serialize into byte form but <b>not</b> store the passed args.
* Serialized bytes are saved into superclass (Message) {@code dataBytes} and, if not empty,
* a checksum is created and saved into {@code checksumBytes}.
* Therefore: <i>do not use subclass's getters after using constructor!</i>
* </p>
* <p></p>
* <p>
* For subclasses where outgoing versions might be usefully cached, they can implement Clonable
* as long if they are safe to use {@link Object#clone()}.
* </p>
*/
public abstract class Message {
// MAGIC(4) + TYPE(4) + HAS-ID(1) + ID?(4) + DATA-SIZE(4) + CHECKSUM?(4) + DATA?(*)
private static final int MAGIC_LENGTH = 4;
private static final int TYPE_LENGTH = 4;
private static final int HAS_ID_LENGTH = 1;
private static final int ID_LENGTH = 4;
private static final int DATA_SIZE_LENGTH = 4;
private static final int CHECKSUM_LENGTH = 4;
private static final int MAX_DATA_SIZE = 10 * 1024 * 1024; // 10MB
private int id;
private MessageType type;
protected static final byte[] EMPTY_DATA_BYTES = new byte[0];
protected int id;
protected final MessageType type;
/** Serialized outgoing message data. Expected to be written to by subclass. */
protected byte[] dataBytes;
/** Serialized outgoing message checksum. Expected to be written to by subclass. */
protected byte[] checksumBytes;
/** Typically called by subclass when constructing message from received network data. */
protected Message(int id, MessageType type) {
this.id = id;
this.type = type;
}
/** Typically called by subclass when constructing outgoing message. */
protected Message(MessageType type) {
this(-1, type);
}
@ -54,7 +85,7 @@ public abstract class Message {
/**
* Attempt to read a message from byte buffer.
*
* @param readOnlyBuffer
* @param readOnlyBuffer ByteBuffer containing bytes read from network
* @return null if no complete message can be read
* @throws MessageException if message could not be decoded or is invalid
*/
@ -131,9 +162,27 @@ public abstract class Message {
return Arrays.copyOfRange(Crypto.digest(dataBuffer), 0, CHECKSUM_LENGTH);
}
public void checkValidOutgoing() throws MessageException {
// We expect subclass to have initialized these
if (this.dataBytes == null)
throw new MessageException("Missing data payload");
if (this.dataBytes.length > 0 && this.checksumBytes == null)
throw new MessageException("Missing data checksum");
}
public byte[] toBytes() throws MessageException {
checkValidOutgoing();
// We can calculate exact length
int messageLength = MAGIC_LENGTH + TYPE_LENGTH + HAS_ID_LENGTH;
messageLength += this.hasId() ? ID_LENGTH : 0;
messageLength += DATA_SIZE_LENGTH + this.dataBytes.length > 0 ? CHECKSUM_LENGTH + this.dataBytes.length : 0;
if (messageLength > MAX_DATA_SIZE)
throw new MessageException(String.format("About to send message with length %d larger than allowed %d", messageLength, MAX_DATA_SIZE));
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(256);
ByteArrayOutputStream bytes = new ByteArrayOutputStream(messageLength);
// Magic
bytes.write(Network.getInstance().getMessageMagic());
@ -148,32 +197,30 @@ public abstract class Message {
bytes.write(0);
}
byte[] data = this.toData();
if (data == null)
throw new MessageException("Missing data payload");
bytes.write(Ints.toByteArray(this.dataBytes.length));
bytes.write(Ints.toByteArray(data.length));
if (data.length > 0) {
bytes.write(generateChecksum(data));
bytes.write(data);
if (this.dataBytes.length > 0) {
bytes.write(this.checksumBytes);
bytes.write(this.dataBytes);
}
if (bytes.size() > MAX_DATA_SIZE)
throw new MessageException(String.format("About to send message with length %d larger than allowed %d", bytes.size(), MAX_DATA_SIZE));
return bytes.toByteArray();
} catch (IOException | TransformationException e) {
} catch (IOException e) {
throw new MessageException("Failed to serialize message", e);
}
}
/** Serialize message into bytes.
*
* @return message as byte array, or null if message is missing payload data / uninitialized somehow
* @throws IOException if unable / failed to serialize
* @throws TransformationException if unable / failed to serialize
*/
protected abstract byte[] toData() throws IOException, TransformationException;
public static <M extends Message> M cloneWithNewId(M message, int newId) {
M clone;
try {
clone = (M) message.clone();
} catch (CloneNotSupportedException e) {
throw new UnsupportedOperationException("Message sub-class not cloneable");
}
clone.setId(newId);
return clone;
}
}

View File

@ -16,10 +16,29 @@ import com.google.common.primitives.Longs;
public class OnlineAccountsMessage extends Message {
private static final int MAX_ACCOUNT_COUNT = 5000;
private final List<OnlineAccountData> onlineAccounts;
private List<OnlineAccountData> onlineAccounts;
public OnlineAccountsMessage(List<OnlineAccountData> onlineAccounts) {
this(-1, onlineAccounts);
super(MessageType.ONLINE_ACCOUNTS);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(onlineAccounts.size()));
for (OnlineAccountData onlineAccountData : onlineAccounts) {
bytes.write(Longs.toByteArray(onlineAccountData.getTimestamp()));
bytes.write(onlineAccountData.getSignature());
bytes.write(onlineAccountData.getPublicKey());
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private OnlineAccountsMessage(int id, List<OnlineAccountData> onlineAccounts) {
@ -53,21 +72,4 @@ public class OnlineAccountsMessage extends Message {
return new OnlineAccountsMessage(id, onlineAccounts);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.onlineAccounts.size()));
for (OnlineAccountData onlineAccountData : this.onlineAccounts) {
bytes.write(Longs.toByteArray(onlineAccountData.getTimestamp()));
bytes.write(onlineAccountData.getSignature());
bytes.write(onlineAccountData.getPublicKey());
}
return bytes.toByteArray();
}
}

View File

@ -23,11 +23,52 @@ import java.util.Map;
* Also V2 only builds online accounts message once!
*/
public class OnlineAccountsV2Message extends Message {
private final List<OnlineAccountData> onlineAccounts;
private byte[] cachedData;
private List<OnlineAccountData> onlineAccounts;
public OnlineAccountsV2Message(List<OnlineAccountData> onlineAccounts) {
this(-1, onlineAccounts);
super(MessageType.ONLINE_ACCOUNTS_V2);
// Shortcut in case we have no online accounts
if (onlineAccounts.isEmpty()) {
this.dataBytes = Ints.toByteArray(0);
this.checksumBytes = Message.generateChecksum(this.dataBytes);
return;
}
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (OnlineAccountData onlineAccountData : onlineAccounts) {
Long timestamp = onlineAccountData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ onlineAccounts.size() * (Transformer.SIGNATURE_LENGTH + Transformer.PUBLIC_KEY_LENGTH);
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
try {
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (OnlineAccountData onlineAccountData : onlineAccounts) {
if (onlineAccountData.getTimestamp() == timestamp) {
bytes.write(onlineAccountData.getSignature());
bytes.write(onlineAccountData.getPublicKey());
}
}
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private OnlineAccountsV2Message(int id, List<OnlineAccountData> onlineAccounts) {
@ -69,46 +110,4 @@ public class OnlineAccountsV2Message extends Message {
return new OnlineAccountsV2Message(id, onlineAccounts);
}
@Override
protected synchronized byte[] toData() throws IOException {
if (this.cachedData != null)
return this.cachedData;
// Shortcut in case we have no online accounts
if (this.onlineAccounts.isEmpty()) {
this.cachedData = Ints.toByteArray(0);
return this.cachedData;
}
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (OnlineAccountData onlineAccountData : this.onlineAccounts) {
Long timestamp = onlineAccountData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ this.onlineAccounts.size() * (Transformer.SIGNATURE_LENGTH + Transformer.PUBLIC_KEY_LENGTH);
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (OnlineAccountData onlineAccountData : this.onlineAccounts) {
if (onlineAccountData.getTimestamp() == timestamp) {
bytes.write(onlineAccountData.getSignature());
bytes.write(onlineAccountData.getPublicKey());
}
}
}
this.cachedData = bytes.toByteArray();
return this.cachedData;
}
}

View File

@ -15,10 +15,38 @@ import com.google.common.primitives.Ints;
// NOTE: this message supports hostnames, literal IP addresses (IPv4 and IPv6) with port numbers
public class PeersV2Message extends Message {
private final List<PeerAddress> peerAddresses;
private List<PeerAddress> peerAddresses;
public PeersV2Message(List<PeerAddress> peerAddresses) {
this(-1, peerAddresses);
super(MessageType.PEERS_V2);
List<byte[]> addresses = new ArrayList<>();
// First entry represents sending node but contains only port number with empty address.
addresses.add(("0.0.0.0:" + Settings.getInstance().getListenPort()).getBytes(StandardCharsets.UTF_8));
for (PeerAddress peerAddress : peerAddresses)
addresses.add(peerAddress.toString().getBytes(StandardCharsets.UTF_8));
// We can't send addresses that are longer than 255 bytes as length itself is encoded in one byte.
addresses.removeIf(addressString -> addressString.length > 255);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
// Number of entries
bytes.write(Ints.toByteArray(addresses.size()));
for (byte[] address : addresses) {
bytes.write(address.length);
bytes.write(address);
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private PeersV2Message(int id, List<PeerAddress> peerAddresses) {
@ -55,32 +83,4 @@ public class PeersV2Message extends Message {
return new PeersV2Message(id, peerAddresses);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
List<byte[]> addresses = new ArrayList<>();
// First entry represents sending node but contains only port number with empty address.
addresses.add(("0.0.0.0:" + Settings.getInstance().getListenPort()).getBytes(StandardCharsets.UTF_8));
for (PeerAddress peerAddress : this.peerAddresses)
addresses.add(peerAddress.toString().getBytes(StandardCharsets.UTF_8));
// We can't send addresses that are longer than 255 bytes as length itself is encoded in one byte.
addresses.removeIf(addressString -> addressString.length > 255);
// Serialize
// Number of entries
bytes.write(Ints.toByteArray(addresses.size()));
for (byte[] address : addresses) {
bytes.write(address.length);
bytes.write(address);
}
return bytes.toByteArray();
}
}

View File

@ -5,7 +5,9 @@ import java.nio.ByteBuffer;
public class PingMessage extends Message {
public PingMessage() {
this(-1);
super(MessageType.PING);
this.dataBytes = EMPTY_DATA_BYTES;
}
private PingMessage(int id) {
@ -16,9 +18,4 @@ public class PingMessage extends Message {
return new PingMessage(id);
}
@Override
protected byte[] toData() {
return new byte[0];
}
}

View File

@ -5,7 +5,9 @@ import java.nio.ByteBuffer;
public class PongMessage extends Message {
public PongMessage() {
this(-1);
super(MessageType.PONG);
this.dataBytes = EMPTY_DATA_BYTES;
}
private PongMessage(int id) {
@ -16,9 +18,4 @@ public class PongMessage extends Message {
return new PongMessage(id);
}
@Override
protected byte[] toData() {
return new byte[0];
}
}

View File

@ -10,8 +10,25 @@ public class ResponseMessage extends Message {
public static final int DATA_LENGTH = 32;
private final int nonce;
private final byte[] data;
private int nonce;
private byte[] data;
public ResponseMessage(int nonce, byte[] data) {
super(MessageType.RESPONSE);
ByteArrayOutputStream bytes = new ByteArrayOutputStream(4 + DATA_LENGTH);
try {
bytes.write(Ints.toByteArray(nonce));
bytes.write(data);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private ResponseMessage(int id, int nonce, byte[] data) {
super(id, MessageType.RESPONSE);
@ -20,10 +37,6 @@ public class ResponseMessage extends Message {
this.data = data;
}
public ResponseMessage(int nonce, byte[] data) {
this(-1, nonce, data);
}
public int getNonce() {
return this.nonce;
}
@ -41,15 +54,4 @@ public class ResponseMessage extends Message {
return new ResponseMessage(id, nonce, data);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(4 + DATA_LENGTH);
bytes.write(Ints.toByteArray(this.nonce));
bytes.write(data);
return bytes.toByteArray();
}
}

View File

@ -13,10 +13,24 @@ import com.google.common.primitives.Ints;
public class SignaturesMessage extends Message {
private final List<byte[]> signatures;
private List<byte[]> signatures;
public SignaturesMessage(List<byte[]> signatures) {
this(-1, signatures);
super(MessageType.SIGNATURES);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(signatures.size()));
for (byte[] signature : signatures)
bytes.write(signature);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private SignaturesMessage(int id, List<byte[]> signatures) {
@ -45,16 +59,4 @@ public class SignaturesMessage extends Message {
return new SignaturesMessage(id, signatures);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.signatures.size()));
for (byte[] signature : this.signatures)
bytes.write(signature);
return bytes.toByteArray();
}
}

View File

@ -20,11 +20,55 @@ import java.util.Map;
* Groups of: number of entries, timestamp, then pubkey + sig + AT address for each entry.
*/
public class TradePresencesMessage extends Message {
private final List<TradePresenceData> tradePresences;
private byte[] cachedData;
private List<TradePresenceData> tradePresences;
public TradePresencesMessage(List<TradePresenceData> tradePresences) {
this(-1, tradePresences);
super(MessageType.TRADE_PRESENCES);
// Shortcut in case we have no trade presences
if (tradePresences.isEmpty()) {
this.dataBytes = Ints.toByteArray(0);
this.checksumBytes = Message.generateChecksum(this.dataBytes);
return;
}
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (TradePresenceData tradePresenceData : tradePresences) {
Long timestamp = tradePresenceData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ tradePresences.size() * (Transformer.PUBLIC_KEY_LENGTH + Transformer.SIGNATURE_LENGTH + Transformer.ADDRESS_LENGTH);
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
try {
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (TradePresenceData tradePresenceData : tradePresences) {
if (tradePresenceData.getTimestamp() == timestamp) {
bytes.write(tradePresenceData.getPublicKey());
bytes.write(tradePresenceData.getSignature());
bytes.write(Base58.decode(tradePresenceData.getAtAddress()));
}
}
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private TradePresencesMessage(int id, List<TradePresenceData> tradePresences) {
@ -70,49 +114,4 @@ public class TradePresencesMessage extends Message {
return new TradePresencesMessage(id, tradePresences);
}
@Override
protected synchronized byte[] toData() throws IOException {
if (this.cachedData != null)
return this.cachedData;
// Shortcut in case we have no trade presences
if (this.tradePresences.isEmpty()) {
this.cachedData = Ints.toByteArray(0);
return this.cachedData;
}
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (TradePresenceData tradePresenceData : this.tradePresences) {
Long timestamp = tradePresenceData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ this.tradePresences.size() * (Transformer.PUBLIC_KEY_LENGTH + Transformer.SIGNATURE_LENGTH + Transformer.ADDRESS_LENGTH);
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (TradePresenceData tradePresenceData : this.tradePresences) {
if (tradePresenceData.getTimestamp() == timestamp) {
bytes.write(tradePresenceData.getPublicKey());
bytes.write(tradePresenceData.getSignature());
bytes.write(Base58.decode(tradePresenceData.getAtAddress()));
}
}
}
this.cachedData = bytes.toByteArray();
return this.cachedData;
}
}

View File

@ -10,8 +10,11 @@ public class TransactionMessage extends Message {
private TransactionData transactionData;
public TransactionMessage(TransactionData transactionData) {
this(-1, transactionData);
public TransactionMessage(TransactionData transactionData) throws TransformationException {
super(MessageType.TRANSACTION);
this.dataBytes = TransactionTransformer.toBytes(transactionData);
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private TransactionMessage(int id, TransactionData transactionData) {
@ -36,12 +39,4 @@ public class TransactionMessage extends Message {
return new TransactionMessage(id, transactionData);
}
@Override
protected byte[] toData() throws TransformationException {
if (this.transactionData == null)
return null;
return TransactionTransformer.toBytes(this.transactionData);
}
}

View File

@ -13,10 +13,24 @@ import com.google.common.primitives.Ints;
public class TransactionSignaturesMessage extends Message {
private final List<byte[]> signatures;
private List<byte[]> signatures;
public TransactionSignaturesMessage(List<byte[]> signatures) {
this(-1, signatures);
super(MessageType.TRANSACTION_SIGNATURES);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(signatures.size()));
for (byte[] signature : signatures)
bytes.write(signature);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private TransactionSignaturesMessage(int id, List<byte[]> signatures) {
@ -45,16 +59,4 @@ public class TransactionSignaturesMessage extends Message {
return new TransactionSignaturesMessage(id, signatures);
}
@Override
protected byte[] toData() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.signatures.size()));
for (byte[] signature : this.signatures)
bytes.write(signature);
return bytes.toByteArray();
}
}