OnlineAccountsV3:

Adding support for GET_ONLINE_ACCOUNTS_V3 to Controller, which calls OnlineAccountsManager.

With OnlineAccountsV3, instead of nodes sending their list of known online accounts (public keys),
nodes now send a summary which contains hashes of known online accounts, one per timestamp + leading-byte combo.
Thus outgoing messages are much smaller and scale better with more users.
Remote peers compare the hashes and send back lists of online accounts (for that timestamp + leading-byte combo) where hashes do not match.

Massive rewrite of OnlineAccountsManager to maintain online accounts.
Now there are three caches:
1. all online accounts, but split into sets by timestamp
2. 'hashes' of all online accounts, one hash per timestamp+leading-byte combination
Mainly for efficient use by GetOnlineAccountsV3 message constructor.
3. online accounts for the highest blocks on our chain to speed up block processing
Note that highest blocks might be way older than 'current' blocks if we're somewhat behind in syncing.

Other OnlineAccountsManager changes:
* Use scheduling executor service to manage subtasks
* Switch from 'synchronized' to 'concurrent' collections
* Generally switch from Lists to Sets - requires improved OnlineAccountData.hashCode() - further work needed
* Only send V3 messages to peers with version >= 3.2.203 (for testing)
* More info on which online accounts lists are returned depending on use-cases

To test, change your peer's version (in pom.xml?) to v3.2.203.
This commit is contained in:
catbref 2022-04-30 17:17:55 +01:00
parent f2060fe7a1
commit fbdc1e1cdb
5 changed files with 447 additions and 340 deletions

View File

