Interim commit: online accounts and account levels

Safety commit in case of data loss!

Lots of changes to do with "online accounts", including:

* networking
	+ GET_ONLINE_ACCOUNTS * ONLINE_ACCOUNTS messages & handling
	+ Changes to serialization of block data to include online accounts info
* block-related
	+ Adding online accounts info when generating blocks
	+ Validating online accounts info in block data
	+ Repository changes to store online accounts info
* Controller
	+ Managing in-memory cache of online accounts
	+ Updating/broadcasting our online accounts
* BlockChain config

Added "account levels", so new code/changes required in the usual places, like:

* transaction data object
* repository
* transaction transformer
* transaction processing
This commit is contained in:
catbref
2019-09-13 14:21:04 +01:00
parent 2cc926666b
commit 504cfc6a74
30 changed files with 1781 additions and 177 deletions

View File

@@ -0,0 +1,316 @@
package org.qora.test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Before;
import org.junit.Test;
import org.qora.account.PrivateKeyAccount;
import org.qora.account.PublicKeyAccount;
import org.qora.data.network.OnlineAccount;
import org.qora.network.message.GetOnlineAccountsMessage;
import org.qora.network.message.Message;
import org.qora.network.message.OnlineAccountsMessage;
import org.qora.repository.DataException;
import org.qora.test.common.Common;
import org.qora.test.common.FakePeer;
import org.qora.utils.ByteArray;
import com.google.common.primitives.Longs;
public class OnlineTests extends Common {
@Before
public void beforeTest() throws DataException {
Common.useDefaultSettings();
}
private static final int MAX_PEERS = 100;
private static final int MAX_RUNNING_PEERS = 20;
private static final boolean LOG_CONNECTION_CHANGES = false;
private static final boolean LOG_ACCOUNT_CHANGES = true;
private static final boolean GET_ONLINE_UNICAST_NOT_BROADCAST = false;
private static final long ONLINE_TIMESTAMP_MODULUS = 5 * 60 * 1000;
private static List<PrivateKeyAccount> allKnownAccounts;
private static final Random random = new Random();
static class OnlinePeer extends FakePeer {
private static final long LAST_SEEN_EXPIRY_PERIOD = 6 * 60 * 1000;
private static final long ONLINE_REFRESH_INTERVAL = 4 * 60 * 1000;
private static final int MAX_CONNECTED_PEERS = 5;
private final PrivateKeyAccount account;
private List<OnlineAccount> onlineAccounts;
private long nextOnlineRefresh = 0;
public OnlinePeer(int id, PrivateKeyAccount account) {
super(id);
this.account = account;
this.onlineAccounts = Collections.synchronizedList(new ArrayList<>());
}
@Override
protected void processMessage(FakePeer peer, Message message) throws InterruptedException {
switch (message.getType()) {
case GET_ONLINE_ACCOUNTS: {
GetOnlineAccountsMessage getOnlineAccountsMessage = (GetOnlineAccountsMessage) message;
List<OnlineAccount> excludeAccounts = getOnlineAccountsMessage.getOnlineAccounts();
// Send online accounts info, excluding entries with matching timestamp & public key from excludeAccounts
List<OnlineAccount> accountsToSend;
synchronized (this.onlineAccounts) {
accountsToSend = new ArrayList<>(this.onlineAccounts);
}
Iterator<OnlineAccount> iterator = accountsToSend.iterator();
SEND_ITERATOR:
while (iterator.hasNext()) {
OnlineAccount onlineAccount = iterator.next();
for (int i = 0; i < excludeAccounts.size(); ++i) {
OnlineAccount excludeAccount = excludeAccounts.get(i);
if (onlineAccount.getTimestamp() == excludeAccount.getTimestamp() && Arrays.equals(onlineAccount.getPublicKey(), excludeAccount.getPublicKey())) {
iterator.remove();
continue SEND_ITERATOR;
}
}
}
Message onlineAccountsMessage = new OnlineAccountsMessage(accountsToSend);
this.send(peer, onlineAccountsMessage);
if (LOG_ACCOUNT_CHANGES)
System.out.println(String.format("[%d] sent %d of our %d online accounts to %d", this.getId(), accountsToSend.size(), onlineAccounts.size(), peer.getId()));
break;
}
case ONLINE_ACCOUNTS: {
OnlineAccountsMessage onlineAccountsMessage = (OnlineAccountsMessage) message;
List<OnlineAccount> onlineAccounts = onlineAccountsMessage.getOnlineAccounts();
if (LOG_ACCOUNT_CHANGES)
System.out.println(String.format("[%d] received %d online accounts from %d", this.getId(), onlineAccounts.size(), peer.getId()));
for (OnlineAccount onlineAccount : onlineAccounts)
verifyAndAddAccount(onlineAccount);
break;
}
default:
break;
}
}
private void verifyAndAddAccount(OnlineAccount onlineAccount) {
// we would check timestamp is 'recent' here
// Verify
byte[] data = Longs.toByteArray(onlineAccount.getTimestamp());
PublicKeyAccount otherAccount = new PublicKeyAccount(null, onlineAccount.getPublicKey());
if (!otherAccount.verify(onlineAccount.getSignature(), data)) {
System.out.println(String.format("[%d] rejecting invalid online account %s", this.getId(), otherAccount.getAddress()));
return;
}
ByteArray publicKeyBA = new ByteArray(onlineAccount.getPublicKey());
synchronized (this.onlineAccounts) {
OnlineAccount existingAccount = this.onlineAccounts.stream().filter(account -> new ByteArray(account.getPublicKey()).equals(publicKeyBA)).findFirst().orElse(null);
if (existingAccount != null) {
if (existingAccount.getTimestamp() < onlineAccount.getTimestamp()) {
this.onlineAccounts.remove(existingAccount);
if (LOG_ACCOUNT_CHANGES)
System.out.println(String.format("[%d] updated online account %s with timestamp %d (was %d)", this.getId(), otherAccount.getAddress(), onlineAccount.getTimestamp(), existingAccount.getTimestamp()));
} else {
if (LOG_ACCOUNT_CHANGES)
System.out.println(String.format("[%d] ignoring existing online account %s", this.getId(), otherAccount.getAddress()));
return;
}
} else {
if (LOG_ACCOUNT_CHANGES)
System.out.println(String.format("[%d] added online account %s with timestamp %d", this.getId(), otherAccount.getAddress(), onlineAccount.getTimestamp()));
}
this.onlineAccounts.add(onlineAccount);
}
}
@Override
protected void performIdleTasks() {
final long now = System.currentTimeMillis();
// Expire old entries
final long cutoffThreshold = now - LAST_SEEN_EXPIRY_PERIOD;
synchronized (this.onlineAccounts) {
Iterator<OnlineAccount> iterator = this.onlineAccounts.iterator();
while (iterator.hasNext()) {
OnlineAccount onlineAccount = iterator.next();
if (onlineAccount.getTimestamp() < cutoffThreshold) {
iterator.remove();
if (LOG_ACCOUNT_CHANGES) {
PublicKeyAccount otherAccount = new PublicKeyAccount(null, onlineAccount.getPublicKey());
System.out.println(String.format("[%d] removed expired online account %s with timestamp %d", this.getId(), otherAccount.getAddress(), onlineAccount.getTimestamp()));
}
}
}
}
// Request data from another peer
Message message;
synchronized (this.onlineAccounts) {
message = new GetOnlineAccountsMessage(this.onlineAccounts);
}
if (GET_ONLINE_UNICAST_NOT_BROADCAST) {
FakePeer peer = this.pickRandomPeer();
if (peer != null)
this.send(peer, message);
} else {
this.broadcast(message);
}
// Refresh our onlineness?
if (now >= this.nextOnlineRefresh) {
this.nextOnlineRefresh = now + ONLINE_REFRESH_INTERVAL;
refreshOnlineness();
}
// Log our online list
synchronized (this.onlineAccounts) {
System.out.println(String.format("[%d] Connections: %d, online accounts: %d", this.getId(), this.peers.size(), this.onlineAccounts.size()));
}
}
private void refreshOnlineness() {
// Broadcast signed timestamp
final long timestamp = (System.currentTimeMillis() / ONLINE_TIMESTAMP_MODULUS) * ONLINE_TIMESTAMP_MODULUS;
byte[] data = Longs.toByteArray(timestamp);
byte[] signature = this.account.sign(data);
byte[] publicKey = this.account.getPublicKey();
// Our account is online
OnlineAccount onlineAccount = new OnlineAccount(timestamp, signature, publicKey);
synchronized (this.onlineAccounts) {
this.onlineAccounts.removeIf(account -> account.getPublicKey() == this.account.getPublicKey());
this.onlineAccounts.add(onlineAccount);
}
Message message = new OnlineAccountsMessage(Arrays.asList(onlineAccount));
this.broadcast(message);
if (LOG_ACCOUNT_CHANGES)
System.out.println(String.format("[%d] broadcasted online account %s with timestamp %d", this.getId(), this.account.getAddress(), timestamp));
}
@Override
public void connect(FakePeer otherPeer) {
int totalPeers;
synchronized (this.peers) {
totalPeers = this.peers.size();
}
if (totalPeers >= MAX_CONNECTED_PEERS)
return;
super.connect(otherPeer);
if (LOG_CONNECTION_CHANGES)
System.out.println(String.format("[%d] Connected to peer %d, total peers: %d", this.getId(), otherPeer.getId(), totalPeers + 1));
}
public void randomDisconnect() {
FakePeer peer;
int totalPeers;
synchronized (this.peers) {
peer = this.pickRandomPeer();
if (peer == null)
return;
totalPeers = this.peers.size();
}
this.disconnect(peer);
if (LOG_CONNECTION_CHANGES)
System.out.println(String.format("[%d] Disconnected peer %d, total peers: %d", this.getId(), peer.getId(), totalPeers - 1));
}
}
@Test
public void testOnlineness() throws InterruptedException {
allKnownAccounts = new ArrayList<>();
List<OnlinePeer> allPeers = new ArrayList<>();
for (int i = 0; i < MAX_PEERS; ++i) {
byte[] seed = new byte[32];
random.nextBytes(seed);
PrivateKeyAccount account = new PrivateKeyAccount(null, seed);
allKnownAccounts.add(account);
OnlinePeer peer = new OnlinePeer(i, account);
allPeers.add(peer);
}
// Start up some peers
List<OnlinePeer> runningPeers = new ArrayList<>();
ExecutorService peerExecutor = Executors.newCachedThreadPool();
for (int c = 0; c < MAX_RUNNING_PEERS; ++c) {
OnlinePeer newPeer;
do {
int i = random.nextInt(allPeers.size());
newPeer = allPeers.get(i);
} while (runningPeers.contains(newPeer));
runningPeers.add(newPeer);
peerExecutor.execute(newPeer);
}
// Randomly connect/disconnect peers
while (true) {
int i = random.nextInt(runningPeers.size());
OnlinePeer peer = runningPeers.get(i);
if ((random.nextInt() & 0xf) != 0) {
// Connect
OnlinePeer otherPeer;
do {
int j = random.nextInt(runningPeers.size());
otherPeer = runningPeers.get(j);
} while (otherPeer == peer);
peer.connect(otherPeer);
} else {
peer.randomDisconnect();
}
Thread.sleep(100);
}
}
}

