Merge pull request #72 from catbref/presence-v2

Presence v2
This commit is contained in:
CalDescent 2022-02-27 22:01:59 +00:00 committed by GitHub
commit 5903607363
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 978 additions and 51 deletions

View File

@ -40,13 +40,7 @@ import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.qortal.api.resource.AnnotationPostProcessor;
import org.qortal.api.resource.ApiDefinition;
import org.qortal.api.websocket.ActiveChatsWebSocket;
import org.qortal.api.websocket.AdminStatusWebSocket;
import org.qortal.api.websocket.BlocksWebSocket;
import org.qortal.api.websocket.ChatMessagesWebSocket;
import org.qortal.api.websocket.PresenceWebSocket;
import org.qortal.api.websocket.TradeBotWebSocket;
import org.qortal.api.websocket.TradeOffersWebSocket;
import org.qortal.api.websocket.*;
import org.qortal.settings.Settings;
public class ApiService {
@ -212,6 +206,9 @@ public class ApiService {
context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages");
context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers");
context.addServlet(TradeBotWebSocket.class, "/websockets/crosschain/tradebot");
context.addServlet(TradePresenceWebSocket.class, "/websockets/crosschain/tradepresence");
// Deprecated
context.addServlet(PresenceWebSocket.class, "/websockets/presence");
// Start server

View File

@ -25,6 +25,7 @@ import org.qortal.api.ApiExceptionFactory;
import org.qortal.api.Security;
import org.qortal.api.model.CrossChainCancelRequest;
import org.qortal.api.model.CrossChainTradeSummary;
import org.qortal.controller.tradebot.TradeBot;
import org.qortal.crosschain.SupportedBlockchain;
import org.qortal.crosschain.ACCT;
import org.qortal.crosschain.AcctMode;
@ -120,6 +121,8 @@ public class CrossChainResource {
crossChainTrades = crossChainTrades.subList(0, upperLimit);
}
crossChainTrades.stream().forEach(CrossChainResource::decorateTradeDataWithPresence);
return crossChainTrades;
} catch (DataException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e);
@ -151,7 +154,11 @@ public class CrossChainResource {
if (acct == null)
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_CRITERIA);
return acct.populateTradeData(repository, atData);
CrossChainTradeData crossChainTradeData = acct.populateTradeData(repository, atData);
decorateTradeDataWithPresence(crossChainTradeData);
return crossChainTradeData;
} catch (DataException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e);
}
@ -486,4 +493,7 @@ public class CrossChainResource {
}
}
private static void decorateTradeDataWithPresence(CrossChainTradeData crossChainTradeData) {
TradeBot.getInstance().decorateTradeDataWithPresence(crossChainTradeData);
}
}

View File

@ -0,0 +1,137 @@
package org.qortal.api.websocket;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.*;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.controller.Controller;
import org.qortal.controller.tradebot.TradeBot;
import org.qortal.data.network.TradePresenceData;
import org.qortal.event.Event;
import org.qortal.event.EventBus;
import org.qortal.event.Listener;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import java.io.IOException;
import java.io.StringWriter;
import java.util.*;
@WebSocket
@SuppressWarnings("serial")
public class TradePresenceWebSocket extends ApiWebSocket implements Listener {
/** Map key is public key in base58, map value is trade presence */
private static final Map<String, TradePresenceData> currentEntries = Collections.synchronizedMap(new HashMap<>());
@Override
public void configure(WebSocketServletFactory factory) {
factory.register(TradePresenceWebSocket.class);
populateCurrentInfo();
EventBus.INSTANCE.addListener(this::listen);
}
@Override
public void listen(Event event) {
// XXX - Suggest we change this to something like Synchronizer.NewChainTipEvent?
// We use NewBlockEvent as a proxy for 1-minute timer
if (!(event instanceof TradeBot.TradePresenceEvent) && !(event instanceof Controller.NewBlockEvent))
return;
removeOldEntries();
if (event instanceof Controller.NewBlockEvent)
// We only wanted a chance to cull old entries
return;
TradePresenceData tradePresence = ((TradeBot.TradePresenceEvent) event).getTradePresenceData();
boolean somethingChanged = mergePresence(tradePresence);
if (!somethingChanged)
// nothing changed
return;
List<TradePresenceData> tradePresences = Collections.singletonList(tradePresence);
// Notify sessions
for (Session session : getSessions()) {
sendTradePresences(session, tradePresences);
}
}
@OnWebSocketConnect
@Override
public void onWebSocketConnect(Session session) {
Map<String, List<String>> queryParams = session.getUpgradeRequest().getParameterMap();
List<TradePresenceData> tradePresences;
synchronized (currentEntries) {
tradePresences = List.copyOf(currentEntries.values());
}
if (!sendTradePresences(session, tradePresences)) {
session.close(4002, "websocket issue");
return;
}
super.onWebSocketConnect(session);
}
@OnWebSocketClose
@Override
public void onWebSocketClose(Session session, int statusCode, String reason) {
// clean up
super.onWebSocketClose(session, statusCode, reason);
}
@OnWebSocketError
public void onWebSocketError(Session session, Throwable throwable) {
/* ignored */
}
@OnWebSocketMessage
public void onWebSocketMessage(Session session, String message) {
/* ignored */
}
private boolean sendTradePresences(Session session, List<TradePresenceData> tradePresences) {
try {
StringWriter stringWriter = new StringWriter();
marshall(stringWriter, tradePresences);
String output = stringWriter.toString();
session.getRemote().sendStringByFuture(output);
} catch (IOException e) {
// No output this time?
return false;
}
return true;
}
private static void populateCurrentInfo() {
// We want ALL trade presences
TradeBot.getInstance().getAllTradePresences().stream()
.forEach(TradePresenceWebSocket::mergePresence);
}
/** Merge trade presence into cache of current entries, returns true if cache was updated. */
private static boolean mergePresence(TradePresenceData tradePresence) {
// Put/replace for this publickey making sure we keep newest timestamp
String pubKey58 = Base58.encode(tradePresence.getPublicKey());
TradePresenceData newEntry = currentEntries.compute(pubKey58, (k, v) -> v == null || v.getTimestamp() < tradePresence.getTimestamp() ? tradePresence : v);
return newEntry == tradePresence;
}
private static void removeOldEntries() {
long now = NTP.getTime();
currentEntries.values().removeIf(v -> v.getTimestamp() < now);
}
}

