diff --git a/src/main/java/org/qortal/api/websocket/TradeOffersWebSocket.java b/src/main/java/org/qortal/api/websocket/TradeOffersWebSocket.java index d5e90583..59a16344 100644 --- a/src/main/java/org/qortal/api/websocket/TradeOffersWebSocket.java +++ b/src/main/java/org/qortal/api/websocket/TradeOffersWebSocket.java @@ -3,6 +3,8 @@ package org.qortal.api.websocket; import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,18 +44,23 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener { private static final Logger LOGGER = LogManager.getLogger(TradeOffersWebSocket.class); - private static final Map previousAtModes = new HashMap<>(); + private static class CachedOfferInfo { + public final Map previousAtModes = new HashMap<>(); - // OFFERING - private static final Map currentSummaries = new HashMap<>(); - // REDEEMED/REFUNDED/CANCELLED - private static final Map historicSummaries = new HashMap<>(); + // OFFERING + public final Map currentSummaries = new HashMap<>(); + // REDEEMED/REFUNDED/CANCELLED + public final Map historicSummaries = new HashMap<>(); + } + // Manual synchronization + private static final Map cachedInfoByBlockchain = new HashMap<>(); private static final Predicate isHistoric = offerSummary -> offerSummary.getMode() == AcctMode.REDEEMED || offerSummary.getMode() == AcctMode.REFUNDED || offerSummary.getMode() == AcctMode.CANCELLED; + private static final Map sessionBlockchain = Collections.synchronizedMap(new HashMap<>()); @Override public void configure(WebSocketServletFactory factory) { @@ -79,7 +86,6 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener { BlockData blockData = ((Controller.NewBlockEvent) event).getBlockData(); // Process any new info - List crossChainOfferSummaries = new ArrayList<>(); try (final Repository repository = RepositoryManager.getRepository()) { // Find any new/changed trade ATs since this block @@ -88,64 +94,75 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener { final Long expectedValue = null; final Integer minimumFinalHeight = blockData.getHeight(); - // Loop for all different types of trade offer? - Map> acctsByCodeHash = SupportedBlockchain.getAcctMap(); - for (Map.Entry> acctInfo : acctsByCodeHash.entrySet()) { - byte[] codeHash = acctInfo.getKey().value; - ACCT acct = acctInfo.getValue().get(); + for (SupportedBlockchain blockchain : SupportedBlockchain.values()) { + Map> acctsByCodeHash = SupportedBlockchain.getFilteredAcctMap(blockchain); - List atStates = repository.getATRepository().getMatchingFinalATStates(codeHash, - isFinished, dataByteOffset, expectedValue, minimumFinalHeight, - null, null, null); + List crossChainOfferSummaries = new ArrayList<>(); + + synchronized (cachedInfoByBlockchain) { + CachedOfferInfo cachedInfo = cachedInfoByBlockchain.computeIfAbsent(blockchain.name(), k -> new CachedOfferInfo()); + + for (Map.Entry> acctInfo : acctsByCodeHash.entrySet()) { + byte[] codeHash = acctInfo.getKey().value; + ACCT acct = acctInfo.getValue().get(); + + List atStates = repository.getATRepository().getMatchingFinalATStates(codeHash, + isFinished, dataByteOffset, expectedValue, minimumFinalHeight, + null, null, null); + + crossChainOfferSummaries.addAll(produceSummaries(repository, acct, atStates, blockData.getTimestamp())); + } + + // Remove any entries unchanged from last time + crossChainOfferSummaries.removeIf(offerSummary -> cachedInfo.previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode()); + + // Skip to next blockchain if nothing has changed (for this blockchain) + if (crossChainOfferSummaries.isEmpty()) + continue; + + // Update + for (CrossChainOfferSummary offerSummary : crossChainOfferSummaries) { + cachedInfo.previousAtModes.put(offerSummary.qortalAtAddress, offerSummary.getMode()); + LOGGER.trace(() -> String.format("Block height: %d, AT: %s, mode: %s", blockData.getHeight(), offerSummary.qortalAtAddress, offerSummary.getMode().name())); + + switch (offerSummary.getMode()) { + case OFFERING: + cachedInfo.currentSummaries.put(offerSummary.qortalAtAddress, offerSummary); + cachedInfo.historicSummaries.remove(offerSummary.qortalAtAddress); + break; + + case REDEEMED: + case REFUNDED: + case CANCELLED: + cachedInfo.currentSummaries.remove(offerSummary.qortalAtAddress); + cachedInfo.historicSummaries.put(offerSummary.qortalAtAddress, offerSummary); + break; + + case TRADING: + cachedInfo.currentSummaries.remove(offerSummary.qortalAtAddress); + cachedInfo.historicSummaries.remove(offerSummary.qortalAtAddress); + break; + } + } + + // Remove any historic offers that are over 24 hours old + final long tooOldTimestamp = NTP.getTime() - 24 * 60 * 60 * 1000L; + cachedInfo.historicSummaries.values().removeIf(historicSummary -> historicSummary.getTimestamp() < tooOldTimestamp); + } + + // Notify sessions + for (Session session : getSessions()) { + // Only send if this session has this/no preferred blockchain + String preferredBlockchain = sessionBlockchain.get(session); + + if (preferredBlockchain == null || preferredBlockchain.equals(blockchain.name())) + sendOfferSummaries(session, crossChainOfferSummaries); + } - crossChainOfferSummaries.addAll(produceSummaries(repository, acct, atStates, blockData.getTimestamp())); } } catch (DataException e) { // No output this time - return; } - - synchronized (previousAtModes) { - // Remove any entries unchanged from last time - crossChainOfferSummaries.removeIf(offerSummary -> previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode()); - - // Don't send anything if no results - if (crossChainOfferSummaries.isEmpty()) - return; - - // Update - for (CrossChainOfferSummary offerSummary : crossChainOfferSummaries) { - previousAtModes.put(offerSummary.qortalAtAddress, offerSummary.getMode()); - LOGGER.trace(() -> String.format("Block height: %d, AT: %s, mode: %s", blockData.getHeight(), offerSummary.qortalAtAddress, offerSummary.getMode().name())); - - switch (offerSummary.getMode()) { - case OFFERING: - currentSummaries.put(offerSummary.qortalAtAddress, offerSummary); - historicSummaries.remove(offerSummary.qortalAtAddress); - break; - - case REDEEMED: - case REFUNDED: - case CANCELLED: - currentSummaries.remove(offerSummary.qortalAtAddress); - historicSummaries.put(offerSummary.qortalAtAddress, offerSummary); - break; - - case TRADING: - currentSummaries.remove(offerSummary.qortalAtAddress); - historicSummaries.remove(offerSummary.qortalAtAddress); - break; - } - } - - // Remove any historic offers that are over 24 hours old - final long tooOldTimestamp = NTP.getTime() - 24 * 60 * 60 * 1000L; - historicSummaries.values().removeIf(historicSummary -> historicSummary.getTimestamp() < tooOldTimestamp); - } - - // Notify sessions - for (Session session : getSessions()) - sendOfferSummaries(session, crossChainOfferSummaries); } @OnWebSocketConnect @@ -154,13 +171,35 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener { Map> queryParams = session.getUpgradeRequest().getParameterMap(); final boolean includeHistoric = queryParams.get("includeHistoric") != null; + List foreignBlockchains = queryParams.get("foreignBlockchain"); + final String foreignBlockchain = foreignBlockchains == null ? null : foreignBlockchains.get(0); + + // Make sure blockchain (if any) is valid + if (foreignBlockchain != null && SupportedBlockchain.fromString(foreignBlockchain) == null) { + session.close(4003, "unknown blockchain: " + foreignBlockchain); + return; + } + + // save session's preferred blockchain (if any) + sessionBlockchain.put(session, foreignBlockchain); + List crossChainOfferSummaries = new ArrayList<>(); - synchronized (previousAtModes) { - crossChainOfferSummaries.addAll(currentSummaries.values()); + synchronized (cachedInfoByBlockchain) { + Collection cachedInfos; - if (includeHistoric) - crossChainOfferSummaries.addAll(historicSummaries.values()); + if (foreignBlockchain == null) + // No preferred blockchain, so iterate through all of them + cachedInfos = cachedInfoByBlockchain.values(); + else + cachedInfos = Collections.singleton(cachedInfoByBlockchain.computeIfAbsent(foreignBlockchain, k -> new CachedOfferInfo())); + + for (CachedOfferInfo cachedInfo : cachedInfos) { + crossChainOfferSummaries.addAll(cachedInfo.currentSummaries.values()); + + if (includeHistoric) + crossChainOfferSummaries.addAll(cachedInfo.historicSummaries.values()); + } } if (!sendOfferSummaries(session, crossChainOfferSummaries)) { @@ -174,6 +213,9 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener { @OnWebSocketClose @Override public void onWebSocketClose(Session session, int statusCode, String reason) { + // clean up + sessionBlockchain.remove(session); + super.onWebSocketClose(session, statusCode, reason); } @@ -208,25 +250,30 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener { Long expectedValue = (long) AcctMode.OFFERING.value; Integer minimumFinalHeight = null; - Map> acctsByCodeHash = SupportedBlockchain.getAcctMap(); - for (Map.Entry> acctInfo : acctsByCodeHash.entrySet()) { - byte[] codeHash = acctInfo.getKey().value; - ACCT acct = acctInfo.getValue().get(); + for (SupportedBlockchain blockchain : SupportedBlockchain.values()) { + Map> acctsByCodeHash = SupportedBlockchain.getFilteredAcctMap(blockchain); - Integer dataByteOffset = acct.getModeByteOffset(); - List initialAtStates = repository.getATRepository().getMatchingFinalATStates(codeHash, - isFinished, dataByteOffset, expectedValue, minimumFinalHeight, - null, null, null); + CachedOfferInfo cachedInfo = cachedInfoByBlockchain.computeIfAbsent(blockchain.name(), k -> new CachedOfferInfo()); - if (initialAtStates == null) - throw new DataException("Couldn't fetch current trades from repository"); + for (Map.Entry> acctInfo : acctsByCodeHash.entrySet()) { + byte[] codeHash = acctInfo.getKey().value; + ACCT acct = acctInfo.getValue().get(); - // Save initial AT modes - previousAtModes.putAll(initialAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> AcctMode.OFFERING))); + Integer dataByteOffset = acct.getModeByteOffset(); + List initialAtStates = repository.getATRepository().getMatchingFinalATStates(codeHash, + isFinished, dataByteOffset, expectedValue, minimumFinalHeight, + null, null, null); - // Convert to offer summaries - currentSummaries.putAll(produceSummaries(repository, acct, initialAtStates, null).stream() + if (initialAtStates == null) + throw new DataException("Couldn't fetch current trades from repository"); + + // Save initial AT modes + cachedInfo.previousAtModes.putAll(initialAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> AcctMode.OFFERING))); + + // Convert to offer summaries + cachedInfo.currentSummaries.putAll(produceSummaries(repository, acct, initialAtStates, null).stream() .collect(Collectors.toMap(CrossChainOfferSummary::getQortalAtAddress, offerSummary -> offerSummary))); + } } } @@ -243,29 +290,34 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener { Long expectedValue = null; ++minimumFinalHeight; // because height is just *before* timestamp - Map> acctsByCodeHash = SupportedBlockchain.getAcctMap(); - for (Map.Entry> acctInfo : acctsByCodeHash.entrySet()) { - byte[] codeHash = acctInfo.getKey().value; - ACCT acct = acctInfo.getValue().get(); + for (SupportedBlockchain blockchain : SupportedBlockchain.values()) { + Map> acctsByCodeHash = SupportedBlockchain.getFilteredAcctMap(blockchain); - List historicAtStates = repository.getATRepository().getMatchingFinalATStates(codeHash, - isFinished, dataByteOffset, expectedValue, minimumFinalHeight, - null, null, null); + CachedOfferInfo cachedInfo = cachedInfoByBlockchain.computeIfAbsent(blockchain.name(), k -> new CachedOfferInfo()); - if (historicAtStates == null) - throw new DataException("Couldn't fetch historic trades from repository"); + for (Map.Entry> acctInfo : acctsByCodeHash.entrySet()) { + byte[] codeHash = acctInfo.getKey().value; + ACCT acct = acctInfo.getValue().get(); - for (ATStateData historicAtState : historicAtStates) { - CrossChainOfferSummary historicOfferSummary = produceSummary(repository, acct, historicAtState, null); + List historicAtStates = repository.getATRepository().getMatchingFinalATStates(codeHash, + isFinished, dataByteOffset, expectedValue, minimumFinalHeight, + null, null, null); - if (!isHistoric.test(historicOfferSummary)) - continue; + if (historicAtStates == null) + throw new DataException("Couldn't fetch historic trades from repository"); - // Add summary to initial burst - historicSummaries.put(historicOfferSummary.getQortalAtAddress(), historicOfferSummary); + for (ATStateData historicAtState : historicAtStates) { + CrossChainOfferSummary historicOfferSummary = produceSummary(repository, acct, historicAtState, null); - // Save initial AT mode - previousAtModes.put(historicOfferSummary.getQortalAtAddress(), historicOfferSummary.getMode()); + if (!isHistoric.test(historicOfferSummary)) + continue; + + // Add summary to initial burst + cachedInfo.historicSummaries.put(historicOfferSummary.getQortalAtAddress(), historicOfferSummary); + + // Save initial AT mode + cachedInfo.previousAtModes.put(historicOfferSummary.getQortalAtAddress(), historicOfferSummary.getMode()); + } } } } diff --git a/src/main/java/org/qortal/crosschain/SupportedBlockchain.java b/src/main/java/org/qortal/crosschain/SupportedBlockchain.java index e7230ff5..7b6f91f5 100644 --- a/src/main/java/org/qortal/crosschain/SupportedBlockchain.java +++ b/src/main/java/org/qortal/crosschain/SupportedBlockchain.java @@ -67,6 +67,10 @@ public enum SupportedBlockchain { return supportedAcctsByCodeHash; } + public static SupportedBlockchain fromString(String name) { + return blockchainsByName.get(name); + } + public static Map> getFilteredAcctMap(SupportedBlockchain blockchain) { if (blockchain == null) return getAcctMap();