@ -1023,8 +1023,8 @@ public class Block {
// If this block is much older than current online timestamp, then there's no point checking current online accounts
List<OnlineAccountData> currentOnlineAccounts = onlineTimestamp < NTP.getTime() - OnlineAccountsManager.ONLINE_TIMESTAMP_MODULUS
? null
: OnlineAccountsManager.getInstance().getOnlineAccounts();
List<OnlineAccountData> latestBlocksOnlineAccounts = OnlineAccountsManager.getInstance().getLatestBlocksOnlineAccounts();
: OnlineAccountsManager.getInstance().getOnlineAccounts(onlineTimestamp);
List<OnlineAccountData> latestBlocksOnlineAccounts = OnlineAccountsManager.getInstance().getLatestBlocksOnlineAccounts(onlineTimestamp);
// Extract online accounts' timestamp signatures from block data
List<byte[]> onlineAccountsSignatures = BlockTransformer.decodeTimestampSignatures(this.blockData.getOnlineAccountsSignatures());

View File

@ -1229,6 +1229,10 @@ public class Controller extends Thread {
OnlineAccountsManager.getInstance().onNetworkOnlineAccountsV2Message(peer, message);
break;
case GET_ONLINE_ACCOUNTS_V3:
OnlineAccountsManager.getInstance().onNetworkGetOnlineAccountsV3Message(peer, message);
break;
case GET_ARBITRARY_DATA:
// Not currently supported
break;

View File

@ -1,12 +1,13 @@
package org.qortal.controller;
import com.google.common.hash.HashCode;
import com.google.common.primitives.Longs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.account.Account;
import org.qortal.account.PrivateKeyAccount;
import org.qortal.account.PublicKeyAccount;
import org.qortal.block.BlockChain;
import org.qortal.crypto.Crypto;
import org.qortal.data.account.MintingAccountData;
import org.qortal.data.account.RewardShareData;
import org.qortal.data.network.OnlineAccountData;
@ -18,212 +19,101 @@ import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import org.qortal.utils.NamedThreadFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class OnlineAccountsManager extends Thread {
private class OurOnlineAccountsThread extends Thread {
public void run() {
try {
while (!isStopping) {
Thread.sleep(10000L);
// Refresh our online accounts signatures?
sendOurOnlineAccountsInfo();
}
} catch (InterruptedException e) {
// Fall through to exit thread
}
}
}
public class OnlineAccountsManager {
private static final Logger LOGGER = LogManager.getLogger(OnlineAccountsManager.class);
private static OnlineAccountsManager instance;
// 'Current' as in 'now'
/**
* How long online accounts signatures last before they expire.
*/
public static final long ONLINE_TIMESTAMP_MODULUS = 5 * 60 * 1000L;
/**
* How many 'current' timestamp-sets of online accounts we cache.
*/
private static final int MAX_CACHED_TIMESTAMP_SETS = 2;
/**
* How many timestamp-sets of online accounts we cache for 'latest blocks'.
*/
private static final int MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS = 3;
private static final long ONLINE_ACCOUNTS_QUEUE_INTERVAL = 100L; //ms
private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_LEGACY_BROADCAST_INTERVAL = 60 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 10 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_V2_PEER_VERSION = 0x0300020000L; // v3.2.0
private static final long ONLINE_ACCOUNTS_V3_PEER_VERSION = 0x03000200cbL; // v3.2.203
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4, new NamedThreadFactory("OnlineAccounts"));
private volatile boolean isStopping = false;
// To do with online accounts list
private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 1 * 60 * 1000L; // ms
public static final long ONLINE_TIMESTAMP_MODULUS = 5 * 60 * 1000L;
private static final long LAST_SEEN_EXPIRY_PERIOD = (ONLINE_TIMESTAMP_MODULUS * 2) + (1 * 60 * 1000L);
/** How many (latest) blocks' worth of online accounts we cache */
private static final int MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS = 2;
private static final long ONLINE_ACCOUNTS_V2_PEER_VERSION = 0x0300020000L;
private final Set<OnlineAccountData> onlineAccountsImportQueue = ConcurrentHashMap.newKeySet();
private long onlineAccountsTasksTimestamp = Controller.startTime + ONLINE_ACCOUNTS_TASKS_INTERVAL; // ms
/**
* Cache of 'current' online accounts, keyed by timestamp
*/
private final Map<Long, Set<OnlineAccountData>> currentOnlineAccounts = new ConcurrentHashMap<>();
/**
* Cache of hash-summary of 'current' online accounts, keyed by timestamp, then leading byte of public key.
* <p>
* Inner map is also sorted using {@code Byte::compareUnsigned} as a comparator.
* This is critical for proper function of GET_ONLINE_ACCOUNTS_V3 protocol.
*/
private final Map<Long, Map<Byte, byte[]>> currentOnlineAccountsHashes = new ConcurrentHashMap<>();
private final List<OnlineAccountData> onlineAccountsImportQueue = Collections.synchronizedList(new ArrayList<>());
/**
* Cache of online accounts for latest blocks - not necessarily 'current' / now.
* Probably only accessed / modified by a single Synchronizer thread.
*/
private final Map<Long, Set<OnlineAccountData>> latestBlocksOnlineAccounts = new ConcurrentHashMap<>();
/** Cache of current 'online accounts' */
List<OnlineAccountData> onlineAccounts = new ArrayList<>();
/** Cache of latest blocks' online accounts */
Deque<List<OnlineAccountData>> latestBlocksOnlineAccounts = new ArrayDeque<>(MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS);
public OnlineAccountsManager() {
// TODO: make private, add these tasks to scheduled executor:
// send our online accounts every 10s
// expireOnlineAccounts every ONLINE_ACCOUNTS_CHECK_INTERVAL
// broadcastOnlineAccountsQuery every ONLINE_ACCOUNTS_BROADCAST_INTERVAL
// processOnlineAccountsImportQueue every 100ms?
public static long toOnlineAccountTimestamp(long timestamp) {
return (timestamp / ONLINE_TIMESTAMP_MODULUS) * ONLINE_TIMESTAMP_MODULUS;
}
// TODO: convert to SingletonContainer a-la Network
public static synchronized OnlineAccountsManager getInstance() {
if (instance == null) {
instance = new OnlineAccountsManager();
}
return instance;
private OnlineAccountsManager() {
}
// TODO: see constructor for more info
public void run() {
private static class SingletonContainer {
private static final OnlineAccountsManager INSTANCE = new OnlineAccountsManager();
}
// Start separate thread to prepare our online accounts
// This could be converted to a thread pool later if more concurrency is needed
OurOnlineAccountsThread ourOnlineAccountsThread = new OurOnlineAccountsThread();
ourOnlineAccountsThread.start();
public static OnlineAccountsManager getInstance() {
return SingletonContainer.INSTANCE;
}
try {
while (!Controller.isStopping()) {
Thread.sleep(100L);
public void start() {
// Expire old online accounts signatures
executor.scheduleAtFixedRate(this::expireOldOnlineAccounts, ONLINE_ACCOUNTS_TASKS_INTERVAL, ONLINE_ACCOUNTS_TASKS_INTERVAL, TimeUnit.MILLISECONDS);
final Long now = NTP.getTime();
if (now == null) {
continue;
}
// Send our online accounts
executor.scheduleAtFixedRate(this::sendOurOnlineAccountsInfo, ONLINE_ACCOUNTS_BROADCAST_INTERVAL, ONLINE_ACCOUNTS_BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
// Perform tasks to do with managing online accounts list
if (now >= onlineAccountsTasksTimestamp) {
onlineAccountsTasksTimestamp = now + ONLINE_ACCOUNTS_TASKS_INTERVAL;
performOnlineAccountsTasks();
}
// Request online accounts from peers (legacy)
executor.scheduleAtFixedRate(this::requestLegacyRemoteOnlineAccounts, ONLINE_ACCOUNTS_LEGACY_BROADCAST_INTERVAL, ONLINE_ACCOUNTS_LEGACY_BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
// Request online accounts from peers (V3+)
executor.scheduleAtFixedRate(this::requestRemoteOnlineAccounts, ONLINE_ACCOUNTS_BROADCAST_INTERVAL, ONLINE_ACCOUNTS_BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
// Process queued online account verifications
this.processOnlineAccountsImportQueue();
}
} catch (InterruptedException e) {
// Fall through to exit thread
}
ourOnlineAccountsThread.interrupt();
// Process import queue
executor.scheduleWithFixedDelay(this::processOnlineAccountsImportQueue, ONLINE_ACCOUNTS_QUEUE_INTERVAL, ONLINE_ACCOUNTS_QUEUE_INTERVAL, TimeUnit.MILLISECONDS);
}
public void shutdown() {
isStopping = true;
// TODO: convert interrrupt to executor.shutdownNow();
this.interrupt();
}
// Online accounts import queue
private void processOnlineAccountsImportQueue() {
if (this.onlineAccountsImportQueue.isEmpty()) {
// Nothing to do
return;
}
LOGGER.debug("Processing online accounts import queue (size: {})", this.onlineAccountsImportQueue.size());
try (final Repository repository = RepositoryManager.getRepository()) {
List<OnlineAccountData> onlineAccountDataCopy = new ArrayList<>(this.onlineAccountsImportQueue);
for (OnlineAccountData onlineAccountData : onlineAccountDataCopy) {
if (isStopping) {
return;
}
this.verifyAndAddAccount(repository, onlineAccountData);
// Remove from queue
onlineAccountsImportQueue.remove(onlineAccountData);
}
LOGGER.debug("Finished processing online accounts import queue");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while verifying online accounts"), e);
}
}
// Utilities
// TODO: split this into validateAccount() and addAccount()
private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException {
final Long now = NTP.getTime();
if (now == null)
return;
// TODO: don't create otherAccount, instead:
// byte[] rewardSharePublicKey = onlineAccountData.getPublicKey();
PublicKeyAccount otherAccount = new PublicKeyAccount(repository, onlineAccountData.getPublicKey());
// Check timestamp is 'recent' here
if (Math.abs(onlineAccountData.getTimestamp() - now) > ONLINE_TIMESTAMP_MODULUS * 2) {
LOGGER.trace(() -> String.format("Rejecting online account %s with out of range timestamp %d", otherAccount.getAddress(), onlineAccountData.getTimestamp()));
return;
}
// Verify
byte[] data = Longs.toByteArray(onlineAccountData.getTimestamp());
// TODO: use Crypto.verify() static method directly
if (!otherAccount.verify(onlineAccountData.getSignature(), data)) {
LOGGER.trace(() -> String.format("Rejecting invalid online account %s", otherAccount.getAddress()));
return;
}
// Qortal: check online account is actually reward-share
// TODO: use "rewardSharePublicKey" from above TODO
RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(onlineAccountData.getPublicKey());
if (rewardShareData == null) {
// Reward-share doesn't even exist - probably not a good sign
LOGGER.trace(() -> String.format("Rejecting unknown online reward-share public key %s", Base58.encode(onlineAccountData.getPublicKey())));
return;
}
Account mintingAccount = new Account(repository, rewardShareData.getMinter());
if (!mintingAccount.canMint()) {
// Minting-account component of reward-share can no longer mint - disregard
LOGGER.trace(() -> String.format("Rejecting online reward-share with non-minting account %s", mintingAccount.getAddress()));
return;
}
// TODO: change this.onlineAccounts to a ConcurrentMap? Keyed by timestamp?
synchronized (this.onlineAccounts) {
OnlineAccountData existingAccountData = this.onlineAccounts.stream().filter(account -> Arrays.equals(account.getPublicKey(), onlineAccountData.getPublicKey())).findFirst().orElse(null);
if (existingAccountData != null) {
if (existingAccountData.getTimestamp() < onlineAccountData.getTimestamp()) {
this.onlineAccounts.remove(existingAccountData);
// TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58?
LOGGER.trace(() -> String.format("Updated online account %s with timestamp %d (was %d)", otherAccount.getAddress(), onlineAccountData.getTimestamp(), existingAccountData.getTimestamp()));
} else {
// TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58?
LOGGER.trace(() -> String.format("Not updating existing online account %s", otherAccount.getAddress()));
return;
}
} else {
// TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58?
LOGGER.trace(() -> String.format("Added online account %s with timestamp %d", otherAccount.getAddress(), onlineAccountData.getTimestamp()));
}
this.onlineAccounts.add(onlineAccountData);
// TODO: if we actually added a new account, then we need to rebuild our hashes-by-timestamp-then-byte for rewardSharePublicKey's leading byte also
}
executor.shutdownNow();
}
// Testing support
public void ensureTestingAccountsOnline(PrivateKeyAccount... onlineAccounts) {
if (!BlockChain.getInstance().isTestChain()) {
LOGGER.warn("Ignoring attempt to ensure test account is online for non-test chain!");
@ -237,61 +127,222 @@ public class OnlineAccountsManager extends Thread {
final long onlineAccountsTimestamp = toOnlineAccountTimestamp(now);
byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp);
// TODO: use new addAccount() method
synchronized (this.onlineAccounts) {
this.onlineAccounts.clear();
Set<OnlineAccountData> replacementAccounts = new HashSet<>();
for (PrivateKeyAccount onlineAccount : onlineAccounts) {
// Check mintingAccount is actually reward-share?
for (PrivateKeyAccount onlineAccount : onlineAccounts) {
// Check mintingAccount is actually reward-share?
byte[] signature = onlineAccount.sign(timestampBytes);
byte[] publicKey = onlineAccount.getPublicKey();
byte[] signature = onlineAccount.sign(timestampBytes);
byte[] publicKey = onlineAccount.getPublicKey();
OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey);
replacementAccounts.add(ourOnlineAccountData);
}
OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey);
this.onlineAccounts.add(ourOnlineAccountData);
this.currentOnlineAccounts.clear();
addAccounts(replacementAccounts);
}
// Online accounts import queue
private void processOnlineAccountsImportQueue() {
if (this.onlineAccountsImportQueue.isEmpty())
// Nothing to do
return;
LOGGER.debug("Processing online accounts import queue (size: {})", this.onlineAccountsImportQueue.size());
Set<OnlineAccountData> onlineAccountsToAdd = new HashSet<>();
try (final Repository repository = RepositoryManager.getRepository()) {
for (OnlineAccountData onlineAccountData : this.onlineAccountsImportQueue) {
if (isStopping)
return;
boolean isValid = this.validateAccount(repository, onlineAccountData);
if (isValid)
onlineAccountsToAdd.add(onlineAccountData);
// Remove from queue
onlineAccountsImportQueue.remove(onlineAccountData);
}
LOGGER.debug("Finished validating online accounts import queue");
} catch (DataException e) {
LOGGER.error("Repository issue while verifying online accounts", e);
}
LOGGER.debug("Adding {} validated online accounts from import queue", onlineAccountsToAdd.size());
addAccounts(onlineAccountsToAdd);
}
// Utilities
public static byte[] xorByteArrayInPlace(byte[] inplaceArray, byte[] otherArray) {
if (inplaceArray == null)
return Arrays.copyOf(otherArray, otherArray.length);
// Start from index 1 to enforce static leading byte
for (int i = 1; i < otherArray.length; i++)
// inplaceArray[i] ^= otherArray[otherArray.length - i - 1];
inplaceArray[i] ^= otherArray[i];
return inplaceArray;
}
private boolean validateAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException {
final Long now = NTP.getTime();
if (now == null)
return false;
byte[] rewardSharePublicKey = onlineAccountData.getPublicKey();
long onlineAccountTimestamp = onlineAccountData.getTimestamp();
// Check timestamp is 'recent' here
if (Math.abs(onlineAccountTimestamp - now) > ONLINE_TIMESTAMP_MODULUS * 2) {
LOGGER.trace(() -> String.format("Rejecting online account %s with out of range timestamp %d", Base58.encode(rewardSharePublicKey), onlineAccountTimestamp));
return false;
}
// Verify
byte[] data = Longs.toByteArray(onlineAccountData.getTimestamp());
if (!Crypto.verify(rewardSharePublicKey, onlineAccountData.getSignature(), data)) {
LOGGER.trace(() -> String.format("Rejecting invalid online account %s", Base58.encode(rewardSharePublicKey)));
return false;
}
// Qortal: check online account is actually reward-share
RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(rewardSharePublicKey);
if (rewardShareData == null) {
// Reward-share doesn't even exist - probably not a good sign
LOGGER.trace(() -> String.format("Rejecting unknown online reward-share public key %s", Base58.encode(rewardSharePublicKey)));
return false;
}
Account mintingAccount = new Account(repository, rewardShareData.getMinter());
if (!mintingAccount.canMint()) {
// Minting-account component of reward-share can no longer mint - disregard
LOGGER.trace(() -> String.format("Rejecting online reward-share with non-minting account %s", mintingAccount.getAddress()));
return false;
}
return true;
}
private void addAccounts(Set<OnlineAccountData> onlineAccountsToAdd) {
// For keeping track of which hashes to rebuild
Map<Long, Set<Byte>> hashesToRebuild = new HashMap<>();
for (OnlineAccountData onlineAccountData : onlineAccountsToAdd) {
boolean isNewEntry = this.addAccount(onlineAccountData);
if (isNewEntry)
hashesToRebuild.computeIfAbsent(onlineAccountData.getTimestamp(), k -> new HashSet<>()).add(onlineAccountData.getPublicKey()[0]);
}
for (var entry : hashesToRebuild.entrySet()) {
Long timestamp = entry.getKey();
LOGGER.debug(String.format("Rehashing for timestamp %d and leading bytes %s",
timestamp,
entry.getValue().stream().sorted(Byte::compareUnsigned).map(leadingByte -> String.format("%02x", leadingByte)).collect(Collectors.joining(", "))
)
);
for (Byte leadingByte : entry.getValue()) {
byte[] pubkeyHash = currentOnlineAccounts.get(timestamp).stream()
.map(OnlineAccountData::getPublicKey)
.filter(publicKey -> leadingByte == publicKey[0])
.reduce(null, OnlineAccountsManager::xorByteArrayInPlace);
currentOnlineAccountsHashes.computeIfAbsent(timestamp, k -> new ConcurrentSkipListMap<>(Byte::compareUnsigned)).put(leadingByte, pubkeyHash);
LOGGER.trace(() -> String.format("Rebuilt hash %s for timestamp %d and leading byte %02x using %d public keys",
HashCode.fromBytes(pubkeyHash),
timestamp,
leadingByte,
currentOnlineAccounts.get(timestamp).stream()
.map(OnlineAccountData::getPublicKey)
.filter(publicKey -> leadingByte == publicKey[0])
.count()
));
}
}
}
private void performOnlineAccountsTasks() {
private boolean addAccount(OnlineAccountData onlineAccountData) {
byte[] rewardSharePublicKey = onlineAccountData.getPublicKey();
long onlineAccountTimestamp = onlineAccountData.getTimestamp();
Set<OnlineAccountData> onlineAccounts = this.currentOnlineAccounts.computeIfAbsent(onlineAccountTimestamp, k -> ConcurrentHashMap.newKeySet());
boolean isNewEntry = onlineAccounts.add(onlineAccountData);
if (isNewEntry)
LOGGER.trace(() -> String.format("Added online account %s with timestamp %d", Base58.encode(rewardSharePublicKey), onlineAccountTimestamp));
else
LOGGER.trace(() -> String.format("Not updating existing online account %s with timestamp %d", Base58.encode(rewardSharePublicKey), onlineAccountTimestamp));
return isNewEntry;
}
/**
* Expire old entries.
*/
private void expireOldOnlineAccounts() {
final Long now = NTP.getTime();
if (now == null)
return;
// Expire old entries
final long cutoffThreshold = now - LAST_SEEN_EXPIRY_PERIOD;
synchronized (this.onlineAccounts) {
Iterator<OnlineAccountData> iterator = this.onlineAccounts.iterator();
while (iterator.hasNext()) {
OnlineAccountData onlineAccountData = iterator.next();
if (onlineAccountData.getTimestamp() < cutoffThreshold) {
iterator.remove();
LOGGER.trace(() -> {
PublicKeyAccount otherAccount = new PublicKeyAccount(null, onlineAccountData.getPublicKey());
return String.format("Removed expired online account %s with timestamp %d", otherAccount.getAddress(), onlineAccountData.getTimestamp());
});
}
}
}
// Request data from other peers?
if ((this.onlineAccountsTasksTimestamp % ONLINE_ACCOUNTS_BROADCAST_INTERVAL) < ONLINE_ACCOUNTS_TASKS_INTERVAL) {
List<OnlineAccountData> safeOnlineAccounts;
synchronized (this.onlineAccounts) {
safeOnlineAccounts = new ArrayList<>(this.onlineAccounts);
}
Message messageV1 = new GetOnlineAccountsMessage(safeOnlineAccounts);
Message messageV2 = new GetOnlineAccountsV2Message(safeOnlineAccounts);
Network.getInstance().broadcast(peer ->
peer.getPeersVersion() >= ONLINE_ACCOUNTS_V2_PEER_VERSION ? messageV2 : messageV1
);
}
final long cutoffThreshold = now - MAX_CACHED_TIMESTAMP_SETS * ONLINE_TIMESTAMP_MODULUS;
this.currentOnlineAccounts.keySet().removeIf(timestamp -> timestamp < cutoffThreshold);
this.currentOnlineAccountsHashes.keySet().removeIf(timestamp -> timestamp < cutoffThreshold);
}
/**
* Request data from other peers. (Pre-V3)
*/
private void requestLegacyRemoteOnlineAccounts() {
final Long now = NTP.getTime();
if (now == null)
return;
// Don't bother if we're not up to date
if (!Controller.getInstance().isUpToDate())
return;
List<OnlineAccountData> mergedOnlineAccounts = Set.copyOf(this.currentOnlineAccounts.values()).stream().flatMap(Set::stream).collect(Collectors.toList());
Message messageV2 = new GetOnlineAccountsV2Message(mergedOnlineAccounts);
Network.getInstance().broadcast(peer ->
peer.getPeersVersion() < ONLINE_ACCOUNTS_V3_PEER_VERSION
? messageV2
: null
);
}
/**
* Request data from other peers. V3+
*/
private void requestRemoteOnlineAccounts() {
final Long now = NTP.getTime();
if (now == null)
return;
// Don't bother if we're not up to date
if (!Controller.getInstance().isUpToDate())
return;
Message messageV3 = new GetOnlineAccountsV3Message(currentOnlineAccountsHashes);
Network.getInstance().broadcast(peer ->
peer.getPeersVersion() >= ONLINE_ACCOUNTS_V3_PEER_VERSION
? messageV3
: null
);
}
/**
* Send online accounts that are minting on this node.
*/
private void sendOurOnlineAccountsInfo() {
final Long now = NTP.getTime();
if (now == null) {
@ -302,13 +353,12 @@ public class OnlineAccountsManager extends Thread {
try (final Repository repository = RepositoryManager.getRepository()) {
mintingAccounts = repository.getAccountRepository().getMintingAccounts();
// We have no accounts, but don't reset timestamp
// We have no accounts to send
if (mintingAccounts.isEmpty())
return;
// Only reward-share accounts allowed
// Only active reward-shares allowed
Iterator<MintingAccountData> iterator = mintingAccounts.iterator();
int i = 0;
while (iterator.hasNext()) {
MintingAccountData mintingAccountData = iterator.next();
@ -325,11 +375,6 @@ public class OnlineAccountsManager extends Thread {
iterator.remove();
continue;
}
if (++i > 1+1) {
iterator.remove();
continue;
}
}
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage()));
@ -343,7 +388,6 @@ public class OnlineAccountsManager extends Thread {
byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp);
List<OnlineAccountData> ourOnlineAccounts = new ArrayList<>();
MINTING_ACCOUNTS:
for (MintingAccountData mintingAccountData : mintingAccounts) {
PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey());
@ -352,28 +396,13 @@ public class OnlineAccountsManager extends Thread {
// Our account is online
OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey);
synchronized (this.onlineAccounts) {
Iterator<OnlineAccountData> iterator = this.onlineAccounts.iterator();
while (iterator.hasNext()) {
OnlineAccountData existingOnlineAccountData = iterator.next();
if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) {
// If our online account is already present, with same timestamp, then move on to next mintingAccount
if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp)
continue MINTING_ACCOUNTS;
// If our online account is already present, but with older timestamp, then remove it
iterator.remove();
break;
}
}
this.onlineAccounts.add(ourOnlineAccountData);
boolean isNewEntry = addAccount(ourOnlineAccountData);
if (isNewEntry) {
LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", Base58.encode(mintingAccount.getPublicKey()), onlineAccountsTimestamp));
ourOnlineAccounts.add(ourOnlineAccountData);
hasInfoChanged = true;
}
LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp));
ourOnlineAccounts.add(ourOnlineAccountData);
hasInfoChanged = true;
}
if (!hasInfoChanged)
@ -381,52 +410,81 @@ public class OnlineAccountsManager extends Thread {
Message messageV1 = new OnlineAccountsMessage(ourOnlineAccounts);
Message messageV2 = new OnlineAccountsV2Message(ourOnlineAccounts);
Message messageV3 = new OnlineAccountsV2Message(ourOnlineAccounts); // TODO: V3 message
Network.getInstance().broadcast(peer ->
peer.getPeersVersion() >= ONLINE_ACCOUNTS_V2_PEER_VERSION ? messageV2 : messageV1
peer.getPeersVersion() >= ONLINE_ACCOUNTS_V3_PEER_VERSION
? messageV3
: peer.getPeersVersion() >= ONLINE_ACCOUNTS_V2_PEER_VERSION
? messageV2
: messageV1
);
LOGGER.trace(() -> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp));
LOGGER.debug("Broadcasted {} online account{} with timestamp {}", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp);
}
public static long toOnlineAccountTimestamp(long timestamp) {
return (timestamp / ONLINE_TIMESTAMP_MODULUS) * ONLINE_TIMESTAMP_MODULUS;
/**
* Returns list of online accounts matching given timestamp.
*/
// Block::mint() - only wants online accounts with timestamp that matches block's timestamp so they can be added to new block
// Block::areOnlineAccountsValid() - only wants online accounts with timestamp that matches block's timestamp to avoid re-verifying sigs
public List<OnlineAccountData> getOnlineAccounts(long onlineTimestamp) {
return new ArrayList<>(Set.copyOf(this.currentOnlineAccounts.getOrDefault(onlineTimestamp, Collections.emptySet())));
}
/** Returns list of online accounts with timestamp recent enough to be considered currently online. */
/**
* Returns list of online accounts with timestamp recent enough to be considered currently online.
*/
// API: calls this to return list of online accounts - probably expects ALL timestamps - but going to get 'current' from now on
// BlockMinter: only calls this to check whether returned list is empty or not, to determine whether minting is even possible or not
public List<OnlineAccountData> getOnlineAccounts() {
final long onlineTimestamp = toOnlineAccountTimestamp(NTP.getTime());
final Long now = NTP.getTime();
if (now == null)
return Collections.emptyList();
synchronized (this.onlineAccounts) {
return this.onlineAccounts.stream().filter(account -> account.getTimestamp() == onlineTimestamp).collect(Collectors.toList());
}
final long onlineTimestamp = toOnlineAccountTimestamp(now);
return getOnlineAccounts(onlineTimestamp);
}
/**
* Returns cached, unmodifiable list of latest block's online accounts.
*/
// TODO: this needs tidying up - do we change method to only return latest timestamp's set?
// Block::areOnlineAccountsValid() - only wants online accounts with timestamp that matches latest / previous block's timestamp to avoid re-verifying sigs
public List<OnlineAccountData> getLatestBlocksOnlineAccounts(long blockOnlineTimestamp) {
Set<OnlineAccountData> onlineAccounts = this.latestBlocksOnlineAccounts.getOrDefault(blockOnlineTimestamp, Collections.emptySet());
/** Returns cached, unmodifiable list of latest block's online accounts. */
public List<OnlineAccountData> getLatestBlocksOnlineAccounts() {
synchronized (this.latestBlocksOnlineAccounts) {
return this.latestBlocksOnlineAccounts.peekFirst();
}
return List.copyOf(onlineAccounts);
}
/** Caches list of latest block's online accounts. Typically called by Block.process() */
/**
* Caches list of latest block's online accounts. Typically called by Block.process()
*/
// TODO: is this simply a bulk add, like the import queue but blocking? Used by Synchronizer but could be for blocks that are quite historic?
// Block::process() - basically for adding latest block's online accounts to cache to avoid re-verifying when processing another block in the future
public void pushLatestBlocksOnlineAccounts(List<OnlineAccountData> latestBlocksOnlineAccounts) {
synchronized (this.latestBlocksOnlineAccounts) {
if (this.latestBlocksOnlineAccounts.size() == MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS)
this.latestBlocksOnlineAccounts.pollLast();
if (latestBlocksOnlineAccounts == null || latestBlocksOnlineAccounts.isEmpty())
return;
this.latestBlocksOnlineAccounts.addFirst(latestBlocksOnlineAccounts == null
? Collections.emptyList()
: Collections.unmodifiableList(latestBlocksOnlineAccounts));
}
long timestamp = latestBlocksOnlineAccounts.get(0).getTimestamp();
this.latestBlocksOnlineAccounts.computeIfAbsent(timestamp, k -> ConcurrentHashMap.newKeySet()).addAll(latestBlocksOnlineAccounts);
if (this.latestBlocksOnlineAccounts.size() > MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS)
this.latestBlocksOnlineAccounts.keySet().stream()
.sorted()
.findFirst()
.ifPresent(this.latestBlocksOnlineAccounts::remove);
}
/** Reverts list of latest block's online accounts. Typically called by Block.orphan() */
/**
* Reverts list of latest block's online accounts. Typically called by Block.orphan()
*/
// TODO: see above
// Block::orphan() - for removing latest block's online accounts from cache
public void popLatestBlocksOnlineAccounts() {
synchronized (this.latestBlocksOnlineAccounts) {
this.latestBlocksOnlineAccounts.pollFirst();
}
// NO-OP
}
@ -438,45 +496,48 @@ public class OnlineAccountsManager extends Thread {
List<OnlineAccountData> excludeAccounts = getOnlineAccountsMessage.getOnlineAccounts();
// Send online accounts info, excluding entries with matching timestamp & public key from excludeAccounts
List<OnlineAccountData> accountsToSend;
synchronized (this.onlineAccounts) {
accountsToSend = new ArrayList<>(this.onlineAccounts);
}
List<OnlineAccountData> accountsToSend = Set.copyOf(this.currentOnlineAccounts.values()).stream().flatMap(Set::stream).collect(Collectors.toList());
int prefilterSize = accountsToSend.size();
Iterator<OnlineAccountData> iterator = accountsToSend.iterator();
SEND_ITERATOR:
while (iterator.hasNext()) {
OnlineAccountData onlineAccountData = iterator.next();
for (int i = 0; i < excludeAccounts.size(); ++i) {
OnlineAccountData excludeAccountData = excludeAccounts.get(i);
for (OnlineAccountData excludeAccountData : excludeAccounts) {
if (onlineAccountData.getTimestamp() == excludeAccountData.getTimestamp() && Arrays.equals(onlineAccountData.getPublicKey(), excludeAccountData.getPublicKey())) {
iterator.remove();
continue SEND_ITERATOR;
break;
}
}
}
if (accountsToSend.isEmpty())
return;
Message onlineAccountsMessage = new OnlineAccountsMessage(accountsToSend);
peer.sendMessage(onlineAccountsMessage);
LOGGER.trace(() -> String.format("Sent %d of our %d online accounts to %s", accountsToSend.size(), this.onlineAccounts.size(), peer));
LOGGER.debug("Sent {} of our {} online accounts to {}", accountsToSend.size(), prefilterSize, peer);
}
public void onNetworkOnlineAccountsMessage(Peer peer, Message message) {
OnlineAccountsMessage onlineAccountsMessage = (OnlineAccountsMessage) message;
List<OnlineAccountData> peersOnlineAccounts = onlineAccountsMessage.getOnlineAccounts();
LOGGER.trace(() -> String.format("Received %d online accounts from %s", peersOnlineAccounts.size(), peer));
LOGGER.debug("Received {} online accounts from {}", peersOnlineAccounts.size(), peer);
try (final Repository repository = RepositoryManager.getRepository()) {
for (OnlineAccountData onlineAccountData : peersOnlineAccounts)
this.verifyAndAddAccount(repository, onlineAccountData);
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while verifying online accounts from peer %s", peer), e);
int importCount = 0;
// Add any online accounts to the queue that aren't already present
for (OnlineAccountData onlineAccountData : peersOnlineAccounts) {
boolean isNewEntry = onlineAccountsImportQueue.add(onlineAccountData);
if (isNewEntry)
importCount++;
}
if (importCount > 0)
LOGGER.debug("Added {} online accounts to queue", importCount);
}
public void onNetworkGetOnlineAccountsV2Message(Peer peer, Message message) {
@ -485,58 +546,106 @@ public class OnlineAccountsManager extends Thread {
List<OnlineAccountData> excludeAccounts = getOnlineAccountsMessage.getOnlineAccounts();
// Send online accounts info, excluding entries with matching timestamp & public key from excludeAccounts
List<OnlineAccountData> accountsToSend;
synchronized (this.onlineAccounts) {
accountsToSend = new ArrayList<>(this.onlineAccounts);
}
List<OnlineAccountData> accountsToSend = Set.copyOf(this.currentOnlineAccounts.values()).stream().flatMap(Set::stream).collect(Collectors.toList());
int prefilterSize = accountsToSend.size();
Iterator<OnlineAccountData> iterator = accountsToSend.iterator();
SEND_ITERATOR:
while (iterator.hasNext()) {
OnlineAccountData onlineAccountData = iterator.next();
for (int i = 0; i < excludeAccounts.size(); ++i) {
OnlineAccountData excludeAccountData = excludeAccounts.get(i);
for (OnlineAccountData excludeAccountData : excludeAccounts) {
if (onlineAccountData.getTimestamp() == excludeAccountData.getTimestamp() && Arrays.equals(onlineAccountData.getPublicKey(), excludeAccountData.getPublicKey())) {
iterator.remove();
continue SEND_ITERATOR;
break;
}
}
}
if (accountsToSend.isEmpty())
return;
Message onlineAccountsMessage = new OnlineAccountsV2Message(accountsToSend);
peer.sendMessage(onlineAccountsMessage);
LOGGER.trace(() -> String.format("Sent %d of our %d online accounts to %s", accountsToSend.size(), this.onlineAccounts.size(), peer));
LOGGER.debug("Sent {} of our {} online accounts to {}", accountsToSend.size(), prefilterSize, peer);
}
public void onNetworkOnlineAccountsV2Message(Peer peer, Message message) {
OnlineAccountsV2Message onlineAccountsMessage = (OnlineAccountsV2Message) message;
List<OnlineAccountData> peersOnlineAccounts = onlineAccountsMessage.getOnlineAccounts();
LOGGER.debug(String.format("Received %d online accounts from %s", peersOnlineAccounts.size(), peer));
LOGGER.debug("Received {} online accounts from {}", peersOnlineAccounts.size(), peer);
int importCount = 0;
// Add any online accounts to the queue that aren't already present
for (OnlineAccountData onlineAccountData : peersOnlineAccounts) {
boolean isNewEntry = onlineAccountsImportQueue.add(onlineAccountData);
// Do we already know about this online account data?
if (onlineAccounts.contains(onlineAccountData)) {
continue;
}
// Is it already in the import queue?
if (onlineAccountsImportQueue.contains(onlineAccountData)) {
continue;
}
onlineAccountsImportQueue.add(onlineAccountData);
importCount++;
if (isNewEntry)
importCount++;
}
LOGGER.debug(String.format("Added %d online accounts to queue", importCount));
if (importCount > 0)
LOGGER.debug("Added {} online accounts to queue", importCount);
}
public void onNetworkGetOnlineAccountsV3Message(Peer peer, Message message) {
GetOnlineAccountsV3Message getOnlineAccountsMessage = (GetOnlineAccountsV3Message) message;
Map<Long, Map<Byte, byte[]>> peersHashes = getOnlineAccountsMessage.getHashesByTimestampThenByte();
List<OnlineAccountData> outgoingOnlineAccounts = new ArrayList<>();
// Warning: no double-checking/fetching - we must be ConcurrentMap compatible!
// So no contains()-then-get() or multiple get()s on the same key/map.
// We also use getOrDefault() with emptySet() on currentOnlineAccounts in case corresponding timestamp entry isn't there.
for (var ourOuterMapEntry : currentOnlineAccountsHashes.entrySet()) {
Long timestamp = ourOuterMapEntry.getKey();
var ourInnerMap = ourOuterMapEntry.getValue();
var peersInnerMap = peersHashes.get(timestamp);
if (peersInnerMap == null) {
// Peer doesn't have this timestamp, so if it's valid (i.e. not too old) then we'd have to send all of ours
Set<OnlineAccountData> timestampsOnlineAccounts = this.currentOnlineAccounts.getOrDefault(timestamp, Collections.emptySet());
outgoingOnlineAccounts.addAll(timestampsOnlineAccounts);
LOGGER.debug(() -> String.format("Going to send all %d online accounts for timestamp %d", timestampsOnlineAccounts.size(), timestamp));
} else {
// Quick cache of which leading bytes to send so we only have to filter once
Set<Byte> outgoingLeadingBytes = new HashSet<>();
// We have entries for this timestamp so compare against peer's entries
for (var ourInnerMapEntry : ourInnerMap.entrySet()) {
Byte leadingByte = ourInnerMapEntry.getKey();
byte[] peersHash = peersInnerMap.get(leadingByte);
if (!Arrays.equals(ourInnerMapEntry.getValue(), peersHash)) {
// For this leading byte: hashes don't match or peer doesn't have entry
// Send all online accounts for this timestamp and leading byte
outgoingLeadingBytes.add(leadingByte);
}
}
int beforeAddSize = outgoingOnlineAccounts.size();
this.currentOnlineAccounts.getOrDefault(timestamp, Collections.emptySet()).stream()
.filter(account -> outgoingLeadingBytes.contains(account.getPublicKey()[0]))
.forEach(outgoingOnlineAccounts::add);
if (outgoingLeadingBytes.size() > beforeAddSize)
LOGGER.debug(String.format("Going to send %d online accounts for timestamp %d and leading bytes %s",
outgoingOnlineAccounts.size() - beforeAddSize,
timestamp,
outgoingLeadingBytes.stream().sorted(Byte::compareUnsigned).map(leadingByte -> String.format("%02x", leadingByte)).collect(Collectors.joining(", "))
)
);
}
}
Message onlineAccountsMessage = new OnlineAccountsV2Message(outgoingOnlineAccounts); // TODO: V3 message
peer.sendMessage(onlineAccountsMessage);
LOGGER.debug("Sent {} online accounts to {}", outgoingOnlineAccounts.size(), peer);
}
}

View File

@ -5,6 +5,7 @@ import java.util.Arrays;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlTransient;
import org.qortal.account.PublicKeyAccount;
@ -16,6 +17,9 @@ public class OnlineAccountData {
protected byte[] signature;
protected byte[] publicKey;
@XmlTransient
private int hash;
// Constructors
// necessary for JAXB serialization
@ -74,8 +78,13 @@ public class OnlineAccountData {
@Override
public int hashCode() {
// Pretty lazy implementation
return (int) this.timestamp;
int h = this.hash;
if (h == 0) {
this.hash = h = Long.hashCode(this.timestamp)
^ Arrays.hashCode(this.publicKey)
^ Arrays.hashCode(this.signature);
}
return h;
}
}

View File

@ -1,17 +1,14 @@
package org.qortal.test.network;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider;
import org.junit.Ignore;
import org.junit.Test;
import org.qortal.controller.OnlineAccountsManager;
import org.qortal.data.network.OnlineAccountData;
import org.qortal.network.message.*;
import org.qortal.transform.Transformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.Security;
import java.util.*;
@ -74,24 +71,12 @@ public class OnlineAccountsV3Tests {
hashesByTimestampThenByte
.computeIfAbsent(timestamp, k -> new HashMap<>())
.compute(leadingByte, (k, v) -> xorByteArrayInPlace(v, onlineAccountData.getPublicKey()));
.compute(leadingByte, (k, v) -> OnlineAccountsManager.xorByteArrayInPlace(v, onlineAccountData.getPublicKey()));
}
return hashesByTimestampThenByte;
}
// TODO: This needs to be moved - probably to be OnlineAccountsManager
private static byte[] xorByteArrayInPlace(byte[] inplaceArray, byte[] otherArray) {
if (inplaceArray == null)
return Arrays.copyOf(otherArray, otherArray.length);
// Start from index 1 to enforce static leading byte
for (int i = 1; i < otherArray.length; i++)
inplaceArray[i] ^= otherArray[otherArray.length - i - 1];
return inplaceArray;
}
@Test
public void testOnGetOnlineAccountsV3() {
List<OnlineAccountData> ourOnlineAccounts = generateOnlineAccounts(false);