forked from Qortal/qortal
Refactored various websockets to event bus from old BlockNotifier/StatusNotifier
This commit is contained in:
parent
0b5e5832c4
commit
1cd4bbc078
@ -13,59 +13,87 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
|||||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||||
import org.qortal.api.model.NodeStatus;
|
import org.qortal.api.model.NodeStatus;
|
||||||
import org.qortal.controller.StatusNotifier;
|
import org.qortal.controller.Controller;
|
||||||
import org.qortal.repository.DataException;
|
import org.qortal.event.Event;
|
||||||
import org.qortal.repository.Repository;
|
import org.qortal.event.EventBus;
|
||||||
import org.qortal.repository.RepositoryManager;
|
import org.qortal.event.Listener;
|
||||||
|
|
||||||
@WebSocket
|
@WebSocket
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
public class AdminStatusWebSocket extends ApiWebSocket {
|
public class AdminStatusWebSocket extends ApiWebSocket implements Listener {
|
||||||
|
|
||||||
|
private static final AtomicReference<String> previousOutput = new AtomicReference<>(null);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void configure(WebSocketServletFactory factory) {
|
public void configure(WebSocketServletFactory factory) {
|
||||||
factory.register(AdminStatusWebSocket.class);
|
factory.register(AdminStatusWebSocket.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
previousOutput.set(buildStatusString());
|
||||||
|
} catch (IOException e) {
|
||||||
|
// How to fail properly?
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
EventBus.INSTANCE.addListener(this::listen);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void listen(Event event) {
|
||||||
|
if (!(event instanceof Controller.StatusChangeEvent))
|
||||||
|
return;
|
||||||
|
|
||||||
|
String newOutput;
|
||||||
|
try {
|
||||||
|
newOutput = buildStatusString();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Ignore this time?
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (previousOutput.getAndUpdate(currentValue -> newOutput).equals(newOutput))
|
||||||
|
// Output hasn't changed, so don't send anything
|
||||||
|
return;
|
||||||
|
|
||||||
|
for (Session session : getSessions())
|
||||||
|
this.sendStatus(session, newOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnWebSocketConnect
|
@OnWebSocketConnect
|
||||||
|
@Override
|
||||||
public void onWebSocketConnect(Session session) {
|
public void onWebSocketConnect(Session session) {
|
||||||
AtomicReference<String> previousOutput = new AtomicReference<>(null);
|
this.sendStatus(session, previousOutput.get());
|
||||||
|
|
||||||
StatusNotifier.Listener listener = timestamp -> onNotify(session, previousOutput);
|
super.onWebSocketConnect(session);
|
||||||
StatusNotifier.getInstance().register(session, listener);
|
|
||||||
|
|
||||||
this.onNotify(session, previousOutput);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnWebSocketClose
|
@OnWebSocketClose
|
||||||
|
@Override
|
||||||
public void onWebSocketClose(Session session, int statusCode, String reason) {
|
public void onWebSocketClose(Session session, int statusCode, String reason) {
|
||||||
StatusNotifier.getInstance().deregister(session);
|
super.onWebSocketClose(session, statusCode, reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnWebSocketError
|
@OnWebSocketError
|
||||||
public void onWebSocketError(Session session, Throwable throwable) {
|
public void onWebSocketError(Session session, Throwable throwable) {
|
||||||
|
/* We ignore errors for now, but method here to silence log spam */
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnWebSocketMessage
|
@OnWebSocketMessage
|
||||||
public void onWebSocketMessage(Session session, String message) {
|
public void onWebSocketMessage(Session session, String message) {
|
||||||
|
/* ignored */
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onNotify(Session session,AtomicReference<String> previousOutput) {
|
private static String buildStatusString() throws IOException {
|
||||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
|
||||||
NodeStatus nodeStatus = new NodeStatus();
|
NodeStatus nodeStatus = new NodeStatus();
|
||||||
|
|
||||||
StringWriter stringWriter = new StringWriter();
|
StringWriter stringWriter = new StringWriter();
|
||||||
|
|
||||||
marshall(stringWriter, nodeStatus);
|
marshall(stringWriter, nodeStatus);
|
||||||
|
return stringWriter.toString();
|
||||||
|
}
|
||||||
|
|
||||||
// Only output if something has changed
|
private void sendStatus(Session session, String status) {
|
||||||
String output = stringWriter.toString();
|
try {
|
||||||
if (output.equals(previousOutput.get()))
|
session.getRemote().sendStringByFuture(status);
|
||||||
return;
|
} catch (WebSocketException e) {
|
||||||
|
|
||||||
previousOutput.set(output);
|
|
||||||
session.getRemote().sendStringByFuture(output);
|
|
||||||
} catch (DataException | IOException | WebSocketException e) {
|
|
||||||
// No output this time?
|
// No output this time?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,11 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
|||||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||||
import org.qortal.api.ApiError;
|
import org.qortal.api.ApiError;
|
||||||
import org.qortal.api.model.BlockInfo;
|
import org.qortal.api.model.BlockInfo;
|
||||||
import org.qortal.controller.BlockNotifier;
|
import org.qortal.controller.Controller;
|
||||||
|
import org.qortal.data.block.BlockData;
|
||||||
|
import org.qortal.event.Event;
|
||||||
|
import org.qortal.event.EventBus;
|
||||||
|
import org.qortal.event.Listener;
|
||||||
import org.qortal.repository.DataException;
|
import org.qortal.repository.DataException;
|
||||||
import org.qortal.repository.Repository;
|
import org.qortal.repository.Repository;
|
||||||
import org.qortal.repository.RepositoryManager;
|
import org.qortal.repository.RepositoryManager;
|
||||||
@ -22,26 +26,42 @@ import org.qortal.utils.Base58;
|
|||||||
|
|
||||||
@WebSocket
|
@WebSocket
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
public class BlocksWebSocket extends ApiWebSocket {
|
public class BlocksWebSocket extends ApiWebSocket implements Listener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void configure(WebSocketServletFactory factory) {
|
public void configure(WebSocketServletFactory factory) {
|
||||||
factory.register(BlocksWebSocket.class);
|
factory.register(BlocksWebSocket.class);
|
||||||
|
|
||||||
|
EventBus.INSTANCE.addListener(this::listen);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void listen(Event event) {
|
||||||
|
if (!(event instanceof Controller.NewBlockEvent))
|
||||||
|
return;
|
||||||
|
|
||||||
|
BlockData blockData = ((Controller.NewBlockEvent) event).getBlockData();
|
||||||
|
BlockInfo blockInfo = new BlockInfo(blockData);
|
||||||
|
|
||||||
|
for (Session session : getSessions())
|
||||||
|
sendBlockInfo(session, blockInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnWebSocketConnect
|
@OnWebSocketConnect
|
||||||
|
@Override
|
||||||
public void onWebSocketConnect(Session session) {
|
public void onWebSocketConnect(Session session) {
|
||||||
BlockNotifier.Listener listener = blockInfo -> onNotify(session, blockInfo);
|
super.onWebSocketConnect(session);
|
||||||
BlockNotifier.getInstance().register(session, listener);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnWebSocketClose
|
@OnWebSocketClose
|
||||||
|
@Override
|
||||||
public void onWebSocketClose(Session session, int statusCode, String reason) {
|
public void onWebSocketClose(Session session, int statusCode, String reason) {
|
||||||
BlockNotifier.getInstance().deregister(session);
|
super.onWebSocketClose(session, statusCode, reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnWebSocketError
|
@OnWebSocketError
|
||||||
public void onWebSocketError(Session session, Throwable throwable) {
|
public void onWebSocketError(Session session, Throwable throwable) {
|
||||||
|
/* We ignore errors for now, but method here to silence log spam */
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnWebSocketMessage
|
@OnWebSocketMessage
|
||||||
@ -71,7 +91,7 @@ public class BlocksWebSocket extends ApiWebSocket {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
onNotify(session, blockInfos.get(0));
|
sendBlockInfo(session, blockInfos.get(0));
|
||||||
} catch (DataException e) {
|
} catch (DataException e) {
|
||||||
sendError(session, ApiError.REPOSITORY_ISSUE);
|
sendError(session, ApiError.REPOSITORY_ISSUE);
|
||||||
}
|
}
|
||||||
@ -100,13 +120,13 @@ public class BlocksWebSocket extends ApiWebSocket {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
onNotify(session, blockInfos.get(0));
|
sendBlockInfo(session, blockInfos.get(0));
|
||||||
} catch (DataException e) {
|
} catch (DataException e) {
|
||||||
sendError(session, ApiError.REPOSITORY_ISSUE);
|
sendError(session, ApiError.REPOSITORY_ISSUE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onNotify(Session session, BlockInfo blockInfo) {
|
private void sendBlockInfo(Session session, BlockInfo blockInfo) {
|
||||||
StringWriter stringWriter = new StringWriter();
|
StringWriter stringWriter = new StringWriter();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -6,6 +6,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Predicate;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.eclipse.jetty.websocket.api.Session;
|
import org.eclipse.jetty.websocket.api.Session;
|
||||||
@ -14,12 +15,15 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
|
|||||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
||||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||||
import org.qortal.api.model.BlockInfo;
|
|
||||||
import org.qortal.api.model.CrossChainOfferSummary;
|
import org.qortal.api.model.CrossChainOfferSummary;
|
||||||
import org.qortal.controller.BlockNotifier;
|
import org.qortal.controller.Controller;
|
||||||
import org.qortal.crosschain.BTCACCT;
|
import org.qortal.crosschain.BTCACCT;
|
||||||
import org.qortal.data.at.ATStateData;
|
import org.qortal.data.at.ATStateData;
|
||||||
|
import org.qortal.data.block.BlockData;
|
||||||
import org.qortal.data.crosschain.CrossChainTradeData;
|
import org.qortal.data.crosschain.CrossChainTradeData;
|
||||||
|
import org.qortal.event.Event;
|
||||||
|
import org.qortal.event.EventBus;
|
||||||
|
import org.qortal.event.Listener;
|
||||||
import org.qortal.repository.DataException;
|
import org.qortal.repository.DataException;
|
||||||
import org.qortal.repository.Repository;
|
import org.qortal.repository.Repository;
|
||||||
import org.qortal.repository.RepositoryManager;
|
import org.qortal.repository.RepositoryManager;
|
||||||
@ -27,120 +31,56 @@ import org.qortal.utils.NTP;
|
|||||||
|
|
||||||
@WebSocket
|
@WebSocket
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
public class TradeOffersWebSocket extends ApiWebSocket {
|
public class TradeOffersWebSocket extends ApiWebSocket implements Listener {
|
||||||
|
|
||||||
|
private static final Map<String, BTCACCT.Mode> previousAtModes = new HashMap<>();
|
||||||
|
|
||||||
|
// OFFERING
|
||||||
|
private static final List<CrossChainOfferSummary> currentSummaries = new ArrayList<>();
|
||||||
|
// REDEEMED/REFUNDED/CANCELLED
|
||||||
|
private static final List<CrossChainOfferSummary> historicSummaries = new ArrayList<>();
|
||||||
|
|
||||||
|
private static final Predicate<CrossChainOfferSummary> isCurrent = offerSummary
|
||||||
|
-> offerSummary.getMode() == BTCACCT.Mode.OFFERING;
|
||||||
|
|
||||||
|
private static final Predicate<CrossChainOfferSummary> isHistoric = offerSummary
|
||||||
|
-> offerSummary.getMode() == BTCACCT.Mode.REDEEMED
|
||||||
|
|| offerSummary.getMode() == BTCACCT.Mode.REFUNDED
|
||||||
|
|| offerSummary.getMode() == BTCACCT.Mode.CANCELLED;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void configure(WebSocketServletFactory factory) {
|
public void configure(WebSocketServletFactory factory) {
|
||||||
factory.register(TradeOffersWebSocket.class);
|
factory.register(TradeOffersWebSocket.class);
|
||||||
}
|
|
||||||
|
|
||||||
@OnWebSocketConnect
|
|
||||||
public void onWebSocketConnect(Session session) {
|
|
||||||
Map<String, List<String>> queryParams = session.getUpgradeRequest().getParameterMap();
|
|
||||||
|
|
||||||
final boolean includeHistoric = queryParams.get("includeHistoric") != null;
|
|
||||||
final Map<String, BTCACCT.Mode> previousAtModes = new HashMap<>();
|
|
||||||
List<CrossChainOfferSummary> crossChainOfferSummaries;
|
|
||||||
|
|
||||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||||
List<ATStateData> initialAtStates;
|
populateCurrentSummaries(repository);
|
||||||
|
|
||||||
// We want ALL OFFERING trades
|
|
||||||
Boolean isFinished = Boolean.FALSE;
|
|
||||||
Integer dataByteOffset = BTCACCT.MODE_BYTE_OFFSET;
|
|
||||||
Long expectedValue = (long) BTCACCT.Mode.OFFERING.value;
|
|
||||||
Integer minimumFinalHeight = null;
|
|
||||||
|
|
||||||
initialAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
|
|
||||||
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
|
|
||||||
null, null, null);
|
|
||||||
|
|
||||||
if (initialAtStates == null) {
|
|
||||||
session.close(4001, "repository issue fetching OFFERING trades");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save initial AT modes
|
|
||||||
previousAtModes.putAll(initialAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> BTCACCT.Mode.OFFERING)));
|
|
||||||
|
|
||||||
// Convert to offer summaries
|
|
||||||
crossChainOfferSummaries = produceSummaries(repository, initialAtStates, null);
|
|
||||||
|
|
||||||
if (includeHistoric) {
|
|
||||||
// We also want REDEEMED/REFUNDED/CANCELLED trades over the last 24 hours
|
|
||||||
long timestamp = NTP.getTime() - 24 * 60 * 60 * 1000L;
|
|
||||||
minimumFinalHeight = repository.getBlockRepository().getHeightFromTimestamp(timestamp);
|
|
||||||
|
|
||||||
if (minimumFinalHeight != 0) {
|
|
||||||
isFinished = Boolean.TRUE;
|
|
||||||
dataByteOffset = null;
|
|
||||||
expectedValue = null;
|
|
||||||
++minimumFinalHeight; // because height is just *before* timestamp
|
|
||||||
|
|
||||||
List<ATStateData> historicAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
|
|
||||||
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
|
|
||||||
null, null, null);
|
|
||||||
|
|
||||||
if (historicAtStates == null) {
|
|
||||||
session.close(4002, "repository issue fetching historic trades");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (ATStateData historicAtState : historicAtStates) {
|
|
||||||
CrossChainOfferSummary historicOfferSummary = produceSummary(repository, historicAtState, null);
|
|
||||||
|
|
||||||
switch (historicOfferSummary.getMode()) {
|
|
||||||
case REDEEMED:
|
|
||||||
case REFUNDED:
|
|
||||||
case CANCELLED:
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add summary to initial burst
|
|
||||||
crossChainOfferSummaries.add(historicOfferSummary);
|
|
||||||
|
|
||||||
// Save initial AT mode
|
|
||||||
previousAtModes.put(historicAtState.getATAddress(), historicOfferSummary.getMode());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
populateHistoricSummaries(repository);
|
||||||
} catch (DataException e) {
|
} catch (DataException e) {
|
||||||
session.close(4003, "generic repository issue");
|
// How to fail properly?
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!sendOfferSummaries(session, crossChainOfferSummaries)) {
|
EventBus.INSTANCE.addListener(this::listen);
|
||||||
session.close(4004, "websocket issue");
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void listen(Event event) {
|
||||||
|
if (!(event instanceof Controller.NewBlockEvent))
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
|
|
||||||
BlockNotifier.Listener listener = blockInfo -> onNotify(session, blockInfo, previousAtModes);
|
BlockData blockData = ((Controller.NewBlockEvent) event).getBlockData();
|
||||||
BlockNotifier.getInstance().register(session, listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnWebSocketClose
|
// Process any new info
|
||||||
public void onWebSocketClose(Session session, int statusCode, String reason) {
|
List<CrossChainOfferSummary> crossChainOfferSummaries;
|
||||||
BlockNotifier.getInstance().deregister(session);
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnWebSocketMessage
|
|
||||||
public void onWebSocketMessage(Session session, String message) {
|
|
||||||
/* ignored */
|
|
||||||
}
|
|
||||||
|
|
||||||
private void onNotify(Session session, BlockInfo blockInfo, final Map<String, BTCACCT.Mode> previousAtModes) {
|
|
||||||
List<CrossChainOfferSummary> crossChainOfferSummaries = null;
|
|
||||||
|
|
||||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||||
// Find any new trade ATs since this block
|
// Find any new trade ATs since this block
|
||||||
final Boolean isFinished = null;
|
final Boolean isFinished = null;
|
||||||
final Integer dataByteOffset = null;
|
final Integer dataByteOffset = null;
|
||||||
final Long expectedValue = null;
|
final Long expectedValue = null;
|
||||||
final Integer minimumFinalHeight = blockInfo.getHeight();
|
final Integer minimumFinalHeight = blockData.getHeight();
|
||||||
|
|
||||||
List<ATStateData> atStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
|
List<ATStateData> atStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
|
||||||
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
|
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
|
||||||
@ -149,12 +89,13 @@ public class TradeOffersWebSocket extends ApiWebSocket {
|
|||||||
if (atStates == null)
|
if (atStates == null)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
crossChainOfferSummaries = produceSummaries(repository, atStates, blockInfo.getTimestamp());
|
crossChainOfferSummaries = produceSummaries(repository, atStates, blockData.getTimestamp());
|
||||||
} catch (DataException e) {
|
} catch (DataException e) {
|
||||||
// No output this time
|
// No output this time
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized (previousAtModes) { //NOSONAR squid:S2445 suppressed because previousAtModes is final and curried in lambda
|
synchronized (previousAtModes) {
|
||||||
// Remove any entries unchanged from last time
|
// Remove any entries unchanged from last time
|
||||||
crossChainOfferSummaries.removeIf(offerSummary -> previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode());
|
crossChainOfferSummaries.removeIf(offerSummary -> previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode());
|
||||||
|
|
||||||
@ -162,13 +103,63 @@ public class TradeOffersWebSocket extends ApiWebSocket {
|
|||||||
if (crossChainOfferSummaries.isEmpty())
|
if (crossChainOfferSummaries.isEmpty())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
final boolean wasSent = sendOfferSummaries(session, crossChainOfferSummaries);
|
// Update
|
||||||
|
|
||||||
if (!wasSent)
|
|
||||||
return;
|
|
||||||
|
|
||||||
previousAtModes.putAll(crossChainOfferSummaries.stream().collect(Collectors.toMap(CrossChainOfferSummary::getQortalAtAddress, CrossChainOfferSummary::getMode)));
|
previousAtModes.putAll(crossChainOfferSummaries.stream().collect(Collectors.toMap(CrossChainOfferSummary::getQortalAtAddress, CrossChainOfferSummary::getMode)));
|
||||||
|
|
||||||
|
synchronized (currentSummaries) {
|
||||||
|
// Add any OFFERING to 'current'
|
||||||
|
currentSummaries.addAll(crossChainOfferSummaries.stream().filter(isCurrent).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final long tooOldTimestamp = NTP.getTime() - 24 * 60 * 60 * 1000L;
|
||||||
|
synchronized (historicSummaries) {
|
||||||
|
// Add any REDEEMED/REFUNDED/CANCELLED
|
||||||
|
historicSummaries.addAll(crossChainOfferSummaries.stream().filter(isHistoric).collect(Collectors.toList()));
|
||||||
|
|
||||||
|
// But also remove any that are over 24 hours old
|
||||||
|
historicSummaries.removeIf(offerSummary -> offerSummary.getTimestamp() < tooOldTimestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify sessions
|
||||||
|
for (Session session : getSessions())
|
||||||
|
sendOfferSummaries(session, crossChainOfferSummaries);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnWebSocketConnect
|
||||||
|
@Override
|
||||||
|
public void onWebSocketConnect(Session session) {
|
||||||
|
Map<String, List<String>> queryParams = session.getUpgradeRequest().getParameterMap();
|
||||||
|
final boolean includeHistoric = queryParams.get("includeHistoric") != null;
|
||||||
|
|
||||||
|
List<CrossChainOfferSummary> crossChainOfferSummaries;
|
||||||
|
|
||||||
|
synchronized (currentSummaries) {
|
||||||
|
crossChainOfferSummaries = new ArrayList<>(currentSummaries);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (includeHistoric)
|
||||||
|
synchronized (historicSummaries) {
|
||||||
|
crossChainOfferSummaries.addAll(historicSummaries);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!sendOfferSummaries(session, crossChainOfferSummaries)) {
|
||||||
|
session.close(4002, "websocket issue");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
super.onWebSocketConnect(session);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnWebSocketClose
|
||||||
|
@Override
|
||||||
|
public void onWebSocketClose(Session session, int statusCode, String reason) {
|
||||||
|
super.onWebSocketClose(session, statusCode, reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnWebSocketMessage
|
||||||
|
public void onWebSocketMessage(Session session, String message) {
|
||||||
|
/* ignored */
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean sendOfferSummaries(Session session, List<CrossChainOfferSummary> crossChainOfferSummaries) {
|
private boolean sendOfferSummaries(Session session, List<CrossChainOfferSummary> crossChainOfferSummaries) {
|
||||||
@ -186,6 +177,68 @@ public class TradeOffersWebSocket extends ApiWebSocket {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void populateCurrentSummaries(Repository repository) throws DataException {
|
||||||
|
// We want ALL OFFERING trades
|
||||||
|
Boolean isFinished = Boolean.FALSE;
|
||||||
|
Integer dataByteOffset = BTCACCT.MODE_BYTE_OFFSET;
|
||||||
|
Long expectedValue = (long) BTCACCT.Mode.OFFERING.value;
|
||||||
|
Integer minimumFinalHeight = null;
|
||||||
|
|
||||||
|
List<ATStateData> initialAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
|
||||||
|
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
|
||||||
|
null, null, null);
|
||||||
|
|
||||||
|
if (initialAtStates == null)
|
||||||
|
throw new DataException("Couldn't fetch current trades from repository");
|
||||||
|
|
||||||
|
// Save initial AT modes
|
||||||
|
previousAtModes.putAll(initialAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> BTCACCT.Mode.OFFERING)));
|
||||||
|
|
||||||
|
// Convert to offer summaries
|
||||||
|
currentSummaries.addAll(produceSummaries(repository, initialAtStates, null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void populateHistoricSummaries(Repository repository) throws DataException {
|
||||||
|
// We want REDEEMED/REFUNDED/CANCELLED trades over the last 24 hours
|
||||||
|
long timestamp = System.currentTimeMillis() - 24 * 60 * 60 * 1000L;
|
||||||
|
int minimumFinalHeight = repository.getBlockRepository().getHeightFromTimestamp(timestamp);
|
||||||
|
|
||||||
|
if (minimumFinalHeight == 0)
|
||||||
|
throw new DataException("Couldn't fetch block timestamp from repository");
|
||||||
|
|
||||||
|
Boolean isFinished = Boolean.TRUE;
|
||||||
|
Integer dataByteOffset = null;
|
||||||
|
Long expectedValue = null;
|
||||||
|
++minimumFinalHeight; // because height is just *before* timestamp
|
||||||
|
|
||||||
|
List<ATStateData> historicAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
|
||||||
|
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
|
||||||
|
null, null, null);
|
||||||
|
|
||||||
|
if (historicAtStates == null)
|
||||||
|
throw new DataException("Couldn't fetch historic trades from repository");
|
||||||
|
|
||||||
|
for (ATStateData historicAtState : historicAtStates) {
|
||||||
|
CrossChainOfferSummary historicOfferSummary = produceSummary(repository, historicAtState, null);
|
||||||
|
|
||||||
|
switch (historicOfferSummary.getMode()) {
|
||||||
|
case REDEEMED:
|
||||||
|
case REFUNDED:
|
||||||
|
case CANCELLED:
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add summary to initial burst
|
||||||
|
historicSummaries.add(historicOfferSummary);
|
||||||
|
|
||||||
|
// Save initial AT mode
|
||||||
|
previousAtModes.put(historicAtState.getATAddress(), historicOfferSummary.getMode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static CrossChainOfferSummary produceSummary(Repository repository, ATStateData atState, Long timestamp) throws DataException {
|
private static CrossChainOfferSummary produceSummary(Repository repository, ATStateData atState, Long timestamp) throws DataException {
|
||||||
CrossChainTradeData crossChainTradeData = BTCACCT.populateTradeData(repository, atState);
|
CrossChainTradeData crossChainTradeData = BTCACCT.populateTradeData(repository, atState);
|
||||||
|
|
||||||
|
@ -1,61 +0,0 @@
|
|||||||
package org.qortal.controller;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.websocket.api.Session;
|
|
||||||
import org.qortal.api.model.BlockInfo;
|
|
||||||
import org.qortal.data.block.BlockData;
|
|
||||||
|
|
||||||
public class BlockNotifier {
|
|
||||||
|
|
||||||
private static BlockNotifier instance;
|
|
||||||
|
|
||||||
@FunctionalInterface
|
|
||||||
public interface Listener {
|
|
||||||
void notify(BlockInfo blockInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<Session, Listener> listenersBySession = new HashMap<>();
|
|
||||||
|
|
||||||
private BlockNotifier() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public static synchronized BlockNotifier getInstance() {
|
|
||||||
if (instance == null)
|
|
||||||
instance = new BlockNotifier();
|
|
||||||
|
|
||||||
return instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void register(Session session, Listener listener) {
|
|
||||||
synchronized (this.listenersBySession) {
|
|
||||||
this.listenersBySession.put(session, listener);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void deregister(Session session) {
|
|
||||||
synchronized (this.listenersBySession) {
|
|
||||||
this.listenersBySession.remove(session);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void onNewBlock(BlockData blockData) {
|
|
||||||
// Convert BlockData to BlockInfo
|
|
||||||
BlockInfo blockInfo = new BlockInfo(blockData);
|
|
||||||
|
|
||||||
for (Listener listener : getAllListeners())
|
|
||||||
listener.notify(blockInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Collection<Listener> getAllListeners() {
|
|
||||||
// Make a copy of listeners to both avoid concurrent modification
|
|
||||||
// and reduce synchronization time
|
|
||||||
synchronized (this.listenersBySession) {
|
|
||||||
return new ArrayList<>(this.listenersBySession.values());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -50,6 +50,8 @@ import org.qortal.data.network.PeerData;
|
|||||||
import org.qortal.data.transaction.ArbitraryTransactionData;
|
import org.qortal.data.transaction.ArbitraryTransactionData;
|
||||||
import org.qortal.data.transaction.TransactionData;
|
import org.qortal.data.transaction.TransactionData;
|
||||||
import org.qortal.data.transaction.ArbitraryTransactionData.DataType;
|
import org.qortal.data.transaction.ArbitraryTransactionData.DataType;
|
||||||
|
import org.qortal.event.Event;
|
||||||
|
import org.qortal.event.EventBus;
|
||||||
import org.qortal.data.transaction.ChatTransactionData;
|
import org.qortal.data.transaction.ChatTransactionData;
|
||||||
import org.qortal.globalization.Translator;
|
import org.qortal.globalization.Translator;
|
||||||
import org.qortal.gui.Gui;
|
import org.qortal.gui.Gui;
|
||||||
@ -629,6 +631,11 @@ public class Controller extends Thread {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class StatusChangeEvent implements Event {
|
||||||
|
public StatusChangeEvent() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void updateSysTray() {
|
private void updateSysTray() {
|
||||||
if (NTP.getTime() == null) {
|
if (NTP.getTime() == null) {
|
||||||
SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING_CLOCK"));
|
SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING_CLOCK"));
|
||||||
@ -656,7 +663,7 @@ public class Controller extends Thread {
|
|||||||
SysTray.getInstance().setToolTipText(tooltip);
|
SysTray.getInstance().setToolTipText(tooltip);
|
||||||
|
|
||||||
this.callbackExecutor.execute(() -> {
|
this.callbackExecutor.execute(() -> {
|
||||||
StatusNotifier.getInstance().onStatusChange(NTP.getTime());
|
EventBus.INSTANCE.notify(new StatusChangeEvent());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -783,6 +790,18 @@ public class Controller extends Thread {
|
|||||||
requestSysTrayUpdate = true;
|
requestSysTrayUpdate = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class NewBlockEvent implements Event {
|
||||||
|
private final BlockData blockData;
|
||||||
|
|
||||||
|
public NewBlockEvent(BlockData blockData) {
|
||||||
|
this.blockData = blockData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BlockData getBlockData() {
|
||||||
|
return this.blockData;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void onNewBlock(BlockData latestBlockData) {
|
public void onNewBlock(BlockData latestBlockData) {
|
||||||
this.setChainTip(latestBlockData);
|
this.setChainTip(latestBlockData);
|
||||||
requestSysTrayUpdate = true;
|
requestSysTrayUpdate = true;
|
||||||
@ -792,7 +811,8 @@ public class Controller extends Thread {
|
|||||||
Network network = Network.getInstance();
|
Network network = Network.getInstance();
|
||||||
network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData));
|
network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData));
|
||||||
|
|
||||||
BlockNotifier.getInstance().onNewBlock(latestBlockData);
|
// Notify listeners of new block
|
||||||
|
EventBus.INSTANCE.notify(new NewBlockEvent(latestBlockData));
|
||||||
|
|
||||||
if (this.notifyGroupMembershipChange) {
|
if (this.notifyGroupMembershipChange) {
|
||||||
this.notifyGroupMembershipChange = false;
|
this.notifyGroupMembershipChange = false;
|
||||||
|
@ -1,56 +0,0 @@
|
|||||||
package org.qortal.controller;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.websocket.api.Session;
|
|
||||||
|
|
||||||
public class StatusNotifier {
|
|
||||||
|
|
||||||
private static StatusNotifier instance;
|
|
||||||
|
|
||||||
@FunctionalInterface
|
|
||||||
public interface Listener {
|
|
||||||
void notify(long timestamp);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<Session, Listener> listenersBySession = new HashMap<>();
|
|
||||||
|
|
||||||
private StatusNotifier() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public static synchronized StatusNotifier getInstance() {
|
|
||||||
if (instance == null)
|
|
||||||
instance = new StatusNotifier();
|
|
||||||
|
|
||||||
return instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void register(Session session, Listener listener) {
|
|
||||||
synchronized (this.listenersBySession) {
|
|
||||||
this.listenersBySession.put(session, listener);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void deregister(Session session) {
|
|
||||||
synchronized (this.listenersBySession) {
|
|
||||||
this.listenersBySession.remove(session);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void onStatusChange(long now) {
|
|
||||||
for (Listener listener : getAllListeners())
|
|
||||||
listener.notify(now);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Collection<Listener> getAllListeners() {
|
|
||||||
// Make a copy of listeners to both avoid concurrent modification
|
|
||||||
// and reduce synchronization time
|
|
||||||
synchronized (this.listenersBySession) {
|
|
||||||
return new ArrayList<>(this.listenersBySession.values());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user