Initial work on online-accounts-v3 network messages to drastically reduce network load.

Lots of TODOs to action.
This commit is contained in:
catbref
2022-04-23 16:04:35 +01:00
parent 6950c6bf69
commit f2060fe7a1
5 changed files with 358 additions and 2 deletions

View File

@@ -67,9 +67,14 @@ public class OnlineAccountsManager extends Thread {
Deque<List<OnlineAccountData>> latestBlocksOnlineAccounts = new ArrayDeque<>(MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS);
public OnlineAccountsManager() {
// TODO: make private, add these tasks to scheduled executor:
// send our online accounts every 10s
// expireOnlineAccounts every ONLINE_ACCOUNTS_CHECK_INTERVAL
// broadcastOnlineAccountsQuery every ONLINE_ACCOUNTS_BROADCAST_INTERVAL
// processOnlineAccountsImportQueue every 100ms?
}
// TODO: convert to SingletonContainer a-la Network
public static synchronized OnlineAccountsManager getInstance() {
if (instance == null) {
instance = new OnlineAccountsManager();
@@ -78,6 +83,7 @@ public class OnlineAccountsManager extends Thread {
return instance;
}
// TODO: see constructor for more info
public void run() {
// Start separate thread to prepare our online accounts
@@ -113,6 +119,7 @@ public class OnlineAccountsManager extends Thread {
public void shutdown() {
isStopping = true;
// TODO: convert interrrupt to executor.shutdownNow();
this.interrupt();
}
@@ -151,11 +158,14 @@ public class OnlineAccountsManager extends Thread {
// Utilities
// TODO: split this into validateAccount() and addAccount()
private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException {
final Long now = NTP.getTime();
if (now == null)
return;
// TODO: don't create otherAccount, instead:
// byte[] rewardSharePublicKey = onlineAccountData.getPublicKey();
PublicKeyAccount otherAccount = new PublicKeyAccount(repository, onlineAccountData.getPublicKey());
// Check timestamp is 'recent' here
@@ -166,12 +176,14 @@ public class OnlineAccountsManager extends Thread {
// Verify
byte[] data = Longs.toByteArray(onlineAccountData.getTimestamp());
// TODO: use Crypto.verify() static method directly
if (!otherAccount.verify(onlineAccountData.getSignature(), data)) {
LOGGER.trace(() -> String.format("Rejecting invalid online account %s", otherAccount.getAddress()));
return;
}
// Qortal: check online account is actually reward-share
// TODO: use "rewardSharePublicKey" from above TODO
RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(onlineAccountData.getPublicKey());
if (rewardShareData == null) {
// Reward-share doesn't even exist - probably not a good sign
@@ -186,6 +198,7 @@ public class OnlineAccountsManager extends Thread {
return;
}
// TODO: change this.onlineAccounts to a ConcurrentMap? Keyed by timestamp?
synchronized (this.onlineAccounts) {
OnlineAccountData existingAccountData = this.onlineAccounts.stream().filter(account -> Arrays.equals(account.getPublicKey(), onlineAccountData.getPublicKey())).findFirst().orElse(null);
@@ -193,17 +206,21 @@ public class OnlineAccountsManager extends Thread {
if (existingAccountData.getTimestamp() < onlineAccountData.getTimestamp()) {
this.onlineAccounts.remove(existingAccountData);
// TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58?
LOGGER.trace(() -> String.format("Updated online account %s with timestamp %d (was %d)", otherAccount.getAddress(), onlineAccountData.getTimestamp(), existingAccountData.getTimestamp()));
} else {
// TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58?
LOGGER.trace(() -> String.format("Not updating existing online account %s", otherAccount.getAddress()));
return;
}
} else {
// TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58?
LOGGER.trace(() -> String.format("Added online account %s with timestamp %d", otherAccount.getAddress(), onlineAccountData.getTimestamp()));
}
this.onlineAccounts.add(onlineAccountData);
// TODO: if we actually added a new account, then we need to rebuild our hashes-by-timestamp-then-byte for rewardSharePublicKey's leading byte also
}
}
@@ -220,6 +237,7 @@ public class OnlineAccountsManager extends Thread {
final long onlineAccountsTimestamp = toOnlineAccountTimestamp(now);
byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp);
// TODO: use new addAccount() method
synchronized (this.onlineAccounts) {
this.onlineAccounts.clear();

View File

@@ -0,0 +1,110 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.qortal.transform.Transformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
/**
* For requesting online accounts info from remote peer, given our list of online accounts.
*
* Different format to V1 and V2:
* V1 is: number of entries, then timestamp + pubkey for each entry
* V2 is: groups of: number of entries, timestamp, then pubkey for each entry
* V3 is: groups of: timestamp, number of entries (one per leading byte), then hash(pubkeys) for each entry
*/
public class GetOnlineAccountsV3Message extends Message {
private static final Map<Long, Map<Byte, byte[]>> EMPTY_ONLINE_ACCOUNTS = Collections.emptyMap();
private Map<Long, Map<Byte, byte[]>> hashesByTimestampThenByte;
public GetOnlineAccountsV3Message(Map<Long, Map<Byte, byte[]>> hashesByTimestampThenByte) {
super(MessageType.GET_ONLINE_ACCOUNTS_V3);
// If we don't have ANY online accounts then it's an easier construction...
if (hashesByTimestampThenByte.isEmpty()) {
this.dataBytes = EMPTY_DATA_BYTES;
return;
}
// We should know exactly how many bytes to allocate now
int byteSize = hashesByTimestampThenByte.size() * (Transformer.TIMESTAMP_LENGTH + Transformer.INT_LENGTH)
+ Transformer.TIMESTAMP_LENGTH /* trailing zero entry indicates end of entries */;
byteSize += hashesByTimestampThenByte.values()
.stream()
.mapToInt(map -> map.size() * Transformer.PUBLIC_KEY_LENGTH)
.sum();
ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize);
// Warning: no double-checking/fetching! We must be ConcurrentMap compatible.
// So no contains() then get() or multiple get()s on the same key/map.
try {
for (var outerMapEntry : hashesByTimestampThenByte.entrySet()) {
bytes.write(Longs.toByteArray(outerMapEntry.getKey()));
var innerMap = outerMapEntry.getValue();
bytes.write(Ints.toByteArray(innerMap.size()));
for (byte[] hashBytes : innerMap.values()) {
bytes.write(hashBytes);
}
}
// end of records
bytes.write(Longs.toByteArray(0L));
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetOnlineAccountsV3Message(int id, Map<Long, Map<Byte, byte[]>> hashesByTimestampThenByte) {
super(id, MessageType.GET_ONLINE_ACCOUNTS_V3);
this.hashesByTimestampThenByte = hashesByTimestampThenByte;
}
public Map<Long, Map<Byte, byte[]>> getHashesByTimestampThenByte() {
return this.hashesByTimestampThenByte;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
// 'empty' case
if (!bytes.hasRemaining()) {
return new GetOnlineAccountsV3Message(id, EMPTY_ONLINE_ACCOUNTS);
}
Map<Long, Map<Byte, byte[]>> hashesByTimestampThenByte = new HashMap<>();
while (true) {
long timestamp = bytes.getLong();
if (timestamp == 0)
// Zero timestamp indicates end of records
break;
int hashCount = bytes.getInt();
Map<Byte, byte[]> hashesByByte = new HashMap<>();
for (int i = 0; i < hashCount; ++i) {
byte[] publicKeyHash = new byte[Transformer.PUBLIC_KEY_LENGTH];
bytes.get(publicKeyHash);
hashesByByte.put(publicKeyHash[0], publicKeyHash);
}
hashesByTimestampThenByte.put(timestamp, hashesByByte);
}
return new GetOnlineAccountsV3Message(id, hashesByTimestampThenByte);
}
}

View File

@@ -46,6 +46,7 @@ public abstract class Message {
private static final int MAX_DATA_SIZE = 10 * 1024 * 1024; // 10MB
protected static final byte[] EMPTY_DATA_BYTES = new byte[0];
private static final ByteBuffer EMPTY_READ_ONLY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_DATA_BYTES).asReadOnlyBuffer();
protected int id;
protected final MessageType type;
@@ -126,7 +127,7 @@ public abstract class Message {
if (dataSize > 0 && dataSize + CHECKSUM_LENGTH > readOnlyBuffer.remaining())
return null;
ByteBuffer dataSlice = null;
ByteBuffer dataSlice = EMPTY_READ_ONLY_BYTE_BUFFER;
if (dataSize > 0) {
byte[] expectedChecksum = new byte[CHECKSUM_LENGTH];
readOnlyBuffer.get(expectedChecksum);

View File

@@ -46,6 +46,8 @@ public enum MessageType {
GET_ONLINE_ACCOUNTS(81, GetOnlineAccountsMessage::fromByteBuffer),
ONLINE_ACCOUNTS_V2(82, OnlineAccountsV2Message::fromByteBuffer),
GET_ONLINE_ACCOUNTS_V2(83, GetOnlineAccountsV2Message::fromByteBuffer),
// ONLINE_ACCOUNTS_V3(84, OnlineAccountsV3Message::fromByteBuffer),
GET_ONLINE_ACCOUNTS_V3(85, GetOnlineAccountsV3Message::fromByteBuffer),
ARBITRARY_DATA(90, ArbitraryDataMessage::fromByteBuffer),
GET_ARBITRARY_DATA(91, GetArbitraryDataMessage::fromByteBuffer),