View File

@ -1312,6 +1312,14 @@ public class Controller extends Thread {
ArbitraryDataManager.getInstance().onNetworkArbitrarySignaturesMessage(peer, message);
break;
case GET_TRADE_PRESENCES:
TradeBot.getInstance().onGetTradePresencesMessage(peer, message);
break;
case TRADE_PRESENCES:
TradeBot.getInstance().onTradePresencesMessage(peer, message);
break;
default:
LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));
break;

View File

@ -2,16 +2,11 @@ package org.qortal.controller.tradebot;
import java.awt.TrayIcon.MessageType;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.bitcoinj.core.ECKey;
import org.qortal.account.PrivateKeyAccount;
import org.qortal.api.model.crosschain.TradeBotCreateRequest;
@ -19,25 +14,26 @@ import org.qortal.controller.Controller;
import org.qortal.controller.Synchronizer;
import org.qortal.controller.tradebot.AcctTradeBot.ResponseResult;
import org.qortal.crosschain.*;
import org.qortal.crypto.Crypto;
import org.qortal.data.at.ATData;
import org.qortal.data.crosschain.CrossChainTradeData;
import org.qortal.data.crosschain.TradeBotData;
import org.qortal.data.transaction.BaseTransactionData;
import org.qortal.data.transaction.PresenceTransactionData;
import org.qortal.data.network.TradePresenceData;
import org.qortal.event.Event;
import org.qortal.event.EventBus;
import org.qortal.event.Listener;
import org.qortal.group.Group;
import org.qortal.gui.SysTray;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.message.GetTradePresencesMessage;
import org.qortal.network.message.Message;
import org.qortal.network.message.TradePresencesMessage;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.repository.hsqldb.HSQLDBImportExport;
import org.qortal.settings.Settings;
import org.qortal.transaction.PresenceTransaction;
import org.qortal.transaction.PresenceTransaction.PresenceType;
import org.qortal.transaction.Transaction.ValidationResult;
import org.qortal.transform.transaction.TransactionTransformer;
import org.qortal.utils.ByteArray;
import org.qortal.utils.NTP;
import com.google.common.primitives.Longs;
@ -57,6 +53,15 @@ public class TradeBot implements Listener {
private static final Logger LOGGER = LogManager.getLogger(TradeBot.class);
private static final Random RANDOM = new SecureRandom();
/** Maximum lifetime of trade presence timestamp. 30 mins in ms. */
private static final long PRESENCE_LIFETIME = 30 * 60 * 1000L;
/** How soon before expiry of our own trade presence timestamp that we want to trigger renewal. 5 mins in ms. */
private static final long EARLY_RENEWAL_PERIOD = 5 * 60 * 1000L;
/** Trade presence timestamps are rounded up to this nearest interval. Bigger values improve grouping of entries in [GET_]TRADE_PRESENCES network messages. 15 mins in ms. */
private static final long EXPIRY_ROUNDING = 15 * 60 * 1000L;
/** How often we want to broadcast our list of all known trade presences to peers. 5 mins in ms. */
private static final long PRESENCE_BROADCAST_INTERVAL = 5 * 60 * 1000L;
public interface StateNameAndValueSupplier {
public String getState();
public int getStateValue();
@ -74,6 +79,18 @@ public class TradeBot implements Listener {
}
}
public static class TradePresenceEvent implements Event {
private final TradePresenceData tradePresenceData;
public TradePresenceEvent(TradePresenceData tradePresenceData) {
this.tradePresenceData = tradePresenceData;
}
public TradePresenceData getTradePresenceData() {
return this.tradePresenceData;
}
}
private static final Map<Class<? extends ACCT>, Supplier<AcctTradeBot>> acctTradeBotSuppliers = new HashMap<>();
static {
acctTradeBotSuppliers.put(BitcoinACCTv1.class, BitcoinACCTv1TradeBot::getInstance);
@ -87,7 +104,12 @@ public class TradeBot implements Listener {
private static TradeBot instance;
private final Map<String, Long> presenceTimestampsByAtAddress = Collections.synchronizedMap(new HashMap<>());
private final Map<ByteArray, Long> ourTradePresenceTimestampsByPubkey = Collections.synchronizedMap(new HashMap<>());
private final List<TradePresenceData> pendingTradePresences = Collections.synchronizedList(new ArrayList<>());
private final Map<ByteArray, TradePresenceData> allTradePresencesByPubkey = Collections.synchronizedMap(new HashMap<>());
private Map<ByteArray, TradePresenceData> safeAllTradePresencesByPubkey = Collections.emptyMap();
private long nextTradePresenceBroadcastTimestamp = 0L;
private TradeBot() {
EventBus.INSTANCE.addListener(event -> TradeBot.getInstance().listen(event));
@ -218,6 +240,8 @@ public class TradeBot implements Listener {
return;
synchronized (this) {
expireOldPresenceTimestamps();
List<TradeBotData> allTradeBotData;
try (final Repository repository = RepositoryManager.getRepository()) {
@ -248,6 +272,8 @@ public class TradeBot implements Listener {
} catch (ForeignBlockchainException e) {
LOGGER.warn(() -> String.format("Foreign blockchain issue processing trade-bot entry for AT %s: %s", tradeBotData.getAtAddress(), e.getMessage()));
}
broadcastPresenceTimestamps();
}
}
@ -325,6 +351,33 @@ public class TradeBot implements Listener {
}
// PRESENCE-related
public Collection<TradePresenceData> getAllTradePresences() {
return this.safeAllTradePresencesByPubkey.values();
}
/** Trade presence timestamps expire in the 'future' so any that reach 'now' have expired and are removed. */
private void expireOldPresenceTimestamps() {
long now = NTP.getTime();
int allRemovedCount = 0;
synchronized (this.allTradePresencesByPubkey) {
int preRemoveCount = this.allTradePresencesByPubkey.size();
this.allTradePresencesByPubkey.values().removeIf(tradePresenceData -> tradePresenceData.getTimestamp() <= now);
allRemovedCount = this.allTradePresencesByPubkey.size() - preRemoveCount;
}
int ourRemovedCount = 0;
synchronized (this.ourTradePresenceTimestampsByPubkey) {
int preRemoveCount = this.ourTradePresenceTimestampsByPubkey.size();
this.ourTradePresenceTimestampsByPubkey.values().removeIf(timestamp -> timestamp < now);
ourRemovedCount = this.ourTradePresenceTimestampsByPubkey.size() - preRemoveCount;
}
if (allRemovedCount > 0)
LOGGER.debug("Removed {} expired trade presences, of which {} ours", allRemovedCount, ourRemovedCount);
}
/*package*/ void updatePresence(Repository repository, TradeBotData tradeBotData, CrossChainTradeData tradeData)
throws DataException {
String atAddress = tradeBotData.getAtAddress();
@ -333,44 +386,292 @@ public class TradeBot implements Listener {
String signerAddress = tradeNativeAccount.getAddress();
/*
* There's no point in Alice trying to build a PRESENCE transaction
* for an AT that isn't locked to her, as other peers won't be able
* to validate the PRESENCE transaction as signing public key won't
* be visible.
* There's no point in Alice trying to broadcast presence for an AT that isn't locked to her,
* as other peers won't be able to verify as signing public key isn't yet in the AT's data segment.
*/
if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress))
// Signer is neither Bob, nor Alice, or trade not yet locked to Alice
if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) {
// Signer is neither Bob, nor trade locked to Alice
LOGGER.trace("Can't provide trade presence for our AT {} as it's not yet locked to Alice", atAddress);
return;
}
long now = NTP.getTime();
long threshold = now - PresenceType.TRADE_BOT.getLifetime();
long newExpiry = generateExpiry(now);
ByteArray pubkeyByteArray = ByteArray.of(tradeNativeAccount.getPublicKey());
long timestamp = presenceTimestampsByAtAddress.compute(atAddress, (k, v) -> (v == null || v < threshold) ? now : v);
// If map entry's timestamp is missing, or within early renewal period, use the new expiry - otherwise use existing timestamp.
synchronized (this.ourTradePresenceTimestampsByPubkey) {
Long currentTimestamp = this.ourTradePresenceTimestampsByPubkey.get(pubkeyByteArray);
// If timestamp hasn't been updated then nothing to do
if (timestamp != now)
if (currentTimestamp != null && currentTimestamp - now > EARLY_RENEWAL_PERIOD) {
// timestamp still good
LOGGER.trace("Current trade presence timestamp {} still good for our trade {}", currentTimestamp, atAddress);
return;
}
this.ourTradePresenceTimestampsByPubkey.put(pubkeyByteArray, newExpiry);
}
// Create signature
byte[] signature = tradeNativeAccount.sign(Longs.toByteArray(newExpiry));
// Add new trade presence to queue to be broadcast around network
TradePresenceData tradePresenceData = new TradePresenceData(newExpiry, tradeNativeAccount.getPublicKey(), signature, atAddress);
this.pendingTradePresences.add(tradePresenceData);
this.allTradePresencesByPubkey.put(pubkeyByteArray, tradePresenceData);
rebuildSafeAllTradePresences();
LOGGER.trace("New trade presence timestamp {} for our trade {}", newExpiry, atAddress);
EventBus.INSTANCE.notify(new TradePresenceEvent(tradePresenceData));
}
private void rebuildSafeAllTradePresences() {
synchronized (this.allTradePresencesByPubkey) {
// Collect into a *new* unmodifiable map.
this.safeAllTradePresencesByPubkey = Map.copyOf(this.allTradePresencesByPubkey);
}
}
private void broadcastPresenceTimestamps() {
// If we have new trade presences that are pending broadcast, send those as a priority
if (!this.pendingTradePresences.isEmpty()) {
// Create a copy for Network to safely use in another thread
List<TradePresenceData> safeTradePresences;
synchronized (this.pendingTradePresences) {
safeTradePresences = List.copyOf(this.pendingTradePresences);
this.pendingTradePresences.clear();
}
LOGGER.debug("Broadcasting {} new trade presences", safeTradePresences.size());
TradePresencesMessage tradePresencesMessage = new TradePresencesMessage(safeTradePresences);
Network.getInstance().broadcast(peer -> tradePresencesMessage);
return;
}
// As we have no new trade presences, check whether it's time to do a general broadcast
Long now = NTP.getTime();
if (now == null || now < nextTradePresenceBroadcastTimestamp)
return;
int txGroupId = Group.NO_GROUP;
byte[] reference = new byte[TransactionTransformer.SIGNATURE_LENGTH];
byte[] creatorPublicKey = tradeNativeAccount.getPublicKey();
long fee = 0L;
nextTradePresenceBroadcastTimestamp = now + PRESENCE_BROADCAST_INTERVAL;
BaseTransactionData baseTransactionData = new BaseTransactionData(timestamp, txGroupId, reference, creatorPublicKey, fee, null);
List<TradePresenceData> safeTradePresences = List.copyOf(this.safeAllTradePresencesByPubkey.values());
int nonce = 0;
byte[] timestampSignature = tradeNativeAccount.sign(Longs.toByteArray(timestamp));
if (safeTradePresences.isEmpty())
return;
PresenceTransactionData transactionData = new PresenceTransactionData(baseTransactionData, nonce, PresenceType.TRADE_BOT, timestampSignature);
LOGGER.debug("Broadcasting all {} known trade presences. Next broadcast timestamp: {}",
safeTradePresences.size(), nextTradePresenceBroadcastTimestamp
);
PresenceTransaction presenceTransaction = new PresenceTransaction(repository, transactionData);
presenceTransaction.computeNonce();
GetTradePresencesMessage getTradePresencesMessage = new GetTradePresencesMessage(safeTradePresences);
Network.getInstance().broadcast(peer -> getTradePresencesMessage);
}
presenceTransaction.sign(tradeNativeAccount);
// Network message processing
ValidationResult result = presenceTransaction.importAsUnconfirmed();
if (result != ValidationResult.OK)
LOGGER.debug(() -> String.format("Unable to build trade-bot PRESENCE transaction for %s: %s", tradeBotData.getAtAddress(), result.name()));
public void onGetTradePresencesMessage(Peer peer, Message message) {
GetTradePresencesMessage getTradePresencesMessage = (GetTradePresencesMessage) message;
List<TradePresenceData> peersTradePresences = getTradePresencesMessage.getTradePresences();
// Create mutable copy from safe snapshot
Map<ByteArray, TradePresenceData> entriesUnknownToPeer = new HashMap<>(this.safeAllTradePresencesByPubkey);
int knownCount = entriesUnknownToPeer.size();
for (TradePresenceData peersTradePresence : peersTradePresences) {
ByteArray pubkeyByteArray = ByteArray.of(peersTradePresence.getPublicKey());
TradePresenceData ourEntry = entriesUnknownToPeer.get(pubkeyByteArray);
if (ourEntry != null && ourEntry.getTimestamp() == peersTradePresence.getTimestamp())
entriesUnknownToPeer.remove(pubkeyByteArray);
}
if (entriesUnknownToPeer.isEmpty())
return;
LOGGER.debug("Sending {} trade presences to peer {} after excluding their {} from known {}",
entriesUnknownToPeer.size(), peer, peersTradePresences.size(), knownCount
);
// Send complement to peer
List<TradePresenceData> safeTradePresences = List.copyOf(entriesUnknownToPeer.values());
Message responseMessage = new TradePresencesMessage(safeTradePresences);
if (!peer.sendMessage(responseMessage)) {
peer.disconnect("failed to send TRADE_PRESENCES response");
return;
}
}
public void onTradePresencesMessage(Peer peer, Message message) {
TradePresencesMessage tradePresencesMessage = (TradePresencesMessage) message;
List<TradePresenceData> peersTradePresences = tradePresencesMessage.getTradePresences();
long now = NTP.getTime();
// Timestamps before this are too far into the past
long pastThreshold = now;
// Timestamps after this are too far into the future
long futureThreshold = now + PRESENCE_LIFETIME;
Map<ByteArray, Supplier<ACCT>> acctSuppliersByCodeHash = SupportedBlockchain.getAcctMap();
int newCount = 0;
try (final Repository repository = RepositoryManager.getRepository()) {
for (TradePresenceData peersTradePresence : peersTradePresences) {
long timestamp = peersTradePresence.getTimestamp();
// Ignore if timestamp is out of bounds
if (timestamp < pastThreshold || timestamp > futureThreshold) {
if (timestamp < pastThreshold)
LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too old vs {}",
peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold
);
else
LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too new vs {}",
peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold
);
continue;
}
ByteArray pubkeyByteArray = ByteArray.of(peersTradePresence.getPublicKey());
// Ignore if we've previously verified this timestamp+publickey combo or sent timestamp is older
TradePresenceData existingTradeData = this.safeAllTradePresencesByPubkey.get(pubkeyByteArray);
if (existingTradeData != null && timestamp <= existingTradeData.getTimestamp()) {
if (timestamp == existingTradeData.getTimestamp())
LOGGER.trace("Ignoring trade presence {} from peer {} as we have verified timestamp {} before",
peersTradePresence.getAtAddress(), peer, timestamp
);
else
LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is older than latest {}",
peersTradePresence.getAtAddress(), peer, timestamp, existingTradeData.getTimestamp()
);
continue;
}
// Check timestamp signature
byte[] timestampSignature = peersTradePresence.getSignature();
byte[] timestampBytes = Longs.toByteArray(timestamp);
byte[] publicKey = peersTradePresence.getPublicKey();
if (!Crypto.verify(publicKey, timestampSignature, timestampBytes)) {
LOGGER.trace("Ignoring trade presence {} from peer {} as signature failed to verify",
peersTradePresence.getAtAddress(), peer
);
continue;
}
ATData atData = repository.getATRepository().fromATAddress(peersTradePresence.getAtAddress());
if (atData == null || atData.getIsFrozen() || atData.getIsFinished()) {
if (atData == null)
LOGGER.trace("Ignoring trade presence {} from peer {} as AT doesn't exist",
peersTradePresence.getAtAddress(), peer
);
else
LOGGER.trace("Ignoring trade presence {} from peer {} as AT is frozen or finished",
peersTradePresence.getAtAddress(), peer
);
continue;
}
ByteArray atCodeHash = new ByteArray(atData.getCodeHash());
Supplier<ACCT> acctSupplier = acctSuppliersByCodeHash.get(atCodeHash);
if (acctSupplier == null) {
LOGGER.trace("Ignoring trade presence {} from peer {} as AT isn't a known ACCT?",
peersTradePresence.getAtAddress(), peer
);
continue;
}
CrossChainTradeData tradeData = acctSupplier.get().populateTradeData(repository, atData);
if (tradeData == null) {
LOGGER.trace("Ignoring trade presence {} from peer {} as trade data not found?",
peersTradePresence.getAtAddress(), peer
);
continue;
}
// Convert signer's public key to address form
String signerAddress = peersTradePresence.getTradeAddress();
// Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form)
if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) {
LOGGER.trace("Ignoring trade presence {} from peer {} as signer isn't Alice or Bob?",
peersTradePresence.getAtAddress(), peer
);
continue;
}
// This is new to us
this.allTradePresencesByPubkey.put(pubkeyByteArray, peersTradePresence);
++newCount;
LOGGER.trace("Added trade presence {} from peer {} with timestamp {}",
peersTradePresence.getAtAddress(), peer, timestamp
);
EventBus.INSTANCE.notify(new TradePresenceEvent(peersTradePresence));
}
} catch (DataException e) {
LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e);
}
if (newCount > 0) {
LOGGER.debug("New trade presences: {}", newCount);
rebuildSafeAllTradePresences();
}
}
public void bridgePresence(long timestamp, byte[] publicKey, byte[] signature, String atAddress) {
long expiry = generateExpiry(timestamp);
ByteArray pubkeyByteArray = ByteArray.of(publicKey);
TradePresenceData fakeTradePresenceData = new TradePresenceData(expiry, publicKey, signature, atAddress);
// Only bridge if trade presence expiry timestamp is newer
TradePresenceData computedTradePresenceData = this.allTradePresencesByPubkey.compute(pubkeyByteArray, (k, v) ->
v == null || v.getTimestamp() < expiry ? fakeTradePresenceData : v
);
if (computedTradePresenceData == fakeTradePresenceData) {
LOGGER.trace("Bridged PRESENCE transaction for trade {} with timestamp {}", atAddress, expiry);
rebuildSafeAllTradePresences();
EventBus.INSTANCE.notify(new TradePresenceEvent(fakeTradePresenceData));
}
}
/** Decorates a CrossChainTradeData object with Alice / Bob trade-bot presence timestamp, if available. */
public void decorateTradeDataWithPresence(CrossChainTradeData crossChainTradeData) {
// Match by AT address, then check for Bob vs Alice
this.safeAllTradePresencesByPubkey.values().stream()
.filter(tradePresenceData -> tradePresenceData.getAtAddress().equals(crossChainTradeData.qortalAtAddress))
.forEach(tradePresenceData -> {
String signerAddress = tradePresenceData.getTradeAddress();
// Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form)
if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress))
crossChainTradeData.creatorPresenceExpiry = tradePresenceData.getTimestamp();
else if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress))
crossChainTradeData.partnerPresenceExpiry = tradePresenceData.getTimestamp();
});
}
private long generateExpiry(long timestamp) {
return ((timestamp - 1) / EXPIRY_ROUNDING) * EXPIRY_ROUNDING + PRESENCE_LIFETIME;
}
}

