From 1d59feeb7216d958507e7786d29edb9ac5d8dc19 Mon Sep 17 00:00:00 2001 From: catbref Date: Wed, 23 Feb 2022 21:53:48 +0000 Subject: [PATCH] Created /websockets/crosschain/tradepresence to replace /websockets/presence --- src/main/java/org/qortal/api/ApiService.java | 11 +- .../api/websocket/TradePresenceWebSocket.java | 137 ++++++++++++++++++ .../qortal/controller/tradebot/TradeBot.java | 20 +++ .../data/network/TradePresenceData.java | 4 +- 4 files changed, 164 insertions(+), 8 deletions(-) create mode 100644 src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java diff --git a/src/main/java/org/qortal/api/ApiService.java b/src/main/java/org/qortal/api/ApiService.java index 697543c7..78c9250c 100644 --- a/src/main/java/org/qortal/api/ApiService.java +++ b/src/main/java/org/qortal/api/ApiService.java @@ -40,13 +40,7 @@ import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; import org.qortal.api.resource.AnnotationPostProcessor; import org.qortal.api.resource.ApiDefinition; -import org.qortal.api.websocket.ActiveChatsWebSocket; -import org.qortal.api.websocket.AdminStatusWebSocket; -import org.qortal.api.websocket.BlocksWebSocket; -import org.qortal.api.websocket.ChatMessagesWebSocket; -import org.qortal.api.websocket.PresenceWebSocket; -import org.qortal.api.websocket.TradeBotWebSocket; -import org.qortal.api.websocket.TradeOffersWebSocket; +import org.qortal.api.websocket.*; import org.qortal.settings.Settings; public class ApiService { @@ -212,6 +206,9 @@ public class ApiService { context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages"); context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers"); context.addServlet(TradeBotWebSocket.class, "/websockets/crosschain/tradebot"); + context.addServlet(TradePresenceWebSocket.class, "/websockets/crosschain/tradepresence"); + + // Deprecated context.addServlet(PresenceWebSocket.class, "/websockets/presence"); // Start server diff --git a/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java b/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java new file mode 100644 index 00000000..808accbf --- /dev/null +++ b/src/main/java/org/qortal/api/websocket/TradePresenceWebSocket.java @@ -0,0 +1,137 @@ +package org.qortal.api.websocket; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.*; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.qortal.controller.Controller; +import org.qortal.controller.tradebot.TradeBot; +import org.qortal.data.network.TradePresenceData; +import org.qortal.event.Event; +import org.qortal.event.EventBus; +import org.qortal.event.Listener; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.*; + +@WebSocket +@SuppressWarnings("serial") +public class TradePresenceWebSocket extends ApiWebSocket implements Listener { + + /** Map key is public key in base58, map value is trade presence */ + private static final Map currentEntries = Collections.synchronizedMap(new HashMap<>()); + + @Override + public void configure(WebSocketServletFactory factory) { + factory.register(TradePresenceWebSocket.class); + + populateCurrentInfo(); + + EventBus.INSTANCE.addListener(this::listen); + } + + @Override + public void listen(Event event) { + // XXX - Suggest we change this to something like Synchronizer.NewChainTipEvent? + // We use NewBlockEvent as a proxy for 1-minute timer + if (!(event instanceof TradeBot.TradePresenceEvent) && !(event instanceof Controller.NewBlockEvent)) + return; + + removeOldEntries(); + + if (event instanceof Controller.NewBlockEvent) + // We only wanted a chance to cull old entries + return; + + TradePresenceData tradePresence = ((TradeBot.TradePresenceEvent) event).getTradePresenceData(); + + boolean somethingChanged = mergePresence(tradePresence); + + if (!somethingChanged) + // nothing changed + return; + + List tradePresences = Collections.singletonList(tradePresence); + + // Notify sessions + for (Session session : getSessions()) { + sendTradePresences(session, tradePresences); + } + } + + @OnWebSocketConnect + @Override + public void onWebSocketConnect(Session session) { + Map> queryParams = session.getUpgradeRequest().getParameterMap(); + + List tradePresences; + + synchronized (currentEntries) { + tradePresences = List.copyOf(currentEntries.values()); + } + + if (!sendTradePresences(session, tradePresences)) { + session.close(4002, "websocket issue"); + return; + } + + super.onWebSocketConnect(session); + } + + @OnWebSocketClose + @Override + public void onWebSocketClose(Session session, int statusCode, String reason) { + // clean up + super.onWebSocketClose(session, statusCode, reason); + } + + @OnWebSocketError + public void onWebSocketError(Session session, Throwable throwable) { + /* ignored */ + } + + @OnWebSocketMessage + public void onWebSocketMessage(Session session, String message) { + /* ignored */ + } + + private boolean sendTradePresences(Session session, List tradePresences) { + try { + StringWriter stringWriter = new StringWriter(); + marshall(stringWriter, tradePresences); + + String output = stringWriter.toString(); + session.getRemote().sendStringByFuture(output); + } catch (IOException e) { + // No output this time? + return false; + } + + return true; + } + + private static void populateCurrentInfo() { + // We want ALL trade presences + TradeBot.getInstance().getAllTradePresences().stream() + .forEach(TradePresenceWebSocket::mergePresence); + } + + /** Merge trade presence into cache of current entries, returns true if cache was updated. */ + private static boolean mergePresence(TradePresenceData tradePresence) { + // Put/replace for this publickey making sure we keep newest timestamp + String pubKey58 = Base58.encode(tradePresence.getPublicKey()); + + TradePresenceData newEntry = currentEntries.compute(pubKey58, (k, v) -> v == null || v.getTimestamp() < tradePresence.getTimestamp() ? tradePresence : v); + + return newEntry != tradePresence; + } + + private static void removeOldEntries() { + long now = NTP.getTime(); + + currentEntries.values().removeIf(v -> v.getTimestamp() < now); + } + +} diff --git a/src/main/java/org/qortal/controller/tradebot/TradeBot.java b/src/main/java/org/qortal/controller/tradebot/TradeBot.java index 4caeeca5..bdedf831 100644 --- a/src/main/java/org/qortal/controller/tradebot/TradeBot.java +++ b/src/main/java/org/qortal/controller/tradebot/TradeBot.java @@ -79,6 +79,18 @@ public class TradeBot implements Listener { } } + public static class TradePresenceEvent implements Event { + private final TradePresenceData tradePresenceData; + + public TradePresenceEvent(TradePresenceData tradePresenceData) { + this.tradePresenceData = tradePresenceData; + } + + public TradePresenceData getTradePresenceData() { + return this.tradePresenceData; + } + } + private static final Map, Supplier> acctTradeBotSuppliers = new HashMap<>(); static { acctTradeBotSuppliers.put(BitcoinACCTv1.class, BitcoinACCTv1TradeBot::getInstance); @@ -340,6 +352,10 @@ public class TradeBot implements Listener { // PRESENCE-related + public Collection getAllTradePresences() { + return this.safeAllTradePresencesByPubkey.values(); + } + /** Trade presence timestamps expire in the 'future' so any that reach 'now' have expired and are removed. */ private void expireOldPresenceTimestamps() { long now = NTP.getTime(); @@ -407,6 +423,8 @@ public class TradeBot implements Listener { rebuildSafeAllTradePresences(); LOGGER.trace("New trade presence timestamp {} for our trade {}", newExpiry, atAddress); + + EventBus.INSTANCE.notify(new TradePresenceEvent(tradePresenceData)); } private void rebuildSafeAllTradePresences() { @@ -604,6 +622,8 @@ public class TradeBot implements Listener { LOGGER.trace("Added trade presence {} from peer {} with timestamp {}", peersTradePresence.getAtAddress(), peer, timestamp ); + + EventBus.INSTANCE.notify(new TradePresenceEvent(peersTradePresence)); } } catch (DataException e) { LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e); diff --git a/src/main/java/org/qortal/data/network/TradePresenceData.java b/src/main/java/org/qortal/data/network/TradePresenceData.java index 089cc742..c1fafa84 100644 --- a/src/main/java/org/qortal/data/network/TradePresenceData.java +++ b/src/main/java/org/qortal/data/network/TradePresenceData.java @@ -4,6 +4,7 @@ import org.qortal.crypto.Crypto; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; import java.util.Arrays; // All properties to be converted to JSON via JAXB @@ -49,7 +50,8 @@ public class TradePresenceData { return this.atAddress; } - // Probably don't need synchronization + // Probably doesn't need synchronization + @XmlElement public String getTradeAddress() { if (tradeAddress != null) return tradeAddress;