Created /websockets/crosschain/tradepresence to replace /websockets/presence

This commit is contained in:
catbref 2022-02-23 21:53:48 +00:00
parent c53dd31765
commit 1d59feeb72
4 changed files with 164 additions and 8 deletions

View File

@ -40,13 +40,7 @@ import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer; import org.glassfish.jersey.servlet.ServletContainer;
import org.qortal.api.resource.AnnotationPostProcessor; import org.qortal.api.resource.AnnotationPostProcessor;
import org.qortal.api.resource.ApiDefinition; import org.qortal.api.resource.ApiDefinition;
import org.qortal.api.websocket.ActiveChatsWebSocket; import org.qortal.api.websocket.*;
import org.qortal.api.websocket.AdminStatusWebSocket;
import org.qortal.api.websocket.BlocksWebSocket;
import org.qortal.api.websocket.ChatMessagesWebSocket;
import org.qortal.api.websocket.PresenceWebSocket;
import org.qortal.api.websocket.TradeBotWebSocket;
import org.qortal.api.websocket.TradeOffersWebSocket;
import org.qortal.settings.Settings; import org.qortal.settings.Settings;
public class ApiService { public class ApiService {
@ -212,6 +206,9 @@ public class ApiService {
context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages"); context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages");
context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers"); context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers");
context.addServlet(TradeBotWebSocket.class, "/websockets/crosschain/tradebot"); context.addServlet(TradeBotWebSocket.class, "/websockets/crosschain/tradebot");
context.addServlet(TradePresenceWebSocket.class, "/websockets/crosschain/tradepresence");
// Deprecated
context.addServlet(PresenceWebSocket.class, "/websockets/presence"); context.addServlet(PresenceWebSocket.class, "/websockets/presence");
// Start server // Start server

View File

@ -0,0 +1,137 @@
package org.qortal.api.websocket;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.*;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.controller.Controller;
import org.qortal.controller.tradebot.TradeBot;
import org.qortal.data.network.TradePresenceData;
import org.qortal.event.Event;
import org.qortal.event.EventBus;
import org.qortal.event.Listener;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import java.io.IOException;
import java.io.StringWriter;
import java.util.*;
@WebSocket
@SuppressWarnings("serial")
public class TradePresenceWebSocket extends ApiWebSocket implements Listener {
/** Map key is public key in base58, map value is trade presence */
private static final Map<String, TradePresenceData> currentEntries = Collections.synchronizedMap(new HashMap<>());
@Override
public void configure(WebSocketServletFactory factory) {
factory.register(TradePresenceWebSocket.class);
populateCurrentInfo();
EventBus.INSTANCE.addListener(this::listen);
}
@Override
public void listen(Event event) {
// XXX - Suggest we change this to something like Synchronizer.NewChainTipEvent?
// We use NewBlockEvent as a proxy for 1-minute timer
if (!(event instanceof TradeBot.TradePresenceEvent) && !(event instanceof Controller.NewBlockEvent))
return;
removeOldEntries();
if (event instanceof Controller.NewBlockEvent)
// We only wanted a chance to cull old entries
return;
TradePresenceData tradePresence = ((TradeBot.TradePresenceEvent) event).getTradePresenceData();
boolean somethingChanged = mergePresence(tradePresence);
if (!somethingChanged)
// nothing changed
return;
List<TradePresenceData> tradePresences = Collections.singletonList(tradePresence);
// Notify sessions
for (Session session : getSessions()) {
sendTradePresences(session, tradePresences);
}
}
@OnWebSocketConnect
@Override
public void onWebSocketConnect(Session session) {
Map<String, List<String>> queryParams = session.getUpgradeRequest().getParameterMap();
List<TradePresenceData> tradePresences;
synchronized (currentEntries) {
tradePresences = List.copyOf(currentEntries.values());
}
if (!sendTradePresences(session, tradePresences)) {
session.close(4002, "websocket issue");
return;
}
super.onWebSocketConnect(session);
}
@OnWebSocketClose
@Override
public void onWebSocketClose(Session session, int statusCode, String reason) {
// clean up
super.onWebSocketClose(session, statusCode, reason);
}
@OnWebSocketError
public void onWebSocketError(Session session, Throwable throwable) {
/* ignored */
}
@OnWebSocketMessage
public void onWebSocketMessage(Session session, String message) {
/* ignored */
}
private boolean sendTradePresences(Session session, List<TradePresenceData> tradePresences) {
try {
StringWriter stringWriter = new StringWriter();
marshall(stringWriter, tradePresences);
String output = stringWriter.toString();
session.getRemote().sendStringByFuture(output);
} catch (IOException e) {
// No output this time?
return false;
}
return true;
}
private static void populateCurrentInfo() {
// We want ALL trade presences
TradeBot.getInstance().getAllTradePresences().stream()
.forEach(TradePresenceWebSocket::mergePresence);
}
/** Merge trade presence into cache of current entries, returns true if cache was updated. */
private static boolean mergePresence(TradePresenceData tradePresence) {
// Put/replace for this publickey making sure we keep newest timestamp
String pubKey58 = Base58.encode(tradePresence.getPublicKey());
TradePresenceData newEntry = currentEntries.compute(pubKey58, (k, v) -> v == null || v.getTimestamp() < tradePresence.getTimestamp() ? tradePresence : v);
return newEntry != tradePresence;
}
private static void removeOldEntries() {
long now = NTP.getTime();
currentEntries.values().removeIf(v -> v.getTimestamp() < now);
}
}

View File

@ -79,6 +79,18 @@ public class TradeBot implements Listener {
} }
} }
public static class TradePresenceEvent implements Event {
private final TradePresenceData tradePresenceData;
public TradePresenceEvent(TradePresenceData tradePresenceData) {
this.tradePresenceData = tradePresenceData;
}
public TradePresenceData getTradePresenceData() {
return this.tradePresenceData;
}
}
private static final Map<Class<? extends ACCT>, Supplier<AcctTradeBot>> acctTradeBotSuppliers = new HashMap<>(); private static final Map<Class<? extends ACCT>, Supplier<AcctTradeBot>> acctTradeBotSuppliers = new HashMap<>();
static { static {
acctTradeBotSuppliers.put(BitcoinACCTv1.class, BitcoinACCTv1TradeBot::getInstance); acctTradeBotSuppliers.put(BitcoinACCTv1.class, BitcoinACCTv1TradeBot::getInstance);
@ -340,6 +352,10 @@ public class TradeBot implements Listener {
// PRESENCE-related // PRESENCE-related
public Collection<TradePresenceData> getAllTradePresences() {
return this.safeAllTradePresencesByPubkey.values();
}
/** Trade presence timestamps expire in the 'future' so any that reach 'now' have expired and are removed. */ /** Trade presence timestamps expire in the 'future' so any that reach 'now' have expired and are removed. */
private void expireOldPresenceTimestamps() { private void expireOldPresenceTimestamps() {
long now = NTP.getTime(); long now = NTP.getTime();
@ -407,6 +423,8 @@ public class TradeBot implements Listener {
rebuildSafeAllTradePresences(); rebuildSafeAllTradePresences();
LOGGER.trace("New trade presence timestamp {} for our trade {}", newExpiry, atAddress); LOGGER.trace("New trade presence timestamp {} for our trade {}", newExpiry, atAddress);
EventBus.INSTANCE.notify(new TradePresenceEvent(tradePresenceData));
} }
private void rebuildSafeAllTradePresences() { private void rebuildSafeAllTradePresences() {
@ -604,6 +622,8 @@ public class TradeBot implements Listener {
LOGGER.trace("Added trade presence {} from peer {} with timestamp {}", LOGGER.trace("Added trade presence {} from peer {} with timestamp {}",
peersTradePresence.getAtAddress(), peer, timestamp peersTradePresence.getAtAddress(), peer, timestamp
); );
EventBus.INSTANCE.notify(new TradePresenceEvent(peersTradePresence));
} }
} catch (DataException e) { } catch (DataException e) {
LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e); LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e);

View File

@ -4,6 +4,7 @@ import org.qortal.crypto.Crypto;
import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import java.util.Arrays; import java.util.Arrays;
// All properties to be converted to JSON via JAXB // All properties to be converted to JSON via JAXB
@ -49,7 +50,8 @@ public class TradePresenceData {
return this.atAddress; return this.atAddress;
} }
// Probably don't need synchronization // Probably doesn't need synchronization
@XmlElement
public String getTradeAddress() { public String getTradeAddress() {
if (tradeAddress != null) if (tradeAddress != null)
return tradeAddress; return tradeAddress;