View File

@ -94,6 +94,12 @@ public class CrossChainTradeData {
public String acctName;
@Schema(description = "Timestamp when AT creator's trade-bot presence expires")
public Long creatorPresenceExpiry;
@Schema(description = "Timestamp when trade partner's trade-bot presence expires")
public Long partnerPresenceExpiry;
// Constructors
// Necessary for JAXB

View File

@ -0,0 +1,114 @@
package org.qortal.data.network;
import org.qortal.crypto.Crypto;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.util.Arrays;
// All properties to be converted to JSON via JAXB
@XmlAccessorType(XmlAccessType.FIELD)
public class TradePresenceData {
protected long timestamp;
@XmlJavaTypeAdapter(
type = byte[].class,
value = org.qortal.api.Base58TypeAdapter.class
)
protected byte[] publicKey; // Could be BOB's or ALICE's
// No need to send this via websocket / API
@XmlTransient
protected byte[] signature; // Not always present
protected String atAddress; // Not always present
// Have JAXB use getter instead
@XmlTransient
protected String tradeAddress; // Lazily instantiated
// Constructors
// necessary for JAXB serialization
protected TradePresenceData() {
}
public TradePresenceData(long timestamp, byte[] publicKey, byte[] signature, String atAddress) {
this.timestamp = timestamp;
this.publicKey = publicKey;
this.signature = signature;
this.atAddress = atAddress;
}
public TradePresenceData(long timestamp, byte[] publicKey) {
this(timestamp, publicKey, null, null);
}
public long getTimestamp() {
return this.timestamp;
}
public byte[] getPublicKey() {
return this.publicKey;
}
public byte[] getSignature() {
return this.signature;
}
public String getAtAddress() {
return this.atAddress;
}
// Probably doesn't need synchronization
@XmlElement
public String getTradeAddress() {
if (tradeAddress != null)
return tradeAddress;
tradeAddress = Crypto.toAddress(this.publicKey);
return tradeAddress;
}
// Comparison
@Override
public boolean equals(Object other) {
if (other == this)
return true;
if (!(other instanceof TradePresenceData))
return false;
TradePresenceData otherTradePresenceData = (TradePresenceData) other;
// Very quick comparison
if (otherTradePresenceData.timestamp != this.timestamp)
return false;
if (!Arrays.equals(otherTradePresenceData.publicKey, this.publicKey))
return false;
if (otherTradePresenceData.atAddress != null && !otherTradePresenceData.atAddress.equals(this.atAddress))
return false;
if (this.atAddress != null && !this.atAddress.equals(otherTradePresenceData.atAddress))
return false;
if (!Arrays.equals(otherTradePresenceData.signature, this.signature))
return false;
return true;
}
@Override
public int hashCode() {
// Pretty lazy implementation
return (int) this.timestamp;
}
}

View File

@ -0,0 +1,110 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.qortal.data.network.TradePresenceData;
import org.qortal.transform.Transformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* For requesting trade presences from remote peer, given our list of known trade presences.
*
* Groups of: number of entries, timestamp, then AT trade pubkey for each entry.
*/
public class GetTradePresencesMessage extends Message {
private List<TradePresenceData> tradePresences;
private byte[] cachedData;
public GetTradePresencesMessage(List<TradePresenceData> tradePresences) {
this(-1, tradePresences);
}
private GetTradePresencesMessage(int id, List<TradePresenceData> tradePresences) {
super(id, MessageType.GET_TRADE_PRESENCES);
this.tradePresences = tradePresences;
}
public List<TradePresenceData> getTradePresences() {
return this.tradePresences;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int groupedEntriesCount = bytes.getInt();
List<TradePresenceData> tradePresences = new ArrayList<>(groupedEntriesCount);
while (groupedEntriesCount > 0) {
long timestamp = bytes.getLong();
for (int i = 0; i < groupedEntriesCount; ++i) {
byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH];
bytes.get(publicKey);
tradePresences.add(new TradePresenceData(timestamp, publicKey));
}
if (bytes.hasRemaining()) {
groupedEntriesCount = bytes.getInt();
} else {
// we've finished
groupedEntriesCount = 0;
}
}
return new GetTradePresencesMessage(id, tradePresences);
}
@Override
protected synchronized byte[] toData() {
if (this.cachedData != null)
return this.cachedData;
// Shortcut in case we have no trade presences
if (this.tradePresences.isEmpty()) {
this.cachedData = Ints.toByteArray(0);
return this.cachedData;
}
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (TradePresenceData tradePresenceData : this.tradePresences) {
Long timestamp = tradePresenceData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ this.tradePresences.size() * Transformer.PUBLIC_KEY_LENGTH;
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (TradePresenceData tradePresenceData : this.tradePresences) {
if (tradePresenceData.getTimestamp() == timestamp)
bytes.write(tradePresenceData.getPublicKey());
}
}
this.cachedData = bytes.toByteArray();
return this.cachedData;
} catch (IOException e) {
return null;
}
}
}