View File

@@ -15,8 +15,14 @@ import org.qora.utils.Base58;
import com.google.common.hash.HashCode;
import io.druid.extendedset.intset.ConciseSet;
import static org.junit.Assert.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.junit.Before;
public class SerializationTests extends Common {
@@ -70,4 +76,69 @@ public class SerializationTests extends Common {
}
}
@Test
public void testAccountBitMap() {
Random random = new Random();
final int numberOfKnownAccounts = random.nextInt(1 << 17) + 1;
System.out.println(String.format("Number of known accounts: %d", numberOfKnownAccounts));
// 5% to 15%
final int numberOfAccountsToEncode = random.nextInt((numberOfKnownAccounts / 10) + numberOfKnownAccounts / 5);
System.out.println(String.format("Number of accounts to encode: %d", numberOfAccountsToEncode));
final int bitsLength = numberOfKnownAccounts;
System.out.println(String.format("Bits to fit all accounts: %d", bitsLength));
// Enough bytes to fit at least bitsLength bits
final int byteLength = ((bitsLength - 1) >> 3) + 1;
System.out.println(String.format("Uncompressed bytes to fit all accounts: %d", byteLength));
List<Integer> accountIndexes = new LinkedList<>();
for (int i = 0; i < numberOfAccountsToEncode; ++i) {
final int accountIndex = random.nextInt(numberOfKnownAccounts);
accountIndexes.add(accountIndex);
// System.out.println(String.format("Account [%d]: %d / 0x%08x", i, accountIndex, accountIndex));
}
ConciseSet compressedSet = new ConciseSet();
for (Integer accountIndex : accountIndexes)
compressedSet.add(accountIndex);
int compressedSize = compressedSet.toByteBuffer().remaining();
System.out.println(String.format("Out of %d known accounts, encoding %d accounts needs %d uncompressed bytes but only %d compressed bytes",
numberOfKnownAccounts, numberOfAccountsToEncode, byteLength, compressedSize));
}
@Test
public void benchmarkBitSetCompression() {
Random random = new Random();
System.out.println(String.format("Known Online UncompressedBitSet UncompressedIntList Compressed"));
for (int run = 0; run < 1000; ++run) {
final int numberOfKnownAccounts = random.nextInt(1 << 17) + 1;
// 5% to 25%
final int numberOfAccountsToEncode = random.nextInt((numberOfKnownAccounts / 20) + numberOfKnownAccounts / 5);
// Enough uncompressed bytes to fit one bit per known account
final int uncompressedBitSetSize = ((numberOfKnownAccounts - 1) >> 3) + 1;
// Size of a simple list of ints
final int uncompressedIntListSize = numberOfAccountsToEncode * 4;
ConciseSet compressedSet = new ConciseSet();
for (int i = 0; i < numberOfAccountsToEncode; ++i)
compressedSet.add(random.nextInt(numberOfKnownAccounts));
int compressedSize = compressedSet.toByteBuffer().remaining();
System.out.println(String.format("%d %d %d %d %d", numberOfKnownAccounts, numberOfAccountsToEncode, uncompressedBitSetSize, uncompressedIntListSize, compressedSize));
}
}
}

