diff --git a/src/main/java/org/qortal/api/ApiService.java b/src/main/java/org/qortal/api/ApiService.java index 697543c7..78c9250c 100644 --- a/src/main/java/org/qortal/api/ApiService.java +++ b/src/main/java/org/qortal/api/ApiService.java @@ -40,13 +40,7 @@ import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; import org.qortal.api.resource.AnnotationPostProcessor; import org.qortal.api.resource.ApiDefinition; -import org.qortal.api.websocket.ActiveChatsWebSocket; -import org.qortal.api.websocket.AdminStatusWebSocket; -import org.qortal.api.websocket.BlocksWebSocket; -import org.qortal.api.websocket.ChatMessagesWebSocket; -import org.qortal.api.websocket.PresenceWebSocket; -import org.qortal.api.websocket.TradeBotWebSocket; -import org.qortal.api.websocket.TradeOffersWebSocket; +import org.qortal.api.websocket.*; import org.qortal.settings.Settings; public class ApiService { @@ -212,6 +206,9 @@ public class ApiService { context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages"); context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers"); context.addServlet(TradeBotWebSocket.class, "/websockets/crosschain/tradebot"); + context.addServlet(TradePresenceWebSocket.class, "/websockets/crosschain/tradepresence"); + + // Deprecated context.addServlet(PresenceWebSocket.class, "/websockets/presence"); // Start server diff --git a/src/main/java/org/qortal/api/resource/CrossChainResource.java b/src/main/java/org/qortal/api/resource/CrossChainResource.java index 47eee301..bb7c70a5 100644 --- a/src/main/java/org/qortal/api/resource/CrossChainResource.java +++ b/src/main/java/org/qortal/api/resource/CrossChainResource.java @@ -25,6 +25,7 @@ import org.qortal.api.ApiExceptionFactory; import org.qortal.api.Security; import org.qortal.api.model.CrossChainCancelRequest; import org.qortal.api.model.CrossChainTradeSummary; +import org.qortal.controller.tradebot.TradeBot; import org.qortal.crosschain.SupportedBlockchain; import org.qortal.crosschain.ACCT; import org.qortal.crosschain.AcctMode; @@ -120,6 +121,8 @@ public class CrossChainResource { crossChainTrades = crossChainTrades.subList(0, upperLimit); } + crossChainTrades.stream().forEach(CrossChainResource::decorateTradeDataWithPresence); + return crossChainTrades; } catch (DataException e) { throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e); @@ -151,7 +154,11 @@ public class CrossChainResource { if (acct == null) throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_CRITERIA); - return acct.populateTradeData(repository, atData); + CrossChainTradeData crossChainTradeData = acct.populateTradeData(repository, atData); + + decorateTradeDataWithPresence(crossChainTradeData); + + return crossChainTradeData; } catch (DataException e) { throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e); } @@ -486,4 +493,7 @@ public class CrossChainResource { } } + private static void decorateTradeDataWithPresence(CrossChainTradeData crossChainTradeData) { + TradeBot.getInstance().decorateTradeDataWithPresence(crossChainTradeData); + } } diff --git a/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java b/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java new file mode 100644 index 00000000..e9558599 --- /dev/null +++ b/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java @@ -0,0 +1,137 @@ +package org.qortal.api.websocket; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.*; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.qortal.controller.Controller; +import org.qortal.controller.tradebot.TradeBot; +import org.qortal.data.network.TradePresenceData; +import org.qortal.event.Event; +import org.qortal.event.EventBus; +import org.qortal.event.Listener; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.*; + +@WebSocket +@SuppressWarnings("serial") +public class TradePresenceWebSocket extends ApiWebSocket implements Listener { + + /** Map key is public key in base58, map value is trade presence */ + private static final Map currentEntries = Collections.synchronizedMap(new HashMap<>()); + + @Override + public void configure(WebSocketServletFactory factory) { + factory.register(TradePresenceWebSocket.class); + + populateCurrentInfo(); + + EventBus.INSTANCE.addListener(this::listen); + } + + @Override + public void listen(Event event) { + // XXX - Suggest we change this to something like Synchronizer.NewChainTipEvent? + // We use NewBlockEvent as a proxy for 1-minute timer + if (!(event instanceof TradeBot.TradePresenceEvent) && !(event instanceof Controller.NewBlockEvent)) + return; + + removeOldEntries(); + + if (event instanceof Controller.NewBlockEvent) + // We only wanted a chance to cull old entries + return; + + TradePresenceData tradePresence = ((TradeBot.TradePresenceEvent) event).getTradePresenceData(); + + boolean somethingChanged = mergePresence(tradePresence); + + if (!somethingChanged) + // nothing changed + return; + + List tradePresences = Collections.singletonList(tradePresence); + + // Notify sessions + for (Session session : getSessions()) { + sendTradePresences(session, tradePresences); + } + } + + @OnWebSocketConnect + @Override + public void onWebSocketConnect(Session session) { + Map> queryParams = session.getUpgradeRequest().getParameterMap(); + + List tradePresences; + + synchronized (currentEntries) { + tradePresences = List.copyOf(currentEntries.values()); + } + + if (!sendTradePresences(session, tradePresences)) { + session.close(4002, "websocket issue"); + return; + } + + super.onWebSocketConnect(session); + } + + @OnWebSocketClose + @Override + public void onWebSocketClose(Session session, int statusCode, String reason) { + // clean up + super.onWebSocketClose(session, statusCode, reason); + } + + @OnWebSocketError + public void onWebSocketError(Session session, Throwable throwable) { + /* ignored */ + } + + @OnWebSocketMessage + public void onWebSocketMessage(Session session, String message) { + /* ignored */ + } + + private boolean sendTradePresences(Session session, List tradePresences) { + try { + StringWriter stringWriter = new StringWriter(); + marshall(stringWriter, tradePresences); + + String output = stringWriter.toString(); + session.getRemote().sendStringByFuture(output); + } catch (IOException e) { + // No output this time? + return false; + } + + return true; + } + + private static void populateCurrentInfo() { + // We want ALL trade presences + TradeBot.getInstance().getAllTradePresences().stream() + .forEach(TradePresenceWebSocket::mergePresence); + } + + /** Merge trade presence into cache of current entries, returns true if cache was updated. */ + private static boolean mergePresence(TradePresenceData tradePresence) { + // Put/replace for this publickey making sure we keep newest timestamp + String pubKey58 = Base58.encode(tradePresence.getPublicKey()); + + TradePresenceData newEntry = currentEntries.compute(pubKey58, (k, v) -> v == null || v.getTimestamp() < tradePresence.getTimestamp() ? tradePresence : v); + + return newEntry == tradePresence; + } + + private static void removeOldEntries() { + long now = NTP.getTime(); + + currentEntries.values().removeIf(v -> v.getTimestamp() < now); + } + +} diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 542a2889..31d18665 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1312,6 +1312,14 @@ public class Controller extends Thread { ArbitraryDataManager.getInstance().onNetworkArbitrarySignaturesMessage(peer, message); break; + case GET_TRADE_PRESENCES: + TradeBot.getInstance().onGetTradePresencesMessage(peer, message); + break; + + case TRADE_PRESENCES: + TradeBot.getInstance().onTradePresencesMessage(peer, message); + break; + default: LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer)); break; diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index 6996acbb..1786a130 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -2,16 +2,11 @@ package org.qortal.controller.tradebot; import java.awt.TrayIcon.MessageType; import java.security.SecureRandom; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.locks.ReentrantLock; +import java.util.*; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.util.Supplier; import org.bitcoinj.core.ECKey; import org.qortal.account.PrivateKeyAccount; import org.qortal.api.model.crosschain.TradeBotCreateRequest; @@ -19,25 +14,26 @@ import org.qortal.controller.Controller; import org.qortal.controller.Synchronizer; import org.qortal.controller.tradebot.AcctTradeBot.ResponseResult; import org.qortal.crosschain.*; +import org.qortal.crypto.Crypto; import org.qortal.data.at.ATData; import org.qortal.data.crosschain.CrossChainTradeData; import org.qortal.data.crosschain.TradeBotData; -import org.qortal.data.transaction.BaseTransactionData; -import org.qortal.data.transaction.PresenceTransactionData; +import org.qortal.data.network.TradePresenceData; import org.qortal.event.Event; import org.qortal.event.EventBus; import org.qortal.event.Listener; -import org.qortal.group.Group; import org.qortal.gui.SysTray; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.message.GetTradePresencesMessage; +import org.qortal.network.message.Message; +import org.qortal.network.message.TradePresencesMessage; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; import org.qortal.repository.hsqldb.HSQLDBImportExport; import org.qortal.settings.Settings; -import org.qortal.transaction.PresenceTransaction; -import org.qortal.transaction.PresenceTransaction.PresenceType; -import org.qortal.transaction.Transaction.ValidationResult; -import org.qortal.transform.transaction.TransactionTransformer; +import org.qortal.utils.ByteArray; import org.qortal.utils.NTP; import com.google.common.primitives.Longs; @@ -57,6 +53,15 @@ public class TradeBot implements Listener { private static final Logger LOGGER = LogManager.getLogger(TradeBot.class); private static final Random RANDOM = new SecureRandom(); + /** Maximum lifetime of trade presence timestamp. 30 mins in ms. */ + private static final long PRESENCE_LIFETIME = 30 * 60 * 1000L; + /** How soon before expiry of our own trade presence timestamp that we want to trigger renewal. 5 mins in ms. */ + private static final long EARLY_RENEWAL_PERIOD = 5 * 60 * 1000L; + /** Trade presence timestamps are rounded up to this nearest interval. Bigger values improve grouping of entries in [GET_]TRADE_PRESENCES network messages. 15 mins in ms. */ + private static final long EXPIRY_ROUNDING = 15 * 60 * 1000L; + /** How often we want to broadcast our list of all known trade presences to peers. 5 mins in ms. */ + private static final long PRESENCE_BROADCAST_INTERVAL = 5 * 60 * 1000L; + public interface StateNameAndValueSupplier { public String getState(); public int getStateValue(); @@ -74,6 +79,18 @@ public class TradeBot implements Listener { } } + public static class TradePresenceEvent implements Event { + private final TradePresenceData tradePresenceData; + + public TradePresenceEvent(TradePresenceData tradePresenceData) { + this.tradePresenceData = tradePresenceData; + } + + public TradePresenceData getTradePresenceData() { + return this.tradePresenceData; + } + } + private static final Map, Supplier> acctTradeBotSuppliers = new HashMap<>(); static { acctTradeBotSuppliers.put(BitcoinACCTv1.class, BitcoinACCTv1TradeBot::getInstance); @@ -87,7 +104,12 @@ public class TradeBot implements Listener { private static TradeBot instance; - private final Map presenceTimestampsByAtAddress = Collections.synchronizedMap(new HashMap<>()); + private final Map ourTradePresenceTimestampsByPubkey = Collections.synchronizedMap(new HashMap<>()); + private final List pendingTradePresences = Collections.synchronizedList(new ArrayList<>()); + + private final Map allTradePresencesByPubkey = Collections.synchronizedMap(new HashMap<>()); + private Map safeAllTradePresencesByPubkey = Collections.emptyMap(); + private long nextTradePresenceBroadcastTimestamp = 0L; private TradeBot() { EventBus.INSTANCE.addListener(event -> TradeBot.getInstance().listen(event)); @@ -218,6 +240,8 @@ public class TradeBot implements Listener { return; synchronized (this) { + expireOldPresenceTimestamps(); + List allTradeBotData; try (final Repository repository = RepositoryManager.getRepository()) { @@ -248,6 +272,8 @@ public class TradeBot implements Listener { } catch (ForeignBlockchainException e) { LOGGER.warn(() -> String.format("Foreign blockchain issue processing trade-bot entry for AT %s: %s", tradeBotData.getAtAddress(), e.getMessage())); } + + broadcastPresenceTimestamps(); } } @@ -325,6 +351,33 @@ public class TradeBot implements Listener { } // PRESENCE-related + + public Collection getAllTradePresences() { + return this.safeAllTradePresencesByPubkey.values(); + } + + /** Trade presence timestamps expire in the 'future' so any that reach 'now' have expired and are removed. */ + private void expireOldPresenceTimestamps() { + long now = NTP.getTime(); + + int allRemovedCount = 0; + synchronized (this.allTradePresencesByPubkey) { + int preRemoveCount = this.allTradePresencesByPubkey.size(); + this.allTradePresencesByPubkey.values().removeIf(tradePresenceData -> tradePresenceData.getTimestamp() <= now); + allRemovedCount = this.allTradePresencesByPubkey.size() - preRemoveCount; + } + + int ourRemovedCount = 0; + synchronized (this.ourTradePresenceTimestampsByPubkey) { + int preRemoveCount = this.ourTradePresenceTimestampsByPubkey.size(); + this.ourTradePresenceTimestampsByPubkey.values().removeIf(timestamp -> timestamp < now); + ourRemovedCount = this.ourTradePresenceTimestampsByPubkey.size() - preRemoveCount; + } + + if (allRemovedCount > 0) + LOGGER.debug("Removed {} expired trade presences, of which {} ours", allRemovedCount, ourRemovedCount); + } + /*package*/ void updatePresence(Repository repository, TradeBotData tradeBotData, CrossChainTradeData tradeData) throws DataException { String atAddress = tradeBotData.getAtAddress(); @@ -333,44 +386,292 @@ public class TradeBot implements Listener { String signerAddress = tradeNativeAccount.getAddress(); /* - * There's no point in Alice trying to build a PRESENCE transaction - * for an AT that isn't locked to her, as other peers won't be able - * to validate the PRESENCE transaction as signing public key won't - * be visible. - */ - if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) - // Signer is neither Bob, nor Alice, or trade not yet locked to Alice + * There's no point in Alice trying to broadcast presence for an AT that isn't locked to her, + * as other peers won't be able to verify as signing public key isn't yet in the AT's data segment. + */ + if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) { + // Signer is neither Bob, nor trade locked to Alice + LOGGER.trace("Can't provide trade presence for our AT {} as it's not yet locked to Alice", atAddress); return; + } long now = NTP.getTime(); - long threshold = now - PresenceType.TRADE_BOT.getLifetime(); + long newExpiry = generateExpiry(now); + ByteArray pubkeyByteArray = ByteArray.of(tradeNativeAccount.getPublicKey()); - long timestamp = presenceTimestampsByAtAddress.compute(atAddress, (k, v) -> (v == null || v < threshold) ? now : v); + // If map entry's timestamp is missing, or within early renewal period, use the new expiry - otherwise use existing timestamp. + synchronized (this.ourTradePresenceTimestampsByPubkey) { + Long currentTimestamp = this.ourTradePresenceTimestampsByPubkey.get(pubkeyByteArray); - // If timestamp hasn't been updated then nothing to do - if (timestamp != now) + if (currentTimestamp != null && currentTimestamp - now > EARLY_RENEWAL_PERIOD) { + // timestamp still good + LOGGER.trace("Current trade presence timestamp {} still good for our trade {}", currentTimestamp, atAddress); + return; + } + + this.ourTradePresenceTimestampsByPubkey.put(pubkeyByteArray, newExpiry); + } + + // Create signature + byte[] signature = tradeNativeAccount.sign(Longs.toByteArray(newExpiry)); + + // Add new trade presence to queue to be broadcast around network + TradePresenceData tradePresenceData = new TradePresenceData(newExpiry, tradeNativeAccount.getPublicKey(), signature, atAddress); + this.pendingTradePresences.add(tradePresenceData); + + this.allTradePresencesByPubkey.put(pubkeyByteArray, tradePresenceData); + rebuildSafeAllTradePresences(); + + LOGGER.trace("New trade presence timestamp {} for our trade {}", newExpiry, atAddress); + + EventBus.INSTANCE.notify(new TradePresenceEvent(tradePresenceData)); + } + + private void rebuildSafeAllTradePresences() { + synchronized (this.allTradePresencesByPubkey) { + // Collect into a *new* unmodifiable map. + this.safeAllTradePresencesByPubkey = Map.copyOf(this.allTradePresencesByPubkey); + } + } + + private void broadcastPresenceTimestamps() { + // If we have new trade presences that are pending broadcast, send those as a priority + if (!this.pendingTradePresences.isEmpty()) { + // Create a copy for Network to safely use in another thread + List safeTradePresences; + synchronized (this.pendingTradePresences) { + safeTradePresences = List.copyOf(this.pendingTradePresences); + this.pendingTradePresences.clear(); + } + + LOGGER.debug("Broadcasting {} new trade presences", safeTradePresences.size()); + + TradePresencesMessage tradePresencesMessage = new TradePresencesMessage(safeTradePresences); + Network.getInstance().broadcast(peer -> tradePresencesMessage); + + return; + } + + // As we have no new trade presences, check whether it's time to do a general broadcast + Long now = NTP.getTime(); + if (now == null || now < nextTradePresenceBroadcastTimestamp) return; - int txGroupId = Group.NO_GROUP; - byte[] reference = new byte[TransactionTransformer.SIGNATURE_LENGTH]; - byte[] creatorPublicKey = tradeNativeAccount.getPublicKey(); - long fee = 0L; + nextTradePresenceBroadcastTimestamp = now + PRESENCE_BROADCAST_INTERVAL; - BaseTransactionData baseTransactionData = new BaseTransactionData(timestamp, txGroupId, reference, creatorPublicKey, fee, null); + List safeTradePresences = List.copyOf(this.safeAllTradePresencesByPubkey.values()); - int nonce = 0; - byte[] timestampSignature = tradeNativeAccount.sign(Longs.toByteArray(timestamp)); + if (safeTradePresences.isEmpty()) + return; - PresenceTransactionData transactionData = new PresenceTransactionData(baseTransactionData, nonce, PresenceType.TRADE_BOT, timestampSignature); + LOGGER.debug("Broadcasting all {} known trade presences. Next broadcast timestamp: {}", + safeTradePresences.size(), nextTradePresenceBroadcastTimestamp + ); - PresenceTransaction presenceTransaction = new PresenceTransaction(repository, transactionData); - presenceTransaction.computeNonce(); + GetTradePresencesMessage getTradePresencesMessage = new GetTradePresencesMessage(safeTradePresences); + Network.getInstance().broadcast(peer -> getTradePresencesMessage); + } - presenceTransaction.sign(tradeNativeAccount); + // Network message processing - ValidationResult result = presenceTransaction.importAsUnconfirmed(); - if (result != ValidationResult.OK) - LOGGER.debug(() -> String.format("Unable to build trade-bot PRESENCE transaction for %s: %s", tradeBotData.getAtAddress(), result.name())); + public void onGetTradePresencesMessage(Peer peer, Message message) { + GetTradePresencesMessage getTradePresencesMessage = (GetTradePresencesMessage) message; + + List peersTradePresences = getTradePresencesMessage.getTradePresences(); + + // Create mutable copy from safe snapshot + Map entriesUnknownToPeer = new HashMap<>(this.safeAllTradePresencesByPubkey); + int knownCount = entriesUnknownToPeer.size(); + + for (TradePresenceData peersTradePresence : peersTradePresences) { + ByteArray pubkeyByteArray = ByteArray.of(peersTradePresence.getPublicKey()); + + TradePresenceData ourEntry = entriesUnknownToPeer.get(pubkeyByteArray); + + if (ourEntry != null && ourEntry.getTimestamp() == peersTradePresence.getTimestamp()) + entriesUnknownToPeer.remove(pubkeyByteArray); + } + + if (entriesUnknownToPeer.isEmpty()) + return; + + LOGGER.debug("Sending {} trade presences to peer {} after excluding their {} from known {}", + entriesUnknownToPeer.size(), peer, peersTradePresences.size(), knownCount + ); + + // Send complement to peer + List safeTradePresences = List.copyOf(entriesUnknownToPeer.values()); + Message responseMessage = new TradePresencesMessage(safeTradePresences); + if (!peer.sendMessage(responseMessage)) { + peer.disconnect("failed to send TRADE_PRESENCES response"); + return; + } + } + + public void onTradePresencesMessage(Peer peer, Message message) { + TradePresencesMessage tradePresencesMessage = (TradePresencesMessage) message; + + List peersTradePresences = tradePresencesMessage.getTradePresences(); + + long now = NTP.getTime(); + // Timestamps before this are too far into the past + long pastThreshold = now; + // Timestamps after this are too far into the future + long futureThreshold = now + PRESENCE_LIFETIME; + + Map> acctSuppliersByCodeHash = SupportedBlockchain.getAcctMap(); + + int newCount = 0; + + try (final Repository repository = RepositoryManager.getRepository()) { + for (TradePresenceData peersTradePresence : peersTradePresences) { + long timestamp = peersTradePresence.getTimestamp(); + + // Ignore if timestamp is out of bounds + if (timestamp < pastThreshold || timestamp > futureThreshold) { + if (timestamp < pastThreshold) + LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too old vs {}", + peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold + ); + else + LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too new vs {}", + peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold + ); + + continue; + } + + ByteArray pubkeyByteArray = ByteArray.of(peersTradePresence.getPublicKey()); + + // Ignore if we've previously verified this timestamp+publickey combo or sent timestamp is older + TradePresenceData existingTradeData = this.safeAllTradePresencesByPubkey.get(pubkeyByteArray); + if (existingTradeData != null && timestamp <= existingTradeData.getTimestamp()) { + if (timestamp == existingTradeData.getTimestamp()) + LOGGER.trace("Ignoring trade presence {} from peer {} as we have verified timestamp {} before", + peersTradePresence.getAtAddress(), peer, timestamp + ); + else + LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is older than latest {}", + peersTradePresence.getAtAddress(), peer, timestamp, existingTradeData.getTimestamp() + ); + + continue; + } + + // Check timestamp signature + byte[] timestampSignature = peersTradePresence.getSignature(); + byte[] timestampBytes = Longs.toByteArray(timestamp); + byte[] publicKey = peersTradePresence.getPublicKey(); + if (!Crypto.verify(publicKey, timestampSignature, timestampBytes)) { + LOGGER.trace("Ignoring trade presence {} from peer {} as signature failed to verify", + peersTradePresence.getAtAddress(), peer + ); + + continue; + } + + ATData atData = repository.getATRepository().fromATAddress(peersTradePresence.getAtAddress()); + if (atData == null || atData.getIsFrozen() || atData.getIsFinished()) { + if (atData == null) + LOGGER.trace("Ignoring trade presence {} from peer {} as AT doesn't exist", + peersTradePresence.getAtAddress(), peer + ); + else + LOGGER.trace("Ignoring trade presence {} from peer {} as AT is frozen or finished", + peersTradePresence.getAtAddress(), peer + ); + + continue; + } + + ByteArray atCodeHash = new ByteArray(atData.getCodeHash()); + Supplier acctSupplier = acctSuppliersByCodeHash.get(atCodeHash); + if (acctSupplier == null) { + LOGGER.trace("Ignoring trade presence {} from peer {} as AT isn't a known ACCT?", + peersTradePresence.getAtAddress(), peer + ); + + continue; + } + + CrossChainTradeData tradeData = acctSupplier.get().populateTradeData(repository, atData); + if (tradeData == null) { + LOGGER.trace("Ignoring trade presence {} from peer {} as trade data not found?", + peersTradePresence.getAtAddress(), peer + ); + + continue; + } + + // Convert signer's public key to address form + String signerAddress = peersTradePresence.getTradeAddress(); + + // Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form) + if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) { + LOGGER.trace("Ignoring trade presence {} from peer {} as signer isn't Alice or Bob?", + peersTradePresence.getAtAddress(), peer + ); + + continue; + } + + // This is new to us + this.allTradePresencesByPubkey.put(pubkeyByteArray, peersTradePresence); + ++newCount; + + LOGGER.trace("Added trade presence {} from peer {} with timestamp {}", + peersTradePresence.getAtAddress(), peer, timestamp + ); + + EventBus.INSTANCE.notify(new TradePresenceEvent(peersTradePresence)); + } + } catch (DataException e) { + LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e); + } + + if (newCount > 0) { + LOGGER.debug("New trade presences: {}", newCount); + rebuildSafeAllTradePresences(); + } + } + + public void bridgePresence(long timestamp, byte[] publicKey, byte[] signature, String atAddress) { + long expiry = generateExpiry(timestamp); + ByteArray pubkeyByteArray = ByteArray.of(publicKey); + + TradePresenceData fakeTradePresenceData = new TradePresenceData(expiry, publicKey, signature, atAddress); + + // Only bridge if trade presence expiry timestamp is newer + TradePresenceData computedTradePresenceData = this.allTradePresencesByPubkey.compute(pubkeyByteArray, (k, v) -> + v == null || v.getTimestamp() < expiry ? fakeTradePresenceData : v + ); + + if (computedTradePresenceData == fakeTradePresenceData) { + LOGGER.trace("Bridged PRESENCE transaction for trade {} with timestamp {}", atAddress, expiry); + rebuildSafeAllTradePresences(); + + EventBus.INSTANCE.notify(new TradePresenceEvent(fakeTradePresenceData)); + } + } + + /** Decorates a CrossChainTradeData object with Alice / Bob trade-bot presence timestamp, if available. */ + public void decorateTradeDataWithPresence(CrossChainTradeData crossChainTradeData) { + // Match by AT address, then check for Bob vs Alice + this.safeAllTradePresencesByPubkey.values().stream() + .filter(tradePresenceData -> tradePresenceData.getAtAddress().equals(crossChainTradeData.qortalAtAddress)) + .forEach(tradePresenceData -> { + String signerAddress = tradePresenceData.getTradeAddress(); + + // Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form) + if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress)) + crossChainTradeData.creatorPresenceExpiry = tradePresenceData.getTimestamp(); + else if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress)) + crossChainTradeData.partnerPresenceExpiry = tradePresenceData.getTimestamp(); + }); + } + + private long generateExpiry(long timestamp) { + return ((timestamp - 1) / EXPIRY_ROUNDING) * EXPIRY_ROUNDING + PRESENCE_LIFETIME; } } diff --git a/src/main/java/org/qortal/data/crosschain/CrossChainTradeData.java b/src/main/java/org/qortal/data/crosschain/CrossChainTradeData.java index 69250e54..edc115b6 100644 --- a/src/main/java/org/qortal/data/crosschain/CrossChainTradeData.java +++ b/src/main/java/org/qortal/data/crosschain/CrossChainTradeData.java @@ -94,6 +94,12 @@ public class CrossChainTradeData { public String acctName; + @Schema(description = "Timestamp when AT creator's trade-bot presence expires") + public Long creatorPresenceExpiry; + + @Schema(description = "Timestamp when trade partner's trade-bot presence expires") + public Long partnerPresenceExpiry; + // Constructors // Necessary for JAXB diff --git a/src/main/java/org/qortal/data/network/TradePresenceData.java b/src/main/java/org/qortal/data/network/TradePresenceData.java new file mode 100644 index 00000000..9bd9ce29 --- /dev/null +++ b/src/main/java/org/qortal/data/network/TradePresenceData.java @@ -0,0 +1,114 @@ +package org.qortal.data.network; + +import org.qortal.crypto.Crypto; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import java.util.Arrays; + +// All properties to be converted to JSON via JAXB +@XmlAccessorType(XmlAccessType.FIELD) +public class TradePresenceData { + + protected long timestamp; + + @XmlJavaTypeAdapter( + type = byte[].class, + value = org.qortal.api.Base58TypeAdapter.class + ) + protected byte[] publicKey; // Could be BOB's or ALICE's + + // No need to send this via websocket / API + @XmlTransient + protected byte[] signature; // Not always present + + protected String atAddress; // Not always present + + // Have JAXB use getter instead + @XmlTransient + protected String tradeAddress; // Lazily instantiated + + // Constructors + + // necessary for JAXB serialization + protected TradePresenceData() { + } + + public TradePresenceData(long timestamp, byte[] publicKey, byte[] signature, String atAddress) { + this.timestamp = timestamp; + this.publicKey = publicKey; + this.signature = signature; + this.atAddress = atAddress; + } + + public TradePresenceData(long timestamp, byte[] publicKey) { + this(timestamp, publicKey, null, null); + } + + public long getTimestamp() { + return this.timestamp; + } + + public byte[] getPublicKey() { + return this.publicKey; + } + + public byte[] getSignature() { + return this.signature; + } + + public String getAtAddress() { + return this.atAddress; + } + + // Probably doesn't need synchronization + @XmlElement + public String getTradeAddress() { + if (tradeAddress != null) + return tradeAddress; + + tradeAddress = Crypto.toAddress(this.publicKey); + return tradeAddress; + } + + // Comparison + + @Override + public boolean equals(Object other) { + if (other == this) + return true; + + if (!(other instanceof TradePresenceData)) + return false; + + TradePresenceData otherTradePresenceData = (TradePresenceData) other; + + // Very quick comparison + if (otherTradePresenceData.timestamp != this.timestamp) + return false; + + if (!Arrays.equals(otherTradePresenceData.publicKey, this.publicKey)) + return false; + + if (otherTradePresenceData.atAddress != null && !otherTradePresenceData.atAddress.equals(this.atAddress)) + return false; + + if (this.atAddress != null && !this.atAddress.equals(otherTradePresenceData.atAddress)) + return false; + + if (!Arrays.equals(otherTradePresenceData.signature, this.signature)) + return false; + + return true; + } + + @Override + public int hashCode() { + // Pretty lazy implementation + return (int) this.timestamp; + } + +} diff --git a/src/main/java/org/qortal/network/message/GetTradePresencesMessage.java b/src/main/java/org/qortal/network/message/GetTradePresencesMessage.java new file mode 100644 index 00000000..d9be3c1b --- /dev/null +++ b/src/main/java/org/qortal/network/message/GetTradePresencesMessage.java @@ -0,0 +1,110 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.qortal.data.network.TradePresenceData; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * For requesting trade presences from remote peer, given our list of known trade presences. + * + * Groups of: number of entries, timestamp, then AT trade pubkey for each entry. + */ +public class GetTradePresencesMessage extends Message { + private List tradePresences; + private byte[] cachedData; + + public GetTradePresencesMessage(List tradePresences) { + this(-1, tradePresences); + } + + private GetTradePresencesMessage(int id, List tradePresences) { + super(id, MessageType.GET_TRADE_PRESENCES); + + this.tradePresences = tradePresences; + } + + public List getTradePresences() { + return this.tradePresences; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int groupedEntriesCount = bytes.getInt(); + + List tradePresences = new ArrayList<>(groupedEntriesCount); + + while (groupedEntriesCount > 0) { + long timestamp = bytes.getLong(); + + for (int i = 0; i < groupedEntriesCount; ++i) { + byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(publicKey); + + tradePresences.add(new TradePresenceData(timestamp, publicKey)); + } + + if (bytes.hasRemaining()) { + groupedEntriesCount = bytes.getInt(); + } else { + // we've finished + groupedEntriesCount = 0; + } + } + + return new GetTradePresencesMessage(id, tradePresences); + } + + @Override + protected synchronized byte[] toData() { + if (this.cachedData != null) + return this.cachedData; + + // Shortcut in case we have no trade presences + if (this.tradePresences.isEmpty()) { + this.cachedData = Ints.toByteArray(0); + return this.cachedData; + } + + // How many of each timestamp + Map countByTimestamp = new HashMap<>(); + + for (TradePresenceData tradePresenceData : this.tradePresences) { + Long timestamp = tradePresenceData.getTimestamp(); + countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); + } + + // We should know exactly how many bytes to allocate now + int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) + + this.tradePresences.size() * Transformer.PUBLIC_KEY_LENGTH; + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); + + for (long timestamp : countByTimestamp.keySet()) { + bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp))); + + bytes.write(Longs.toByteArray(timestamp)); + + for (TradePresenceData tradePresenceData : this.tradePresences) { + if (tradePresenceData.getTimestamp() == timestamp) + bytes.write(tradePresenceData.getPublicKey()); + } + } + + this.cachedData = bytes.toByteArray(); + return this.cachedData; + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index 6c89a0dd..d119725d 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -93,7 +93,11 @@ public abstract class Message { ARBITRARY_DATA_FILE_LIST(120), GET_ARBITRARY_DATA_FILE_LIST(121), - ARBITRARY_SIGNATURES(130); + ARBITRARY_SIGNATURES(130), + + TRADE_PRESENCES(140), + GET_TRADE_PRESENCES(141), + ; public final int value; public final Method fromByteBufferMethod; diff --git a/src/main/java/org/qortal/network/message/TradePresencesMessage.java b/src/main/java/org/qortal/network/message/TradePresencesMessage.java new file mode 100644 index 00000000..9d846722 --- /dev/null +++ b/src/main/java/org/qortal/network/message/TradePresencesMessage.java @@ -0,0 +1,123 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.qortal.data.network.TradePresenceData; +import org.qortal.transform.Transformer; +import org.qortal.utils.Base58; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * For sending list of trade presences to remote peer. + * + * Groups of: number of entries, timestamp, then pubkey + sig + AT address for each entry. + */ +public class TradePresencesMessage extends Message { + private List tradePresences; + private byte[] cachedData; + + public TradePresencesMessage(List tradePresences) { + this(-1, tradePresences); + } + + private TradePresencesMessage(int id, List tradePresences) { + super(id, MessageType.TRADE_PRESENCES); + + this.tradePresences = tradePresences; + } + + public List getTradePresences() { + return this.tradePresences; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int groupedEntriesCount = bytes.getInt(); + + List tradePresences = new ArrayList<>(groupedEntriesCount); + + while (groupedEntriesCount > 0) { + long timestamp = bytes.getLong(); + + for (int i = 0; i < groupedEntriesCount; ++i) { + byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(publicKey); + + byte[] signature = new byte[Transformer.SIGNATURE_LENGTH]; + bytes.get(signature); + + byte[] atAddressBytes = new byte[Transformer.ADDRESS_LENGTH]; + bytes.get(atAddressBytes); + String atAddress = Base58.encode(atAddressBytes); + + tradePresences.add(new TradePresenceData(timestamp, publicKey, signature, atAddress)); + } + + if (bytes.hasRemaining()) { + groupedEntriesCount = bytes.getInt(); + } else { + // we've finished + groupedEntriesCount = 0; + } + } + + return new TradePresencesMessage(id, tradePresences); + } + + @Override + protected synchronized byte[] toData() { + if (this.cachedData != null) + return this.cachedData; + + // Shortcut in case we have no trade presences + if (this.tradePresences.isEmpty()) { + this.cachedData = Ints.toByteArray(0); + return this.cachedData; + } + + // How many of each timestamp + Map countByTimestamp = new HashMap<>(); + + for (TradePresenceData tradePresenceData : this.tradePresences) { + Long timestamp = tradePresenceData.getTimestamp(); + countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); + } + + // We should know exactly how many bytes to allocate now + int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) + + this.tradePresences.size() * (Transformer.PUBLIC_KEY_LENGTH + Transformer.SIGNATURE_LENGTH + Transformer.ADDRESS_LENGTH); + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); + + for (long timestamp : countByTimestamp.keySet()) { + bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp))); + + bytes.write(Longs.toByteArray(timestamp)); + + for (TradePresenceData tradePresenceData : this.tradePresences) { + if (tradePresenceData.getTimestamp() == timestamp) { + bytes.write(tradePresenceData.getPublicKey()); + + bytes.write(tradePresenceData.getSignature()); + + bytes.write(Base58.decode(tradePresenceData.getAtAddress())); + } + } + } + + this.cachedData = bytes.toByteArray(); + return this.cachedData; + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qortal/transaction/PresenceTransaction.java b/src/main/java/org/qortal/transaction/PresenceTransaction.java index 0d28d382..566c6979 100644 --- a/src/main/java/org/qortal/transaction/PresenceTransaction.java +++ b/src/main/java/org/qortal/transaction/PresenceTransaction.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.account.Account; import org.qortal.controller.Controller; +import org.qortal.controller.tradebot.TradeBot; import org.qortal.crosschain.ACCT; import org.qortal.crosschain.SupportedBlockchain; import org.qortal.crypto.Crypto; @@ -191,12 +192,16 @@ public class PresenceTransaction extends Transaction { CrossChainTradeData crossChainTradeData = acctSupplier.get().populateTradeData(repository, atData); // OK if signer's public key (in address form) matches Bob's trade public key (in address form) - if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress)) + if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress)) { + TradeBot.getInstance().bridgePresence(this.presenceTransactionData.getTimestamp(), this.transactionData.getCreatorPublicKey(), timestampSignature, atData.getATAddress()); return ValidationResult.OK; + } // OK if signer's public key (in address form) matches Alice's trade public key (in address form) - if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress)) + if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress)) { + TradeBot.getInstance().bridgePresence(this.presenceTransactionData.getTimestamp(), this.transactionData.getCreatorPublicKey(), timestampSignature, atData.getATAddress()); return ValidationResult.OK; + } } return ValidationResult.AT_UNKNOWN; diff --git a/src/test/java/org/qortal/test/crosschain/TradeBotPresenceTests.java b/src/test/java/org/qortal/test/crosschain/TradeBotPresenceTests.java new file mode 100644 index 00000000..c60a046b --- /dev/null +++ b/src/test/java/org/qortal/test/crosschain/TradeBotPresenceTests.java @@ -0,0 +1,112 @@ +package org.qortal.test.crosschain; + +import org.junit.Test; +import org.qortal.utils.ByteArray; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class TradeBotPresenceTests { + + public static final long ROUNDING = 15 * 60 * 1000L; // to nearest X mins + public static final long LIFETIME = 30 * 60 * 1000L; // lifetime: X mins + public static final long EARLY_RENEWAL_LIFETIME = 5 * 60 * 1000L; // X mins before expiry + public static final long CHECK_INTERVAL = 5 * 60 * 1000L; // X mins + public static final long MAX_TIMESTAMP = 100 * 60 * 1000L; // run tests for X mins + + // We want to generate timestamps that expire 30 mins into the future, but also round to nearest X min? + // We want to regenerate timestamps early (e.g. 15 mins before expiry) to allow for network propagation + + // We want to keep the latest timestamp for any given public key + // We want to reject out-of-bound timestamps from peers (>30 mins into future, not now/past) + + // We want to make sure that we don't incorrectly delete an entry at 15-min and 30-min boundaries + + @Test + public void testGeneratedExpiryTimestamps() { + for (long timestamp = 0; timestamp <= MAX_TIMESTAMP; timestamp += CHECK_INTERVAL) { + long expiry = generateExpiry(timestamp); + + System.out.println(String.format("time: % 3dm, expiry: % 3dm", + timestamp / 60_000L, + expiry / 60_000L + )); + } + } + + @Test + public void testEarlyRenewal() { + Long currentExpiry = null; + + for (long timestamp = 0; timestamp <= MAX_TIMESTAMP; timestamp += CHECK_INTERVAL) { + long newExpiry = generateExpiry(timestamp); + + if (currentExpiry == null || currentExpiry - timestamp <= EARLY_RENEWAL_LIFETIME) { + currentExpiry = newExpiry; + } + + System.out.println(String.format("time: % 3dm, expiry: % 3dm", + timestamp / 60_000L, + currentExpiry / 60_000L + )); + } + } + + @Test + public void testEnforceLatestTimestamp() { + ByteArray pubkeyByteArray = ByteArray.of("publickey".getBytes(StandardCharsets.UTF_8)); + + Map timestampsByPublicKey = new HashMap<>(); + + // Working backwards this time + for (long timestamp = MAX_TIMESTAMP; timestamp >= 0; timestamp -= CHECK_INTERVAL){ + long newExpiry = generateExpiry(timestamp); + + timestampsByPublicKey.compute(pubkeyByteArray, (k, v) -> + v == null || v < newExpiry ? newExpiry : v + ); + + Long currentExpiry = timestampsByPublicKey.get(pubkeyByteArray); + + System.out.println(String.format("time: % 3dm, expiry: % 3dm", + timestamp / 60_000L, + currentExpiry / 60_000L + )); + } + } + + @Test + public void testEnforcePeerExpiryBounds() { + System.out.println(String.format("%40s", "Our time")); + + for (long ourTimestamp = 0; ourTimestamp <= MAX_TIMESTAMP; ourTimestamp += CHECK_INTERVAL) { + System.out.print(String.format("%s% 3dm ", + ourTimestamp != 0 ? "| " : " ", + ourTimestamp / 60_000L + )); + } + System.out.println(); + + for (long peerTimestamp = 0; peerTimestamp <= MAX_TIMESTAMP; peerTimestamp += CHECK_INTERVAL) { + System.out.print(String.format("% 4dm ", peerTimestamp / 60_000L)); + + for (long ourTimestamp = 0; ourTimestamp <= MAX_TIMESTAMP; ourTimestamp += CHECK_INTERVAL) { + System.out.print(String.format("| %s ", + isPeerExpiryValid(ourTimestamp, peerTimestamp) ? "✔" : "✘" + )); + } + System.out.println(); + } + + System.out.println("Peer's expiry time"); + } + + private long generateExpiry(long timestamp) { + return ((timestamp - 1) / ROUNDING) * ROUNDING + LIFETIME; + } + + private boolean isPeerExpiryValid(long nowTimestamp, long peerExpiry) { + return peerExpiry > nowTimestamp && peerExpiry <= LIFETIME + nowTimestamp; + } +}