View File

@ -93,7 +93,11 @@ public abstract class Message {
ARBITRARY_DATA_FILE_LIST(120),
GET_ARBITRARY_DATA_FILE_LIST(121),
ARBITRARY_SIGNATURES(130);
ARBITRARY_SIGNATURES(130),
TRADE_PRESENCES(140),
GET_TRADE_PRESENCES(141),
;
public final int value;
public final Method fromByteBufferMethod;

View File

@ -0,0 +1,123 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.qortal.data.network.TradePresenceData;
import org.qortal.transform.Transformer;
import org.qortal.utils.Base58;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* For sending list of trade presences to remote peer.
*
* Groups of: number of entries, timestamp, then pubkey + sig + AT address for each entry.
*/
public class TradePresencesMessage extends Message {
private List<TradePresenceData> tradePresences;
private byte[] cachedData;
public TradePresencesMessage(List<TradePresenceData> tradePresences) {
this(-1, tradePresences);
}
private TradePresencesMessage(int id, List<TradePresenceData> tradePresences) {
super(id, MessageType.TRADE_PRESENCES);
this.tradePresences = tradePresences;
}
public List<TradePresenceData> getTradePresences() {
return this.tradePresences;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int groupedEntriesCount = bytes.getInt();
List<TradePresenceData> tradePresences = new ArrayList<>(groupedEntriesCount);
while (groupedEntriesCount > 0) {
long timestamp = bytes.getLong();
for (int i = 0; i < groupedEntriesCount; ++i) {
byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH];
bytes.get(publicKey);
byte[] signature = new byte[Transformer.SIGNATURE_LENGTH];
bytes.get(signature);
byte[] atAddressBytes = new byte[Transformer.ADDRESS_LENGTH];
bytes.get(atAddressBytes);
String atAddress = Base58.encode(atAddressBytes);
tradePresences.add(new TradePresenceData(timestamp, publicKey, signature, atAddress));
}
if (bytes.hasRemaining()) {
groupedEntriesCount = bytes.getInt();
} else {
// we've finished
groupedEntriesCount = 0;
}
}
return new TradePresencesMessage(id, tradePresences);
}
@Override
protected synchronized byte[] toData() {
if (this.cachedData != null)
return this.cachedData;
// Shortcut in case we have no trade presences
if (this.tradePresences.isEmpty()) {
this.cachedData = Ints.toByteArray(0);
return this.cachedData;
}
// How many of each timestamp
Map<Long, Integer> countByTimestamp = new HashMap<>();
for (TradePresenceData tradePresenceData : this.tradePresences) {
Long timestamp = tradePresenceData.getTimestamp();
countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v);
}
// We should know exactly how many bytes to allocate now
int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH)
+ this.tradePresences.size() * (Transformer.PUBLIC_KEY_LENGTH + Transformer.SIGNATURE_LENGTH + Transformer.ADDRESS_LENGTH);
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
for (long timestamp : countByTimestamp.keySet()) {
bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp)));
bytes.write(Longs.toByteArray(timestamp));
for (TradePresenceData tradePresenceData : this.tradePresences) {
if (tradePresenceData.getTimestamp() == timestamp) {
bytes.write(tradePresenceData.getPublicKey());
bytes.write(tradePresenceData.getSignature());
bytes.write(Base58.decode(tradePresenceData.getAtAddress()));
}
}
}
this.cachedData = bytes.toByteArray();
return this.cachedData;
} catch (IOException e) {
return null;
}
}
}

