From 01d810fc008ce30b151e54ffed8ba66081f322e6 Mon Sep 17 00:00:00 2001 From: catbref Date: Sat, 19 Feb 2022 16:05:10 +0000 Subject: [PATCH 1/9] Initial effort at migrating PRESENCE transactions to dedicated network messages --- .../org/qortal/controller/Controller.java | 10 + .../qortal/controller/tradebot/TradeBot.java | 225 ++++++++++++++---- .../qortal/data/network/OnlineTradeData.java | 86 +++++++ .../message/GetOnlineTradesMessage.java | 110 +++++++++ .../org/qortal/network/message/Message.java | 5 +- .../network/message/OnlineTradesMessage.java | 123 ++++++++++ 6 files changed, 516 insertions(+), 43 deletions(-) create mode 100644 src/main/java/org/qortal/data/network/OnlineTradeData.java create mode 100644 src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java create mode 100644 src/main/java/org/qortal/network/message/OnlineTradesMessage.java diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 542a2889..5c956956 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1222,6 +1222,8 @@ public class Controller extends Thread { } public void onPeerHandshakeCompleted(Peer peer) { + // XXX: we could turn this into an EventBus event so that listeners like TradeBot get a look-in + // Only send if outbound if (peer.isOutbound()) { // Request peer's unconfirmed transactions @@ -1312,6 +1314,14 @@ public class Controller extends Thread { ArbitraryDataManager.getInstance().onNetworkArbitrarySignaturesMessage(peer, message); break; + case GET_ONLINE_TRADES: + TradeBot.getInstance().onGetOnlineTradesMessage(peer, message); + break; + + case ONLINE_TRADES: + TradeBot.getInstance().onOnlineTradesMessage(peer, message); + break; + default: LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer)); break; diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index 6996acbb..904b87b5 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -2,16 +2,11 @@ package org.qortal.controller.tradebot; import java.awt.TrayIcon.MessageType; import java.security.SecureRandom; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.locks.ReentrantLock; +import java.util.*; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.util.Supplier; import org.bitcoinj.core.ECKey; import org.qortal.account.PrivateKeyAccount; import org.qortal.api.model.crosschain.TradeBotCreateRequest; @@ -19,25 +14,26 @@ import org.qortal.controller.Controller; import org.qortal.controller.Synchronizer; import org.qortal.controller.tradebot.AcctTradeBot.ResponseResult; import org.qortal.crosschain.*; +import org.qortal.crypto.Crypto; import org.qortal.data.at.ATData; import org.qortal.data.crosschain.CrossChainTradeData; import org.qortal.data.crosschain.TradeBotData; -import org.qortal.data.transaction.BaseTransactionData; -import org.qortal.data.transaction.PresenceTransactionData; +import org.qortal.data.network.OnlineTradeData; import org.qortal.event.Event; import org.qortal.event.EventBus; import org.qortal.event.Listener; -import org.qortal.group.Group; import org.qortal.gui.SysTray; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.message.GetOnlineTradesMessage; +import org.qortal.network.message.Message; +import org.qortal.network.message.OnlineTradesMessage; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; import org.qortal.repository.hsqldb.HSQLDBImportExport; import org.qortal.settings.Settings; -import org.qortal.transaction.PresenceTransaction; -import org.qortal.transaction.PresenceTransaction.PresenceType; -import org.qortal.transaction.Transaction.ValidationResult; -import org.qortal.transform.transaction.TransactionTransformer; +import org.qortal.utils.ByteArray; import org.qortal.utils.NTP; import com.google.common.primitives.Longs; @@ -57,6 +53,10 @@ public class TradeBot implements Listener { private static final Logger LOGGER = LogManager.getLogger(TradeBot.class); private static final Random RANDOM = new SecureRandom(); + private static final long ONLINE_LIFETIME = 30 * 60 * 1000L; // 30 minutes in ms + + private static final long ONLINE_BROADCAST_INTERVAL = 5 * 60 * 1000L; // 5 minutes in ms + public interface StateNameAndValueSupplier { public String getState(); public int getStateValue(); @@ -87,7 +87,12 @@ public class TradeBot implements Listener { private static TradeBot instance; - private final Map presenceTimestampsByAtAddress = Collections.synchronizedMap(new HashMap<>()); + private final Map ourTimestampsByPubkey = Collections.synchronizedMap(new HashMap<>()); + private final List pendingOnlineSignatures = Collections.synchronizedList(new ArrayList<>()); + + private final Map allOnlineByPubkey = Collections.synchronizedMap(new HashMap<>()); + private Map safeAllOnlineByPubkey = Collections.emptyMap(); + private long nextBroadcastTimestamp = 0L; private TradeBot() { EventBus.INSTANCE.addListener(event -> TradeBot.getInstance().listen(event)); @@ -218,6 +223,8 @@ public class TradeBot implements Listener { return; synchronized (this) { + expireOldOnlineSignatures(); + List allTradeBotData; try (final Repository repository = RepositoryManager.getRepository()) { @@ -248,6 +255,8 @@ public class TradeBot implements Listener { } catch (ForeignBlockchainException e) { LOGGER.warn(() -> String.format("Foreign blockchain issue processing trade-bot entry for AT %s: %s", tradeBotData.getAtAddress(), e.getMessage())); } + + broadcastOnlineSignatures(); } } @@ -325,6 +334,15 @@ public class TradeBot implements Listener { } // PRESENCE-related + + private void expireOldOnlineSignatures() { + long now = NTP.getTime(); + + synchronized (this.pendingOnlineSignatures) { + this.pendingOnlineSignatures.removeIf(onlineTradeData -> onlineTradeData.getTimestamp() <= now); + } + } + /*package*/ void updatePresence(Repository repository, TradeBotData tradeBotData, CrossChainTradeData tradeData) throws DataException { String atAddress = tradeBotData.getAtAddress(); @@ -333,44 +351,167 @@ public class TradeBot implements Listener { String signerAddress = tradeNativeAccount.getAddress(); /* - * There's no point in Alice trying to build a PRESENCE transaction - * for an AT that isn't locked to her, as other peers won't be able - * to validate the PRESENCE transaction as signing public key won't - * be visible. + * We only broadcast trade entry online signatures for BOB when OFFERING + * so that buyers don't click on offline / expired entries that would waste their time. */ - if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) - // Signer is neither Bob, nor Alice, or trade not yet locked to Alice + if (tradeData.mode != AcctMode.OFFERING || !signerAddress.equals(tradeData.qortalCreatorTradeAddress)) return; long now = NTP.getTime(); - long threshold = now - PresenceType.TRADE_BOT.getLifetime(); - long timestamp = presenceTimestampsByAtAddress.compute(atAddress, (k, v) -> (v == null || v < threshold) ? now : v); + // Timestamps are considered good for full lifetime... + long expiry = (now + ONLINE_LIFETIME) % ONLINE_LIFETIME; + // ... but refresh if older than half-lifetime + long threshold = (now + ONLINE_LIFETIME / 2) % (ONLINE_LIFETIME / 2); + + ByteArray pubkeyByteArray = ByteArray.of(tradeNativeAccount.getPublicKey()); + // If map's timestamp is missing, or too old, use the new timestamp - otherwise use existing timestamp. + long timestamp = ourTimestampsByPubkey.compute(pubkeyByteArray, (k, v) -> (v == null || v <= threshold) ? expiry : v); // If timestamp hasn't been updated then nothing to do - if (timestamp != now) + if (timestamp != expiry) return; - int txGroupId = Group.NO_GROUP; - byte[] reference = new byte[TransactionTransformer.SIGNATURE_LENGTH]; - byte[] creatorPublicKey = tradeNativeAccount.getPublicKey(); - long fee = 0L; + // Create signature + byte[] signature = tradeNativeAccount.sign(Longs.toByteArray(timestamp)); - BaseTransactionData baseTransactionData = new BaseTransactionData(timestamp, txGroupId, reference, creatorPublicKey, fee, null); + // Add new online info to queue to be broadcast around network + OnlineTradeData onlineTradeData = new OnlineTradeData(timestamp, tradeNativeAccount.getPublicKey(), signature, atAddress); + this.pendingOnlineSignatures.add(onlineTradeData); - int nonce = 0; - byte[] timestampSignature = tradeNativeAccount.sign(Longs.toByteArray(timestamp)); - - PresenceTransactionData transactionData = new PresenceTransactionData(baseTransactionData, nonce, PresenceType.TRADE_BOT, timestampSignature); - - PresenceTransaction presenceTransaction = new PresenceTransaction(repository, transactionData); - presenceTransaction.computeNonce(); - - presenceTransaction.sign(tradeNativeAccount); - - ValidationResult result = presenceTransaction.importAsUnconfirmed(); - if (result != ValidationResult.OK) - LOGGER.debug(() -> String.format("Unable to build trade-bot PRESENCE transaction for %s: %s", tradeBotData.getAtAddress(), result.name())); + this.allOnlineByPubkey.put(pubkeyByteArray, onlineTradeData); + rebuildSafeAllOnline(); } + private void rebuildSafeAllOnline() { + synchronized (this.allOnlineByPubkey) { + // Collect into a *new* unmodifiable map. + this.safeAllOnlineByPubkey = Map.copyOf(this.allOnlineByPubkey); + } + } + + private void broadcastOnlineSignatures() { + // If we have new online signatures that are pending broadcast, send those as a priority + if (!this.pendingOnlineSignatures.isEmpty()) { + // Create a copy for Network to safely use in another thread + List safeOnlineSignatures; + synchronized (this.pendingOnlineSignatures) { + safeOnlineSignatures = List.copyOf(this.pendingOnlineSignatures); + this.pendingOnlineSignatures.clear(); + } + + OnlineTradesMessage onlineTradesMessage = new OnlineTradesMessage(safeOnlineSignatures); + Network.getInstance().broadcast(peer -> onlineTradesMessage); + + return; + } + + // As we have no new online signatures, check whether it's time to do a general broadcast + Long now = NTP.getTime(); + if (now == null || now < nextBroadcastTimestamp) + return; + + nextBroadcastTimestamp = now + ONLINE_BROADCAST_INTERVAL; + + List safeOnlineSignatures = List.copyOf(this.safeAllOnlineByPubkey.values()); + + GetOnlineTradesMessage getOnlineTradesMessage = new GetOnlineTradesMessage(safeOnlineSignatures); + Network.getInstance().broadcast(peer -> getOnlineTradesMessage); + } + + // Network message processing + + public void onGetOnlineTradesMessage(Peer peer, Message message) { + GetOnlineTradesMessage getOnlineTradesMessage = (GetOnlineTradesMessage) message; + + List peersOnlineTrades = getOnlineTradesMessage.getOnlineTrades(); + + Map entriesUnknownToPeer = new HashMap<>(this.safeAllOnlineByPubkey); + for (OnlineTradeData peersOnlineTrade : peersOnlineTrades) { + ByteArray pubkeyByteArray = ByteArray.of(peersOnlineTrade.getPublicKey()); + + OnlineTradeData ourEntry = entriesUnknownToPeer.get(pubkeyByteArray); + + if (ourEntry != null && ourEntry.getTimestamp() == peersOnlineTrade.getTimestamp()) + entriesUnknownToPeer.remove(pubkeyByteArray); + } + + // Send complement to peer + List safeOnlineSignatures = List.copyOf(entriesUnknownToPeer.values()); + Message responseMessage = new OnlineTradesMessage(safeOnlineSignatures); + if (!peer.sendMessage(responseMessage)) { + peer.disconnect("failed to send online trades response"); + return; + } + } + + public void onOnlineTradesMessage(Peer peer, Message message) { + OnlineTradesMessage onlineTradesMessage = (OnlineTradesMessage) message; + + List peersOnlineTrades = onlineTradesMessage.getOnlineTrades(); + + long now = NTP.getTime(); + // Timestamps after this are too far into the future + long futureThreshold = (now % ONLINE_LIFETIME) + ONLINE_LIFETIME + ONLINE_LIFETIME / 2; + // Timestamps before this are too far into the past + long pastThreshold = now; + + Map> acctSuppliersByCodeHash = SupportedBlockchain.getAcctMap(); + + int newCount = 0; + + try (final Repository repository = RepositoryManager.getRepository()) { + for (OnlineTradeData peersOnlineTrade : peersOnlineTrades) { + long timestamp = peersOnlineTrade.getTimestamp(); + + if (timestamp < pastThreshold || timestamp > futureThreshold) + continue; + + ByteArray pubkeyByteArray = ByteArray.of(peersOnlineTrade.getPublicKey()); + + // Ignore if we've previously verified this timestamp+publickey combo + OnlineTradeData existingTradeData = this.safeAllOnlineByPubkey.get(pubkeyByteArray); + if (existingTradeData != null && existingTradeData.getTimestamp() == timestamp) + continue; + + // Check timestamp signature + byte[] timestampSignature = peersOnlineTrade.getSignature(); + byte[] timestampBytes = Longs.toByteArray(timestamp); + byte[] publicKey = peersOnlineTrade.getPublicKey(); + if (!Crypto.verify(publicKey, timestampSignature, timestampBytes)) + continue; + + ATData atData = repository.getATRepository().fromATAddress(peersOnlineTrade.getAtAddress()); + if (atData == null || atData.getIsFrozen() || atData.getIsFinished()) + continue; + + ByteArray atCodeHash = new ByteArray(atData.getCodeHash()); + Supplier acctSupplier = acctSuppliersByCodeHash.get(atCodeHash); + if (acctSupplier == null) + continue; + + CrossChainTradeData tradeData = acctSupplier.get().populateTradeData(repository, atData); + if (tradeData == null) + continue; + + // Convert signer's public key to address form + String signerAddress = Crypto.toAddress(publicKey); + + // Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form) + if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) + continue; + + // This is new to us + this.allOnlineByPubkey.put(pubkeyByteArray, peersOnlineTrade); + ++newCount; + } + } catch (DataException e) { + LOGGER.error("Couldn't process ONLINE_TRADES message due to repository issue", e); + } + + if (newCount > 0) { + LOGGER.debug("New online trade signatures: {}", newCount); + rebuildSafeAllOnline(); + } + } } diff --git a/src/main/java/org/qortal/data/network/OnlineTradeData.java b/src/main/java/org/qortal/data/network/OnlineTradeData.java new file mode 100644 index 00000000..d370c3a3 --- /dev/null +++ b/src/main/java/org/qortal/data/network/OnlineTradeData.java @@ -0,0 +1,86 @@ +package org.qortal.data.network; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import java.util.Arrays; + +// All properties to be converted to JSON via JAXB +@XmlAccessorType(XmlAccessType.FIELD) +public class OnlineTradeData { + + protected long timestamp; + protected byte[] publicKey; // Could be BOB's or ALICE's + protected byte[] signature; // Not always present + protected String atAddress; // Not always present + + // Constructors + + // necessary for JAXB serialization + protected OnlineTradeData() { + } + + public OnlineTradeData(long timestamp, byte[] publicKey, byte[] signature, String address) { + this.timestamp = timestamp; + this.publicKey = publicKey; + this.signature = signature; + this.atAddress = address; + } + + public OnlineTradeData(long timestamp, byte[] publicKey) { + this(timestamp, publicKey, null, null); + } + + public long getTimestamp() { + return this.timestamp; + } + + public byte[] getPublicKey() { + return this.publicKey; + } + + public byte[] getSignature() { + return this.signature; + } + + public String getAtAddress() { + return this.atAddress; + } + + // Comparison + + @Override + public boolean equals(Object other) { + if (other == this) + return true; + + if (!(other instanceof OnlineTradeData)) + return false; + + OnlineTradeData otherOnlineTradeData = (OnlineTradeData) other; + + // Very quick comparison + if (otherOnlineTradeData.timestamp != this.timestamp) + return false; + + if (!Arrays.equals(otherOnlineTradeData.publicKey, this.publicKey)) + return false; + + if (otherOnlineTradeData.atAddress != null && !otherOnlineTradeData.atAddress.equals(this.atAddress)) + return false; + + if (this.atAddress != null && !this.atAddress.equals(otherOnlineTradeData.atAddress)) + return false; + + if (!Arrays.equals(otherOnlineTradeData.signature, this.signature)) + return false; + + return true; + } + + @Override + public int hashCode() { + // Pretty lazy implementation + return (int) this.timestamp; + } + +} diff --git a/src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java b/src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java new file mode 100644 index 00000000..588f6a0b --- /dev/null +++ b/src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java @@ -0,0 +1,110 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.qortal.data.network.OnlineTradeData; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * For requesting which trades are online from remote peer, given our list of online trades. + * + * Groups of: number of entries, timestamp, then AT trade pubkey for each entry. + */ +public class GetOnlineTradesMessage extends Message { + private List onlineTrades; + private byte[] cachedData; + + public GetOnlineTradesMessage(List onlineTrades) { + this(-1, onlineTrades); + } + + private GetOnlineTradesMessage(int id, List onlineTrades) { + super(id, MessageType.GET_ONLINE_TRADES); + + this.onlineTrades = onlineTrades; + } + + public List getOnlineTrades() { + return this.onlineTrades; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int tradeCount = bytes.getInt(); + + List onlineTrades = new ArrayList<>(tradeCount); + + while (tradeCount > 0) { + long timestamp = bytes.getLong(); + + for (int i = 0; i < tradeCount; ++i) { + byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(publicKey); + + onlineTrades.add(new OnlineTradeData(timestamp, publicKey)); + } + + if (bytes.hasRemaining()) { + tradeCount = bytes.getInt(); + } else { + // we've finished + tradeCount = 0; + } + } + + return new GetOnlineTradesMessage(id, onlineTrades); + } + + @Override + protected synchronized byte[] toData() { + if (this.cachedData != null) + return this.cachedData; + + // Shortcut in case we have no online accounts + if (this.onlineTrades.isEmpty()) { + this.cachedData = Ints.toByteArray(0); + return this.cachedData; + } + + // How many of each timestamp + Map countByTimestamp = new HashMap<>(); + + for (OnlineTradeData onlineTradeData : this.onlineTrades) { + Long timestamp = onlineTradeData.getTimestamp(); + countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); + } + + // We should know exactly how many bytes to allocate now + int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) + + this.onlineTrades.size() * Transformer.PUBLIC_KEY_LENGTH; + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); + + for (long timestamp : countByTimestamp.keySet()) { + bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp))); + + bytes.write(Longs.toByteArray(timestamp)); + + for (OnlineTradeData onlineTradeData : this.onlineTrades) { + if (onlineTradeData.getTimestamp() == timestamp) + bytes.write(onlineTradeData.getPublicKey()); + } + } + + this.cachedData = bytes.toByteArray(); + return this.cachedData; + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index 6c89a0dd..1ce45aca 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -93,7 +93,10 @@ public abstract class Message { ARBITRARY_DATA_FILE_LIST(120), GET_ARBITRARY_DATA_FILE_LIST(121), - ARBITRARY_SIGNATURES(130); + ARBITRARY_SIGNATURES(130), + + GET_ONLINE_TRADES(140), + ONLINE_TRADES(141); public final int value; public final Method fromByteBufferMethod; diff --git a/src/main/java/org/qortal/network/message/OnlineTradesMessage.java b/src/main/java/org/qortal/network/message/OnlineTradesMessage.java new file mode 100644 index 00000000..4feef670 --- /dev/null +++ b/src/main/java/org/qortal/network/message/OnlineTradesMessage.java @@ -0,0 +1,123 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.qortal.data.network.OnlineTradeData; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * For sending list of which trades are online to remote peer. + * + * Groups of: number of entries, timestamp, then pubkey + sig + AT address for each entry. + */ +public class OnlineTradesMessage extends Message { + private List onlineTrades; + private byte[] cachedData; + + public OnlineTradesMessage(List onlineTrades) { + this(-1, onlineTrades); + } + + private OnlineTradesMessage(int id, List onlineTrades) { + super(id, MessageType.ONLINE_TRADES); + + this.onlineTrades = onlineTrades; + } + + public List getOnlineTrades() { + return this.onlineTrades; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int tradeCount = bytes.getInt(); + + List onlineTrades = new ArrayList<>(tradeCount); + + while (tradeCount > 0) { + long timestamp = bytes.getLong(); + + for (int i = 0; i < tradeCount; ++i) { + byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(publicKey); + + byte[] signature = new byte[Transformer.SIGNATURE_LENGTH]; + bytes.get(signature); + + byte[] addressBytes = new byte[Transformer.ADDRESS_LENGTH]; + bytes.get(addressBytes); + String address = new String(addressBytes, StandardCharsets.UTF_8); + + onlineTrades.add(new OnlineTradeData(timestamp, publicKey, signature, address)); + } + + if (bytes.hasRemaining()) { + tradeCount = bytes.getInt(); + } else { + // we've finished + tradeCount = 0; + } + } + + return new OnlineTradesMessage(id, onlineTrades); + } + + @Override + protected synchronized byte[] toData() { + if (this.cachedData != null) + return this.cachedData; + + // Shortcut in case we have no online trade entries + if (this.onlineTrades.isEmpty()) { + this.cachedData = Ints.toByteArray(0); + return this.cachedData; + } + + // How many of each timestamp + Map countByTimestamp = new HashMap<>(); + + for (OnlineTradeData onlineTradeData : this.onlineTrades) { + Long timestamp = onlineTradeData.getTimestamp(); + countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); + } + + // We should know exactly how many bytes to allocate now + int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) + + this.onlineTrades.size() * (Transformer.PUBLIC_KEY_LENGTH + Transformer.SIGNATURE_LENGTH + Transformer.ADDRESS_LENGTH); + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); + + for (long timestamp : countByTimestamp.keySet()) { + bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp))); + + bytes.write(Longs.toByteArray(timestamp)); + + for (OnlineTradeData onlineTradeData : this.onlineTrades) { + if (onlineTradeData.getTimestamp() == timestamp) { + bytes.write(onlineTradeData.getPublicKey()); + + bytes.write(onlineTradeData.getSignature()); + + bytes.write(onlineTradeData.getAtAddress().getBytes(StandardCharsets.UTF_8)); + } + } + } + + this.cachedData = bytes.toByteArray(); + return this.cachedData; + } catch (IOException e) { + return null; + } + } + +} From cb57af3c532c51a2e631527b392a4e2f9cccc2a3 Mon Sep 17 00:00:00 2001 From: catbref Date: Sun, 20 Feb 2022 17:27:11 +0000 Subject: [PATCH 2/9] Bugfixes to online trade sigs + bridging from PRESENCE transactions --- .../qortal/controller/tradebot/TradeBot.java | 137 +++++++++++++++--- .../qortal/data/network/OnlineTradeData.java | 4 +- .../org/qortal/network/message/Message.java | 5 +- .../network/message/OnlineTradesMessage.java | 11 +- .../transaction/PresenceTransaction.java | 9 +- 5 files changed, 134 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index 904b87b5..88c720b2 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -338,9 +338,15 @@ public class TradeBot implements Listener { private void expireOldOnlineSignatures() { long now = NTP.getTime(); - synchronized (this.pendingOnlineSignatures) { - this.pendingOnlineSignatures.removeIf(onlineTradeData -> onlineTradeData.getTimestamp() <= now); + int removedCount = 0; + synchronized (this.allOnlineByPubkey) { + int preRemoveCount = this.allOnlineByPubkey.size(); + this.allOnlineByPubkey.values().removeIf(onlineTradeData -> onlineTradeData.getTimestamp() <= now); + removedCount = this.allOnlineByPubkey.size() - preRemoveCount; } + + if (removedCount > 0) + LOGGER.trace("Removed {} old online trade signatures", removedCount); } /*package*/ void updatePresence(Repository repository, TradeBotData tradeBotData, CrossChainTradeData tradeData) @@ -359,28 +365,33 @@ public class TradeBot implements Listener { long now = NTP.getTime(); - // Timestamps are considered good for full lifetime... - long expiry = (now + ONLINE_LIFETIME) % ONLINE_LIFETIME; - // ... but refresh if older than half-lifetime - long threshold = (now + ONLINE_LIFETIME / 2) % (ONLINE_LIFETIME / 2); + // Timestamps are considered good for full lifetime, but we'll refresh if older than half-lifetime + long threshold = (now / (ONLINE_LIFETIME / 2)) * (ONLINE_LIFETIME / 2); + long newExpiry = threshold + ONLINE_LIFETIME / 2; ByteArray pubkeyByteArray = ByteArray.of(tradeNativeAccount.getPublicKey()); // If map's timestamp is missing, or too old, use the new timestamp - otherwise use existing timestamp. - long timestamp = ourTimestampsByPubkey.compute(pubkeyByteArray, (k, v) -> (v == null || v <= threshold) ? expiry : v); + synchronized (this.ourTimestampsByPubkey) { + Long currentTimestamp = this.ourTimestampsByPubkey.get(pubkeyByteArray); - // If timestamp hasn't been updated then nothing to do - if (timestamp != expiry) - return; + if (currentTimestamp != null && currentTimestamp > threshold) + // timestamp still good + return; + + this.ourTimestampsByPubkey.put(pubkeyByteArray, newExpiry); + } // Create signature - byte[] signature = tradeNativeAccount.sign(Longs.toByteArray(timestamp)); + byte[] signature = tradeNativeAccount.sign(Longs.toByteArray(newExpiry)); // Add new online info to queue to be broadcast around network - OnlineTradeData onlineTradeData = new OnlineTradeData(timestamp, tradeNativeAccount.getPublicKey(), signature, atAddress); + OnlineTradeData onlineTradeData = new OnlineTradeData(newExpiry, tradeNativeAccount.getPublicKey(), signature, atAddress); this.pendingOnlineSignatures.add(onlineTradeData); this.allOnlineByPubkey.put(pubkeyByteArray, onlineTradeData); rebuildSafeAllOnline(); + + LOGGER.trace("New signed timestamp {} for our online trade {}", newExpiry, atAddress); } private void rebuildSafeAllOnline() { @@ -400,6 +411,8 @@ public class TradeBot implements Listener { this.pendingOnlineSignatures.clear(); } + LOGGER.trace("Broadcasting {} new online trades", safeOnlineSignatures.size()); + OnlineTradesMessage onlineTradesMessage = new OnlineTradesMessage(safeOnlineSignatures); Network.getInstance().broadcast(peer -> onlineTradesMessage); @@ -415,6 +428,13 @@ public class TradeBot implements Listener { List safeOnlineSignatures = List.copyOf(this.safeAllOnlineByPubkey.values()); + if (safeOnlineSignatures.isEmpty()) + return; + + LOGGER.trace("Broadcasting all {} known online trades. Next broadcast timestamp: {}", + safeOnlineSignatures.size(), nextBroadcastTimestamp + ); + GetOnlineTradesMessage getOnlineTradesMessage = new GetOnlineTradesMessage(safeOnlineSignatures); Network.getInstance().broadcast(peer -> getOnlineTradesMessage); } @@ -427,6 +447,8 @@ public class TradeBot implements Listener { List peersOnlineTrades = getOnlineTradesMessage.getOnlineTrades(); Map entriesUnknownToPeer = new HashMap<>(this.safeAllOnlineByPubkey); + int knownCount = entriesUnknownToPeer.size(); + for (OnlineTradeData peersOnlineTrade : peersOnlineTrades) { ByteArray pubkeyByteArray = ByteArray.of(peersOnlineTrade.getPublicKey()); @@ -436,6 +458,10 @@ public class TradeBot implements Listener { entriesUnknownToPeer.remove(pubkeyByteArray); } + LOGGER.trace("Sending {} known \\ {} peers = {} online trades to peer {}", + knownCount, peersOnlineTrades.size(), entriesUnknownToPeer.size() + ); + // Send complement to peer List safeOnlineSignatures = List.copyOf(entriesUnknownToPeer.values()); Message responseMessage = new OnlineTradesMessage(safeOnlineSignatures); @@ -452,7 +478,7 @@ public class TradeBot implements Listener { long now = NTP.getTime(); // Timestamps after this are too far into the future - long futureThreshold = (now % ONLINE_LIFETIME) + ONLINE_LIFETIME + ONLINE_LIFETIME / 2; + long futureThreshold = (now / ONLINE_LIFETIME + 1) * ONLINE_LIFETIME; // Timestamps before this are too far into the past long pastThreshold = now; @@ -464,46 +490,101 @@ public class TradeBot implements Listener { for (OnlineTradeData peersOnlineTrade : peersOnlineTrades) { long timestamp = peersOnlineTrade.getTimestamp(); - if (timestamp < pastThreshold || timestamp > futureThreshold) + // Ignore if timestamp is out of bounds + if (timestamp < pastThreshold || timestamp > futureThreshold) { + if (timestamp < pastThreshold) + LOGGER.trace("Ignoring online trade {} from peer {} as timestamp {} is too old vs {}", + peersOnlineTrade.getAtAddress(), peer, timestamp, pastThreshold + ); + else + LOGGER.trace("Ignoring online trade {} from peer {} as timestamp {} is too new vs {}", + peersOnlineTrade.getAtAddress(), peer, timestamp, pastThreshold + ); + continue; + } ByteArray pubkeyByteArray = ByteArray.of(peersOnlineTrade.getPublicKey()); - // Ignore if we've previously verified this timestamp+publickey combo + // Ignore if we've previously verified this timestamp+publickey combo or sent timestamp is older OnlineTradeData existingTradeData = this.safeAllOnlineByPubkey.get(pubkeyByteArray); - if (existingTradeData != null && existingTradeData.getTimestamp() == timestamp) + if (existingTradeData != null && timestamp <= existingTradeData.getTimestamp()) { + if (timestamp == existingTradeData.getTimestamp()) + LOGGER.trace("Ignoring online trade {} from peer {} as we have verified timestamp {} before", + peersOnlineTrade.getAtAddress(), peer, timestamp + ); + else + LOGGER.trace("Ignoring online trade {} from peer {} as timestamp {} is older than latest {}", + peersOnlineTrade.getAtAddress(), peer, timestamp, existingTradeData.getTimestamp() + ); + continue; + } // Check timestamp signature byte[] timestampSignature = peersOnlineTrade.getSignature(); byte[] timestampBytes = Longs.toByteArray(timestamp); byte[] publicKey = peersOnlineTrade.getPublicKey(); - if (!Crypto.verify(publicKey, timestampSignature, timestampBytes)) + if (!Crypto.verify(publicKey, timestampSignature, timestampBytes)) { + LOGGER.trace("Ignoring online trade {} from peer {} as signature failed to verify", + peersOnlineTrade.getAtAddress(), peer + ); + continue; + } ATData atData = repository.getATRepository().fromATAddress(peersOnlineTrade.getAtAddress()); - if (atData == null || atData.getIsFrozen() || atData.getIsFinished()) + if (atData == null || atData.getIsFrozen() || atData.getIsFinished()) { + if (atData == null) + LOGGER.trace("Ignoring online trade {} from peer {} as AT doesn't exist", + peersOnlineTrade.getAtAddress(), peer + ); + else + LOGGER.trace("Ignoring online trade {} from peer {} as AT is frozen or finished", + peersOnlineTrade.getAtAddress(), peer + ); + continue; + } ByteArray atCodeHash = new ByteArray(atData.getCodeHash()); Supplier acctSupplier = acctSuppliersByCodeHash.get(atCodeHash); - if (acctSupplier == null) + if (acctSupplier == null) { + LOGGER.trace("Ignoring online trade {} from peer {} as AT isn't a known ACCT?", + peersOnlineTrade.getAtAddress(), peer + ); + continue; + } CrossChainTradeData tradeData = acctSupplier.get().populateTradeData(repository, atData); - if (tradeData == null) + if (tradeData == null) { + LOGGER.trace("Ignoring online trade {} from peer {} as trade data not found?", + peersOnlineTrade.getAtAddress(), peer + ); + continue; + } // Convert signer's public key to address form String signerAddress = Crypto.toAddress(publicKey); // Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form) - if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) + if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) { + LOGGER.trace("Ignoring online trade {} from peer {} as signer isn't Alice or Bob?", + peersOnlineTrade.getAtAddress(), peer + ); + continue; + } // This is new to us this.allOnlineByPubkey.put(pubkeyByteArray, peersOnlineTrade); ++newCount; + + LOGGER.trace("Added online trade {} from peer {} with timestamp {}", + peersOnlineTrade.getAtAddress(), peer, timestamp + ); } } catch (DataException e) { LOGGER.error("Couldn't process ONLINE_TRADES message due to repository issue", e); @@ -514,4 +595,18 @@ public class TradeBot implements Listener { rebuildSafeAllOnline(); } } + + public void bridgePresence(long timestamp, byte[] publicKey, byte[] signature, String atAddress) { + long expiry = (timestamp / ONLINE_LIFETIME + 1) * ONLINE_LIFETIME; + ByteArray pubkeyByteArray = ByteArray.of(publicKey); + + OnlineTradeData fakeOnlineTradeData = new OnlineTradeData(expiry, publicKey, signature, atAddress); + + OnlineTradeData computedOnlineTradeData = this.allOnlineByPubkey.compute(pubkeyByteArray, (k, v) -> (v == null || v.getTimestamp() < expiry) ? fakeOnlineTradeData : v); + + if (computedOnlineTradeData == fakeOnlineTradeData) { + LOGGER.trace("Bridged online trade {} with timestamp {}", atAddress, expiry); + rebuildSafeAllOnline(); + } + } } diff --git a/src/main/java/org/qortal/data/network/OnlineTradeData.java b/src/main/java/org/qortal/data/network/OnlineTradeData.java index d370c3a3..102030e1 100644 --- a/src/main/java/org/qortal/data/network/OnlineTradeData.java +++ b/src/main/java/org/qortal/data/network/OnlineTradeData.java @@ -19,11 +19,11 @@ public class OnlineTradeData { protected OnlineTradeData() { } - public OnlineTradeData(long timestamp, byte[] publicKey, byte[] signature, String address) { + public OnlineTradeData(long timestamp, byte[] publicKey, byte[] signature, String atAddress) { this.timestamp = timestamp; this.publicKey = publicKey; this.signature = signature; - this.atAddress = address; + this.atAddress = atAddress; } public OnlineTradeData(long timestamp, byte[] publicKey) { diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index 1ce45aca..ae7e68c9 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -95,8 +95,9 @@ public abstract class Message { ARBITRARY_SIGNATURES(130), - GET_ONLINE_TRADES(140), - ONLINE_TRADES(141); + ONLINE_TRADES(140), + GET_ONLINE_TRADES(141), + ; public final int value; public final Method fromByteBufferMethod; diff --git a/src/main/java/org/qortal/network/message/OnlineTradesMessage.java b/src/main/java/org/qortal/network/message/OnlineTradesMessage.java index 4feef670..74912d8e 100644 --- a/src/main/java/org/qortal/network/message/OnlineTradesMessage.java +++ b/src/main/java/org/qortal/network/message/OnlineTradesMessage.java @@ -4,6 +4,7 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import org.qortal.data.network.OnlineTradeData; import org.qortal.transform.Transformer; +import org.qortal.utils.Base58; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -53,11 +54,11 @@ public class OnlineTradesMessage extends Message { byte[] signature = new byte[Transformer.SIGNATURE_LENGTH]; bytes.get(signature); - byte[] addressBytes = new byte[Transformer.ADDRESS_LENGTH]; - bytes.get(addressBytes); - String address = new String(addressBytes, StandardCharsets.UTF_8); + byte[] atAddressBytes = new byte[Transformer.ADDRESS_LENGTH]; + bytes.get(atAddressBytes); + String atAddress = Base58.encode(atAddressBytes); - onlineTrades.add(new OnlineTradeData(timestamp, publicKey, signature, address)); + onlineTrades.add(new OnlineTradeData(timestamp, publicKey, signature, atAddress)); } if (bytes.hasRemaining()) { @@ -108,7 +109,7 @@ public class OnlineTradesMessage extends Message { bytes.write(onlineTradeData.getSignature()); - bytes.write(onlineTradeData.getAtAddress().getBytes(StandardCharsets.UTF_8)); + bytes.write(Base58.decode(onlineTradeData.getAtAddress())); } } } diff --git a/src/main/java/org/qortal/transaction/PresenceTransaction.java b/src/main/java/org/qortal/transaction/PresenceTransaction.java index 0d28d382..566c6979 100644 --- a/src/main/java/org/qortal/transaction/PresenceTransaction.java +++ b/src/main/java/org/qortal/transaction/PresenceTransaction.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.account.Account; import org.qortal.controller.Controller; +import org.qortal.controller.tradebot.TradeBot; import org.qortal.crosschain.ACCT; import org.qortal.crosschain.SupportedBlockchain; import org.qortal.crypto.Crypto; @@ -191,12 +192,16 @@ public class PresenceTransaction extends Transaction { CrossChainTradeData crossChainTradeData = acctSupplier.get().populateTradeData(repository, atData); // OK if signer's public key (in address form) matches Bob's trade public key (in address form) - if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress)) + if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress)) { + TradeBot.getInstance().bridgePresence(this.presenceTransactionData.getTimestamp(), this.transactionData.getCreatorPublicKey(), timestampSignature, atData.getATAddress()); return ValidationResult.OK; + } // OK if signer's public key (in address form) matches Alice's trade public key (in address form) - if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress)) + if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress)) { + TradeBot.getInstance().bridgePresence(this.presenceTransactionData.getTimestamp(), this.transactionData.getCreatorPublicKey(), timestampSignature, atData.getATAddress()); return ValidationResult.OK; + } } return ValidationResult.AT_UNKNOWN; From 4c020819926c21a0eef54020c1cfb833f1b0d941 Mon Sep 17 00:00:00 2001 From: catbref Date: Mon, 21 Feb 2022 20:05:29 +0000 Subject: [PATCH 3/9] Tidy up TradeBot presence logging. Decorate API endpoints /crosschain/tradeoffers and /crosschain/trade with presence expiry timestamps --- .../api/resource/CrossChainResource.java | 12 +++++++- .../qortal/controller/tradebot/TradeBot.java | 30 +++++++++++++++---- .../data/crosschain/CrossChainTradeData.java | 6 ++++ .../qortal/data/network/OnlineTradeData.java | 12 ++++++++ 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/qortal/api/resource/CrossChainResource.java b/src/main/java/org/qortal/api/resource/CrossChainResource.java index 47eee301..bb7c70a5 100644 --- a/src/main/java/org/qortal/api/resource/CrossChainResource.java +++ b/src/main/java/org/qortal/api/resource/CrossChainResource.java @@ -25,6 +25,7 @@ import org.qortal.api.ApiExceptionFactory; import org.qortal.api.Security; import org.qortal.api.model.CrossChainCancelRequest; import org.qortal.api.model.CrossChainTradeSummary; +import org.qortal.controller.tradebot.TradeBot; import org.qortal.crosschain.SupportedBlockchain; import org.qortal.crosschain.ACCT; import org.qortal.crosschain.AcctMode; @@ -120,6 +121,8 @@ public class CrossChainResource { crossChainTrades = crossChainTrades.subList(0, upperLimit); } + crossChainTrades.stream().forEach(CrossChainResource::decorateTradeDataWithPresence); + return crossChainTrades; } catch (DataException e) { throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e); @@ -151,7 +154,11 @@ public class CrossChainResource { if (acct == null) throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_CRITERIA); - return acct.populateTradeData(repository, atData); + CrossChainTradeData crossChainTradeData = acct.populateTradeData(repository, atData); + + decorateTradeDataWithPresence(crossChainTradeData); + + return crossChainTradeData; } catch (DataException e) { throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e); } @@ -486,4 +493,7 @@ public class CrossChainResource { } } + private static void decorateTradeDataWithPresence(CrossChainTradeData crossChainTradeData) { + TradeBot.getInstance().decorateTradeDataWithPresence(crossChainTradeData); + } } diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index 88c720b2..9182ff6e 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -346,7 +346,7 @@ public class TradeBot implements Listener { } if (removedCount > 0) - LOGGER.trace("Removed {} old online trade signatures", removedCount); + LOGGER.debug("Removed {} old online trade signatures", removedCount); } /*package*/ void updatePresence(Repository repository, TradeBotData tradeBotData, CrossChainTradeData tradeData) @@ -411,7 +411,7 @@ public class TradeBot implements Listener { this.pendingOnlineSignatures.clear(); } - LOGGER.trace("Broadcasting {} new online trades", safeOnlineSignatures.size()); + LOGGER.debug("Broadcasting {} new online trades", safeOnlineSignatures.size()); OnlineTradesMessage onlineTradesMessage = new OnlineTradesMessage(safeOnlineSignatures); Network.getInstance().broadcast(peer -> onlineTradesMessage); @@ -431,7 +431,7 @@ public class TradeBot implements Listener { if (safeOnlineSignatures.isEmpty()) return; - LOGGER.trace("Broadcasting all {} known online trades. Next broadcast timestamp: {}", + LOGGER.debug("Broadcasting all {} known online trades. Next broadcast timestamp: {}", safeOnlineSignatures.size(), nextBroadcastTimestamp ); @@ -458,8 +458,11 @@ public class TradeBot implements Listener { entriesUnknownToPeer.remove(pubkeyByteArray); } - LOGGER.trace("Sending {} known \\ {} peers = {} online trades to peer {}", - knownCount, peersOnlineTrades.size(), entriesUnknownToPeer.size() + if (entriesUnknownToPeer.isEmpty()) + return; + + LOGGER.debug("Sending {} online trades to peer {} after excluding their {} from known {}", + entriesUnknownToPeer.size(), peer, peersOnlineTrades.size(), knownCount ); // Send complement to peer @@ -567,7 +570,7 @@ public class TradeBot implements Listener { } // Convert signer's public key to address form - String signerAddress = Crypto.toAddress(publicKey); + String signerAddress = peersOnlineTrade.getTradeAddress(); // Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form) if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) { @@ -609,4 +612,19 @@ public class TradeBot implements Listener { rebuildSafeAllOnline(); } } + + public void decorateTradeDataWithPresence(CrossChainTradeData crossChainTradeData) { + // Match by AT address, then check for Bob vs Alice + this.safeAllOnlineByPubkey.values().stream() + .filter(onlineTradeData -> onlineTradeData.getAtAddress().equals(crossChainTradeData.qortalAtAddress)) + .forEach(onlineTradeData -> { + String signerAddress = onlineTradeData.getTradeAddress(); + + // Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form) + if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress)) + crossChainTradeData.creatorPresenceExpiry = onlineTradeData.getTimestamp(); + else if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress)) + crossChainTradeData.partnerPresenceExpiry = onlineTradeData.getTimestamp(); + }); + } } diff --git a/src/main/java/org/qortal/data/crosschain/CrossChainTradeData.java b/src/main/java/org/qortal/data/crosschain/CrossChainTradeData.java index 69250e54..edc115b6 100644 --- a/src/main/java/org/qortal/data/crosschain/CrossChainTradeData.java +++ b/src/main/java/org/qortal/data/crosschain/CrossChainTradeData.java @@ -94,6 +94,12 @@ public class CrossChainTradeData { public String acctName; + @Schema(description = "Timestamp when AT creator's trade-bot presence expires") + public Long creatorPresenceExpiry; + + @Schema(description = "Timestamp when trade partner's trade-bot presence expires") + public Long partnerPresenceExpiry; + // Constructors // Necessary for JAXB diff --git a/src/main/java/org/qortal/data/network/OnlineTradeData.java b/src/main/java/org/qortal/data/network/OnlineTradeData.java index 102030e1..1ae48718 100644 --- a/src/main/java/org/qortal/data/network/OnlineTradeData.java +++ b/src/main/java/org/qortal/data/network/OnlineTradeData.java @@ -1,5 +1,7 @@ package org.qortal.data.network; +import org.qortal.crypto.Crypto; + import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import java.util.Arrays; @@ -12,6 +14,7 @@ public class OnlineTradeData { protected byte[] publicKey; // Could be BOB's or ALICE's protected byte[] signature; // Not always present protected String atAddress; // Not always present + protected String tradeAddress; // Lazily instantiated // Constructors @@ -46,6 +49,15 @@ public class OnlineTradeData { return this.atAddress; } + // Probably don't need synchronization + public String getTradeAddress() { + if (tradeAddress != null) + return tradeAddress; + + tradeAddress = Crypto.toAddress(this.publicKey); + return tradeAddress; + } + // Comparison @Override From c53dd31765a1e32c264f99d15c8f56fec9e11f67 Mon Sep 17 00:00:00 2001 From: catbref Date: Wed, 23 Feb 2022 20:22:01 +0000 Subject: [PATCH 4/9] Tidy up of trade presence timestamp generation & checking. Added tests. Renamed "online trades" to "trade presences" --- .../org/qortal/controller/Controller.java | 8 +- .../qortal/controller/tradebot/TradeBot.java | 279 ++++++++++-------- ...eTradeData.java => TradePresenceData.java} | 22 +- ...age.java => GetTradePresencesMessage.java} | 54 ++-- .../org/qortal/network/message/Message.java | 4 +- ...essage.java => TradePresencesMessage.java} | 59 ++-- .../crosschain/TradeBotPresenceTests.java | 112 +++++++ 7 files changed, 337 insertions(+), 201 deletions(-) rename src/main/java/org/qortal/data/network/{OnlineTradeData.java => TradePresenceData.java} (67%) rename src/main/java/org/qortal/network/message/{GetOnlineTradesMessage.java => GetTradePresencesMessage.java} (54%) rename src/main/java/org/qortal/network/message/{OnlineTradesMessage.java => TradePresencesMessage.java} (55%) create mode 100644 src/test/java/org/qortal/test/crosschain/TradeBotPresenceTests.java diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 5c956956..29c0992b 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1314,12 +1314,12 @@ public class Controller extends Thread { ArbitraryDataManager.getInstance().onNetworkArbitrarySignaturesMessage(peer, message); break; - case GET_ONLINE_TRADES: - TradeBot.getInstance().onGetOnlineTradesMessage(peer, message); + case GET_TRADE_PRESENCES: + TradeBot.getInstance().onGetTradePresencesMessage(peer, message); break; - case ONLINE_TRADES: - TradeBot.getInstance().onOnlineTradesMessage(peer, message); + case TRADE_PRESENCES: + TradeBot.getInstance().onTradePresencesMessage(peer, message); break; default: diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index 9182ff6e..4caeeca5 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -18,16 +18,16 @@ import org.qortal.crypto.Crypto; import org.qortal.data.at.ATData; import org.qortal.data.crosschain.CrossChainTradeData; import org.qortal.data.crosschain.TradeBotData; -import org.qortal.data.network.OnlineTradeData; +import org.qortal.data.network.TradePresenceData; import org.qortal.event.Event; import org.qortal.event.EventBus; import org.qortal.event.Listener; import org.qortal.gui.SysTray; import org.qortal.network.Network; import org.qortal.network.Peer; -import org.qortal.network.message.GetOnlineTradesMessage; +import org.qortal.network.message.GetTradePresencesMessage; import org.qortal.network.message.Message; -import org.qortal.network.message.OnlineTradesMessage; +import org.qortal.network.message.TradePresencesMessage; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; @@ -53,9 +53,14 @@ public class TradeBot implements Listener { private static final Logger LOGGER = LogManager.getLogger(TradeBot.class); private static final Random RANDOM = new SecureRandom(); - private static final long ONLINE_LIFETIME = 30 * 60 * 1000L; // 30 minutes in ms - - private static final long ONLINE_BROADCAST_INTERVAL = 5 * 60 * 1000L; // 5 minutes in ms + /** Maximum lifetime of trade presence timestamp. 30 mins in ms. */ + private static final long PRESENCE_LIFETIME = 30 * 60 * 1000L; + /** How soon before expiry of our own trade presence timestamp that we want to trigger renewal. 5 mins in ms. */ + private static final long EARLY_RENEWAL_PERIOD = 5 * 60 * 1000L; + /** Trade presence timestamps are rounded up to this nearest interval. Bigger values improve grouping of entries in [GET_]TRADE_PRESENCES network messages. 15 mins in ms. */ + private static final long EXPIRY_ROUNDING = 15 * 60 * 1000L; + /** How often we want to broadcast our list of all known trade presences to peers. 5 mins in ms. */ + private static final long PRESENCE_BROADCAST_INTERVAL = 5 * 60 * 1000L; public interface StateNameAndValueSupplier { public String getState(); @@ -87,12 +92,12 @@ public class TradeBot implements Listener { private static TradeBot instance; - private final Map ourTimestampsByPubkey = Collections.synchronizedMap(new HashMap<>()); - private final List pendingOnlineSignatures = Collections.synchronizedList(new ArrayList<>()); + private final Map ourTradePresenceTimestampsByPubkey = Collections.synchronizedMap(new HashMap<>()); + private final List pendingTradePresences = Collections.synchronizedList(new ArrayList<>()); - private final Map allOnlineByPubkey = Collections.synchronizedMap(new HashMap<>()); - private Map safeAllOnlineByPubkey = Collections.emptyMap(); - private long nextBroadcastTimestamp = 0L; + private final Map allTradePresencesByPubkey = Collections.synchronizedMap(new HashMap<>()); + private Map safeAllTradePresencesByPubkey = Collections.emptyMap(); + private long nextTradePresenceBroadcastTimestamp = 0L; private TradeBot() { EventBus.INSTANCE.addListener(event -> TradeBot.getInstance().listen(event)); @@ -223,7 +228,7 @@ public class TradeBot implements Listener { return; synchronized (this) { - expireOldOnlineSignatures(); + expireOldPresenceTimestamps(); List allTradeBotData; @@ -256,7 +261,7 @@ public class TradeBot implements Listener { LOGGER.warn(() -> String.format("Foreign blockchain issue processing trade-bot entry for AT %s: %s", tradeBotData.getAtAddress(), e.getMessage())); } - broadcastOnlineSignatures(); + broadcastPresenceTimestamps(); } } @@ -335,18 +340,26 @@ public class TradeBot implements Listener { // PRESENCE-related - private void expireOldOnlineSignatures() { + /** Trade presence timestamps expire in the 'future' so any that reach 'now' have expired and are removed. */ + private void expireOldPresenceTimestamps() { long now = NTP.getTime(); - int removedCount = 0; - synchronized (this.allOnlineByPubkey) { - int preRemoveCount = this.allOnlineByPubkey.size(); - this.allOnlineByPubkey.values().removeIf(onlineTradeData -> onlineTradeData.getTimestamp() <= now); - removedCount = this.allOnlineByPubkey.size() - preRemoveCount; + int allRemovedCount = 0; + synchronized (this.allTradePresencesByPubkey) { + int preRemoveCount = this.allTradePresencesByPubkey.size(); + this.allTradePresencesByPubkey.values().removeIf(tradePresenceData -> tradePresenceData.getTimestamp() <= now); + allRemovedCount = this.allTradePresencesByPubkey.size() - preRemoveCount; } - if (removedCount > 0) - LOGGER.debug("Removed {} old online trade signatures", removedCount); + int ourRemovedCount = 0; + synchronized (this.ourTradePresenceTimestampsByPubkey) { + int preRemoveCount = this.ourTradePresenceTimestampsByPubkey.size(); + this.ourTradePresenceTimestampsByPubkey.values().removeIf(timestamp -> timestamp < now); + ourRemovedCount = this.ourTradePresenceTimestampsByPubkey.size() - preRemoveCount; + } + + if (allRemovedCount > 0) + LOGGER.debug("Removed {} expired trade presences, of which {} ours", allRemovedCount, ourRemovedCount); } /*package*/ void updatePresence(Repository repository, TradeBotData tradeBotData, CrossChainTradeData tradeData) @@ -357,194 +370,197 @@ public class TradeBot implements Listener { String signerAddress = tradeNativeAccount.getAddress(); /* - * We only broadcast trade entry online signatures for BOB when OFFERING - * so that buyers don't click on offline / expired entries that would waste their time. - */ - if (tradeData.mode != AcctMode.OFFERING || !signerAddress.equals(tradeData.qortalCreatorTradeAddress)) + * There's no point in Alice trying to broadcast presence for an AT that isn't locked to her, + * as other peers won't be able to verify as signing public key isn't yet in the AT's data segment. + */ + if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) { + // Signer is neither Bob, nor trade locked to Alice + LOGGER.trace("Can't provide trade presence for our AT {} as it's not yet locked to Alice", atAddress); return; + } long now = NTP.getTime(); - - // Timestamps are considered good for full lifetime, but we'll refresh if older than half-lifetime - long threshold = (now / (ONLINE_LIFETIME / 2)) * (ONLINE_LIFETIME / 2); - long newExpiry = threshold + ONLINE_LIFETIME / 2; - + long newExpiry = generateExpiry(now); ByteArray pubkeyByteArray = ByteArray.of(tradeNativeAccount.getPublicKey()); - // If map's timestamp is missing, or too old, use the new timestamp - otherwise use existing timestamp. - synchronized (this.ourTimestampsByPubkey) { - Long currentTimestamp = this.ourTimestampsByPubkey.get(pubkeyByteArray); - if (currentTimestamp != null && currentTimestamp > threshold) + // If map entry's timestamp is missing, or within early renewal period, use the new expiry - otherwise use existing timestamp. + synchronized (this.ourTradePresenceTimestampsByPubkey) { + Long currentTimestamp = this.ourTradePresenceTimestampsByPubkey.get(pubkeyByteArray); + + if (currentTimestamp != null && currentTimestamp - now > EARLY_RENEWAL_PERIOD) { // timestamp still good + LOGGER.trace("Current trade presence timestamp {} still good for our trade {}", currentTimestamp, atAddress); return; + } - this.ourTimestampsByPubkey.put(pubkeyByteArray, newExpiry); + this.ourTradePresenceTimestampsByPubkey.put(pubkeyByteArray, newExpiry); } // Create signature byte[] signature = tradeNativeAccount.sign(Longs.toByteArray(newExpiry)); - // Add new online info to queue to be broadcast around network - OnlineTradeData onlineTradeData = new OnlineTradeData(newExpiry, tradeNativeAccount.getPublicKey(), signature, atAddress); - this.pendingOnlineSignatures.add(onlineTradeData); + // Add new trade presence to queue to be broadcast around network + TradePresenceData tradePresenceData = new TradePresenceData(newExpiry, tradeNativeAccount.getPublicKey(), signature, atAddress); + this.pendingTradePresences.add(tradePresenceData); - this.allOnlineByPubkey.put(pubkeyByteArray, onlineTradeData); - rebuildSafeAllOnline(); + this.allTradePresencesByPubkey.put(pubkeyByteArray, tradePresenceData); + rebuildSafeAllTradePresences(); - LOGGER.trace("New signed timestamp {} for our online trade {}", newExpiry, atAddress); + LOGGER.trace("New trade presence timestamp {} for our trade {}", newExpiry, atAddress); } - private void rebuildSafeAllOnline() { - synchronized (this.allOnlineByPubkey) { + private void rebuildSafeAllTradePresences() { + synchronized (this.allTradePresencesByPubkey) { // Collect into a *new* unmodifiable map. - this.safeAllOnlineByPubkey = Map.copyOf(this.allOnlineByPubkey); + this.safeAllTradePresencesByPubkey = Map.copyOf(this.allTradePresencesByPubkey); } } - private void broadcastOnlineSignatures() { - // If we have new online signatures that are pending broadcast, send those as a priority - if (!this.pendingOnlineSignatures.isEmpty()) { + private void broadcastPresenceTimestamps() { + // If we have new trade presences that are pending broadcast, send those as a priority + if (!this.pendingTradePresences.isEmpty()) { // Create a copy for Network to safely use in another thread - List safeOnlineSignatures; - synchronized (this.pendingOnlineSignatures) { - safeOnlineSignatures = List.copyOf(this.pendingOnlineSignatures); - this.pendingOnlineSignatures.clear(); + List safeTradePresences; + synchronized (this.pendingTradePresences) { + safeTradePresences = List.copyOf(this.pendingTradePresences); + this.pendingTradePresences.clear(); } - LOGGER.debug("Broadcasting {} new online trades", safeOnlineSignatures.size()); + LOGGER.debug("Broadcasting {} new trade presences", safeTradePresences.size()); - OnlineTradesMessage onlineTradesMessage = new OnlineTradesMessage(safeOnlineSignatures); - Network.getInstance().broadcast(peer -> onlineTradesMessage); + TradePresencesMessage tradePresencesMessage = new TradePresencesMessage(safeTradePresences); + Network.getInstance().broadcast(peer -> tradePresencesMessage); return; } - // As we have no new online signatures, check whether it's time to do a general broadcast + // As we have no new trade presences, check whether it's time to do a general broadcast Long now = NTP.getTime(); - if (now == null || now < nextBroadcastTimestamp) + if (now == null || now < nextTradePresenceBroadcastTimestamp) return; - nextBroadcastTimestamp = now + ONLINE_BROADCAST_INTERVAL; + nextTradePresenceBroadcastTimestamp = now + PRESENCE_BROADCAST_INTERVAL; - List safeOnlineSignatures = List.copyOf(this.safeAllOnlineByPubkey.values()); + List safeTradePresences = List.copyOf(this.safeAllTradePresencesByPubkey.values()); - if (safeOnlineSignatures.isEmpty()) + if (safeTradePresences.isEmpty()) return; - LOGGER.debug("Broadcasting all {} known online trades. Next broadcast timestamp: {}", - safeOnlineSignatures.size(), nextBroadcastTimestamp + LOGGER.debug("Broadcasting all {} known trade presences. Next broadcast timestamp: {}", + safeTradePresences.size(), nextTradePresenceBroadcastTimestamp ); - GetOnlineTradesMessage getOnlineTradesMessage = new GetOnlineTradesMessage(safeOnlineSignatures); - Network.getInstance().broadcast(peer -> getOnlineTradesMessage); + GetTradePresencesMessage getTradePresencesMessage = new GetTradePresencesMessage(safeTradePresences); + Network.getInstance().broadcast(peer -> getTradePresencesMessage); } // Network message processing - public void onGetOnlineTradesMessage(Peer peer, Message message) { - GetOnlineTradesMessage getOnlineTradesMessage = (GetOnlineTradesMessage) message; + public void onGetTradePresencesMessage(Peer peer, Message message) { + GetTradePresencesMessage getTradePresencesMessage = (GetTradePresencesMessage) message; - List peersOnlineTrades = getOnlineTradesMessage.getOnlineTrades(); + List peersTradePresences = getTradePresencesMessage.getTradePresences(); - Map entriesUnknownToPeer = new HashMap<>(this.safeAllOnlineByPubkey); + // Create mutable copy from safe snapshot + Map entriesUnknownToPeer = new HashMap<>(this.safeAllTradePresencesByPubkey); int knownCount = entriesUnknownToPeer.size(); - for (OnlineTradeData peersOnlineTrade : peersOnlineTrades) { - ByteArray pubkeyByteArray = ByteArray.of(peersOnlineTrade.getPublicKey()); + for (TradePresenceData peersTradePresence : peersTradePresences) { + ByteArray pubkeyByteArray = ByteArray.of(peersTradePresence.getPublicKey()); - OnlineTradeData ourEntry = entriesUnknownToPeer.get(pubkeyByteArray); + TradePresenceData ourEntry = entriesUnknownToPeer.get(pubkeyByteArray); - if (ourEntry != null && ourEntry.getTimestamp() == peersOnlineTrade.getTimestamp()) + if (ourEntry != null && ourEntry.getTimestamp() == peersTradePresence.getTimestamp()) entriesUnknownToPeer.remove(pubkeyByteArray); } if (entriesUnknownToPeer.isEmpty()) return; - LOGGER.debug("Sending {} online trades to peer {} after excluding their {} from known {}", - entriesUnknownToPeer.size(), peer, peersOnlineTrades.size(), knownCount + LOGGER.debug("Sending {} trade presences to peer {} after excluding their {} from known {}", + entriesUnknownToPeer.size(), peer, peersTradePresences.size(), knownCount ); // Send complement to peer - List safeOnlineSignatures = List.copyOf(entriesUnknownToPeer.values()); - Message responseMessage = new OnlineTradesMessage(safeOnlineSignatures); + List safeTradePresences = List.copyOf(entriesUnknownToPeer.values()); + Message responseMessage = new TradePresencesMessage(safeTradePresences); if (!peer.sendMessage(responseMessage)) { - peer.disconnect("failed to send online trades response"); + peer.disconnect("failed to send TRADE_PRESENCES response"); return; } } - public void onOnlineTradesMessage(Peer peer, Message message) { - OnlineTradesMessage onlineTradesMessage = (OnlineTradesMessage) message; + public void onTradePresencesMessage(Peer peer, Message message) { + TradePresencesMessage tradePresencesMessage = (TradePresencesMessage) message; - List peersOnlineTrades = onlineTradesMessage.getOnlineTrades(); + List peersTradePresences = tradePresencesMessage.getTradePresences(); long now = NTP.getTime(); - // Timestamps after this are too far into the future - long futureThreshold = (now / ONLINE_LIFETIME + 1) * ONLINE_LIFETIME; // Timestamps before this are too far into the past long pastThreshold = now; + // Timestamps after this are too far into the future + long futureThreshold = now + PRESENCE_LIFETIME; Map> acctSuppliersByCodeHash = SupportedBlockchain.getAcctMap(); int newCount = 0; try (final Repository repository = RepositoryManager.getRepository()) { - for (OnlineTradeData peersOnlineTrade : peersOnlineTrades) { - long timestamp = peersOnlineTrade.getTimestamp(); + for (TradePresenceData peersTradePresence : peersTradePresences) { + long timestamp = peersTradePresence.getTimestamp(); // Ignore if timestamp is out of bounds if (timestamp < pastThreshold || timestamp > futureThreshold) { if (timestamp < pastThreshold) - LOGGER.trace("Ignoring online trade {} from peer {} as timestamp {} is too old vs {}", - peersOnlineTrade.getAtAddress(), peer, timestamp, pastThreshold + LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too old vs {}", + peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold ); else - LOGGER.trace("Ignoring online trade {} from peer {} as timestamp {} is too new vs {}", - peersOnlineTrade.getAtAddress(), peer, timestamp, pastThreshold + LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too new vs {}", + peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold ); continue; } - ByteArray pubkeyByteArray = ByteArray.of(peersOnlineTrade.getPublicKey()); + ByteArray pubkeyByteArray = ByteArray.of(peersTradePresence.getPublicKey()); // Ignore if we've previously verified this timestamp+publickey combo or sent timestamp is older - OnlineTradeData existingTradeData = this.safeAllOnlineByPubkey.get(pubkeyByteArray); + TradePresenceData existingTradeData = this.safeAllTradePresencesByPubkey.get(pubkeyByteArray); if (existingTradeData != null && timestamp <= existingTradeData.getTimestamp()) { if (timestamp == existingTradeData.getTimestamp()) - LOGGER.trace("Ignoring online trade {} from peer {} as we have verified timestamp {} before", - peersOnlineTrade.getAtAddress(), peer, timestamp + LOGGER.trace("Ignoring trade presence {} from peer {} as we have verified timestamp {} before", + peersTradePresence.getAtAddress(), peer, timestamp ); else - LOGGER.trace("Ignoring online trade {} from peer {} as timestamp {} is older than latest {}", - peersOnlineTrade.getAtAddress(), peer, timestamp, existingTradeData.getTimestamp() + LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is older than latest {}", + peersTradePresence.getAtAddress(), peer, timestamp, existingTradeData.getTimestamp() ); continue; } // Check timestamp signature - byte[] timestampSignature = peersOnlineTrade.getSignature(); + byte[] timestampSignature = peersTradePresence.getSignature(); byte[] timestampBytes = Longs.toByteArray(timestamp); - byte[] publicKey = peersOnlineTrade.getPublicKey(); + byte[] publicKey = peersTradePresence.getPublicKey(); if (!Crypto.verify(publicKey, timestampSignature, timestampBytes)) { - LOGGER.trace("Ignoring online trade {} from peer {} as signature failed to verify", - peersOnlineTrade.getAtAddress(), peer + LOGGER.trace("Ignoring trade presence {} from peer {} as signature failed to verify", + peersTradePresence.getAtAddress(), peer ); continue; } - ATData atData = repository.getATRepository().fromATAddress(peersOnlineTrade.getAtAddress()); + ATData atData = repository.getATRepository().fromATAddress(peersTradePresence.getAtAddress()); if (atData == null || atData.getIsFrozen() || atData.getIsFinished()) { if (atData == null) - LOGGER.trace("Ignoring online trade {} from peer {} as AT doesn't exist", - peersOnlineTrade.getAtAddress(), peer + LOGGER.trace("Ignoring trade presence {} from peer {} as AT doesn't exist", + peersTradePresence.getAtAddress(), peer ); else - LOGGER.trace("Ignoring online trade {} from peer {} as AT is frozen or finished", - peersOnlineTrade.getAtAddress(), peer + LOGGER.trace("Ignoring trade presence {} from peer {} as AT is frozen or finished", + peersTradePresence.getAtAddress(), peer ); continue; @@ -553,8 +569,8 @@ public class TradeBot implements Listener { ByteArray atCodeHash = new ByteArray(atData.getCodeHash()); Supplier acctSupplier = acctSuppliersByCodeHash.get(atCodeHash); if (acctSupplier == null) { - LOGGER.trace("Ignoring online trade {} from peer {} as AT isn't a known ACCT?", - peersOnlineTrade.getAtAddress(), peer + LOGGER.trace("Ignoring trade presence {} from peer {} as AT isn't a known ACCT?", + peersTradePresence.getAtAddress(), peer ); continue; @@ -562,69 +578,78 @@ public class TradeBot implements Listener { CrossChainTradeData tradeData = acctSupplier.get().populateTradeData(repository, atData); if (tradeData == null) { - LOGGER.trace("Ignoring online trade {} from peer {} as trade data not found?", - peersOnlineTrade.getAtAddress(), peer + LOGGER.trace("Ignoring trade presence {} from peer {} as trade data not found?", + peersTradePresence.getAtAddress(), peer ); continue; } // Convert signer's public key to address form - String signerAddress = peersOnlineTrade.getTradeAddress(); + String signerAddress = peersTradePresence.getTradeAddress(); // Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form) if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) { - LOGGER.trace("Ignoring online trade {} from peer {} as signer isn't Alice or Bob?", - peersOnlineTrade.getAtAddress(), peer + LOGGER.trace("Ignoring trade presence {} from peer {} as signer isn't Alice or Bob?", + peersTradePresence.getAtAddress(), peer ); continue; } // This is new to us - this.allOnlineByPubkey.put(pubkeyByteArray, peersOnlineTrade); + this.allTradePresencesByPubkey.put(pubkeyByteArray, peersTradePresence); ++newCount; - LOGGER.trace("Added online trade {} from peer {} with timestamp {}", - peersOnlineTrade.getAtAddress(), peer, timestamp + LOGGER.trace("Added trade presence {} from peer {} with timestamp {}", + peersTradePresence.getAtAddress(), peer, timestamp ); } } catch (DataException e) { - LOGGER.error("Couldn't process ONLINE_TRADES message due to repository issue", e); + LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e); } if (newCount > 0) { - LOGGER.debug("New online trade signatures: {}", newCount); - rebuildSafeAllOnline(); + LOGGER.debug("New trade presences: {}", newCount); + rebuildSafeAllTradePresences(); } } public void bridgePresence(long timestamp, byte[] publicKey, byte[] signature, String atAddress) { - long expiry = (timestamp / ONLINE_LIFETIME + 1) * ONLINE_LIFETIME; + long expiry = generateExpiry(timestamp); ByteArray pubkeyByteArray = ByteArray.of(publicKey); - OnlineTradeData fakeOnlineTradeData = new OnlineTradeData(expiry, publicKey, signature, atAddress); + TradePresenceData fakeTradePresenceData = new TradePresenceData(expiry, publicKey, signature, atAddress); - OnlineTradeData computedOnlineTradeData = this.allOnlineByPubkey.compute(pubkeyByteArray, (k, v) -> (v == null || v.getTimestamp() < expiry) ? fakeOnlineTradeData : v); + // Only bridge if trade presence expiry timestamp is newer + TradePresenceData computedTradePresenceData = this.allTradePresencesByPubkey.compute(pubkeyByteArray, (k, v) -> + v == null || v.getTimestamp() < expiry ? fakeTradePresenceData : v + ); - if (computedOnlineTradeData == fakeOnlineTradeData) { - LOGGER.trace("Bridged online trade {} with timestamp {}", atAddress, expiry); - rebuildSafeAllOnline(); + if (computedTradePresenceData == fakeTradePresenceData) { + LOGGER.trace("Bridged PRESENCE transaction for trade {} with timestamp {}", atAddress, expiry); + rebuildSafeAllTradePresences(); } } + /** Decorates a CrossChainTradeData object with Alice / Bob trade-bot presence timestamp, if available. */ public void decorateTradeDataWithPresence(CrossChainTradeData crossChainTradeData) { // Match by AT address, then check for Bob vs Alice - this.safeAllOnlineByPubkey.values().stream() - .filter(onlineTradeData -> onlineTradeData.getAtAddress().equals(crossChainTradeData.qortalAtAddress)) - .forEach(onlineTradeData -> { - String signerAddress = onlineTradeData.getTradeAddress(); + this.safeAllTradePresencesByPubkey.values().stream() + .filter(tradePresenceData -> tradePresenceData.getAtAddress().equals(crossChainTradeData.qortalAtAddress)) + .forEach(tradePresenceData -> { + String signerAddress = tradePresenceData.getTradeAddress(); // Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form) if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress)) - crossChainTradeData.creatorPresenceExpiry = onlineTradeData.getTimestamp(); + crossChainTradeData.creatorPresenceExpiry = tradePresenceData.getTimestamp(); else if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress)) - crossChainTradeData.partnerPresenceExpiry = onlineTradeData.getTimestamp(); + crossChainTradeData.partnerPresenceExpiry = tradePresenceData.getTimestamp(); }); } + + private long generateExpiry(long timestamp) { + return ((timestamp - 1) / EXPIRY_ROUNDING) * EXPIRY_ROUNDING + PRESENCE_LIFETIME; + } + } diff --git a/src/main/java/org/qortal/data/network/OnlineTradeData.java b/src/main/java/org/qortal/data/network/TradePresenceData.java similarity index 67% rename from src/main/java/org/qortal/data/network/OnlineTradeData.java rename to src/main/java/org/qortal/data/network/TradePresenceData.java index 1ae48718..089cc742 100644 --- a/src/main/java/org/qortal/data/network/OnlineTradeData.java +++ b/src/main/java/org/qortal/data/network/TradePresenceData.java @@ -8,7 +8,7 @@ import java.util.Arrays; // All properties to be converted to JSON via JAXB @XmlAccessorType(XmlAccessType.FIELD) -public class OnlineTradeData { +public class TradePresenceData { protected long timestamp; protected byte[] publicKey; // Could be BOB's or ALICE's @@ -19,17 +19,17 @@ public class OnlineTradeData { // Constructors // necessary for JAXB serialization - protected OnlineTradeData() { + protected TradePresenceData() { } - public OnlineTradeData(long timestamp, byte[] publicKey, byte[] signature, String atAddress) { + public TradePresenceData(long timestamp, byte[] publicKey, byte[] signature, String atAddress) { this.timestamp = timestamp; this.publicKey = publicKey; this.signature = signature; this.atAddress = atAddress; } - public OnlineTradeData(long timestamp, byte[] publicKey) { + public TradePresenceData(long timestamp, byte[] publicKey) { this(timestamp, publicKey, null, null); } @@ -65,25 +65,25 @@ public class OnlineTradeData { if (other == this) return true; - if (!(other instanceof OnlineTradeData)) + if (!(other instanceof TradePresenceData)) return false; - OnlineTradeData otherOnlineTradeData = (OnlineTradeData) other; + TradePresenceData otherTradePresenceData = (TradePresenceData) other; // Very quick comparison - if (otherOnlineTradeData.timestamp != this.timestamp) + if (otherTradePresenceData.timestamp != this.timestamp) return false; - if (!Arrays.equals(otherOnlineTradeData.publicKey, this.publicKey)) + if (!Arrays.equals(otherTradePresenceData.publicKey, this.publicKey)) return false; - if (otherOnlineTradeData.atAddress != null && !otherOnlineTradeData.atAddress.equals(this.atAddress)) + if (otherTradePresenceData.atAddress != null && !otherTradePresenceData.atAddress.equals(this.atAddress)) return false; - if (this.atAddress != null && !this.atAddress.equals(otherOnlineTradeData.atAddress)) + if (this.atAddress != null && !this.atAddress.equals(otherTradePresenceData.atAddress)) return false; - if (!Arrays.equals(otherOnlineTradeData.signature, this.signature)) + if (!Arrays.equals(otherTradePresenceData.signature, this.signature)) return false; return true; diff --git a/src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java b/src/main/java/org/qortal/network/message/GetTradePresencesMessage.java similarity index 54% rename from src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java rename to src/main/java/org/qortal/network/message/GetTradePresencesMessage.java index 588f6a0b..d9be3c1b 100644 --- a/src/main/java/org/qortal/network/message/GetOnlineTradesMessage.java +++ b/src/main/java/org/qortal/network/message/GetTradePresencesMessage.java @@ -2,7 +2,7 @@ package org.qortal.network.message; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import org.qortal.data.network.OnlineTradeData; +import org.qortal.data.network.TradePresenceData; import org.qortal.transform.Transformer; import java.io.ByteArrayOutputStream; @@ -15,52 +15,52 @@ import java.util.List; import java.util.Map; /** - * For requesting which trades are online from remote peer, given our list of online trades. + * For requesting trade presences from remote peer, given our list of known trade presences. * * Groups of: number of entries, timestamp, then AT trade pubkey for each entry. */ -public class GetOnlineTradesMessage extends Message { - private List onlineTrades; +public class GetTradePresencesMessage extends Message { + private List tradePresences; private byte[] cachedData; - public GetOnlineTradesMessage(List onlineTrades) { - this(-1, onlineTrades); + public GetTradePresencesMessage(List tradePresences) { + this(-1, tradePresences); } - private GetOnlineTradesMessage(int id, List onlineTrades) { - super(id, MessageType.GET_ONLINE_TRADES); + private GetTradePresencesMessage(int id, List tradePresences) { + super(id, MessageType.GET_TRADE_PRESENCES); - this.onlineTrades = onlineTrades; + this.tradePresences = tradePresences; } - public List getOnlineTrades() { - return this.onlineTrades; + public List getTradePresences() { + return this.tradePresences; } public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { - int tradeCount = bytes.getInt(); + int groupedEntriesCount = bytes.getInt(); - List onlineTrades = new ArrayList<>(tradeCount); + List tradePresences = new ArrayList<>(groupedEntriesCount); - while (tradeCount > 0) { + while (groupedEntriesCount > 0) { long timestamp = bytes.getLong(); - for (int i = 0; i < tradeCount; ++i) { + for (int i = 0; i < groupedEntriesCount; ++i) { byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; bytes.get(publicKey); - onlineTrades.add(new OnlineTradeData(timestamp, publicKey)); + tradePresences.add(new TradePresenceData(timestamp, publicKey)); } if (bytes.hasRemaining()) { - tradeCount = bytes.getInt(); + groupedEntriesCount = bytes.getInt(); } else { // we've finished - tradeCount = 0; + groupedEntriesCount = 0; } } - return new GetOnlineTradesMessage(id, onlineTrades); + return new GetTradePresencesMessage(id, tradePresences); } @Override @@ -68,8 +68,8 @@ public class GetOnlineTradesMessage extends Message { if (this.cachedData != null) return this.cachedData; - // Shortcut in case we have no online accounts - if (this.onlineTrades.isEmpty()) { + // Shortcut in case we have no trade presences + if (this.tradePresences.isEmpty()) { this.cachedData = Ints.toByteArray(0); return this.cachedData; } @@ -77,14 +77,14 @@ public class GetOnlineTradesMessage extends Message { // How many of each timestamp Map countByTimestamp = new HashMap<>(); - for (OnlineTradeData onlineTradeData : this.onlineTrades) { - Long timestamp = onlineTradeData.getTimestamp(); + for (TradePresenceData tradePresenceData : this.tradePresences) { + Long timestamp = tradePresenceData.getTimestamp(); countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); } // We should know exactly how many bytes to allocate now int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) - + this.onlineTrades.size() * Transformer.PUBLIC_KEY_LENGTH; + + this.tradePresences.size() * Transformer.PUBLIC_KEY_LENGTH; try { ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); @@ -94,9 +94,9 @@ public class GetOnlineTradesMessage extends Message { bytes.write(Longs.toByteArray(timestamp)); - for (OnlineTradeData onlineTradeData : this.onlineTrades) { - if (onlineTradeData.getTimestamp() == timestamp) - bytes.write(onlineTradeData.getPublicKey()); + for (TradePresenceData tradePresenceData : this.tradePresences) { + if (tradePresenceData.getTimestamp() == timestamp) + bytes.write(tradePresenceData.getPublicKey()); } } diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index ae7e68c9..d119725d 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -95,8 +95,8 @@ public abstract class Message { ARBITRARY_SIGNATURES(130), - ONLINE_TRADES(140), - GET_ONLINE_TRADES(141), + TRADE_PRESENCES(140), + GET_TRADE_PRESENCES(141), ; public final int value; diff --git a/src/main/java/org/qortal/network/message/OnlineTradesMessage.java b/src/main/java/org/qortal/network/message/TradePresencesMessage.java similarity index 55% rename from src/main/java/org/qortal/network/message/OnlineTradesMessage.java rename to src/main/java/org/qortal/network/message/TradePresencesMessage.java index 74912d8e..9d846722 100644 --- a/src/main/java/org/qortal/network/message/OnlineTradesMessage.java +++ b/src/main/java/org/qortal/network/message/TradePresencesMessage.java @@ -2,7 +2,7 @@ package org.qortal.network.message; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import org.qortal.data.network.OnlineTradeData; +import org.qortal.data.network.TradePresenceData; import org.qortal.transform.Transformer; import org.qortal.utils.Base58; @@ -10,44 +10,43 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** - * For sending list of which trades are online to remote peer. + * For sending list of trade presences to remote peer. * * Groups of: number of entries, timestamp, then pubkey + sig + AT address for each entry. */ -public class OnlineTradesMessage extends Message { - private List onlineTrades; +public class TradePresencesMessage extends Message { + private List tradePresences; private byte[] cachedData; - public OnlineTradesMessage(List onlineTrades) { - this(-1, onlineTrades); + public TradePresencesMessage(List tradePresences) { + this(-1, tradePresences); } - private OnlineTradesMessage(int id, List onlineTrades) { - super(id, MessageType.ONLINE_TRADES); + private TradePresencesMessage(int id, List tradePresences) { + super(id, MessageType.TRADE_PRESENCES); - this.onlineTrades = onlineTrades; + this.tradePresences = tradePresences; } - public List getOnlineTrades() { - return this.onlineTrades; + public List getTradePresences() { + return this.tradePresences; } public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { - int tradeCount = bytes.getInt(); + int groupedEntriesCount = bytes.getInt(); - List onlineTrades = new ArrayList<>(tradeCount); + List tradePresences = new ArrayList<>(groupedEntriesCount); - while (tradeCount > 0) { + while (groupedEntriesCount > 0) { long timestamp = bytes.getLong(); - for (int i = 0; i < tradeCount; ++i) { + for (int i = 0; i < groupedEntriesCount; ++i) { byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; bytes.get(publicKey); @@ -58,18 +57,18 @@ public class OnlineTradesMessage extends Message { bytes.get(atAddressBytes); String atAddress = Base58.encode(atAddressBytes); - onlineTrades.add(new OnlineTradeData(timestamp, publicKey, signature, atAddress)); + tradePresences.add(new TradePresenceData(timestamp, publicKey, signature, atAddress)); } if (bytes.hasRemaining()) { - tradeCount = bytes.getInt(); + groupedEntriesCount = bytes.getInt(); } else { // we've finished - tradeCount = 0; + groupedEntriesCount = 0; } } - return new OnlineTradesMessage(id, onlineTrades); + return new TradePresencesMessage(id, tradePresences); } @Override @@ -77,8 +76,8 @@ public class OnlineTradesMessage extends Message { if (this.cachedData != null) return this.cachedData; - // Shortcut in case we have no online trade entries - if (this.onlineTrades.isEmpty()) { + // Shortcut in case we have no trade presences + if (this.tradePresences.isEmpty()) { this.cachedData = Ints.toByteArray(0); return this.cachedData; } @@ -86,14 +85,14 @@ public class OnlineTradesMessage extends Message { // How many of each timestamp Map countByTimestamp = new HashMap<>(); - for (OnlineTradeData onlineTradeData : this.onlineTrades) { - Long timestamp = onlineTradeData.getTimestamp(); + for (TradePresenceData tradePresenceData : this.tradePresences) { + Long timestamp = tradePresenceData.getTimestamp(); countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); } // We should know exactly how many bytes to allocate now int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) - + this.onlineTrades.size() * (Transformer.PUBLIC_KEY_LENGTH + Transformer.SIGNATURE_LENGTH + Transformer.ADDRESS_LENGTH); + + this.tradePresences.size() * (Transformer.PUBLIC_KEY_LENGTH + Transformer.SIGNATURE_LENGTH + Transformer.ADDRESS_LENGTH); try { ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); @@ -103,13 +102,13 @@ public class OnlineTradesMessage extends Message { bytes.write(Longs.toByteArray(timestamp)); - for (OnlineTradeData onlineTradeData : this.onlineTrades) { - if (onlineTradeData.getTimestamp() == timestamp) { - bytes.write(onlineTradeData.getPublicKey()); + for (TradePresenceData tradePresenceData : this.tradePresences) { + if (tradePresenceData.getTimestamp() == timestamp) { + bytes.write(tradePresenceData.getPublicKey()); - bytes.write(onlineTradeData.getSignature()); + bytes.write(tradePresenceData.getSignature()); - bytes.write(Base58.decode(onlineTradeData.getAtAddress())); + bytes.write(Base58.decode(tradePresenceData.getAtAddress())); } } } diff --git a/src/test/java/org/qortal/test/crosschain/TradeBotPresenceTests.java b/src/test/java/org/qortal/test/crosschain/TradeBotPresenceTests.java new file mode 100644 index 00000000..c60a046b --- /dev/null +++ b/src/test/java/org/qortal/test/crosschain/TradeBotPresenceTests.java @@ -0,0 +1,112 @@ +package org.qortal.test.crosschain; + +import org.junit.Test; +import org.qortal.utils.ByteArray; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class TradeBotPresenceTests { + + public static final long ROUNDING = 15 * 60 * 1000L; // to nearest X mins + public static final long LIFETIME = 30 * 60 * 1000L; // lifetime: X mins + public static final long EARLY_RENEWAL_LIFETIME = 5 * 60 * 1000L; // X mins before expiry + public static final long CHECK_INTERVAL = 5 * 60 * 1000L; // X mins + public static final long MAX_TIMESTAMP = 100 * 60 * 1000L; // run tests for X mins + + // We want to generate timestamps that expire 30 mins into the future, but also round to nearest X min? + // We want to regenerate timestamps early (e.g. 15 mins before expiry) to allow for network propagation + + // We want to keep the latest timestamp for any given public key + // We want to reject out-of-bound timestamps from peers (>30 mins into future, not now/past) + + // We want to make sure that we don't incorrectly delete an entry at 15-min and 30-min boundaries + + @Test + public void testGeneratedExpiryTimestamps() { + for (long timestamp = 0; timestamp <= MAX_TIMESTAMP; timestamp += CHECK_INTERVAL) { + long expiry = generateExpiry(timestamp); + + System.out.println(String.format("time: % 3dm, expiry: % 3dm", + timestamp / 60_000L, + expiry / 60_000L + )); + } + } + + @Test + public void testEarlyRenewal() { + Long currentExpiry = null; + + for (long timestamp = 0; timestamp <= MAX_TIMESTAMP; timestamp += CHECK_INTERVAL) { + long newExpiry = generateExpiry(timestamp); + + if (currentExpiry == null || currentExpiry - timestamp <= EARLY_RENEWAL_LIFETIME) { + currentExpiry = newExpiry; + } + + System.out.println(String.format("time: % 3dm, expiry: % 3dm", + timestamp / 60_000L, + currentExpiry / 60_000L + )); + } + } + + @Test + public void testEnforceLatestTimestamp() { + ByteArray pubkeyByteArray = ByteArray.of("publickey".getBytes(StandardCharsets.UTF_8)); + + Map timestampsByPublicKey = new HashMap<>(); + + // Working backwards this time + for (long timestamp = MAX_TIMESTAMP; timestamp >= 0; timestamp -= CHECK_INTERVAL){ + long newExpiry = generateExpiry(timestamp); + + timestampsByPublicKey.compute(pubkeyByteArray, (k, v) -> + v == null || v < newExpiry ? newExpiry : v + ); + + Long currentExpiry = timestampsByPublicKey.get(pubkeyByteArray); + + System.out.println(String.format("time: % 3dm, expiry: % 3dm", + timestamp / 60_000L, + currentExpiry / 60_000L + )); + } + } + + @Test + public void testEnforcePeerExpiryBounds() { + System.out.println(String.format("%40s", "Our time")); + + for (long ourTimestamp = 0; ourTimestamp <= MAX_TIMESTAMP; ourTimestamp += CHECK_INTERVAL) { + System.out.print(String.format("%s% 3dm ", + ourTimestamp != 0 ? "| " : " ", + ourTimestamp / 60_000L + )); + } + System.out.println(); + + for (long peerTimestamp = 0; peerTimestamp <= MAX_TIMESTAMP; peerTimestamp += CHECK_INTERVAL) { + System.out.print(String.format("% 4dm ", peerTimestamp / 60_000L)); + + for (long ourTimestamp = 0; ourTimestamp <= MAX_TIMESTAMP; ourTimestamp += CHECK_INTERVAL) { + System.out.print(String.format("| %s ", + isPeerExpiryValid(ourTimestamp, peerTimestamp) ? "✔" : "✘" + )); + } + System.out.println(); + } + + System.out.println("Peer's expiry time"); + } + + private long generateExpiry(long timestamp) { + return ((timestamp - 1) / ROUNDING) * ROUNDING + LIFETIME; + } + + private boolean isPeerExpiryValid(long nowTimestamp, long peerExpiry) { + return peerExpiry > nowTimestamp && peerExpiry <= LIFETIME + nowTimestamp; + } +} From 1d59feeb7216d958507e7786d29edb9ac5d8dc19 Mon Sep 17 00:00:00 2001 From: catbref Date: Wed, 23 Feb 2022 21:53:48 +0000 Subject: [PATCH 5/9] Created /websockets/crosschain/tradepresence to replace /websockets/presence --- src/main/java/org/qortal/api/ApiService.java | 11 +- .../api/websocket/TradePresenceWebSocket.java | 137 ++++++++++++++++++ .../qortal/controller/tradebot/TradeBot.java | 20 +++ .../data/network/TradePresenceData.java | 4 +- 4 files changed, 164 insertions(+), 8 deletions(-) create mode 100644 src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java diff --git a/src/main/java/org/qortal/api/ApiService.java b/src/main/java/org/qortal/api/ApiService.java index 697543c7..78c9250c 100644 --- a/src/main/java/org/qortal/api/ApiService.java +++ b/src/main/java/org/qortal/api/ApiService.java @@ -40,13 +40,7 @@ import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; import org.qortal.api.resource.AnnotationPostProcessor; import org.qortal.api.resource.ApiDefinition; -import org.qortal.api.websocket.ActiveChatsWebSocket; -import org.qortal.api.websocket.AdminStatusWebSocket; -import org.qortal.api.websocket.BlocksWebSocket; -import org.qortal.api.websocket.ChatMessagesWebSocket; -import org.qortal.api.websocket.PresenceWebSocket; -import org.qortal.api.websocket.TradeBotWebSocket; -import org.qortal.api.websocket.TradeOffersWebSocket; +import org.qortal.api.websocket.*; import org.qortal.settings.Settings; public class ApiService { @@ -212,6 +206,9 @@ public class ApiService { context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages"); context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers"); context.addServlet(TradeBotWebSocket.class, "/websockets/crosschain/tradebot"); + context.addServlet(TradePresenceWebSocket.class, "/websockets/crosschain/tradepresence"); + + // Deprecated context.addServlet(PresenceWebSocket.class, "/websockets/presence"); // Start server diff --git a/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java b/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java new file mode 100644 index 00000000..808accbf --- /dev/null +++ b/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java @@ -0,0 +1,137 @@ +package org.qortal.api.websocket; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.*; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.qortal.controller.Controller; +import org.qortal.controller.tradebot.TradeBot; +import org.qortal.data.network.TradePresenceData; +import org.qortal.event.Event; +import org.qortal.event.EventBus; +import org.qortal.event.Listener; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.*; + +@WebSocket +@SuppressWarnings("serial") +public class TradePresenceWebSocket extends ApiWebSocket implements Listener { + + /** Map key is public key in base58, map value is trade presence */ + private static final Map currentEntries = Collections.synchronizedMap(new HashMap<>()); + + @Override + public void configure(WebSocketServletFactory factory) { + factory.register(TradePresenceWebSocket.class); + + populateCurrentInfo(); + + EventBus.INSTANCE.addListener(this::listen); + } + + @Override + public void listen(Event event) { + // XXX - Suggest we change this to something like Synchronizer.NewChainTipEvent? + // We use NewBlockEvent as a proxy for 1-minute timer + if (!(event instanceof TradeBot.TradePresenceEvent) && !(event instanceof Controller.NewBlockEvent)) + return; + + removeOldEntries(); + + if (event instanceof Controller.NewBlockEvent) + // We only wanted a chance to cull old entries + return; + + TradePresenceData tradePresence = ((TradeBot.TradePresenceEvent) event).getTradePresenceData(); + + boolean somethingChanged = mergePresence(tradePresence); + + if (!somethingChanged) + // nothing changed + return; + + List tradePresences = Collections.singletonList(tradePresence); + + // Notify sessions + for (Session session : getSessions()) { + sendTradePresences(session, tradePresences); + } + } + + @OnWebSocketConnect + @Override + public void onWebSocketConnect(Session session) { + Map> queryParams = session.getUpgradeRequest().getParameterMap(); + + List tradePresences; + + synchronized (currentEntries) { + tradePresences = List.copyOf(currentEntries.values()); + } + + if (!sendTradePresences(session, tradePresences)) { + session.close(4002, "websocket issue"); + return; + } + + super.onWebSocketConnect(session); + } + + @OnWebSocketClose + @Override + public void onWebSocketClose(Session session, int statusCode, String reason) { + // clean up + super.onWebSocketClose(session, statusCode, reason); + } + + @OnWebSocketError + public void onWebSocketError(Session session, Throwable throwable) { + /* ignored */ + } + + @OnWebSocketMessage + public void onWebSocketMessage(Session session, String message) { + /* ignored */ + } + + private boolean sendTradePresences(Session session, List tradePresences) { + try { + StringWriter stringWriter = new StringWriter(); + marshall(stringWriter, tradePresences); + + String output = stringWriter.toString(); + session.getRemote().sendStringByFuture(output); + } catch (IOException e) { + // No output this time? + return false; + } + + return true; + } + + private static void populateCurrentInfo() { + // We want ALL trade presences + TradeBot.getInstance().getAllTradePresences().stream() + .forEach(TradePresenceWebSocket::mergePresence); + } + + /** Merge trade presence into cache of current entries, returns true if cache was updated. */ + private static boolean mergePresence(TradePresenceData tradePresence) { + // Put/replace for this publickey making sure we keep newest timestamp + String pubKey58 = Base58.encode(tradePresence.getPublicKey()); + + TradePresenceData newEntry = currentEntries.compute(pubKey58, (k, v) -> v == null || v.getTimestamp() < tradePresence.getTimestamp() ? tradePresence : v); + + return newEntry != tradePresence; + } + + private static void removeOldEntries() { + long now = NTP.getTime(); + + currentEntries.values().removeIf(v -> v.getTimestamp() < now); + } + +} diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index 4caeeca5..bdedf831 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -79,6 +79,18 @@ public class TradeBot implements Listener { } } + public static class TradePresenceEvent implements Event { + private final TradePresenceData tradePresenceData; + + public TradePresenceEvent(TradePresenceData tradePresenceData) { + this.tradePresenceData = tradePresenceData; + } + + public TradePresenceData getTradePresenceData() { + return this.tradePresenceData; + } + } + private static final Map, Supplier> acctTradeBotSuppliers = new HashMap<>(); static { acctTradeBotSuppliers.put(BitcoinACCTv1.class, BitcoinACCTv1TradeBot::getInstance); @@ -340,6 +352,10 @@ public class TradeBot implements Listener { // PRESENCE-related + public Collection getAllTradePresences() { + return this.safeAllTradePresencesByPubkey.values(); + } + /** Trade presence timestamps expire in the 'future' so any that reach 'now' have expired and are removed. */ private void expireOldPresenceTimestamps() { long now = NTP.getTime(); @@ -407,6 +423,8 @@ public class TradeBot implements Listener { rebuildSafeAllTradePresences(); LOGGER.trace("New trade presence timestamp {} for our trade {}", newExpiry, atAddress); + + EventBus.INSTANCE.notify(new TradePresenceEvent(tradePresenceData)); } private void rebuildSafeAllTradePresences() { @@ -604,6 +622,8 @@ public class TradeBot implements Listener { LOGGER.trace("Added trade presence {} from peer {} with timestamp {}", peersTradePresence.getAtAddress(), peer, timestamp ); + + EventBus.INSTANCE.notify(new TradePresenceEvent(peersTradePresence)); } } catch (DataException e) { LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e); diff --git a/src/main/java/org/qortal/data/network/TradePresenceData.java b/src/main/java/org/qortal/data/network/TradePresenceData.java index 089cc742..c1fafa84 100644 --- a/src/main/java/org/qortal/data/network/TradePresenceData.java +++ b/src/main/java/org/qortal/data/network/TradePresenceData.java @@ -4,6 +4,7 @@ import org.qortal.crypto.Crypto; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; import java.util.Arrays; // All properties to be converted to JSON via JAXB @@ -49,7 +50,8 @@ public class TradePresenceData { return this.atAddress; } - // Probably don't need synchronization + // Probably doesn't need synchronization + @XmlElement public String getTradeAddress() { if (tradeAddress != null) return tradeAddress; From e2ef5b2ef3a8e468f606e2f8fa1d1e3339385dd2 Mon Sep 17 00:00:00 2001 From: catbref Date: Thu, 24 Feb 2022 08:41:41 +0000 Subject: [PATCH 6/9] Missed change from last commit: incorrect logic in TradePresenceWebSocket --- .../java/org/qortal/api/websocket/TradePresenceWebSocket.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java b/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java index 808accbf..e9558599 100644 --- a/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java +++ b/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java @@ -125,7 +125,7 @@ public class TradePresenceWebSocket extends ApiWebSocket implements Listener { TradePresenceData newEntry = currentEntries.compute(pubKey58, (k, v) -> v == null || v.getTimestamp() < tradePresence.getTimestamp() ? tradePresence : v); - return newEntry != tradePresence; + return newEntry == tradePresence; } private static void removeOldEntries() { From 3b477ef637e208702bce2b6e356c36e405e0ccb8 Mon Sep 17 00:00:00 2001 From: catbref Date: Thu, 24 Feb 2022 17:30:27 +0000 Subject: [PATCH 7/9] Fix JAXB marshalling error (duplicate tradeAddress) in TradePresenceWebSocket. No need to send signature. Make sure publicKey is sent in Base58, not Base64. --- .../org/qortal/data/network/TradePresenceData.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/main/java/org/qortal/data/network/TradePresenceData.java b/src/main/java/org/qortal/data/network/TradePresenceData.java index c1fafa84..9bd9ce29 100644 --- a/src/main/java/org/qortal/data/network/TradePresenceData.java +++ b/src/main/java/org/qortal/data/network/TradePresenceData.java @@ -5,6 +5,8 @@ import org.qortal.crypto.Crypto; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import java.util.Arrays; // All properties to be converted to JSON via JAXB @@ -12,9 +14,21 @@ import java.util.Arrays; public class TradePresenceData { protected long timestamp; + + @XmlJavaTypeAdapter( + type = byte[].class, + value = org.qortal.api.Base58TypeAdapter.class + ) protected byte[] publicKey; // Could be BOB's or ALICE's + + // No need to send this via websocket / API + @XmlTransient protected byte[] signature; // Not always present + protected String atAddress; // Not always present + + // Have JAXB use getter instead + @XmlTransient protected String tradeAddress; // Lazily instantiated // Constructors From ecac47d1bcf13e75ed5f3c6327377c9472b93bac Mon Sep 17 00:00:00 2001 From: catbref Date: Sat, 26 Feb 2022 17:57:47 +0000 Subject: [PATCH 8/9] Also notify TradePresenceWebsocket (using TradePresenceEvent) when bridging old PRESENCE txns --- src/main/java/org/qortal/controller/tradebot/TradeBot.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index bdedf831..1786a130 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -649,6 +649,8 @@ public class TradeBot implements Listener { if (computedTradePresenceData == fakeTradePresenceData) { LOGGER.trace("Bridged PRESENCE transaction for trade {} with timestamp {}", atAddress, expiry); rebuildSafeAllTradePresences(); + + EventBus.INSTANCE.notify(new TradePresenceEvent(fakeTradePresenceData)); } } From 590a8f52db549a06b2a21781493a33d8a9f0be9f Mon Sep 17 00:00:00 2001 From: catbref Date: Sun, 27 Feb 2022 16:57:26 +0000 Subject: [PATCH 9/9] Remove future work comment from Controller --- src/main/java/org/qortal/controller/Controller.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 29c0992b..31d18665 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1222,8 +1222,6 @@ public class Controller extends Thread { } public void onPeerHandshakeCompleted(Peer peer) { - // XXX: we could turn this into an EventBus event so that listeners like TradeBot get a look-in - // Only send if outbound if (peer.isOutbound()) { // Request peer's unconfirmed transactions