forked from Qortal/qortal
Reorganized some controller methods.
This commit is contained in:
parent
3e0306f646
commit
99f6bb5ac6
@ -822,6 +822,103 @@ public class Controller extends Thread {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Incoming transactions queue
|
||||||
|
|
||||||
|
private void processIncomingTransactionsQueue() {
|
||||||
|
if (this.incomingTransactions.size() == 0) {
|
||||||
|
// Don't bother locking if there are no new transactions to process
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) {
|
||||||
|
// Prioritize syncing, and don't attempt to lock
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
|
||||||
|
if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) {
|
||||||
|
LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOGGER.info("Interrupted when trying to acquire blockchain lock");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||||
|
|
||||||
|
// Iterate through incoming transactions list
|
||||||
|
synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList()
|
||||||
|
Iterator iterator = this.incomingTransactions.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
if (isStopping) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TransactionData transactionData = (TransactionData) iterator.next();
|
||||||
|
Transaction transaction = Transaction.fromData(repository, transactionData);
|
||||||
|
|
||||||
|
// Check signature
|
||||||
|
if (!transaction.isSignatureValid()) {
|
||||||
|
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
|
||||||
|
iterator.remove();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ValidationResult validationResult = transaction.importAsUnconfirmed();
|
||||||
|
|
||||||
|
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
|
||||||
|
LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature())));
|
||||||
|
iterator.remove();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
|
||||||
|
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature())));
|
||||||
|
iterator.remove();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (validationResult != ValidationResult.OK) {
|
||||||
|
final String signature58 = Base58.encode(transactionData.getSignature());
|
||||||
|
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58));
|
||||||
|
Long now = NTP.getTime();
|
||||||
|
if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) {
|
||||||
|
Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL;
|
||||||
|
if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) {
|
||||||
|
// Use shorter recheck interval for expired transactions
|
||||||
|
expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL;
|
||||||
|
}
|
||||||
|
Long expiry = now + expiryLength;
|
||||||
|
LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58);
|
||||||
|
// Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it
|
||||||
|
invalidUnconfirmedTransactions.put(signature58, expiry);
|
||||||
|
}
|
||||||
|
iterator.remove();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (DataException e) {
|
||||||
|
LOGGER.error(String.format("Repository issue while processing incoming transactions", e));
|
||||||
|
} finally {
|
||||||
|
blockchainLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanupInvalidTransactionsList(Long now) {
|
||||||
|
if (now == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Periodically remove invalid unconfirmed transactions from the list, so that they can be fetched again
|
||||||
|
invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < now);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// Shutdown
|
// Shutdown
|
||||||
|
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
@ -1295,100 +1392,6 @@ public class Controller extends Thread {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processIncomingTransactionsQueue() {
|
|
||||||
if (this.incomingTransactions.size() == 0) {
|
|
||||||
// Don't bother locking if there are no new transactions to process
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) {
|
|
||||||
// Prioritize syncing, and don't attempt to lock
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
|
|
||||||
if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) {
|
|
||||||
LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue"));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOGGER.info("Interrupted when trying to acquire blockchain lock");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
|
||||||
|
|
||||||
// Iterate through incoming transactions list
|
|
||||||
synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList()
|
|
||||||
Iterator iterator = this.incomingTransactions.iterator();
|
|
||||||
while (iterator.hasNext()) {
|
|
||||||
if (isStopping) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
TransactionData transactionData = (TransactionData) iterator.next();
|
|
||||||
Transaction transaction = Transaction.fromData(repository, transactionData);
|
|
||||||
|
|
||||||
// Check signature
|
|
||||||
if (!transaction.isSignatureValid()) {
|
|
||||||
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
ValidationResult validationResult = transaction.importAsUnconfirmed();
|
|
||||||
|
|
||||||
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
|
|
||||||
LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature())));
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
|
|
||||||
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature())));
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (validationResult != ValidationResult.OK) {
|
|
||||||
final String signature58 = Base58.encode(transactionData.getSignature());
|
|
||||||
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58));
|
|
||||||
Long now = NTP.getTime();
|
|
||||||
if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) {
|
|
||||||
Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL;
|
|
||||||
if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) {
|
|
||||||
// Use shorter recheck interval for expired transactions
|
|
||||||
expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL;
|
|
||||||
}
|
|
||||||
Long expiry = now + expiryLength;
|
|
||||||
LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58);
|
|
||||||
// Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it
|
|
||||||
invalidUnconfirmedTransactions.put(signature58, expiry);
|
|
||||||
}
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
|
|
||||||
iterator.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (DataException e) {
|
|
||||||
LOGGER.error(String.format("Repository issue while processing incoming transactions", e));
|
|
||||||
} finally {
|
|
||||||
blockchainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void cleanupInvalidTransactionsList(Long now) {
|
|
||||||
if (now == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// Periodically remove invalid unconfirmed transactions from the list, so that they can be fetched again
|
|
||||||
invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < now);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) {
|
private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) {
|
||||||
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
|
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
|
||||||
final byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
|
final byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
|
||||||
@ -1784,88 +1787,94 @@ public class Controller extends Thread {
|
|||||||
|
|
||||||
private void sendOurOnlineAccountsInfo() {
|
private void sendOurOnlineAccountsInfo() {
|
||||||
final Long now = NTP.getTime();
|
final Long now = NTP.getTime();
|
||||||
if (now == null)
|
if (now != null) {
|
||||||
return;
|
|
||||||
|
|
||||||
List<MintingAccountData> mintingAccounts;
|
List<MintingAccountData> mintingAccounts;
|
||||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||||
mintingAccounts = repository.getAccountRepository().getMintingAccounts();
|
mintingAccounts = repository.getAccountRepository().getMintingAccounts();
|
||||||
|
|
||||||
// We have no accounts, but don't reset timestamp
|
// We have no accounts, but don't reset timestamp
|
||||||
if (mintingAccounts.isEmpty())
|
if (mintingAccounts.isEmpty())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// Only reward-share accounts allowed
|
// Only reward-share accounts allowed
|
||||||
Iterator<MintingAccountData> iterator = mintingAccounts.iterator();
|
Iterator<MintingAccountData> iterator = mintingAccounts.iterator();
|
||||||
while (iterator.hasNext()) {
|
int i = 0;
|
||||||
MintingAccountData mintingAccountData = iterator.next();
|
|
||||||
|
|
||||||
RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(mintingAccountData.getPublicKey());
|
|
||||||
if (rewardShareData == null) {
|
|
||||||
// Reward-share doesn't even exist - probably not a good sign
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Account mintingAccount = new Account(repository, rewardShareData.getMinter());
|
|
||||||
if (!mintingAccount.canMint()) {
|
|
||||||
// Minting-account component of reward-share can no longer mint - disregard
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (DataException e) {
|
|
||||||
LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage()));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 'current' timestamp
|
|
||||||
final long onlineAccountsTimestamp = Controller.toOnlineAccountTimestamp(now);
|
|
||||||
boolean hasInfoChanged = false;
|
|
||||||
|
|
||||||
byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp);
|
|
||||||
List<OnlineAccountData> ourOnlineAccounts = new ArrayList<>();
|
|
||||||
|
|
||||||
MINTING_ACCOUNTS:
|
|
||||||
for (MintingAccountData mintingAccountData : mintingAccounts) {
|
|
||||||
PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey());
|
|
||||||
|
|
||||||
byte[] signature = mintingAccount.sign(timestampBytes);
|
|
||||||
byte[] publicKey = mintingAccount.getPublicKey();
|
|
||||||
|
|
||||||
// Our account is online
|
|
||||||
OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey);
|
|
||||||
synchronized (this.onlineAccounts) {
|
|
||||||
Iterator<OnlineAccountData> iterator = this.onlineAccounts.iterator();
|
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
OnlineAccountData existingOnlineAccountData = iterator.next();
|
MintingAccountData mintingAccountData = iterator.next();
|
||||||
|
|
||||||
if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) {
|
RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(mintingAccountData.getPublicKey());
|
||||||
// If our online account is already present, with same timestamp, then move on to next mintingAccount
|
if (rewardShareData == null) {
|
||||||
if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp)
|
// Reward-share doesn't even exist - probably not a good sign
|
||||||
continue MINTING_ACCOUNTS;
|
|
||||||
|
|
||||||
// If our online account is already present, but with older timestamp, then remove it
|
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
break;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Account mintingAccount = new Account(repository, rewardShareData.getMinter());
|
||||||
|
if (!mintingAccount.canMint()) {
|
||||||
|
// Minting-account component of reward-share can no longer mint - disregard
|
||||||
|
iterator.remove();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (++i > 2) {
|
||||||
|
iterator.remove();
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (DataException e) {
|
||||||
this.onlineAccounts.add(ourOnlineAccountData);
|
LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage()));
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp));
|
// 'current' timestamp
|
||||||
ourOnlineAccounts.add(ourOnlineAccountData);
|
final long onlineAccountsTimestamp = Controller.toOnlineAccountTimestamp(now);
|
||||||
hasInfoChanged = true;
|
boolean hasInfoChanged = false;
|
||||||
|
|
||||||
|
byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp);
|
||||||
|
List<OnlineAccountData> ourOnlineAccounts = new ArrayList<>();
|
||||||
|
|
||||||
|
MINTING_ACCOUNTS:
|
||||||
|
for (MintingAccountData mintingAccountData : mintingAccounts) {
|
||||||
|
PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey());
|
||||||
|
|
||||||
|
byte[] signature = mintingAccount.sign(timestampBytes);
|
||||||
|
byte[] publicKey = mintingAccount.getPublicKey();
|
||||||
|
|
||||||
|
// Our account is online
|
||||||
|
OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey);
|
||||||
|
synchronized (this.onlineAccounts) {
|
||||||
|
Iterator<OnlineAccountData> iterator = this.onlineAccounts.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
OnlineAccountData existingOnlineAccountData = iterator.next();
|
||||||
|
|
||||||
|
if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) {
|
||||||
|
// If our online account is already present, with same timestamp, then move on to next mintingAccount
|
||||||
|
if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp)
|
||||||
|
continue MINTING_ACCOUNTS;
|
||||||
|
|
||||||
|
// If our online account is already present, but with older timestamp, then remove it
|
||||||
|
iterator.remove();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.onlineAccounts.add(ourOnlineAccountData);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp));
|
||||||
|
ourOnlineAccounts.add(ourOnlineAccountData);
|
||||||
|
hasInfoChanged = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hasInfoChanged)
|
||||||
|
return;
|
||||||
|
|
||||||
|
Message message = new OnlineAccountsMessage(ourOnlineAccounts);
|
||||||
|
Network.getInstance().broadcast(peer -> message);
|
||||||
|
|
||||||
|
LOGGER.trace(() -> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hasInfoChanged)
|
|
||||||
return;
|
|
||||||
|
|
||||||
Message message = new OnlineAccountsMessage(ourOnlineAccounts);
|
|
||||||
Network.getInstance().broadcast(peer -> message);
|
|
||||||
|
|
||||||
LOGGER.trace(()-> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static long toOnlineAccountTimestamp(long timestamp) {
|
public static long toOnlineAccountTimestamp(long timestamp) {
|
||||||
|
Loading…
Reference in New Issue
Block a user