View File

@ -13,6 +13,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.account.Account;
import org.qortal.controller.Controller;
import org.qortal.controller.tradebot.TradeBot;
import org.qortal.crosschain.ACCT;
import org.qortal.crosschain.SupportedBlockchain;
import org.qortal.crypto.Crypto;
@ -191,13 +192,17 @@ public class PresenceTransaction extends Transaction {
CrossChainTradeData crossChainTradeData = acctSupplier.get().populateTradeData(repository, atData);
// OK if signer's public key (in address form) matches Bob's trade public key (in address form)
if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress))
if (signerAddress.equals(crossChainTradeData.qortalCreatorTradeAddress)) {
TradeBot.getInstance().bridgePresence(this.presenceTransactionData.getTimestamp(), this.transactionData.getCreatorPublicKey(), timestampSignature, atData.getATAddress());
return ValidationResult.OK;
}
// OK if signer's public key (in address form) matches Alice's trade public key (in address form)
if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress))
if (signerAddress.equals(crossChainTradeData.qortalPartnerAddress)) {
TradeBot.getInstance().bridgePresence(this.presenceTransactionData.getTimestamp(), this.transactionData.getCreatorPublicKey(), timestampSignature, atData.getATAddress());
return ValidationResult.OK;
}
}
return ValidationResult.AT_UNKNOWN;
}

