Add preferred-blockchain filtering to /websockets/crosschain/tradeoffers via foreignBlockchain query param

This commit is contained in:
catbref 2020-12-10 16:00:01 +00:00
parent 31fa916156
commit 68e3d3b989
2 changed files with 149 additions and 93 deletions

View File

@ -3,6 +3,8 @@ package org.qortal.api.websocket;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -42,18 +44,23 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener {
private static final Logger LOGGER = LogManager.getLogger(TradeOffersWebSocket.class);
private static final Map<String, AcctMode> previousAtModes = new HashMap<>();
private static class CachedOfferInfo {
public final Map<String, AcctMode> previousAtModes = new HashMap<>();
// OFFERING
private static final Map<String, CrossChainOfferSummary> currentSummaries = new HashMap<>();
// REDEEMED/REFUNDED/CANCELLED
private static final Map<String, CrossChainOfferSummary> historicSummaries = new HashMap<>();
// OFFERING
public final Map<String, CrossChainOfferSummary> currentSummaries = new HashMap<>();
// REDEEMED/REFUNDED/CANCELLED
public final Map<String, CrossChainOfferSummary> historicSummaries = new HashMap<>();
}
// Manual synchronization
private static final Map<String, CachedOfferInfo> cachedInfoByBlockchain = new HashMap<>();
private static final Predicate<CrossChainOfferSummary> isHistoric = offerSummary
-> offerSummary.getMode() == AcctMode.REDEEMED
|| offerSummary.getMode() == AcctMode.REFUNDED
|| offerSummary.getMode() == AcctMode.CANCELLED;
private static final Map<Session, String> sessionBlockchain = Collections.synchronizedMap(new HashMap<>());
@Override
public void configure(WebSocketServletFactory factory) {
@ -79,7 +86,6 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener {
BlockData blockData = ((Controller.NewBlockEvent) event).getBlockData();
// Process any new info
List<CrossChainOfferSummary> crossChainOfferSummaries = new ArrayList<>();
try (final Repository repository = RepositoryManager.getRepository()) {
// Find any new/changed trade ATs since this block
@ -88,64 +94,75 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener {
final Long expectedValue = null;
final Integer minimumFinalHeight = blockData.getHeight();
// Loop for all different types of trade offer?
Map<ByteArray, Supplier<ACCT>> acctsByCodeHash = SupportedBlockchain.getAcctMap();
for (Map.Entry<ByteArray, Supplier<ACCT>> acctInfo : acctsByCodeHash.entrySet()) {
byte[] codeHash = acctInfo.getKey().value;
ACCT acct = acctInfo.getValue().get();
for (SupportedBlockchain blockchain : SupportedBlockchain.values()) {
Map<ByteArray, Supplier<ACCT>> acctsByCodeHash = SupportedBlockchain.getFilteredAcctMap(blockchain);
List<ATStateData> atStates = repository.getATRepository().getMatchingFinalATStates(codeHash,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
List<CrossChainOfferSummary> crossChainOfferSummaries = new ArrayList<>();
synchronized (cachedInfoByBlockchain) {
CachedOfferInfo cachedInfo = cachedInfoByBlockchain.computeIfAbsent(blockchain.name(), k -> new CachedOfferInfo());
for (Map.Entry<ByteArray, Supplier<ACCT>> acctInfo : acctsByCodeHash.entrySet()) {
byte[] codeHash = acctInfo.getKey().value;
ACCT acct = acctInfo.getValue().get();
List<ATStateData> atStates = repository.getATRepository().getMatchingFinalATStates(codeHash,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
crossChainOfferSummaries.addAll(produceSummaries(repository, acct, atStates, blockData.getTimestamp()));
}
// Remove any entries unchanged from last time
crossChainOfferSummaries.removeIf(offerSummary -> cachedInfo.previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode());
// Skip to next blockchain if nothing has changed (for this blockchain)
if (crossChainOfferSummaries.isEmpty())
continue;
// Update
for (CrossChainOfferSummary offerSummary : crossChainOfferSummaries) {
cachedInfo.previousAtModes.put(offerSummary.qortalAtAddress, offerSummary.getMode());
LOGGER.trace(() -> String.format("Block height: %d, AT: %s, mode: %s", blockData.getHeight(), offerSummary.qortalAtAddress, offerSummary.getMode().name()));
switch (offerSummary.getMode()) {
case OFFERING:
cachedInfo.currentSummaries.put(offerSummary.qortalAtAddress, offerSummary);
cachedInfo.historicSummaries.remove(offerSummary.qortalAtAddress);
break;
case REDEEMED:
case REFUNDED:
case CANCELLED:
cachedInfo.currentSummaries.remove(offerSummary.qortalAtAddress);
cachedInfo.historicSummaries.put(offerSummary.qortalAtAddress, offerSummary);
break;
case TRADING:
cachedInfo.currentSummaries.remove(offerSummary.qortalAtAddress);
cachedInfo.historicSummaries.remove(offerSummary.qortalAtAddress);
break;
}
}
// Remove any historic offers that are over 24 hours old
final long tooOldTimestamp = NTP.getTime() - 24 * 60 * 60 * 1000L;
cachedInfo.historicSummaries.values().removeIf(historicSummary -> historicSummary.getTimestamp() < tooOldTimestamp);
}
// Notify sessions
for (Session session : getSessions()) {
// Only send if this session has this/no preferred blockchain
String preferredBlockchain = sessionBlockchain.get(session);
if (preferredBlockchain == null || preferredBlockchain.equals(blockchain.name()))
sendOfferSummaries(session, crossChainOfferSummaries);
}
crossChainOfferSummaries.addAll(produceSummaries(repository, acct, atStates, blockData.getTimestamp()));
}
} catch (DataException e) {
// No output this time
return;
}
synchronized (previousAtModes) {
// Remove any entries unchanged from last time
crossChainOfferSummaries.removeIf(offerSummary -> previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode());
// Don't send anything if no results
if (crossChainOfferSummaries.isEmpty())
return;
// Update
for (CrossChainOfferSummary offerSummary : crossChainOfferSummaries) {
previousAtModes.put(offerSummary.qortalAtAddress, offerSummary.getMode());
LOGGER.trace(() -> String.format("Block height: %d, AT: %s, mode: %s", blockData.getHeight(), offerSummary.qortalAtAddress, offerSummary.getMode().name()));
switch (offerSummary.getMode()) {
case OFFERING:
currentSummaries.put(offerSummary.qortalAtAddress, offerSummary);
historicSummaries.remove(offerSummary.qortalAtAddress);
break;
case REDEEMED:
case REFUNDED:
case CANCELLED:
currentSummaries.remove(offerSummary.qortalAtAddress);
historicSummaries.put(offerSummary.qortalAtAddress, offerSummary);
break;
case TRADING:
currentSummaries.remove(offerSummary.qortalAtAddress);
historicSummaries.remove(offerSummary.qortalAtAddress);
break;
}
}
// Remove any historic offers that are over 24 hours old
final long tooOldTimestamp = NTP.getTime() - 24 * 60 * 60 * 1000L;
historicSummaries.values().removeIf(historicSummary -> historicSummary.getTimestamp() < tooOldTimestamp);
}
// Notify sessions
for (Session session : getSessions())
sendOfferSummaries(session, crossChainOfferSummaries);
}
@OnWebSocketConnect
@ -154,13 +171,35 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener {
Map<String, List<String>> queryParams = session.getUpgradeRequest().getParameterMap();
final boolean includeHistoric = queryParams.get("includeHistoric") != null;
List<String> foreignBlockchains = queryParams.get("foreignBlockchain");
final String foreignBlockchain = foreignBlockchains == null ? null : foreignBlockchains.get(0);
// Make sure blockchain (if any) is valid
if (foreignBlockchain != null && SupportedBlockchain.fromString(foreignBlockchain) == null) {
session.close(4003, "unknown blockchain: " + foreignBlockchain);
return;
}
// save session's preferred blockchain (if any)
sessionBlockchain.put(session, foreignBlockchain);
List<CrossChainOfferSummary> crossChainOfferSummaries = new ArrayList<>();
synchronized (previousAtModes) {
crossChainOfferSummaries.addAll(currentSummaries.values());
synchronized (cachedInfoByBlockchain) {
Collection<CachedOfferInfo> cachedInfos;
if (includeHistoric)
crossChainOfferSummaries.addAll(historicSummaries.values());
if (foreignBlockchain == null)
// No preferred blockchain, so iterate through all of them
cachedInfos = cachedInfoByBlockchain.values();
else
cachedInfos = Collections.singleton(cachedInfoByBlockchain.computeIfAbsent(foreignBlockchain, k -> new CachedOfferInfo()));
for (CachedOfferInfo cachedInfo : cachedInfos) {
crossChainOfferSummaries.addAll(cachedInfo.currentSummaries.values());
if (includeHistoric)
crossChainOfferSummaries.addAll(cachedInfo.historicSummaries.values());
}
}
if (!sendOfferSummaries(session, crossChainOfferSummaries)) {
@ -174,6 +213,9 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener {
@OnWebSocketClose
@Override
public void onWebSocketClose(Session session, int statusCode, String reason) {
// clean up
sessionBlockchain.remove(session);
super.onWebSocketClose(session, statusCode, reason);
}
@ -208,25 +250,30 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener {
Long expectedValue = (long) AcctMode.OFFERING.value;
Integer minimumFinalHeight = null;
Map<ByteArray, Supplier<ACCT>> acctsByCodeHash = SupportedBlockchain.getAcctMap();
for (Map.Entry<ByteArray, Supplier<ACCT>> acctInfo : acctsByCodeHash.entrySet()) {
byte[] codeHash = acctInfo.getKey().value;
ACCT acct = acctInfo.getValue().get();
for (SupportedBlockchain blockchain : SupportedBlockchain.values()) {
Map<ByteArray, Supplier<ACCT>> acctsByCodeHash = SupportedBlockchain.getFilteredAcctMap(blockchain);
Integer dataByteOffset = acct.getModeByteOffset();
List<ATStateData> initialAtStates = repository.getATRepository().getMatchingFinalATStates(codeHash,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
CachedOfferInfo cachedInfo = cachedInfoByBlockchain.computeIfAbsent(blockchain.name(), k -> new CachedOfferInfo());
if (initialAtStates == null)
throw new DataException("Couldn't fetch current trades from repository");
for (Map.Entry<ByteArray, Supplier<ACCT>> acctInfo : acctsByCodeHash.entrySet()) {
byte[] codeHash = acctInfo.getKey().value;
ACCT acct = acctInfo.getValue().get();
// Save initial AT modes
previousAtModes.putAll(initialAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> AcctMode.OFFERING)));
Integer dataByteOffset = acct.getModeByteOffset();
List<ATStateData> initialAtStates = repository.getATRepository().getMatchingFinalATStates(codeHash,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
// Convert to offer summaries
currentSummaries.putAll(produceSummaries(repository, acct, initialAtStates, null).stream()
if (initialAtStates == null)
throw new DataException("Couldn't fetch current trades from repository");
// Save initial AT modes
cachedInfo.previousAtModes.putAll(initialAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> AcctMode.OFFERING)));
// Convert to offer summaries
cachedInfo.currentSummaries.putAll(produceSummaries(repository, acct, initialAtStates, null).stream()
.collect(Collectors.toMap(CrossChainOfferSummary::getQortalAtAddress, offerSummary -> offerSummary)));
}
}
}
@ -243,29 +290,34 @@ public class TradeOffersWebSocket extends ApiWebSocket implements Listener {
Long expectedValue = null;
++minimumFinalHeight; // because height is just *before* timestamp
Map<ByteArray, Supplier<ACCT>> acctsByCodeHash = SupportedBlockchain.getAcctMap();
for (Map.Entry<ByteArray, Supplier<ACCT>> acctInfo : acctsByCodeHash.entrySet()) {
byte[] codeHash = acctInfo.getKey().value;
ACCT acct = acctInfo.getValue().get();
for (SupportedBlockchain blockchain : SupportedBlockchain.values()) {
Map<ByteArray, Supplier<ACCT>> acctsByCodeHash = SupportedBlockchain.getFilteredAcctMap(blockchain);
List<ATStateData> historicAtStates = repository.getATRepository().getMatchingFinalATStates(codeHash,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
CachedOfferInfo cachedInfo = cachedInfoByBlockchain.computeIfAbsent(blockchain.name(), k -> new CachedOfferInfo());
if (historicAtStates == null)
throw new DataException("Couldn't fetch historic trades from repository");
for (Map.Entry<ByteArray, Supplier<ACCT>> acctInfo : acctsByCodeHash.entrySet()) {
byte[] codeHash = acctInfo.getKey().value;
ACCT acct = acctInfo.getValue().get();
for (ATStateData historicAtState : historicAtStates) {
CrossChainOfferSummary historicOfferSummary = produceSummary(repository, acct, historicAtState, null);
List<ATStateData> historicAtStates = repository.getATRepository().getMatchingFinalATStates(codeHash,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
if (!isHistoric.test(historicOfferSummary))
continue;
if (historicAtStates == null)
throw new DataException("Couldn't fetch historic trades from repository");
// Add summary to initial burst
historicSummaries.put(historicOfferSummary.getQortalAtAddress(), historicOfferSummary);
for (ATStateData historicAtState : historicAtStates) {
CrossChainOfferSummary historicOfferSummary = produceSummary(repository, acct, historicAtState, null);
// Save initial AT mode
previousAtModes.put(historicOfferSummary.getQortalAtAddress(), historicOfferSummary.getMode());
if (!isHistoric.test(historicOfferSummary))
continue;
// Add summary to initial burst
cachedInfo.historicSummaries.put(historicOfferSummary.getQortalAtAddress(), historicOfferSummary);
// Save initial AT mode
cachedInfo.previousAtModes.put(historicOfferSummary.getQortalAtAddress(), historicOfferSummary.getMode());
}
}
}
}

View File

@ -67,6 +67,10 @@ public enum SupportedBlockchain {
return supportedAcctsByCodeHash;
}
public static SupportedBlockchain fromString(String name) {
return blockchainsByName.get(name);
}
public static Map<ByteArray, Supplier<ACCT>> getFilteredAcctMap(SupportedBlockchain blockchain) {
if (blockchain == null)
return getAcctMap();