From 01d810fc008ce30b151e54ffed8ba66081f322e6 Mon Sep 17 00:00:00 2001 From: catbref Date: Sat, 19 Feb 2022 16:05:10 +0000 Subject: [PATCH] Initial effort at migrating PRESENCE transactions to dedicated network messages --- .../org/qortal/controller/Controller.java | 10 + .../qortal/controller/tradebot/TradeBot.java | 225 ++++++++++++++---- .../qortal/data/network/OnlineTradeData.java | 86 +++++++ .../message/GetOnlineTradesMessage.java | 110 +++++++++ .../org/qortal/network/message/Message.java | 5 +- .../network/message/OnlineTradesMessage.java | 123 ++++++++++ 6 files changed, 516 insertions(+), 43 deletions(-) create mode 100644 src/main/java/org/qortal/data/network/OnlineTradeData.java create mode 100644 src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java create mode 100644 src/main/java/org/qortal/network/message/OnlineTradesMessage.java diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 542a2889..5c956956 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1222,6 +1222,8 @@ public class Controller extends Thread { } public void onPeerHandshakeCompleted(Peer peer) { + // XXX: we could turn this into an EventBus event so that listeners like TradeBot get a look-in + // Only send if outbound if (peer.isOutbound()) { // Request peer's unconfirmed transactions @@ -1312,6 +1314,14 @@ public class Controller extends Thread { ArbitraryDataManager.getInstance().onNetworkArbitrarySignaturesMessage(peer, message); break; + case GET_ONLINE_TRADES: + TradeBot.getInstance().onGetOnlineTradesMessage(peer, message); + break; + + case ONLINE_TRADES: + TradeBot.getInstance().onOnlineTradesMessage(peer, message); + break; + default: LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer)); break; diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index 6996acbb..904b87b5 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -2,16 +2,11 @@ package org.qortal.controller.tradebot; import java.awt.TrayIcon.MessageType; import java.security.SecureRandom; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.locks.ReentrantLock; +import java.util.*; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.util.Supplier; import org.bitcoinj.core.ECKey; import org.qortal.account.PrivateKeyAccount; import org.qortal.api.model.crosschain.TradeBotCreateRequest; @@ -19,25 +14,26 @@ import org.qortal.controller.Controller; import org.qortal.controller.Synchronizer; import org.qortal.controller.tradebot.AcctTradeBot.ResponseResult; import org.qortal.crosschain.*; +import org.qortal.crypto.Crypto; import org.qortal.data.at.ATData; import org.qortal.data.crosschain.CrossChainTradeData; import org.qortal.data.crosschain.TradeBotData; -import org.qortal.data.transaction.BaseTransactionData; -import org.qortal.data.transaction.PresenceTransactionData; +import org.qortal.data.network.OnlineTradeData; import org.qortal.event.Event; import org.qortal.event.EventBus; import org.qortal.event.Listener; -import org.qortal.group.Group; import org.qortal.gui.SysTray; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.message.GetOnlineTradesMessage; +import org.qortal.network.message.Message; +import org.qortal.network.message.OnlineTradesMessage; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; import org.qortal.repository.hsqldb.HSQLDBImportExport; import org.qortal.settings.Settings; -import org.qortal.transaction.PresenceTransaction; -import org.qortal.transaction.PresenceTransaction.PresenceType; -import org.qortal.transaction.Transaction.ValidationResult; -import org.qortal.transform.transaction.TransactionTransformer; +import org.qortal.utils.ByteArray; import org.qortal.utils.NTP; import com.google.common.primitives.Longs; @@ -57,6 +53,10 @@ public class TradeBot implements Listener { private static final Logger LOGGER = LogManager.getLogger(TradeBot.class); private static final Random RANDOM = new SecureRandom(); + private static final long ONLINE_LIFETIME = 30 * 60 * 1000L; // 30 minutes in ms + + private static final long ONLINE_BROADCAST_INTERVAL = 5 * 60 * 1000L; // 5 minutes in ms + public interface StateNameAndValueSupplier { public String getState(); public int getStateValue(); @@ -87,7 +87,12 @@ public class TradeBot implements Listener { private static TradeBot instance; - private final Map presenceTimestampsByAtAddress = Collections.synchronizedMap(new HashMap<>()); + private final Map ourTimestampsByPubkey = Collections.synchronizedMap(new HashMap<>()); + private final List pendingOnlineSignatures = Collections.synchronizedList(new ArrayList<>()); + + private final Map allOnlineByPubkey = Collections.synchronizedMap(new HashMap<>()); + private Map safeAllOnlineByPubkey = Collections.emptyMap(); + private long nextBroadcastTimestamp = 0L; private TradeBot() { EventBus.INSTANCE.addListener(event -> TradeBot.getInstance().listen(event)); @@ -218,6 +223,8 @@ public class TradeBot implements Listener { return; synchronized (this) { + expireOldOnlineSignatures(); + List allTradeBotData; try (final Repository repository = RepositoryManager.getRepository()) { @@ -248,6 +255,8 @@ public class TradeBot implements Listener { } catch (ForeignBlockchainException e) { LOGGER.warn(() -> String.format("Foreign blockchain issue processing trade-bot entry for AT %s: %s", tradeBotData.getAtAddress(), e.getMessage())); } + + broadcastOnlineSignatures(); } } @@ -325,6 +334,15 @@ public class TradeBot implements Listener { } // PRESENCE-related + + private void expireOldOnlineSignatures() { + long now = NTP.getTime(); + + synchronized (this.pendingOnlineSignatures) { + this.pendingOnlineSignatures.removeIf(onlineTradeData -> onlineTradeData.getTimestamp() <= now); + } + } + /*package*/ void updatePresence(Repository repository, TradeBotData tradeBotData, CrossChainTradeData tradeData) throws DataException { String atAddress = tradeBotData.getAtAddress(); @@ -333,44 +351,167 @@ public class TradeBot implements Listener { String signerAddress = tradeNativeAccount.getAddress(); /* - * There's no point in Alice trying to build a PRESENCE transaction - * for an AT that isn't locked to her, as other peers won't be able - * to validate the PRESENCE transaction as signing public key won't - * be visible. + * We only broadcast trade entry online signatures for BOB when OFFERING + * so that buyers don't click on offline / expired entries that would waste their time. */ - if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) - // Signer is neither Bob, nor Alice, or trade not yet locked to Alice + if (tradeData.mode != AcctMode.OFFERING || !signerAddress.equals(tradeData.qortalCreatorTradeAddress)) return; long now = NTP.getTime(); - long threshold = now - PresenceType.TRADE_BOT.getLifetime(); - long timestamp = presenceTimestampsByAtAddress.compute(atAddress, (k, v) -> (v == null || v < threshold) ? now : v); + // Timestamps are considered good for full lifetime... + long expiry = (now + ONLINE_LIFETIME) % ONLINE_LIFETIME; + // ... but refresh if older than half-lifetime + long threshold = (now + ONLINE_LIFETIME / 2) % (ONLINE_LIFETIME / 2); + + ByteArray pubkeyByteArray = ByteArray.of(tradeNativeAccount.getPublicKey()); + // If map's timestamp is missing, or too old, use the new timestamp - otherwise use existing timestamp. + long timestamp = ourTimestampsByPubkey.compute(pubkeyByteArray, (k, v) -> (v == null || v <= threshold) ? expiry : v); // If timestamp hasn't been updated then nothing to do - if (timestamp != now) + if (timestamp != expiry) return; - int txGroupId = Group.NO_GROUP; - byte[] reference = new byte[TransactionTransformer.SIGNATURE_LENGTH]; - byte[] creatorPublicKey = tradeNativeAccount.getPublicKey(); - long fee = 0L; + // Create signature + byte[] signature = tradeNativeAccount.sign(Longs.toByteArray(timestamp)); - BaseTransactionData baseTransactionData = new BaseTransactionData(timestamp, txGroupId, reference, creatorPublicKey, fee, null); + // Add new online info to queue to be broadcast around network + OnlineTradeData onlineTradeData = new OnlineTradeData(timestamp, tradeNativeAccount.getPublicKey(), signature, atAddress); + this.pendingOnlineSignatures.add(onlineTradeData); - int nonce = 0; - byte[] timestampSignature = tradeNativeAccount.sign(Longs.toByteArray(timestamp)); - - PresenceTransactionData transactionData = new PresenceTransactionData(baseTransactionData, nonce, PresenceType.TRADE_BOT, timestampSignature); - - PresenceTransaction presenceTransaction = new PresenceTransaction(repository, transactionData); - presenceTransaction.computeNonce(); - - presenceTransaction.sign(tradeNativeAccount); - - ValidationResult result = presenceTransaction.importAsUnconfirmed(); - if (result != ValidationResult.OK) - LOGGER.debug(() -> String.format("Unable to build trade-bot PRESENCE transaction for %s: %s", tradeBotData.getAtAddress(), result.name())); + this.allOnlineByPubkey.put(pubkeyByteArray, onlineTradeData); + rebuildSafeAllOnline(); } + private void rebuildSafeAllOnline() { + synchronized (this.allOnlineByPubkey) { + // Collect into a *new* unmodifiable map. + this.safeAllOnlineByPubkey = Map.copyOf(this.allOnlineByPubkey); + } + } + + private void broadcastOnlineSignatures() { + // If we have new online signatures that are pending broadcast, send those as a priority + if (!this.pendingOnlineSignatures.isEmpty()) { + // Create a copy for Network to safely use in another thread + List safeOnlineSignatures; + synchronized (this.pendingOnlineSignatures) { + safeOnlineSignatures = List.copyOf(this.pendingOnlineSignatures); + this.pendingOnlineSignatures.clear(); + } + + OnlineTradesMessage onlineTradesMessage = new OnlineTradesMessage(safeOnlineSignatures); + Network.getInstance().broadcast(peer -> onlineTradesMessage); + + return; + } + + // As we have no new online signatures, check whether it's time to do a general broadcast + Long now = NTP.getTime(); + if (now == null || now < nextBroadcastTimestamp) + return; + + nextBroadcastTimestamp = now + ONLINE_BROADCAST_INTERVAL; + + List safeOnlineSignatures = List.copyOf(this.safeAllOnlineByPubkey.values()); + + GetOnlineTradesMessage getOnlineTradesMessage = new GetOnlineTradesMessage(safeOnlineSignatures); + Network.getInstance().broadcast(peer -> getOnlineTradesMessage); + } + + // Network message processing + + public void onGetOnlineTradesMessage(Peer peer, Message message) { + GetOnlineTradesMessage getOnlineTradesMessage = (GetOnlineTradesMessage) message; + + List peersOnlineTrades = getOnlineTradesMessage.getOnlineTrades(); + + Map entriesUnknownToPeer = new HashMap<>(this.safeAllOnlineByPubkey); + for (OnlineTradeData peersOnlineTrade : peersOnlineTrades) { + ByteArray pubkeyByteArray = ByteArray.of(peersOnlineTrade.getPublicKey()); + + OnlineTradeData ourEntry = entriesUnknownToPeer.get(pubkeyByteArray); + + if (ourEntry != null && ourEntry.getTimestamp() == peersOnlineTrade.getTimestamp()) + entriesUnknownToPeer.remove(pubkeyByteArray); + } + + // Send complement to peer + List safeOnlineSignatures = List.copyOf(entriesUnknownToPeer.values()); + Message responseMessage = new OnlineTradesMessage(safeOnlineSignatures); + if (!peer.sendMessage(responseMessage)) { + peer.disconnect("failed to send online trades response"); + return; + } + } + + public void onOnlineTradesMessage(Peer peer, Message message) { + OnlineTradesMessage onlineTradesMessage = (OnlineTradesMessage) message; + + List peersOnlineTrades = onlineTradesMessage.getOnlineTrades(); + + long now = NTP.getTime(); + // Timestamps after this are too far into the future + long futureThreshold = (now % ONLINE_LIFETIME) + ONLINE_LIFETIME + ONLINE_LIFETIME / 2; + // Timestamps before this are too far into the past + long pastThreshold = now; + + Map> acctSuppliersByCodeHash = SupportedBlockchain.getAcctMap(); + + int newCount = 0; + + try (final Repository repository = RepositoryManager.getRepository()) { + for (OnlineTradeData peersOnlineTrade : peersOnlineTrades) { + long timestamp = peersOnlineTrade.getTimestamp(); + + if (timestamp < pastThreshold || timestamp > futureThreshold) + continue; + + ByteArray pubkeyByteArray = ByteArray.of(peersOnlineTrade.getPublicKey()); + + // Ignore if we've previously verified this timestamp+publickey combo + OnlineTradeData existingTradeData = this.safeAllOnlineByPubkey.get(pubkeyByteArray); + if (existingTradeData != null && existingTradeData.getTimestamp() == timestamp) + continue; + + // Check timestamp signature + byte[] timestampSignature = peersOnlineTrade.getSignature(); + byte[] timestampBytes = Longs.toByteArray(timestamp); + byte[] publicKey = peersOnlineTrade.getPublicKey(); + if (!Crypto.verify(publicKey, timestampSignature, timestampBytes)) + continue; + + ATData atData = repository.getATRepository().fromATAddress(peersOnlineTrade.getAtAddress()); + if (atData == null || atData.getIsFrozen() || atData.getIsFinished()) + continue; + + ByteArray atCodeHash = new ByteArray(atData.getCodeHash()); + Supplier acctSupplier = acctSuppliersByCodeHash.get(atCodeHash); + if (acctSupplier == null) + continue; + + CrossChainTradeData tradeData = acctSupplier.get().populateTradeData(repository, atData); + if (tradeData == null) + continue; + + // Convert signer's public key to address form + String signerAddress = Crypto.toAddress(publicKey); + + // Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form) + if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) + continue; + + // This is new to us + this.allOnlineByPubkey.put(pubkeyByteArray, peersOnlineTrade); + ++newCount; + } + } catch (DataException e) { + LOGGER.error("Couldn't process ONLINE_TRADES message due to repository issue", e); + } + + if (newCount > 0) { + LOGGER.debug("New online trade signatures: {}", newCount); + rebuildSafeAllOnline(); + } + } } diff --git a/src/main/java/org/qortal/data/network/OnlineTradeData.java b/src/main/java/org/qortal/data/network/OnlineTradeData.java new file mode 100644 index 00000000..d370c3a3 --- /dev/null +++ b/src/main/java/org/qortal/data/network/OnlineTradeData.java @@ -0,0 +1,86 @@ +package org.qortal.data.network; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import java.util.Arrays; + +// All properties to be converted to JSON via JAXB +@XmlAccessorType(XmlAccessType.FIELD) +public class OnlineTradeData { + + protected long timestamp; + protected byte[] publicKey; // Could be BOB's or ALICE's + protected byte[] signature; // Not always present + protected String atAddress; // Not always present + + // Constructors + + // necessary for JAXB serialization + protected OnlineTradeData() { + } + + public OnlineTradeData(long timestamp, byte[] publicKey, byte[] signature, String address) { + this.timestamp = timestamp; + this.publicKey = publicKey; + this.signature = signature; + this.atAddress = address; + } + + public OnlineTradeData(long timestamp, byte[] publicKey) { + this(timestamp, publicKey, null, null); + } + + public long getTimestamp() { + return this.timestamp; + } + + public byte[] getPublicKey() { + return this.publicKey; + } + + public byte[] getSignature() { + return this.signature; + } + + public String getAtAddress() { + return this.atAddress; + } + + // Comparison + + @Override + public boolean equals(Object other) { + if (other == this) + return true; + + if (!(other instanceof OnlineTradeData)) + return false; + + OnlineTradeData otherOnlineTradeData = (OnlineTradeData) other; + + // Very quick comparison + if (otherOnlineTradeData.timestamp != this.timestamp) + return false; + + if (!Arrays.equals(otherOnlineTradeData.publicKey, this.publicKey)) + return false; + + if (otherOnlineTradeData.atAddress != null && !otherOnlineTradeData.atAddress.equals(this.atAddress)) + return false; + + if (this.atAddress != null && !this.atAddress.equals(otherOnlineTradeData.atAddress)) + return false; + + if (!Arrays.equals(otherOnlineTradeData.signature, this.signature)) + return false; + + return true; + } + + @Override + public int hashCode() { + // Pretty lazy implementation + return (int) this.timestamp; + } + +} diff --git a/src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java b/src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java new file mode 100644 index 00000000..588f6a0b --- /dev/null +++ b/src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java @@ -0,0 +1,110 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.qortal.data.network.OnlineTradeData; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * For requesting which trades are online from remote peer, given our list of online trades. + * + * Groups of: number of entries, timestamp, then AT trade pubkey for each entry. + */ +public class GetOnlineTradesMessage extends Message { + private List onlineTrades; + private byte[] cachedData; + + public GetOnlineTradesMessage(List onlineTrades) { + this(-1, onlineTrades); + } + + private GetOnlineTradesMessage(int id, List onlineTrades) { + super(id, MessageType.GET_ONLINE_TRADES); + + this.onlineTrades = onlineTrades; + } + + public List getOnlineTrades() { + return this.onlineTrades; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int tradeCount = bytes.getInt(); + + List onlineTrades = new ArrayList<>(tradeCount); + + while (tradeCount > 0) { + long timestamp = bytes.getLong(); + + for (int i = 0; i < tradeCount; ++i) { + byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(publicKey); + + onlineTrades.add(new OnlineTradeData(timestamp, publicKey)); + } + + if (bytes.hasRemaining()) { + tradeCount = bytes.getInt(); + } else { + // we've finished + tradeCount = 0; + } + } + + return new GetOnlineTradesMessage(id, onlineTrades); + } + + @Override + protected synchronized byte[] toData() { + if (this.cachedData != null) + return this.cachedData; + + // Shortcut in case we have no online accounts + if (this.onlineTrades.isEmpty()) { + this.cachedData = Ints.toByteArray(0); + return this.cachedData; + } + + // How many of each timestamp + Map countByTimestamp = new HashMap<>(); + + for (OnlineTradeData onlineTradeData : this.onlineTrades) { + Long timestamp = onlineTradeData.getTimestamp(); + countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); + } + + // We should know exactly how many bytes to allocate now + int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) + + this.onlineTrades.size() * Transformer.PUBLIC_KEY_LENGTH; + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); + + for (long timestamp : countByTimestamp.keySet()) { + bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp))); + + bytes.write(Longs.toByteArray(timestamp)); + + for (OnlineTradeData onlineTradeData : this.onlineTrades) { + if (onlineTradeData.getTimestamp() == timestamp) + bytes.write(onlineTradeData.getPublicKey()); + } + } + + this.cachedData = bytes.toByteArray(); + return this.cachedData; + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index 6c89a0dd..1ce45aca 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -93,7 +93,10 @@ public abstract class Message { ARBITRARY_DATA_FILE_LIST(120), GET_ARBITRARY_DATA_FILE_LIST(121), - ARBITRARY_SIGNATURES(130); + ARBITRARY_SIGNATURES(130), + + GET_ONLINE_TRADES(140), + ONLINE_TRADES(141); public final int value; public final Method fromByteBufferMethod; diff --git a/src/main/java/org/qortal/network/message/OnlineTradesMessage.java b/src/main/java/org/qortal/network/message/OnlineTradesMessage.java new file mode 100644 index 00000000..4feef670 --- /dev/null +++ b/src/main/java/org/qortal/network/message/OnlineTradesMessage.java @@ -0,0 +1,123 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.qortal.data.network.OnlineTradeData; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * For sending list of which trades are online to remote peer. + * + * Groups of: number of entries, timestamp, then pubkey + sig + AT address for each entry. + */ +public class OnlineTradesMessage extends Message { + private List onlineTrades; + private byte[] cachedData; + + public OnlineTradesMessage(List onlineTrades) { + this(-1, onlineTrades); + } + + private OnlineTradesMessage(int id, List onlineTrades) { + super(id, MessageType.ONLINE_TRADES); + + this.onlineTrades = onlineTrades; + } + + public List getOnlineTrades() { + return this.onlineTrades; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int tradeCount = bytes.getInt(); + + List onlineTrades = new ArrayList<>(tradeCount); + + while (tradeCount > 0) { + long timestamp = bytes.getLong(); + + for (int i = 0; i < tradeCount; ++i) { + byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(publicKey); + + byte[] signature = new byte[Transformer.SIGNATURE_LENGTH]; + bytes.get(signature); + + byte[] addressBytes = new byte[Transformer.ADDRESS_LENGTH]; + bytes.get(addressBytes); + String address = new String(addressBytes, StandardCharsets.UTF_8); + + onlineTrades.add(new OnlineTradeData(timestamp, publicKey, signature, address)); + } + + if (bytes.hasRemaining()) { + tradeCount = bytes.getInt(); + } else { + // we've finished + tradeCount = 0; + } + } + + return new OnlineTradesMessage(id, onlineTrades); + } + + @Override + protected synchronized byte[] toData() { + if (this.cachedData != null) + return this.cachedData; + + // Shortcut in case we have no online trade entries + if (this.onlineTrades.isEmpty()) { + this.cachedData = Ints.toByteArray(0); + return this.cachedData; + } + + // How many of each timestamp + Map countByTimestamp = new HashMap<>(); + + for (OnlineTradeData onlineTradeData : this.onlineTrades) { + Long timestamp = onlineTradeData.getTimestamp(); + countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); + } + + // We should know exactly how many bytes to allocate now + int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) + + this.onlineTrades.size() * (Transformer.PUBLIC_KEY_LENGTH + Transformer.SIGNATURE_LENGTH + Transformer.ADDRESS_LENGTH); + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); + + for (long timestamp : countByTimestamp.keySet()) { + bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp))); + + bytes.write(Longs.toByteArray(timestamp)); + + for (OnlineTradeData onlineTradeData : this.onlineTrades) { + if (onlineTradeData.getTimestamp() == timestamp) { + bytes.write(onlineTradeData.getPublicKey()); + + bytes.write(onlineTradeData.getSignature()); + + bytes.write(onlineTradeData.getAtAddress().getBytes(StandardCharsets.UTF_8)); + } + } + } + + this.cachedData = bytes.toByteArray(); + return this.cachedData; + } catch (IOException e) { + return null; + } + } + +}