View File

@ -0,0 +1,112 @@
package org.qortal.test.crosschain;
import org.junit.Test;
import org.qortal.utils.ByteArray;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class TradeBotPresenceTests {
public static final long ROUNDING = 15 * 60 * 1000L; // to nearest X mins
public static final long LIFETIME = 30 * 60 * 1000L; // lifetime: X mins
public static final long EARLY_RENEWAL_LIFETIME = 5 * 60 * 1000L; // X mins before expiry
public static final long CHECK_INTERVAL = 5 * 60 * 1000L; // X mins
public static final long MAX_TIMESTAMP = 100 * 60 * 1000L; // run tests for X mins
// We want to generate timestamps that expire 30 mins into the future, but also round to nearest X min?
// We want to regenerate timestamps early (e.g. 15 mins before expiry) to allow for network propagation
// We want to keep the latest timestamp for any given public key
// We want to reject out-of-bound timestamps from peers (>30 mins into future, not now/past)
// We want to make sure that we don't incorrectly delete an entry at 15-min and 30-min boundaries
@Test
public void testGeneratedExpiryTimestamps() {
for (long timestamp = 0; timestamp <= MAX_TIMESTAMP; timestamp += CHECK_INTERVAL) {
long expiry = generateExpiry(timestamp);
System.out.println(String.format("time: % 3dm, expiry: % 3dm",
timestamp / 60_000L,
expiry / 60_000L
));
}
}
@Test
public void testEarlyRenewal() {
Long currentExpiry = null;
for (long timestamp = 0; timestamp <= MAX_TIMESTAMP; timestamp += CHECK_INTERVAL) {
long newExpiry = generateExpiry(timestamp);
if (currentExpiry == null || currentExpiry - timestamp <= EARLY_RENEWAL_LIFETIME) {
currentExpiry = newExpiry;
}
System.out.println(String.format("time: % 3dm, expiry: % 3dm",
timestamp / 60_000L,
currentExpiry / 60_000L
));
}
}
@Test
public void testEnforceLatestTimestamp() {
ByteArray pubkeyByteArray = ByteArray.of("publickey".getBytes(StandardCharsets.UTF_8));
Map<ByteArray, Long> timestampsByPublicKey = new HashMap<>();
// Working backwards this time
for (long timestamp = MAX_TIMESTAMP; timestamp >= 0; timestamp -= CHECK_INTERVAL){
long newExpiry = generateExpiry(timestamp);
timestampsByPublicKey.compute(pubkeyByteArray, (k, v) ->
v == null || v < newExpiry ? newExpiry : v
);
Long currentExpiry = timestampsByPublicKey.get(pubkeyByteArray);
System.out.println(String.format("time: % 3dm, expiry: % 3dm",
timestamp / 60_000L,
currentExpiry / 60_000L
));
}
}
@Test
public void testEnforcePeerExpiryBounds() {
System.out.println(String.format("%40s", "Our time"));
for (long ourTimestamp = 0; ourTimestamp <= MAX_TIMESTAMP; ourTimestamp += CHECK_INTERVAL) {
System.out.print(String.format("%s% 3dm ",
ourTimestamp != 0 ? "| " : " ",
ourTimestamp / 60_000L
));
}
System.out.println();
for (long peerTimestamp = 0; peerTimestamp <= MAX_TIMESTAMP; peerTimestamp += CHECK_INTERVAL) {
System.out.print(String.format("% 4dm ", peerTimestamp / 60_000L));
for (long ourTimestamp = 0; ourTimestamp <= MAX_TIMESTAMP; ourTimestamp += CHECK_INTERVAL) {
System.out.print(String.format("| %s ",
isPeerExpiryValid(ourTimestamp, peerTimestamp) ? "" : ""
));
}
System.out.println();
}
System.out.println("Peer's expiry time");
}
private long generateExpiry(long timestamp) {
return ((timestamp - 1) / ROUNDING) * ROUNDING + LIFETIME;
}
private boolean isPeerExpiryValid(long nowTimestamp, long peerExpiry) {
return peerExpiry > nowTimestamp && peerExpiry <= LIFETIME + nowTimestamp;
}
}