WIP: trade-bot: tradeoffers websocket initial message with OFFERING/REDEEMED and fixed subsequent messages

This commit is contained in:
catbref 2020-08-04 11:21:57 +01:00
parent a8743b1bd3
commit 25bf315e23
2 changed files with 122 additions and 46 deletions

View File

@ -3,7 +3,9 @@ package org.qortal.api.websocket;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.eclipse.jetty.websocket.api.Session;
@ -18,10 +20,10 @@ import org.qortal.controller.BlockNotifier;
import org.qortal.crosschain.BTCACCT;
import org.qortal.data.at.ATStateData;
import org.qortal.data.block.BlockData;
import org.qortal.data.crosschain.CrossChainTradeData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.utils.NTP;
@WebSocket
@SuppressWarnings("serial")
@ -34,10 +36,72 @@ public class TradeOffersWebSocket extends WebSocketServlet implements ApiWebSock
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
BlockNotifier.Listener listener = blockData -> onNotify(session, blockData);
BlockNotifier.getInstance().register(session, listener);
Map<String, List<String>> queryParams = session.getUpgradeRequest().getParameterMap();
this.onNotify(session, null);
final boolean includeHistoric = queryParams.get("includeHistoric") != null;
final Map<String, BTCACCT.Mode> previousAtModes = new HashMap<>();
List<CrossChainOfferSummary> crossChainOfferSummaries;
try (final Repository repository = RepositoryManager.getRepository()) {
List<ATStateData> initialAtStates;
// We want ALL OFFERING trades
Boolean isFinished = Boolean.FALSE;
Integer dataByteOffset = BTCACCT.MODE_BYTE_OFFSET;
Long expectedValue = (long) BTCACCT.Mode.OFFERING.value;
Integer minimumFinalHeight = null;
initialAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
if (initialAtStates == null) {
session.close(4001, "repository issue fetching OFFERING trades");
return;
}
// Save initial AT modes
previousAtModes.putAll(initialAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> BTCACCT.Mode.OFFERING)));
if (includeHistoric) {
// We also want REDEEMED trades over the last 24 hours
long timestamp = NTP.getTime() - 24 * 60 * 60 * 1000L;
minimumFinalHeight = repository.getBlockRepository().getHeightFromTimestamp(timestamp);
if (minimumFinalHeight != 0) {
isFinished = Boolean.TRUE;
expectedValue = (long) BTCACCT.Mode.REDEEMED.value;
++minimumFinalHeight; // because height is just *before* timestamp
List<ATStateData> historicAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
if (historicAtStates == null) {
session.close(4002, "repository issue fetching REDEEMED trades");
return;
}
initialAtStates.addAll(historicAtStates);
// Save initial AT modes
previousAtModes.putAll(historicAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> BTCACCT.Mode.REDEEMED)));
}
}
crossChainOfferSummaries = produceSummaries(repository, initialAtStates);
} catch (DataException e) {
session.close(4003, "generic repository issue");
return;
}
if (!sendOfferSummaries(session, crossChainOfferSummaries)) {
session.close(4004, "websocket issue");
return;
}
BlockNotifier.Listener listener = blockData -> onNotify(session, blockData, previousAtModes);
BlockNotifier.getInstance().register(session, listener);
}
@OnWebSocketClose
@ -47,54 +111,68 @@ public class TradeOffersWebSocket extends WebSocketServlet implements ApiWebSock
@OnWebSocketMessage
public void onWebSocketMessage(Session session, String message) {
/* ignored */
}
private void onNotify(Session session, BlockData blockData) {
List<CrossChainTradeData> crossChainTradeDataList = new ArrayList<>();
try (final Repository repository = RepositoryManager.getRepository()) {
Integer minimumFinalHeight;
if (blockData == null) {
// If blockData is null then we send all known trade offers
minimumFinalHeight = null;
} else {
private void onNotify(Session session, BlockData blockData, final Map<String, BTCACCT.Mode> previousAtModes) {
synchronized (previousAtModes) { //NOSONAR squid:S2445 suppressed because previousAtModes is final and curried in lambda
try (final Repository repository = RepositoryManager.getRepository()) {
// Find any new trade ATs since this block
minimumFinalHeight = blockData.getHeight();
final Boolean isFinished = null;
final Integer dataByteOffset = null;
final Long expectedValue = null;
final Integer minimumFinalHeight = blockData.getHeight();
List<ATStateData> atStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
if (atStates == null)
return;
List<CrossChainOfferSummary> crossChainOfferSummaries = produceSummaries(repository, atStates);
// Remove any entries unchanged from last time
crossChainOfferSummaries.removeIf(offerSummary -> previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode());
// Don't send anything if no results
if (crossChainOfferSummaries.isEmpty())
return;
final boolean wasSent = sendOfferSummaries(session, crossChainOfferSummaries);
if (!wasSent)
return;
previousAtModes.putAll(crossChainOfferSummaries.stream().collect(Collectors.toMap(CrossChainOfferSummary::getQortalAtAddress, CrossChainOfferSummary::getMode)));
} catch (DataException e) {
// No output this time
}
final Boolean isFinished = Boolean.FALSE;
List<ATStateData> atStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
isFinished,
BTCACCT.MODE_BYTE_OFFSET, (long) BTCACCT.Mode.OFFERING.value,
minimumFinalHeight,
null, null, null);
// Don't send anything if no results and this isn't initial on-connection message
if (atStates == null || (atStates.isEmpty() && blockData != null))
return;
for (ATStateData atState : atStates) {
CrossChainTradeData crossChainTradeData = BTCACCT.populateTradeData(repository, atState);
crossChainTradeDataList.add(crossChainTradeData);
}
} catch (DataException e) {
// No output this time?
return;
}
}
private boolean sendOfferSummaries(Session session, List<CrossChainOfferSummary> crossChainOfferSummaries) {
try {
List<CrossChainOfferSummary> crossChainOffers = crossChainTradeDataList.stream().map(crossChainTradeData -> new CrossChainOfferSummary(crossChainTradeData)).collect(Collectors.toList());
StringWriter stringWriter = new StringWriter();
this.marshall(stringWriter, crossChainOffers);
this.marshall(stringWriter, crossChainOfferSummaries);
String output = stringWriter.toString();
session.getRemote().sendString(output);
session.getRemote().sendStringByFuture(output);
} catch (IOException e) {
// No output this time?
return false;
}
return true;
}
private static List<CrossChainOfferSummary> produceSummaries(Repository repository, List<ATStateData> atStates) throws DataException {
List<CrossChainOfferSummary> offerSummaries = new ArrayList<>();
for (ATStateData atState : atStates)
offerSummaries.add(new CrossChainOfferSummary(BTCACCT.populateTradeData(repository, atState)));
return offerSummaries;
}
}

View File

@ -17,7 +17,6 @@ import org.ciyam.at.OpCode;
import org.ciyam.at.Timestamp;
import org.qortal.account.Account;
import org.qortal.asset.Asset;
import org.qortal.at.QortalAtLoggerFactory;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
import org.qortal.data.at.ATData;
@ -614,13 +613,11 @@ public class BTCACCT {
* @throws DataException
*/
public static CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, ATStateData atStateData) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
QortalAtLoggerFactory loggerFactory = QortalAtLoggerFactory.getInstance();
byte[] stateData = atStateData.getStateData();
byte[] dataBytes = MachineState.extractDataBytes(loggerFactory, stateData);
CrossChainTradeData tradeData = new CrossChainTradeData();
tradeData.qortalAtAddress = atAddress;
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = atStateData.getCreation();
@ -628,8 +625,9 @@ public class BTCACCT {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
ByteBuffer dataByteBuffer = ByteBuffer.wrap(dataBytes);
byte[] addressBytes = new byte[25];
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);
dataByteBuffer.position(MachineState.HEADER_LENGTH);
/* Constants */