View File

@@ -105,7 +105,7 @@ public class TransactionTests extends Common {
// Create test generator account
generator = new PrivateKeyAccount(repository, generatorSeed);
accountRepository.setLastReference(new AccountData(generator.getAddress(), generatorSeed, generator.getPublicKey(), Group.NO_GROUP, 0, null));
accountRepository.setLastReference(new AccountData(generator.getAddress(), generatorSeed, generator.getPublicKey(), Group.NO_GROUP, 0, null, 0));
accountRepository.save(new AccountBalanceData(generator.getAddress(), Asset.QORA, initialGeneratorBalance));
// Create test sender account
@@ -113,7 +113,7 @@ public class TransactionTests extends Common {
// Mock account
reference = senderSeed;
accountRepository.setLastReference(new AccountData(sender.getAddress(), reference, sender.getPublicKey(), Group.NO_GROUP, 0, null));
accountRepository.setLastReference(new AccountData(sender.getAddress(), reference, sender.getPublicKey(), Group.NO_GROUP, 0, null, 0));
// Mock balance
accountRepository.save(new AccountBalanceData(sender.getAddress(), Asset.QORA, initialSenderBalance));

View File

@@ -0,0 +1,110 @@
package org.qora.test.common;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.qora.network.message.Message;
public abstract class FakePeer implements Runnable {
private static final long DEFAULT_BROADCAST_INTERVAL = 60 * 1000;
protected final int id;
protected final long startedWhen;
protected final LinkedBlockingQueue<PeerMessage> pendingMessages;
protected final List<FakePeer> peers;
private long nextBroadcast;
public FakePeer(int id) {
this.id = id;
this.startedWhen = System.currentTimeMillis();
this.pendingMessages = new LinkedBlockingQueue<>();
this.peers = Collections.synchronizedList(new ArrayList<>());
this.nextBroadcast = this.startedWhen;
}
protected static long getBroadcastInterval() {
return DEFAULT_BROADCAST_INTERVAL;
}
public int getId() {
return this.id;
}
public void run() {
try {
while (true) {
PeerMessage peerMessage = this.pendingMessages.poll(1, TimeUnit.SECONDS);
if (peerMessage != null)
processMessage(peerMessage.peer, peerMessage.message);
else
idleTasksCheck();
}
} catch (InterruptedException e) {
// fall-through to exit
}
}
protected abstract void processMessage(FakePeer peer, Message message) throws InterruptedException;
protected void idleTasksCheck() {
final long now = System.currentTimeMillis();
if (now < this.nextBroadcast)
return;
this.nextBroadcast = now + getBroadcastInterval();
this.performIdleTasks();
}
protected abstract void performIdleTasks();
public void connect(FakePeer peer) {
synchronized (this.peers) {
if (this.peers.contains(peer))
return;
this.peers.add(peer);
}
}
protected void send(FakePeer otherPeer, Message message) {
otherPeer.receive(this, message);
}
protected void broadcast(Message message) {
synchronized (this.peers) {
for (int i = 0; i < this.peers.size(); ++i)
this.send(this.peers.get(i), message);
}
}
public void receive(FakePeer sendingPeer, Message message) {
this.pendingMessages.add(new PeerMessage(sendingPeer, message));
}
public void disconnect(FakePeer peer) {
this.peers.remove(peer);
}
public FakePeer pickRandomPeer() {
synchronized (this.peers) {
if (this.peers.isEmpty())
return null;
Random random = new Random();
int i = random.nextInt(this.peers.size());
return this.peers.get(i);
}
}
}

View File

@@ -0,0 +1,17 @@
package org.qora.test.common;
import org.qora.network.message.Message;
import org.qora.test.common.FakePeer;
public class PeerMessage {
public final FakePeer peer;
public final Message message;
public final long sentWhen;
public Long processedWhen = null;
public PeerMessage(FakePeer peer, Message message) {
this.peer = peer;
this.message = message;
this.sentWhen = System.currentTimeMillis();
}
}