diff --git a/src/main/java/org/qortal/account/Account.java b/src/main/java/org/qortal/account/Account.java index 85fc47d2..df1c2db9 100644 --- a/src/main/java/org/qortal/account/Account.java +++ b/src/main/java/org/qortal/account/Account.java @@ -204,11 +204,15 @@ public class Account { * @throws DataException */ public boolean canMint() throws DataException { - Integer level = this.getLevel(); + AccountData accountData = this.repository.getAccountRepository().getAccount(this.address); + if (accountData == null) + return false; + + Integer level = accountData.getLevel(); if (level != null && level >= BlockChain.getInstance().getMinAccountLevelToMint()) return true; - if (this.isFounder()) + if (Account.isFounder(accountData.getFlags())) return true; return false; @@ -226,11 +230,15 @@ public class Account { * @throws DataException */ public boolean canRewardShare() throws DataException { - Integer level = this.getLevel(); + AccountData accountData = this.repository.getAccountRepository().getAccount(this.address); + if (accountData == null) + return false; + + Integer level = accountData.getLevel(); if (level != null && level >= BlockChain.getInstance().getMinAccountLevelToRewardShare()) return true; - if (this.isFounder()) + if (Account.isFounder(accountData.getFlags())) return true; return false; @@ -264,10 +272,14 @@ public class Account { * @throws DataException */ public int getEffectiveMintingLevel() throws DataException { - if (this.isFounder()) + AccountData accountData = this.repository.getAccountRepository().getAccount(this.address); + if (accountData == null) + return 0; + + if (Account.isFounder(accountData.getFlags())) return BlockChain.getInstance().getFounderEffectiveMintingLevel(); - Integer level = this.getLevel(); + Integer level = accountData.getLevel(); if (level == null) return 0; @@ -290,7 +302,7 @@ public class Account { if (rewardShareData == null) return 0; - PublicKeyAccount rewardShareMinter = new PublicKeyAccount(repository, rewardShareData.getMinterPublicKey()); + Account rewardShareMinter = new Account(repository, rewardShareData.getMinter()); return rewardShareMinter.getEffectiveMintingLevel(); } diff --git a/src/main/java/org/qortal/api/model/BlockMinterSummary.java b/src/main/java/org/qortal/api/model/BlockMinterSummary.java index 570d7580..33b45be1 100644 --- a/src/main/java/org/qortal/api/model/BlockMinterSummary.java +++ b/src/main/java/org/qortal/api/model/BlockMinterSummary.java @@ -32,10 +32,13 @@ public class BlockMinterSummary { } /** Constructs BlockMinterSummary in reward-share context. */ - public BlockMinterSummary(byte[] rewardSharePublicKey, int blockCount, byte[] mintingAccountPublicKey, String recipientAccount) { - this(mintingAccountPublicKey, blockCount); - + public BlockMinterSummary(byte[] rewardSharePublicKey, int blockCount, byte[] mintingAccountPublicKey, String minterAccount, String recipientAccount) { this.rewardSharePublicKey = rewardSharePublicKey; + this.blockCount = blockCount; + + this.mintingAccountPublicKey = mintingAccountPublicKey; + this.mintingAccount = minterAccount; + this.recipientAccount = recipientAccount; } diff --git a/src/main/java/org/qortal/api/resource/AdminResource.java b/src/main/java/org/qortal/api/resource/AdminResource.java index 5277bedf..7ba536ce 100644 --- a/src/main/java/org/qortal/api/resource/AdminResource.java +++ b/src/main/java/org/qortal/api/resource/AdminResource.java @@ -36,8 +36,8 @@ import javax.ws.rs.core.MediaType; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.RollingFileAppender; +import org.qortal.account.Account; import org.qortal.account.PrivateKeyAccount; -import org.qortal.account.PublicKeyAccount; import org.qortal.api.ApiError; import org.qortal.api.ApiErrors; import org.qortal.api.ApiException; @@ -240,7 +240,7 @@ public class AdminResource { // ignore } - return new MintingAccountData(mintingAccountData.getPrivateKey(), rewardShareData); + return new MintingAccountData(mintingAccountData, rewardShareData); }).collect(Collectors.toList()); return mintingAccounts; @@ -284,11 +284,11 @@ public class AdminResource { throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_PRIVATE_KEY); // Qortal: check reward-share's minting account is still allowed to mint - PublicKeyAccount rewardShareMintingAccount = new PublicKeyAccount(repository, rewardShareData.getMinterPublicKey()); + Account rewardShareMintingAccount = new Account(repository, rewardShareData.getMinter()); if (!rewardShareMintingAccount.canMint()) throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.CANNOT_MINT); - MintingAccountData mintingAccountData = new MintingAccountData(seed); + MintingAccountData mintingAccountData = new MintingAccountData(mintingAccount.getPrivateKey(), mintingAccount.getPublicKey()); repository.getAccountRepository().save(mintingAccountData); repository.saveChanges(); diff --git a/src/main/java/org/qortal/block/Block.java b/src/main/java/org/qortal/block/Block.java index 16713605..d0eaeab7 100644 --- a/src/main/java/org/qortal/block/Block.java +++ b/src/main/java/org/qortal/block/Block.java @@ -145,7 +145,7 @@ public class Block { this.repository = repository; this.rewardShareData = repository.getAccountRepository().getRewardShareByIndex(accountIndex); - this.mintingAccount = new PublicKeyAccount(repository, this.rewardShareData.getMinterPublicKey()); + this.mintingAccount = new Account(repository, this.rewardShareData.getMinter()); this.mintingAccountData = repository.getAccountRepository().getAccount(this.mintingAccount.getAddress()); this.isMinterFounder = Account.isFounder(mintingAccountData.getFlags()); diff --git a/src/main/java/org/qortal/block/BlockMinter.java b/src/main/java/org/qortal/block/BlockMinter.java index eb5df334..34c50556 100644 --- a/src/main/java/org/qortal/block/BlockMinter.java +++ b/src/main/java/org/qortal/block/BlockMinter.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.account.Account; import org.qortal.account.PrivateKeyAccount; -import org.qortal.account.PublicKeyAccount; import org.qortal.block.Block.ValidationResult; import org.qortal.controller.Controller; import org.qortal.data.account.MintingAccountData; @@ -123,7 +122,7 @@ public class BlockMinter extends Thread { continue; } - PublicKeyAccount mintingAccount = new PublicKeyAccount(repository, rewardShareData.getMinterPublicKey()); + Account mintingAccount = new Account(repository, rewardShareData.getMinter()); if (!mintingAccount.canMint()) { // Minting-account component of reward-share can no longer mint - disregard madi.remove(); @@ -158,12 +157,12 @@ public class BlockMinter extends Thread { newBlocks.clear(); } + // Discard accounts we have already built blocks with + mintingAccountsData.removeIf(mintingAccountData -> newBlocks.stream().anyMatch(newBlock -> Arrays.equals(newBlock.getBlockData().getMinterPublicKey(), mintingAccountData.getPublicKey()))); + // Do we need to build any potential new blocks? List mintingAccounts = mintingAccountsData.stream().map(accountData -> new PrivateKeyAccount(repository, accountData.getPrivateKey())).collect(Collectors.toList()); - // Discard accounts we have blocks for - mintingAccounts.removeIf(account -> newBlocks.stream().anyMatch(newBlock -> newBlock.getMinter().getAddress().equals(account.getAddress()))); - for (PrivateKeyAccount mintingAccount : mintingAccounts) { // First block does the AT heavy-lifting if (newBlocks.isEmpty()) { @@ -257,11 +256,10 @@ public class BlockMinter extends Thread { RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(newBlock.getBlockData().getMinterPublicKey()); if (rewardShareData != null) { - PublicKeyAccount mintingAccount = new PublicKeyAccount(repository, rewardShareData.getMinterPublicKey()); LOGGER.info(String.format("Minted block %d, sig %.8s by %s on behalf of %s", newBlock.getBlockData().getHeight(), Base58.encode(newBlock.getBlockData().getSignature()), - mintingAccount.getAddress(), + rewardShareData.getMinter(), rewardShareData.getRecipient())); } else { LOGGER.info(String.format("Minted block %d, sig %.8s by %s", diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 94fa0c28..77a9d169 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider; +import org.qortal.account.Account; import org.qortal.account.PrivateKeyAccount; import org.qortal.account.PublicKeyAccount; import org.qortal.api.ApiService; @@ -282,7 +283,11 @@ public class Controller extends Thread { Controller.newInstance(args); LOGGER.info("Starting NTP"); - NTP.start(); + Long ntpOffset = Settings.getInstance().getTestNtpOffset(); + if (ntpOffset != null) + NTP.setFixedOffset(ntpOffset); + else + NTP.start(Settings.getInstance().getNtpServers()); LOGGER.info("Starting repository"); try { @@ -804,505 +809,67 @@ public class Controller extends Thread { public void onNetworkMessage(Peer peer, Message message) { LOGGER.trace(() -> String.format("Processing %s message from %s", message.getType().name(), peer)); + // Ordered by message type value switch (message.getType()) { - case BLOCK: { - // From a v1 peer, with no message ID, this is a broadcast of peer's latest block - - // Not version 1? - if (peer.getVersion() == null || peer.getVersion() > 1) - break; - - // Message ID present? - if (message.hasId()) - break; - - BlockMessage blockMessage = (BlockMessage) message; - BlockData blockData = blockMessage.getBlockData(); - - // Update all peers with same ID - - List connectedPeers = Network.getInstance().getHandshakedPeers(); - for (Peer connectedPeer : connectedPeers) { - // Skip connectedPeer if they have no ID or their ID doesn't match sender's ID - if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) - continue; - - // Update peer chain tip data - PeerChainTipData newChainTipData = new PeerChainTipData(blockData.getHeight(), blockData.getSignature(), blockData.getTimestamp(), blockData.getMinterPublicKey()); - connectedPeer.setChainTipData(newChainTipData); - } - - // Potentially synchronize - requestSync = true; - + case HEIGHT: + onNetworkHeightMessage(peer, message); break; - } - - case HEIGHT: { - HeightMessage heightMessage = (HeightMessage) message; - - // Update all peers with same ID - - List connectedPeers = Network.getInstance().getHandshakedPeers(); - for (Peer connectedPeer : connectedPeers) { - if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) - continue; - - // Update peer chain tip data - PeerChainTipData newChainTipData = new PeerChainTipData(heightMessage.getHeight(), null, null, null); - connectedPeer.setChainTipData(newChainTipData); - } - - // Potentially synchronize - requestSync = true; + case GET_SIGNATURES: + onNetworkGetSignaturesMessage(peer, message); break; - } - - case HEIGHT_V2: { - HeightV2Message heightV2Message = (HeightV2Message) message; - - // If peer is inbound and we've not updated their height - // then this is probably their initial HEIGHT_V2 message - // so they need a corresponding HEIGHT_V2 message from us - if (!peer.isOutbound() && (peer.getChainTipData() == null || peer.getChainTipData().getLastHeight() == null)) - peer.sendMessage(Network.getInstance().buildHeightMessage(peer, getChainTip())); - - // Update all peers with same ID - - List connectedPeers = Network.getInstance().getHandshakedPeers(); - for (Peer connectedPeer : connectedPeers) { - // Skip connectedPeer if they have no ID or their ID doesn't match sender's ID - if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) - continue; - - // Update peer chain tip data - PeerChainTipData newChainTipData = new PeerChainTipData(heightV2Message.getHeight(), heightV2Message.getSignature(), heightV2Message.getTimestamp(), heightV2Message.getMinterPublicKey()); - connectedPeer.setChainTipData(newChainTipData); - } - - // Potentially synchronize - requestSync = true; + case GET_BLOCK: + onNetworkGetBlockMessage(peer, message); break; - } - - case GET_SIGNATURES: { - GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message; - byte[] parentSignature = getSignaturesMessage.getParentSignature(); - - try (final Repository repository = RepositoryManager.getRepository()) { - List signatures = new ArrayList<>(); - - do { - BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); - - if (blockData == null) - break; - - parentSignature = blockData.getSignature(); - signatures.add(parentSignature); - } while (signatures.size() < Network.MAX_SIGNATURES_PER_REPLY); - - Message signaturesMessage = new SignaturesMessage(signatures); - signaturesMessage.setId(message.getId()); - if (!peer.sendMessage(signaturesMessage)) - peer.disconnect("failed to send signatures"); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while sending signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); - } + case BLOCK: + onNetworkBlockMessage(peer, message); break; - } - - case GET_SIGNATURES_V2: { - GetSignaturesV2Message getSignaturesMessage = (GetSignaturesV2Message) message; - byte[] parentSignature = getSignaturesMessage.getParentSignature(); - - try (final Repository repository = RepositoryManager.getRepository()) { - List signatures = new ArrayList<>(); - - do { - BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); - - if (blockData == null) - break; - - parentSignature = blockData.getSignature(); - signatures.add(parentSignature); - } while (signatures.size() < getSignaturesMessage.getNumberRequested()); - - Message signaturesMessage = new SignaturesMessage(signatures); - signaturesMessage.setId(message.getId()); - if (!peer.sendMessage(signaturesMessage)) - peer.disconnect("failed to send signatures (v2)"); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); - } + case TRANSACTION: + onNetworkTransactionMessage(peer, message); break; - } - - case GET_BLOCK: { - GetBlockMessage getBlockMessage = (GetBlockMessage) message; - byte[] signature = getBlockMessage.getSignature(); - - try (final Repository repository = RepositoryManager.getRepository()) { - BlockData blockData = repository.getBlockRepository().fromSignature(signature); - if (blockData == null) { - LOGGER.debug(() -> String.format("Ignoring GET_BLOCK request from peer %s for unknown block %s", peer, Base58.encode(signature))); - // Send no response at all??? - break; - } - - Block block = new Block(repository, blockData); - - Message blockMessage = new BlockMessage(block); - blockMessage.setId(message.getId()); - if (!peer.sendMessage(blockMessage)) - peer.disconnect("failed to send block"); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e); - } + case GET_BLOCK_SUMMARIES: + onNetworkGetBlockSummariesMessage(peer, message); break; - } - - case GET_TRANSACTION: { - GetTransactionMessage getTransactionMessage = (GetTransactionMessage) message; - byte[] signature = getTransactionMessage.getSignature(); - - try (final Repository repository = RepositoryManager.getRepository()) { - TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); - if (transactionData == null) { - LOGGER.debug(() -> String.format("Ignoring GET_TRANSACTION request from peer %s for unknown transaction %s", peer, Base58.encode(signature))); - // Send no response at all??? - break; - } - - Message transactionMessage = new TransactionMessage(transactionData); - transactionMessage.setId(message.getId()); - if (!peer.sendMessage(transactionMessage)) - peer.disconnect("failed to send transaction"); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while send transaction %s to peer %s", Base58.encode(signature), peer), e); - } + case GET_SIGNATURES_V2: + onNetworkGetSignaturesV2Message(peer, message); break; - } - - case TRANSACTION: { - TransactionMessage transactionMessage = (TransactionMessage) message; - TransactionData transactionData = transactionMessage.getTransactionData(); - - try (final Repository repository = RepositoryManager.getRepository()) { - Transaction transaction = Transaction.fromData(repository, transactionData); - - // Check signature - if (!transaction.isSignatureValid()) { - LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); - break; - } - - ValidationResult validationResult = transaction.importAsUnconfirmed(); - - if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { - LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); - break; - } - - if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { - LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); - break; - } - - if (validationResult != ValidationResult.OK) { - LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s from peer %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); - break; - } - - LOGGER.debug(() -> String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); - } + case HEIGHT_V2: + onNetworkHeightV2Message(peer, message); break; - } - case GET_UNCONFIRMED_TRANSACTIONS: { - try (final Repository repository = RepositoryManager.getRepository()) { - List signatures = repository.getTransactionRepository().getUnconfirmedTransactionSignatures(); - - Message transactionSignaturesMessage = new TransactionSignaturesMessage(signatures); - if (!peer.sendMessage(transactionSignaturesMessage)) - peer.disconnect("failed to send unconfirmed transaction signatures"); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while sending unconfirmed transaction signatures to peer %s", peer), e); - } + case GET_TRANSACTION: + onNetworkGetTransactionMessage(peer, message); break; - } - - case TRANSACTION_SIGNATURES: { - TransactionSignaturesMessage transactionSignaturesMessage = (TransactionSignaturesMessage) message; - List signatures = transactionSignaturesMessage.getSignatures(); - List newSignatures = new ArrayList<>(); - - try (final Repository repository = RepositoryManager.getRepository()) { - for (byte[] signature : signatures) { - // Do we have it already? (Before requesting transaction data itself) - if (repository.getTransactionRepository().exists(signature)) { - LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(signature), peer)); - continue; - } - - // Check isInterrupted() here and exit fast - if (Thread.currentThread().isInterrupted()) - return; - - // Fetch actual transaction data from peer - Message getTransactionMessage = new GetTransactionMessage(signature); - Message responseMessage = peer.getResponse(getTransactionMessage); - if (!(responseMessage instanceof TransactionMessage)) { - // Maybe peer no longer has this transaction - LOGGER.trace(() -> String.format("Peer %s didn't send transaction %s", peer, Base58.encode(signature))); - continue; - } - - // Check isInterrupted() here and exit fast - if (Thread.currentThread().isInterrupted()) - return; - - TransactionMessage transactionMessage = (TransactionMessage) responseMessage; - TransactionData transactionData = transactionMessage.getTransactionData(); - Transaction transaction = Transaction.fromData(repository, transactionData); - - // Check signature - if (!transaction.isSignatureValid()) { - LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); - continue; - } - - ValidationResult validationResult = transaction.importAsUnconfirmed(); - - if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { - LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); - continue; - } - - if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { - LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); - // Some other thread (e.g. Synchronizer) might have blockchain lock for a while so might as well give up for now - break; - } - - if (validationResult != ValidationResult.OK) { - LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s from peer %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); - continue; - } - - LOGGER.debug(() -> String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); - - // We could collate signatures that are new to us and broadcast them to our peers too - newSignatures.add(signature); - } - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e); - } catch (InterruptedException e) { - // Shutdown - return; - } - - if (newSignatures.isEmpty()) - break; - - // Broadcast signatures that are new to us - Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : new TransactionSignaturesMessage(newSignatures)); + case GET_UNCONFIRMED_TRANSACTIONS: + onNetworkGetUnconfirmedTransactionsMessage(peer, message); break; - } - - case GET_BLOCK_SUMMARIES: { - GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; - byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); - - try (final Repository repository = RepositoryManager.getRepository()) { - List blockSummaries = new ArrayList<>(); - - int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested()); - - do { - BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); - - if (blockData == null) - break; - - BlockSummaryData blockSummary = new BlockSummaryData(blockData); - blockSummaries.add(blockSummary); - parentSignature = blockData.getSignature(); - } while (blockSummaries.size() < numberRequested); - - Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries); - blockSummariesMessage.setId(message.getId()); - if (!peer.sendMessage(blockSummariesMessage)) - peer.disconnect("failed to send block summaries"); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e); - } + case TRANSACTION_SIGNATURES: + onNetworkTransactionSignaturesMessage(peer, message); break; - } - - case GET_ARBITRARY_DATA: { - GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message; - - byte[] signature = getArbitraryDataMessage.getSignature(); - String signature58 = Base58.encode(signature); - Long timestamp = NTP.getTime(); - Triple newEntry = new Triple<>(signature58, peer, timestamp); - - // If we've seen this request recently, then ignore - if (arbitraryDataRequests.putIfAbsent(message.getId(), newEntry) != null) - break; - - // Do we even have this transaction? - try (final Repository repository = RepositoryManager.getRepository()) { - TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); - if (transactionData == null || transactionData.getType() != TransactionType.ARBITRARY) - break; - - ArbitraryTransaction transaction = new ArbitraryTransaction(repository, transactionData); - - // If we have the data then send it - if (transaction.isDataLocal()) { - byte[] data = transaction.fetchData(); - if (data == null) - break; - - // Update requests map to reflect that we've sent it - newEntry = new Triple<>(signature58, null, timestamp); - arbitraryDataRequests.put(message.getId(), newEntry); - - Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data); - arbitraryDataMessage.setId(message.getId()); - if (!peer.sendMessage(arbitraryDataMessage)) - peer.disconnect("failed to send arbitrary data"); - - break; - } - - // Ask our other peers if they have it - Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : message); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e); - } + case GET_ARBITRARY_DATA: + onNetworkGetArbitraryDataMessage(peer, message); break; - } - - case ARBITRARY_DATA: { - ArbitraryDataMessage arbitraryDataMessage = (ArbitraryDataMessage) message; - - // Do we have a pending request for this data? - Triple request = arbitraryDataRequests.get(message.getId()); - if (request == null || request.getA() == null) - break; - - // Does this message's signature match what we're expecting? - byte[] signature = arbitraryDataMessage.getSignature(); - String signature58 = Base58.encode(signature); - if (!request.getA().equals(signature58)) - break; - - byte[] data = arbitraryDataMessage.getData(); - - // Check transaction exists and payload hash is correct - try (final Repository repository = RepositoryManager.getRepository()) { - TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); - if (!(transactionData instanceof ArbitraryTransactionData)) - break; - - ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData; - - byte[] actualHash = Crypto.digest(data); - - // "data" from repository will always be hash of actual raw data - if (!Arrays.equals(arbitraryTransactionData.getData(), actualHash)) - break; - - // Update requests map to reflect that we've received it - Triple newEntry = new Triple<>(null, null, request.getC()); - arbitraryDataRequests.put(message.getId(), newEntry); - - // Save payload locally - // TODO: storage policy - arbitraryTransactionData.setDataType(DataType.RAW_DATA); - arbitraryTransactionData.setData(data); - repository.getArbitraryRepository().save(arbitraryTransactionData); - repository.saveChanges(); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e); - } - - Peer requestingPeer = request.getB(); - if (requestingPeer != null) { - // Forward to requesting peer; - if (!requestingPeer.sendMessage(arbitraryDataMessage)) - requestingPeer.disconnect("failed to forward arbitrary data"); - } + case ARBITRARY_DATA: + onNetworkArbitraryDataMessage(peer, message); break; - } - - case GET_ONLINE_ACCOUNTS: { - GetOnlineAccountsMessage getOnlineAccountsMessage = (GetOnlineAccountsMessage) message; - - List excludeAccounts = getOnlineAccountsMessage.getOnlineAccounts(); - - // Send online accounts info, excluding entries with matching timestamp & public key from excludeAccounts - List accountsToSend; - synchronized (this.onlineAccounts) { - accountsToSend = new ArrayList<>(this.onlineAccounts); - } - - Iterator iterator = accountsToSend.iterator(); - - SEND_ITERATOR: - while (iterator.hasNext()) { - OnlineAccountData onlineAccountData = iterator.next(); - - for (int i = 0; i < excludeAccounts.size(); ++i) { - OnlineAccountData excludeAccountData = excludeAccounts.get(i); - - if (onlineAccountData.getTimestamp() == excludeAccountData.getTimestamp() && Arrays.equals(onlineAccountData.getPublicKey(), excludeAccountData.getPublicKey())) { - iterator.remove(); - continue SEND_ITERATOR; - } - } - } - - Message onlineAccountsMessage = new OnlineAccountsMessage(accountsToSend); - peer.sendMessage(onlineAccountsMessage); - - LOGGER.trace(() -> String.format("Sent %d of our %d online accounts to %s", accountsToSend.size(), this.onlineAccounts.size(), peer)); + case GET_ONLINE_ACCOUNTS: + onNetworkGetOnlineAccountsMessage(peer, message); break; - } - - case ONLINE_ACCOUNTS: { - OnlineAccountsMessage onlineAccountsMessage = (OnlineAccountsMessage) message; - - List peersOnlineAccounts = onlineAccountsMessage.getOnlineAccounts(); - LOGGER.trace(() -> String.format("Received %d online accounts from %s", peersOnlineAccounts.size(), peer)); - - try (final Repository repository = RepositoryManager.getRepository()) { - for (OnlineAccountData onlineAccountData : peersOnlineAccounts) - this.verifyAndAddAccount(repository, onlineAccountData); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while verifying online accounts from peer %s", peer), e); - } + case ONLINE_ACCOUNTS: + onNetworkOnlineAccountsMessage(peer, message); break; - } default: LOGGER.debug(String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer)); @@ -1310,6 +877,481 @@ public class Controller extends Thread { } } + private void onNetworkHeightMessage(Peer peer, Message message) { + HeightMessage heightMessage = (HeightMessage) message; + + // Update all peers with same ID + + List connectedPeers = Network.getInstance().getHandshakedPeers(); + for (Peer connectedPeer : connectedPeers) { + if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) + continue; + + // Update peer chain tip data + PeerChainTipData newChainTipData = new PeerChainTipData(heightMessage.getHeight(), null, null, null); + connectedPeer.setChainTipData(newChainTipData); + } + + // Potentially synchronize + requestSync = true; + } + + private void onNetworkGetSignaturesMessage(Peer peer, Message message) { + GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message; + byte[] parentSignature = getSignaturesMessage.getParentSignature(); + + try (final Repository repository = RepositoryManager.getRepository()) { + List signatures = new ArrayList<>(); + + do { + BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); + + if (blockData == null) + break; + + parentSignature = blockData.getSignature(); + signatures.add(parentSignature); + } while (signatures.size() < Network.MAX_SIGNATURES_PER_REPLY); + + Message signaturesMessage = new SignaturesMessage(signatures); + signaturesMessage.setId(message.getId()); + if (!peer.sendMessage(signaturesMessage)) + peer.disconnect("failed to send signatures"); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while sending signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); + } + } + + private void onNetworkGetBlockMessage(Peer peer, Message message) { + GetBlockMessage getBlockMessage = (GetBlockMessage) message; + byte[] signature = getBlockMessage.getSignature(); + + try (final Repository repository = RepositoryManager.getRepository()) { + BlockData blockData = repository.getBlockRepository().fromSignature(signature); + if (blockData == null) { + LOGGER.debug(() -> String.format("Ignoring GET_BLOCK request from peer %s for unknown block %s", peer, Base58.encode(signature))); + // Send no response at all??? + return; + } + + Block block = new Block(repository, blockData); + + Message blockMessage = new BlockMessage(block); + blockMessage.setId(message.getId()); + if (!peer.sendMessage(blockMessage)) + peer.disconnect("failed to send block"); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e); + } + } + + private void onNetworkBlockMessage(Peer peer, Message message) { + // From a v1 peer, with no message ID, this is a broadcast of peer's latest block + // v2 peers announce new blocks using HEIGHT_V2 + + // Not version 1? + if (peer.getVersion() == null || peer.getVersion() > 1) + return; + + // Message ID present? + // XXX Why is this test here? If BLOCK had an ID then surely it would be a response to GET_BLOCK + // and hence captured by Peer's reply queue? + if (message.hasId()) + return; + + BlockMessage blockMessage = (BlockMessage) message; + BlockData blockData = blockMessage.getBlockData(); + + // Update all peers with same ID + + List connectedPeers = Network.getInstance().getHandshakedPeers(); + for (Peer connectedPeer : connectedPeers) { + // Skip connectedPeer if they have no ID or their ID doesn't match sender's ID + if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) + continue; + + // Update peer chain tip data + PeerChainTipData newChainTipData = new PeerChainTipData(blockData.getHeight(), blockData.getSignature(), blockData.getTimestamp(), blockData.getMinterPublicKey()); + connectedPeer.setChainTipData(newChainTipData); + } + + // Potentially synchronize + requestSync = true; + } + + private void onNetworkTransactionMessage(Peer peer, Message message) { + TransactionMessage transactionMessage = (TransactionMessage) message; + TransactionData transactionData = transactionMessage.getTransactionData(); + + try (final Repository repository = RepositoryManager.getRepository()) { + Transaction transaction = Transaction.fromData(repository, transactionData); + + // Check signature + if (!transaction.isSignatureValid()) { + LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + return; + } + + ValidationResult validationResult = transaction.importAsUnconfirmed(); + + if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { + LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); + return; + } + + if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { + LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); + return; + } + + if (validationResult != ValidationResult.OK) { + LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s from peer %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + return; + } + + LOGGER.debug(() -> String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); + } + } + + private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) { + GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; + byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); + + try (final Repository repository = RepositoryManager.getRepository()) { + List blockSummaries = new ArrayList<>(); + + int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested()); + + do { + BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); + + if (blockData == null) + // No more blocks to send to peer + break; + + BlockSummaryData blockSummary = new BlockSummaryData(blockData); + blockSummaries.add(blockSummary); + parentSignature = blockData.getSignature(); + } while (blockSummaries.size() < numberRequested); + + Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries); + blockSummariesMessage.setId(message.getId()); + if (!peer.sendMessage(blockSummariesMessage)) + peer.disconnect("failed to send block summaries"); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e); + } + } + + private void onNetworkGetSignaturesV2Message(Peer peer, Message message) { + GetSignaturesV2Message getSignaturesMessage = (GetSignaturesV2Message) message; + byte[] parentSignature = getSignaturesMessage.getParentSignature(); + + try (final Repository repository = RepositoryManager.getRepository()) { + List signatures = new ArrayList<>(); + + do { + BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); + + if (blockData == null) + // No more signatures to send to peer + break; + + parentSignature = blockData.getSignature(); + signatures.add(parentSignature); + } while (signatures.size() < getSignaturesMessage.getNumberRequested()); + + Message signaturesMessage = new SignaturesMessage(signatures); + signaturesMessage.setId(message.getId()); + if (!peer.sendMessage(signaturesMessage)) + peer.disconnect("failed to send signatures (v2)"); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); + } + } + + private void onNetworkHeightV2Message(Peer peer, Message message) { + HeightV2Message heightV2Message = (HeightV2Message) message; + + // If peer is inbound and we've not updated their height + // then this is probably their initial HEIGHT_V2 message + // so they need a corresponding HEIGHT_V2 message from us + if (!peer.isOutbound() && (peer.getChainTipData() == null || peer.getChainTipData().getLastHeight() == null)) + peer.sendMessage(Network.getInstance().buildHeightMessage(peer, getChainTip())); + + // Update all peers with same ID + + List connectedPeers = Network.getInstance().getHandshakedPeers(); + for (Peer connectedPeer : connectedPeers) { + // Skip connectedPeer if they have no ID or their ID doesn't match sender's ID + if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) + continue; + + // Update peer chain tip data + PeerChainTipData newChainTipData = new PeerChainTipData(heightV2Message.getHeight(), heightV2Message.getSignature(), heightV2Message.getTimestamp(), heightV2Message.getMinterPublicKey()); + connectedPeer.setChainTipData(newChainTipData); + } + + // Potentially synchronize + requestSync = true; + } + + private void onNetworkGetTransactionMessage(Peer peer, Message message) { + GetTransactionMessage getTransactionMessage = (GetTransactionMessage) message; + byte[] signature = getTransactionMessage.getSignature(); + + try (final Repository repository = RepositoryManager.getRepository()) { + TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); + if (transactionData == null) { + LOGGER.debug(() -> String.format("Ignoring GET_TRANSACTION request from peer %s for unknown transaction %s", peer, Base58.encode(signature))); + // Send no response at all??? + return; + } + + Message transactionMessage = new TransactionMessage(transactionData); + transactionMessage.setId(message.getId()); + if (!peer.sendMessage(transactionMessage)) + peer.disconnect("failed to send transaction"); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while send transaction %s to peer %s", Base58.encode(signature), peer), e); + } + } + + private void onNetworkGetUnconfirmedTransactionsMessage(Peer peer, Message message) { + try (final Repository repository = RepositoryManager.getRepository()) { + List signatures = repository.getTransactionRepository().getUnconfirmedTransactionSignatures(); + + Message transactionSignaturesMessage = new TransactionSignaturesMessage(signatures); + if (!peer.sendMessage(transactionSignaturesMessage)) + peer.disconnect("failed to send unconfirmed transaction signatures"); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while sending unconfirmed transaction signatures to peer %s", peer), e); + } + } + + private void onNetworkTransactionSignaturesMessage(Peer peer, Message message) { + TransactionSignaturesMessage transactionSignaturesMessage = (TransactionSignaturesMessage) message; + List signatures = transactionSignaturesMessage.getSignatures(); + List newSignatures = new ArrayList<>(); + + try (final Repository repository = RepositoryManager.getRepository()) { + for (byte[] signature : signatures) { + // Do we have it already? (Before requesting transaction data itself) + if (repository.getTransactionRepository().exists(signature)) { + LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(signature), peer)); + continue; + } + + // Check isInterrupted() here and exit fast + if (Thread.currentThread().isInterrupted()) + return; + + // Fetch actual transaction data from peer + Message getTransactionMessage = new GetTransactionMessage(signature); + Message responseMessage = peer.getResponse(getTransactionMessage); + if (!(responseMessage instanceof TransactionMessage)) { + // Maybe peer no longer has this transaction + LOGGER.trace(() -> String.format("Peer %s didn't send transaction %s", peer, Base58.encode(signature))); + continue; + } + + // Check isInterrupted() here and exit fast + if (Thread.currentThread().isInterrupted()) + return; + + TransactionMessage transactionMessage = (TransactionMessage) responseMessage; + TransactionData transactionData = transactionMessage.getTransactionData(); + Transaction transaction = Transaction.fromData(repository, transactionData); + + // Check signature + if (!transaction.isSignatureValid()) { + LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + continue; + } + + ValidationResult validationResult = transaction.importAsUnconfirmed(); + + if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { + LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); + continue; + } + + if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { + LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); + // Some other thread (e.g. Synchronizer) might have blockchain lock for a while so might as well give up for now + break; + } + + if (validationResult != ValidationResult.OK) { + LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s from peer %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + continue; + } + + LOGGER.debug(() -> String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + + // We could collate signatures that are new to us and broadcast them to our peers too + newSignatures.add(signature); + } + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e); + } catch (InterruptedException e) { + // Shutdown + return; + } + + if (newSignatures.isEmpty()) + return; + + // Broadcast signatures that are new to us + Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : new TransactionSignaturesMessage(newSignatures)); + } + + private void onNetworkGetArbitraryDataMessage(Peer peer, Message message) { + GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message; + + byte[] signature = getArbitraryDataMessage.getSignature(); + String signature58 = Base58.encode(signature); + Long timestamp = NTP.getTime(); + Triple newEntry = new Triple<>(signature58, peer, timestamp); + + // If we've seen this request recently, then ignore + if (arbitraryDataRequests.putIfAbsent(message.getId(), newEntry) != null) + return; + + // Do we even have this transaction? + try (final Repository repository = RepositoryManager.getRepository()) { + TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); + if (transactionData == null || transactionData.getType() != TransactionType.ARBITRARY) + return; + + ArbitraryTransaction transaction = new ArbitraryTransaction(repository, transactionData); + + // If we have the data then send it + if (transaction.isDataLocal()) { + byte[] data = transaction.fetchData(); + if (data == null) + return; + + // Update requests map to reflect that we've sent it + newEntry = new Triple<>(signature58, null, timestamp); + arbitraryDataRequests.put(message.getId(), newEntry); + + Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data); + arbitraryDataMessage.setId(message.getId()); + if (!peer.sendMessage(arbitraryDataMessage)) + peer.disconnect("failed to send arbitrary data"); + + return; + } + + // Ask our other peers if they have it + Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : message); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e); + } + } + + private void onNetworkArbitraryDataMessage(Peer peer, Message message) { + ArbitraryDataMessage arbitraryDataMessage = (ArbitraryDataMessage) message; + + // Do we have a pending request for this data? + Triple request = arbitraryDataRequests.get(message.getId()); + if (request == null || request.getA() == null) + return; + + // Does this message's signature match what we're expecting? + byte[] signature = arbitraryDataMessage.getSignature(); + String signature58 = Base58.encode(signature); + if (!request.getA().equals(signature58)) + return; + + byte[] data = arbitraryDataMessage.getData(); + + // Check transaction exists and payload hash is correct + try (final Repository repository = RepositoryManager.getRepository()) { + TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); + if (!(transactionData instanceof ArbitraryTransactionData)) + return; + + ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData; + + byte[] actualHash = Crypto.digest(data); + + // "data" from repository will always be hash of actual raw data + if (!Arrays.equals(arbitraryTransactionData.getData(), actualHash)) + return; + + // Update requests map to reflect that we've received it + Triple newEntry = new Triple<>(null, null, request.getC()); + arbitraryDataRequests.put(message.getId(), newEntry); + + // Save payload locally + // TODO: storage policy + arbitraryTransactionData.setDataType(DataType.RAW_DATA); + arbitraryTransactionData.setData(data); + repository.getArbitraryRepository().save(arbitraryTransactionData); + repository.saveChanges(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e); + } + + Peer requestingPeer = request.getB(); + if (requestingPeer != null) { + // Forward to requesting peer; + if (!requestingPeer.sendMessage(arbitraryDataMessage)) + requestingPeer.disconnect("failed to forward arbitrary data"); + } + } + + private void onNetworkGetOnlineAccountsMessage(Peer peer, Message message) { + GetOnlineAccountsMessage getOnlineAccountsMessage = (GetOnlineAccountsMessage) message; + + List excludeAccounts = getOnlineAccountsMessage.getOnlineAccounts(); + + // Send online accounts info, excluding entries with matching timestamp & public key from excludeAccounts + List accountsToSend; + synchronized (this.onlineAccounts) { + accountsToSend = new ArrayList<>(this.onlineAccounts); + } + + Iterator iterator = accountsToSend.iterator(); + + SEND_ITERATOR: + while (iterator.hasNext()) { + OnlineAccountData onlineAccountData = iterator.next(); + + for (int i = 0; i < excludeAccounts.size(); ++i) { + OnlineAccountData excludeAccountData = excludeAccounts.get(i); + + if (onlineAccountData.getTimestamp() == excludeAccountData.getTimestamp() && Arrays.equals(onlineAccountData.getPublicKey(), excludeAccountData.getPublicKey())) { + iterator.remove(); + continue SEND_ITERATOR; + } + } + } + + Message onlineAccountsMessage = new OnlineAccountsMessage(accountsToSend); + peer.sendMessage(onlineAccountsMessage); + + LOGGER.trace(() -> String.format("Sent %d of our %d online accounts to %s", accountsToSend.size(), this.onlineAccounts.size(), peer)); + } + + private void onNetworkOnlineAccountsMessage(Peer peer, Message message) { + OnlineAccountsMessage onlineAccountsMessage = (OnlineAccountsMessage) message; + + List peersOnlineAccounts = onlineAccountsMessage.getOnlineAccounts(); + LOGGER.trace(() -> String.format("Received %d online accounts from %s", peersOnlineAccounts.size(), peer)); + + try (final Repository repository = RepositoryManager.getRepository()) { + for (OnlineAccountData onlineAccountData : peersOnlineAccounts) + this.verifyAndAddAccount(repository, onlineAccountData); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while verifying online accounts from peer %s", peer), e); + } + } + // Utilities private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException { @@ -1340,7 +1382,7 @@ public class Controller extends Thread { return; } - PublicKeyAccount mintingAccount = new PublicKeyAccount(repository, rewardShareData.getMinterPublicKey()); + Account mintingAccount = new Account(repository, rewardShareData.getMinter()); if (!mintingAccount.canMint()) { // Minting-account component of reward-share can no longer mint - disregard LOGGER.trace(() -> String.format("Rejecting online reward-share with non-minting account %s", mintingAccount.getAddress())); @@ -1460,7 +1502,7 @@ public class Controller extends Thread { continue; } - PublicKeyAccount mintingAccount = new PublicKeyAccount(repository, rewardShareData.getMinterPublicKey()); + Account mintingAccount = new Account(repository, rewardShareData.getMinter()); if (!mintingAccount.canMint()) { // Minting-account component of reward-share can no longer mint - disregard iterator.remove(); diff --git a/src/main/java/org/qortal/data/account/MintingAccountData.java b/src/main/java/org/qortal/data/account/MintingAccountData.java index a47ca5a1..02b4c0f8 100644 --- a/src/main/java/org/qortal/data/account/MintingAccountData.java +++ b/src/main/java/org/qortal/data/account/MintingAccountData.java @@ -2,10 +2,8 @@ package org.qortal.data.account; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlTransient; -import org.qortal.account.PrivateKeyAccount; import org.qortal.crypto.Crypto; import io.swagger.v3.oas.annotations.media.Schema; @@ -16,14 +14,17 @@ import io.swagger.v3.oas.annotations.media.Schema.AccessMode; public class MintingAccountData { // Properties + + // Never actually displayed by API @Schema(hidden = true) @XmlTransient protected byte[] privateKey; - // Not always present - used by API if not null - @XmlTransient - @Schema(hidden = true) + // Read-only by API, we never ask for it as input + @Schema(accessMode = AccessMode.READ_ONLY) protected byte[] publicKey; + + // Not always present - used by API if not null protected String mintingAccount; protected String recipientAccount; protected String address; @@ -34,17 +35,17 @@ public class MintingAccountData { protected MintingAccountData() { } - public MintingAccountData(byte[] privateKey) { + public MintingAccountData(byte[] privateKey, byte[] publicKey) { this.privateKey = privateKey; - this.publicKey = PrivateKeyAccount.toPublicKey(privateKey); + this.publicKey = publicKey; } - public MintingAccountData(byte[] privateKey, RewardShareData rewardShareData) { - this(privateKey); + public MintingAccountData(MintingAccountData srcMintingAccountData, RewardShareData rewardShareData) { + this(srcMintingAccountData.privateKey, srcMintingAccountData.publicKey); if (rewardShareData != null) { this.recipientAccount = rewardShareData.getRecipient(); - this.mintingAccount = Crypto.toAddress(rewardShareData.getMinterPublicKey()); + this.mintingAccount = rewardShareData.getMinter(); } else { this.address = Crypto.toAddress(this.publicKey); } @@ -56,8 +57,6 @@ public class MintingAccountData { return this.privateKey; } - @XmlElement(name = "publicKey") - @Schema(accessMode = AccessMode.READ_ONLY) public byte[] getPublicKey() { return this.publicKey; } diff --git a/src/main/java/org/qortal/data/account/RewardShareData.java b/src/main/java/org/qortal/data/account/RewardShareData.java index 6c3d7078..2ffaf8d6 100644 --- a/src/main/java/org/qortal/data/account/RewardShareData.java +++ b/src/main/java/org/qortal/data/account/RewardShareData.java @@ -5,8 +5,9 @@ import java.math.BigDecimal; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlTransient; -import org.qortal.crypto.Crypto; +import io.swagger.v3.oas.annotations.media.Schema; // All properties to be converted to JSON via JAXB @XmlAccessorType(XmlAccessType.FIELD) @@ -14,6 +15,12 @@ public class RewardShareData { // Properties private byte[] minterPublicKey; + + // "minter" is called "mintingAccount" instead + @XmlTransient + @Schema(hidden = true) + private String minter; + private String recipient; private byte[] rewardSharePublicKey; private BigDecimal sharePercent; @@ -25,8 +32,9 @@ public class RewardShareData { } // Used when fetching from repository - public RewardShareData(byte[] minterPublicKey, String recipient, byte[] rewardSharePublicKey, BigDecimal sharePercent) { + public RewardShareData(byte[] minterPublicKey, String minter, String recipient, byte[] rewardSharePublicKey, BigDecimal sharePercent) { this.minterPublicKey = minterPublicKey; + this.minter = minter; this.recipient = recipient; this.rewardSharePublicKey = rewardSharePublicKey; this.sharePercent = sharePercent; @@ -38,6 +46,10 @@ public class RewardShareData { return this.minterPublicKey; } + public String getMinter() { + return this.minter; + } + public String getRecipient() { return this.recipient; } @@ -52,7 +64,7 @@ public class RewardShareData { @XmlElement(name = "mintingAccount") public String getMintingAccount() { - return Crypto.toAddress(this.minterPublicKey); + return this.minter; } } diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index 92f27a73..119e66d6 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -95,6 +95,8 @@ public class Network { "node10.qortal.org" }; + private static final long NETWORK_EPC_KEEPALIVE = 10L; // seconds + public static final int MAX_SIGNATURES_PER_REPLY = 500; public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500; public static final int PEER_ID_LENGTH = 128; @@ -142,9 +144,10 @@ public class Network { mergePeersLock = new ReentrantLock(); - // We'll use a cached thread pool, max 10 threads, but with more aggressive 10 second timeout. - ExecutorService networkExecutor = new ThreadPoolExecutor(1, 10, - 10L, TimeUnit.SECONDS, + // We'll use a cached thread pool but with more aggressive timeout. + ExecutorService networkExecutor = new ThreadPoolExecutor(1, + Settings.getInstance().getMaxNetworkThreadPoolSize(), + NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS, new SynchronousQueue()); networkEPC = new NetworkProcessor(networkExecutor); } @@ -302,15 +305,17 @@ public class Network { if (task != null) return task; - task = maybeProducePeerPingTask(); + final Long now = NTP.getTime(); + + task = maybeProducePeerPingTask(now); if (task != null) return task; - task = maybeProduceConnectPeerTask(); + task = maybeProduceConnectPeerTask(now); if (task != null) return task; - task = maybeProduceBroadcastTask(); + task = maybeProduceBroadcastTask(now); if (task != null) return task; @@ -323,6 +328,65 @@ public class Network { return null; } + private Task maybeProducePeerMessageTask() { + for (Peer peer : getConnectedPeers()) { + Task peerTask = peer.getMessageTask(); + if (peerTask != null) + return peerTask; + } + + return null; + } + + private Task maybeProducePeerPingTask(Long now) { + // Ask connected peers whether they need a ping + for (Peer peer : getConnectedPeers()) { + Task peerTask = peer.getPingTask(now); + if (peerTask != null) + return peerTask; + } + + return null; + } + + class PeerConnectTask implements ExecuteProduceConsume.Task { + private final Peer peer; + + public PeerConnectTask(Peer peer) { + this.peer = peer; + } + + @Override + public void perform() throws InterruptedException { + connectPeer(peer); + } + } + + private Task maybeProduceConnectPeerTask(Long now) throws InterruptedException { + if (now == null || now < nextConnectTaskTimestamp) + return null; + + if (getOutboundHandshakedPeers().size() >= minOutboundPeers) + return null; + + nextConnectTaskTimestamp = now + 1000L; + + Peer targetPeer = getConnectablePeer(now); + if (targetPeer == null) + return null; + + // Create connection task + return new PeerConnectTask(targetPeer); + } + + private Task maybeProduceBroadcastTask(Long now) { + if (now == null || now < nextBroadcastTimestamp) + return null; + + nextBroadcastTimestamp = now + BROADCAST_INTERVAL; + return () -> Controller.getInstance().doNetworkBroadcast(); + } + class ChannelTask implements ExecuteProduceConsume.Task { private final SelectionKey selectionKey; @@ -405,67 +469,6 @@ public class Network { return new ChannelTask(nextSelectionKey); } - - private Task maybeProducePeerMessageTask() { - for (Peer peer : getConnectedPeers()) { - Task peerTask = peer.getMessageTask(); - if (peerTask != null) - return peerTask; - } - - return null; - } - - private Task maybeProducePeerPingTask() { - // Ask connected peers whether they need a ping - for (Peer peer : getConnectedPeers()) { - Task peerTask = peer.getPingTask(); - if (peerTask != null) - return peerTask; - } - - return null; - } - - class PeerConnectTask implements ExecuteProduceConsume.Task { - private final Peer peer; - - public PeerConnectTask(Peer peer) { - this.peer = peer; - } - - @Override - public void perform() throws InterruptedException { - connectPeer(peer); - } - } - - private Task maybeProduceConnectPeerTask() throws InterruptedException { - if (getOutboundHandshakedPeers().size() >= minOutboundPeers) - return null; - - final Long now = NTP.getTime(); - if (now == null || now < nextConnectTaskTimestamp) - return null; - - nextConnectTaskTimestamp = now + 1000L; - - Peer targetPeer = getConnectablePeer(); - if (targetPeer == null) - return null; - - // Create connection task - return new PeerConnectTask(targetPeer); - } - - private Task maybeProduceBroadcastTask() { - final Long now = NTP.getTime(); - if (now == null || now < nextBroadcastTimestamp) - return null; - - nextBroadcastTimestamp = now + BROADCAST_INTERVAL; - return () -> Controller.getInstance().doNetworkBroadcast(); - } } private void acceptConnection(ServerSocketChannel serverSocketChannel) throws InterruptedException { @@ -588,9 +591,27 @@ public class Network { } } - private Peer getConnectablePeer() throws InterruptedException { - final long now = NTP.getTime(); + private final Predicate isSelfPeer = peerData -> { + PeerAddress peerAddress = peerData.getAddress(); + return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress)); + }; + private final Predicate isConnectedPeer = peerData -> { + PeerAddress peerAddress = peerData.getAddress(); + return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); + }; + + private final Predicate isResolvedAsConnectedPeer = peerData -> { + try { + InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress(); + return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress)); + } catch (UnknownHostException e) { + // Can't resolve - no point even trying to connect + return true; + } + }; + + private Peer getConnectablePeer(final Long now) throws InterruptedException { // We can't block here so use tryRepository(). We don't NEED to connect a new peer. try (final Repository repository = RepositoryManager.tryRepository()) { if (repository == null) @@ -606,36 +627,17 @@ public class Network { peerData.getLastAttempted() > lastAttemptedThreshold); // Don't consider peers that we know loop back to ourself - Predicate isSelfPeer = peerData -> { - PeerAddress peerAddress = peerData.getAddress(); - return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress)); - }; - synchronized (this.selfPeers) { peers.removeIf(isSelfPeer); } // Don't consider already connected peers (simple address match) - Predicate isConnectedPeer = peerData -> { - PeerAddress peerAddress = peerData.getAddress(); - return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); - }; - synchronized (this.connectedPeers) { peers.removeIf(isConnectedPeer); } // Don't consider already connected peers (resolved address match) - Predicate isResolvedAsConnectedPeer = peerData -> { - try { - InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress(); - return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress)); - } catch (UnknownHostException e) { - // Can't resolve - no point even trying to connect - return true; - } - }; - + // XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS synchronized (this.connectedPeers) { peers.removeIf(isResolvedAsConnectedPeer); } @@ -735,126 +737,43 @@ public class Network { Handshake handshakeStatus = peer.getHandshakeStatus(); if (handshakeStatus != Handshake.COMPLETED) { - try { - // Still handshaking - LOGGER.trace(() -> String.format("Handshake status %s, message %s from peer %s", handshakeStatus.name(), (message != null ? message.getType().name() : "null"), peer)); - - // v1 nodes are keen on sending PINGs early. Send to back of queue so we'll process right after handshake - if (message != null && message.getType() == MessageType.PING) { - peer.queueMessage(message); - return; - } - - // Check message type is as expected - if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { - LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); - peer.disconnect("unexpected message"); - return; - } - - Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message); - - if (newHandshakeStatus == null) { - // Handshake failure - LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); - peer.disconnect("handshake failure"); - return; - } - - if (peer.isOutbound()) - // If we made outbound connection then we need to act first - newHandshakeStatus.action(peer); - else - // We have inbound connection so we need to respond in kind with what we just received - handshakeStatus.action(peer); - - peer.setHandshakeStatus(newHandshakeStatus); - - if (newHandshakeStatus == Handshake.COMPLETED) - this.onHandshakeCompleted(peer); - - return; - } finally { - peer.resetHandshakeMessagePending(); - } + onHandshakingMessage(peer, message, handshakeStatus); + return; } // Should be non-handshaking messages from now on + // Ordered by message type value switch (message.getType()) { - case PEER_VERIFY: - // Remote peer wants extra verification - possibleVerificationResponse(peer); + case GET_PEERS: + onGetPeersMessage(peer, message); break; - case VERIFICATION_CODES: - VerificationCodesMessage verificationCodesMessage = (VerificationCodesMessage) message; + case PEERS: + onPeersMessage(peer, message); + break; - // Remote peer is sending the code it wants to receive back via our outbound connection to it - Peer ourUnverifiedPeer = Network.getInstance().getInboundPeerWithId(Network.getInstance().getOurPeerId()); - ourUnverifiedPeer.setVerificationCodes(verificationCodesMessage.getVerificationCodeSent(), verificationCodesMessage.getVerificationCodeExpected()); - - possibleVerificationResponse(ourUnverifiedPeer); + case PING: + onPingMessage(peer, message); break; case VERSION: case PEER_ID: case PROOF: - LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer)); + LOGGER.debug(() -> String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer)); peer.disconnect("unexpected handshaking message"); return; - case PING: - PingMessage pingMessage = (PingMessage) message; - - // Generate 'pong' using same ID - PingMessage pongMessage = new PingMessage(); - pongMessage.setId(pingMessage.getId()); - - if (!peer.sendMessage(pongMessage)) - peer.disconnect("failed to send ping reply"); - - break; - - case PEERS: - PeersMessage peersMessage = (PeersMessage) message; - - List peerAddresses = new ArrayList<>(); - - // v1 PEERS message doesn't support port numbers so we have to add default port - for (InetAddress peerAddress : peersMessage.getPeerAddresses()) - // This is always IPv4 so we don't have to worry about bracketing IPv6. - peerAddresses.add(PeerAddress.fromString(peerAddress.getHostAddress())); - - // Also add peer's details - peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost())); - - mergePeers(peer.toString(), peerAddresses); - break; - case PEERS_V2: - PeersV2Message peersV2Message = (PeersV2Message) message; - - List peerV2Addresses = peersV2Message.getPeerAddresses(); - - // First entry contains remote peer's listen port but empty address. - int peerPort = peerV2Addresses.get(0).getPort(); - peerV2Addresses.remove(0); - - // If inbound peer, use listen port and socket address to recreate first entry - if (!peer.isOutbound()) { - PeerAddress sendingPeerAddress = PeerAddress.fromString(peer.getPeerData().getAddress().getHost() + ":" + peerPort); - LOGGER.trace(() -> String.format("PEERS_V2 sending peer's listen address: %s", sendingPeerAddress.toString())); - peerV2Addresses.add(0, sendingPeerAddress); - } - - mergePeers(peer.toString(), peerV2Addresses); + onPeersV2Message(peer, message); break; - case GET_PEERS: - // Send our known peers - if (!peer.sendMessage(buildPeersMessage(peer))) - peer.disconnect("failed to send peers list"); + case PEER_VERIFY: + onPeerVerifyMessage(peer, message); + break; + + case VERIFICATION_CODES: + onVerificationCodesMessage(peer, message); break; default: @@ -864,6 +783,116 @@ public class Network { } } + private void onHandshakingMessage(Peer peer, Message message, Handshake handshakeStatus) { + try { + // Still handshaking + LOGGER.trace(() -> String.format("Handshake status %s, message %s from peer %s", handshakeStatus.name(), (message != null ? message.getType().name() : "null"), peer)); + + // v1 nodes are keen on sending PINGs early. Send to back of queue so we'll process right after handshake + if (message != null && message.getType() == MessageType.PING) { + peer.queueMessage(message); + return; + } + + // Check message type is as expected + if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { + LOGGER.debug(() -> String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); + peer.disconnect("unexpected message"); + return; + } + + Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message); + + if (newHandshakeStatus == null) { + // Handshake failure + LOGGER.debug(() -> String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); + peer.disconnect("handshake failure"); + return; + } + + if (peer.isOutbound()) + // If we made outbound connection then we need to act first + newHandshakeStatus.action(peer); + else + // We have inbound connection so we need to respond in kind with what we just received + handshakeStatus.action(peer); + + peer.setHandshakeStatus(newHandshakeStatus); + + if (newHandshakeStatus == Handshake.COMPLETED) + this.onHandshakeCompleted(peer); + } finally { + peer.resetHandshakeMessagePending(); + } + } + + private void onGetPeersMessage(Peer peer, Message message) { + // Send our known peers + if (!peer.sendMessage(buildPeersMessage(peer))) + peer.disconnect("failed to send peers list"); + } + + private void onPeersMessage(Peer peer, Message message) { + PeersMessage peersMessage = (PeersMessage) message; + + List peerAddresses = new ArrayList<>(); + + // v1 PEERS message doesn't support port numbers so we have to add default port + for (InetAddress peerAddress : peersMessage.getPeerAddresses()) + // This is always IPv4 so we don't have to worry about bracketing IPv6. + peerAddresses.add(PeerAddress.fromString(peerAddress.getHostAddress())); + + // Also add peer's details + peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost())); + + mergePeers(peer.toString(), peerAddresses); + } + + private void onPingMessage(Peer peer, Message message) { + PingMessage pingMessage = (PingMessage) message; + + // Generate 'pong' using same ID + PingMessage pongMessage = new PingMessage(); + pongMessage.setId(pingMessage.getId()); + + if (!peer.sendMessage(pongMessage)) + peer.disconnect("failed to send ping reply"); + } + + private void onPeersV2Message(Peer peer, Message message) { + PeersV2Message peersV2Message = (PeersV2Message) message; + + List peerV2Addresses = peersV2Message.getPeerAddresses(); + + // First entry contains remote peer's listen port but empty address. + int peerPort = peerV2Addresses.get(0).getPort(); + peerV2Addresses.remove(0); + + // If inbound peer, use listen port and socket address to recreate first entry + if (!peer.isOutbound()) { + PeerAddress sendingPeerAddress = PeerAddress.fromString(peer.getPeerData().getAddress().getHost() + ":" + peerPort); + LOGGER.trace(() -> String.format("PEERS_V2 sending peer's listen address: %s", sendingPeerAddress.toString())); + peerV2Addresses.add(0, sendingPeerAddress); + } + + mergePeers(peer.toString(), peerV2Addresses); + } + + private void onPeerVerifyMessage(Peer peer, Message message) { + // Remote peer wants extra verification + possibleVerificationResponse(peer); + } + + private void onVerificationCodesMessage(Peer peer, Message message) { + VerificationCodesMessage verificationCodesMessage = (VerificationCodesMessage) message; + + // Remote peer is sending the code it wants to receive back via our outbound connection to it + Peer ourUnverifiedPeer = Network.getInstance().getInboundPeerWithId(Network.getInstance().getOurPeerId()); + ourUnverifiedPeer.setVerificationCodes(verificationCodesMessage.getVerificationCodeSent(), verificationCodesMessage.getVerificationCodeExpected()); + + possibleVerificationResponse(ourUnverifiedPeer); + } + private void possibleVerificationResponse(Peer peer) { // Can't respond if we don't have the codes (yet?) if (peer.getVerificationCodeExpected() == null) diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index b0861b67..34738e69 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -466,16 +466,14 @@ public class Peer { /* package */ void startPings() { // Replacing initial null value allows getPingTask() to start sending pings. LOGGER.trace(() -> String.format("Enabling pings for peer %s", this)); - this.lastPingSent = System.currentTimeMillis(); + this.lastPingSent = NTP.getTime(); } - /* package */ ExecuteProduceConsume.Task getPingTask() { + /* package */ ExecuteProduceConsume.Task getPingTask(Long now) { // Pings not enabled yet? - if (this.lastPingSent == null) + if (now == null || this.lastPingSent == null) return null; - final long now = System.currentTimeMillis(); - // Time to send another ping? if (now < this.lastPingSent + PING_INTERVAL) return null; // Not yet @@ -486,7 +484,6 @@ public class Peer { return () -> { PingMessage pingMessage = new PingMessage(); Message message = this.getResponse(pingMessage); - final long after = System.currentTimeMillis(); if (message == null || message.getType() != MessageType.PING) { LOGGER.debug(() -> String.format("Didn't receive reply from %s for PING ID %d", this, pingMessage.getId())); @@ -494,7 +491,7 @@ public class Peer { return; } - this.setLastPing(after - now); + this.setLastPing(NTP.getTime() - now); }; } diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBAccountRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBAccountRepository.java index a80ad8e9..0d406854 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBAccountRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBAccountRepository.java @@ -620,16 +620,17 @@ public class HSQLDBAccountRepository implements AccountRepository { @Override public RewardShareData getRewardShare(byte[] minterPublicKey, String recipient) throws DataException { - String sql = "SELECT reward_share_public_key, share_percent FROM RewardShares WHERE minter_public_key = ? AND recipient = ?"; + String sql = "SELECT minter, reward_share_public_key, share_percent FROM RewardShares WHERE minter_public_key = ? AND recipient = ?"; try (ResultSet resultSet = this.repository.checkedExecute(sql, minterPublicKey, recipient)) { if (resultSet == null) return null; - byte[] rewardSharePublicKey = resultSet.getBytes(1); - BigDecimal sharePercent = resultSet.getBigDecimal(2); + String minter = resultSet.getString(1); + byte[] rewardSharePublicKey = resultSet.getBytes(2); + BigDecimal sharePercent = resultSet.getBigDecimal(3); - return new RewardShareData(minterPublicKey, recipient, rewardSharePublicKey, sharePercent); + return new RewardShareData(minterPublicKey, minter, recipient, rewardSharePublicKey, sharePercent); } catch (SQLException e) { throw new DataException("Unable to fetch reward-share info from repository", e); } @@ -637,17 +638,18 @@ public class HSQLDBAccountRepository implements AccountRepository { @Override public RewardShareData getRewardShare(byte[] rewardSharePublicKey) throws DataException { - String sql = "SELECT minter_public_key, recipient, share_percent FROM RewardShares WHERE reward_share_public_key = ?"; + String sql = "SELECT minter_public_key, minter, recipient, share_percent FROM RewardShares WHERE reward_share_public_key = ?"; try (ResultSet resultSet = this.repository.checkedExecute(sql, rewardSharePublicKey)) { if (resultSet == null) return null; byte[] minterPublicKey = resultSet.getBytes(1); - String recipient = resultSet.getString(2); - BigDecimal sharePercent = resultSet.getBigDecimal(3); + String minter = resultSet.getString(2); + String recipient = resultSet.getString(3); + BigDecimal sharePercent = resultSet.getBigDecimal(4); - return new RewardShareData(minterPublicKey, recipient, rewardSharePublicKey, sharePercent); + return new RewardShareData(minterPublicKey, minter, recipient, rewardSharePublicKey, sharePercent); } catch (SQLException e) { throw new DataException("Unable to fetch reward-share info from repository", e); } @@ -675,7 +677,7 @@ public class HSQLDBAccountRepository implements AccountRepository { @Override public List getRewardShares() throws DataException { - String sql = "SELECT minter_public_key, recipient, share_percent, reward_share_public_key FROM RewardShares"; + String sql = "SELECT minter_public_key, minter, recipient, share_percent, reward_share_public_key FROM RewardShares"; List rewardShares = new ArrayList<>(); @@ -685,11 +687,12 @@ public class HSQLDBAccountRepository implements AccountRepository { do { byte[] minterPublicKey = resultSet.getBytes(1); - String recipient = resultSet.getString(2); - BigDecimal sharePercent = resultSet.getBigDecimal(3); - byte[] rewardSharePublicKey = resultSet.getBytes(4); + String minter = resultSet.getString(2); + String recipient = resultSet.getString(3); + BigDecimal sharePercent = resultSet.getBigDecimal(4); + byte[] rewardSharePublicKey = resultSet.getBytes(5); - rewardShares.add(new RewardShareData(minterPublicKey, recipient, rewardSharePublicKey, sharePercent)); + rewardShares.add(new RewardShareData(minterPublicKey, minter, recipient, rewardSharePublicKey, sharePercent)); } while (resultSet.next()); return rewardShares; @@ -702,7 +705,7 @@ public class HSQLDBAccountRepository implements AccountRepository { public List findRewardShares(List minters, List recipients, List involvedAddresses, Integer limit, Integer offset, Boolean reverse) throws DataException { StringBuilder sql = new StringBuilder(1024); - sql.append("SELECT DISTINCT minter_public_key, recipient, share_percent, reward_share_public_key FROM RewardShares "); + sql.append("SELECT DISTINCT minter_public_key, minter, recipient, share_percent, reward_share_public_key FROM RewardShares "); List args = new ArrayList<>(); @@ -772,11 +775,12 @@ public class HSQLDBAccountRepository implements AccountRepository { do { byte[] minterPublicKey = resultSet.getBytes(1); - String recipient = resultSet.getString(2); - BigDecimal sharePercent = resultSet.getBigDecimal(3); - byte[] rewardSharePublicKey = resultSet.getBytes(4); + String minter = resultSet.getString(2); + String recipient = resultSet.getString(3); + BigDecimal sharePercent = resultSet.getBigDecimal(4); + byte[] rewardSharePublicKey = resultSet.getBytes(5); - rewardShares.add(new RewardShareData(minterPublicKey, recipient, rewardSharePublicKey, sharePercent)); + rewardShares.add(new RewardShareData(minterPublicKey, minter, recipient, rewardSharePublicKey, sharePercent)); } while (resultSet.next()); return rewardShares; @@ -801,7 +805,7 @@ public class HSQLDBAccountRepository implements AccountRepository { @Override public RewardShareData getRewardShareByIndex(int index) throws DataException { - String sql = "SELECT minter_public_key, recipient, share_percent, reward_share_public_key FROM RewardShares " + String sql = "SELECT minter_public_key, minter, recipient, share_percent, reward_share_public_key FROM RewardShares " + "ORDER BY reward_share_public_key ASC " + "OFFSET ? LIMIT 1"; @@ -810,11 +814,12 @@ public class HSQLDBAccountRepository implements AccountRepository { return null; byte[] minterPublicKey = resultSet.getBytes(1); - String recipient = resultSet.getString(2); - BigDecimal sharePercent = resultSet.getBigDecimal(3); - byte[] rewardSharePublicKey = resultSet.getBytes(4); + String minter = resultSet.getString(2); + String recipient = resultSet.getString(3); + BigDecimal sharePercent = resultSet.getBigDecimal(4); + byte[] rewardSharePublicKey = resultSet.getBytes(5); - return new RewardShareData(minterPublicKey, recipient, rewardSharePublicKey, sharePercent); + return new RewardShareData(minterPublicKey, minter, recipient, rewardSharePublicKey, sharePercent); } catch (SQLException e) { throw new DataException("Unable to fetch reward-share info from repository", e); } @@ -824,8 +829,9 @@ public class HSQLDBAccountRepository implements AccountRepository { public void save(RewardShareData rewardShareData) throws DataException { HSQLDBSaver saveHelper = new HSQLDBSaver("RewardShares"); - saveHelper.bind("minter_public_key", rewardShareData.getMinterPublicKey()).bind("recipient", rewardShareData.getRecipient()) - .bind("reward_share_public_key", rewardShareData.getRewardSharePublicKey()).bind("share_percent", rewardShareData.getSharePercent()); + saveHelper.bind("minter_public_key", rewardShareData.getMinterPublicKey()).bind("minter", rewardShareData.getMinter()) + .bind("recipient", rewardShareData.getRecipient()).bind("reward_share_public_key", rewardShareData.getRewardSharePublicKey()) + .bind("share_percent", rewardShareData.getSharePercent()); try { saveHelper.execute(this.repository); @@ -849,14 +855,15 @@ public class HSQLDBAccountRepository implements AccountRepository { public List getMintingAccounts() throws DataException { List mintingAccounts = new ArrayList<>(); - try (ResultSet resultSet = this.repository.checkedExecute("SELECT minter_private_key FROM MintingAccounts")) { + try (ResultSet resultSet = this.repository.checkedExecute("SELECT minter_private_key, minter_public_key FROM MintingAccounts")) { if (resultSet == null) return mintingAccounts; do { byte[] minterPrivateKey = resultSet.getBytes(1); + byte[] minterPublicKey = resultSet.getBytes(2); - mintingAccounts.add(new MintingAccountData(minterPrivateKey)); + mintingAccounts.add(new MintingAccountData(minterPrivateKey, minterPublicKey)); } while (resultSet.next()); return mintingAccounts; @@ -869,7 +876,8 @@ public class HSQLDBAccountRepository implements AccountRepository { public void save(MintingAccountData mintingAccountData) throws DataException { HSQLDBSaver saveHelper = new HSQLDBSaver("MintingAccounts"); - saveHelper.bind("minter_private_key", mintingAccountData.getPrivateKey()); + saveHelper.bind("minter_private_key", mintingAccountData.getPrivateKey()) + .bind("minter_public_key", mintingAccountData.getPublicKey()); try { saveHelper.execute(this.repository); diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBBlockRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBBlockRepository.java index 8063d417..a99af8be 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBBlockRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBBlockRepository.java @@ -201,7 +201,7 @@ public class HSQLDBBlockRepository implements BlockRepository { String subquerySql = "SELECT minter, COUNT(signature) FROM Blocks GROUP BY minter"; StringBuilder sql = new StringBuilder(1024); - sql.append("SELECT DISTINCT block_minter, n_blocks, minter_public_key, recipient FROM ("); + sql.append("SELECT DISTINCT block_minter, n_blocks, minter_public_key, minter, recipient FROM ("); sql.append(subquerySql); sql.append(") AS Minters (block_minter, n_blocks) LEFT OUTER JOIN RewardShares ON reward_share_public_key = block_minter "); @@ -239,14 +239,17 @@ public class HSQLDBBlockRepository implements BlockRepository { do { byte[] blockMinterPublicKey = resultSet.getBytes(1); int nBlocks = resultSet.getInt(2); + + // May not be present if no reward-share: byte[] mintingAccountPublicKey = resultSet.getBytes(3); - String recipientAccount = resultSet.getString(4); + String minterAccount = resultSet.getString(4); + String recipientAccount = resultSet.getString(5); BlockMinterSummary blockMinterSummary; if (recipientAccount == null) blockMinterSummary = new BlockMinterSummary(blockMinterPublicKey, nBlocks); else - blockMinterSummary = new BlockMinterSummary(blockMinterPublicKey, nBlocks, mintingAccountPublicKey, recipientAccount); + blockMinterSummary = new BlockMinterSummary(blockMinterPublicKey, nBlocks, mintingAccountPublicKey, minterAccount, recipientAccount); summaries.add(blockMinterSummary); } while (resultSet.next()); @@ -260,13 +263,13 @@ public class HSQLDBBlockRepository implements BlockRepository { @Override public List getBlockSummariesByMinter(byte[] minterPublicKey, Integer limit, Integer offset, Boolean reverse) throws DataException { StringBuilder sql = new StringBuilder(512); - sql.append("SELECT signature, height, minter, online_accounts_count FROM "); + sql.append("SELECT signature, height, Blocks.minter, online_accounts_count FROM "); // List of minter account's public key and reward-share public keys with minter's public key sql.append("(SELECT * FROM (VALUES (CAST(? AS QortalPublicKey))) UNION (SELECT reward_share_public_key FROM RewardShares WHERE minter_public_key = ?)) AS PublicKeys (public_key) "); // Match Blocks signed with public key from above list - sql.append("JOIN Blocks ON minter = public_key "); + sql.append("JOIN Blocks ON Blocks.minter = public_key "); sql.append("ORDER BY Blocks.height "); if (reverse != null && reverse) diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBDatabaseUpdates.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBDatabaseUpdates.java index f83ea2d8..7f17b772 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBDatabaseUpdates.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBDatabaseUpdates.java @@ -933,6 +933,24 @@ public class HSQLDBDatabaseUpdates { stmt.execute("ALTER TABLE AccountBalances ADD CONSTRAINT CheckBalanceNotNegative CHECK (balance >= 0)"); break; + case 67: + // Provide external function to convert private keys to public keys + stmt.execute("CREATE FUNCTION Ed25519_private_to_public_key (IN privateKey VARBINARY(32)) RETURNS VARBINARY(32) LANGUAGE JAVA DETERMINISTIC NO SQL EXTERNAL NAME 'CLASSPATH:org.qortal.repository.hsqldb.HSQLDBRepository.ed25519PrivateToPublicKey'"); + + // Cache minting account public keys to save us recalculating them + stmt.execute("ALTER TABLE MintingAccounts ADD minter_public_key QortalPublicKey"); + stmt.execute("UPDATE MintingAccounts SET minter_public_key = Ed25519_private_to_public_key(minter_private_key)"); + stmt.execute("ALTER TABLE MintingAccounts ALTER COLUMN minter_public_key SET NOT NULL"); + + // Provide external function to convert public keys to addresses + stmt.execute("CREATE FUNCTION Ed25519_public_key_to_address (IN privateKey VARBINARY(32)) RETURNS VARCHAR(36) LANGUAGE JAVA DETERMINISTIC NO SQL EXTERNAL NAME 'CLASSPATH:org.qortal.repository.hsqldb.HSQLDBRepository.ed25519PublicKeyToAddress'"); + + // Cache reward-share minting account's address + stmt.execute("ALTER TABLE RewardShares ADD minter QortalAddress BEFORE recipient"); + stmt.execute("UPDATE RewardShares SET minter = Ed25519_public_key_to_address(minter_public_key)"); + stmt.execute("ALTER TABLE RewardShares ALTER COLUMN minter SET NOT NULL"); + break; + default: // nothing to do return false; diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java index da3bc5fb..98de2f8b 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBRepository.java @@ -28,6 +28,8 @@ import java.util.regex.Pattern; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.qortal.account.PrivateKeyAccount; +import org.qortal.crypto.Crypto; import org.qortal.repository.ATRepository; import org.qortal.repository.AccountRepository; import org.qortal.repository.ArbitraryRepository; @@ -57,6 +59,8 @@ public class HSQLDBRepository implements Repository { protected List sqlStatements; protected long sessionId; + // Constructors + // NB: no visibility modifier so only callable from within same package /* package */ HSQLDBRepository(Connection connection) throws DataException { this.connection = connection; @@ -84,6 +88,8 @@ public class HSQLDBRepository implements Repository { assertEmptyTransaction("connection creation"); } + // Getters / setters + @Override public ATRepository getATRepository() { return new HSQLDBATRepository(this); @@ -134,6 +140,18 @@ public class HSQLDBRepository implements Repository { return new HSQLDBVotingRepository(this); } + @Override + public boolean getDebug() { + return this.debugState; + } + + @Override + public void setDebug(boolean debugState) { + this.debugState = debugState; + } + + // Transaction COMMIT / ROLLBACK / savepoints + @Override public void saveChanges() throws DataException { try { @@ -203,6 +221,8 @@ public class HSQLDBRepository implements Repository { } } + // Close / backup / rebuild / restore + @Override public void close() throws DataException { // Already closed? No need to do anything but maybe report double-call @@ -257,16 +277,6 @@ public class HSQLDBRepository implements Repository { } } - @Override - public boolean getDebug() { - return this.debugState; - } - - @Override - public void setDebug(boolean debugState) { - this.debugState = debugState; - } - @Override public void backup(boolean quick) throws DataException { if (!quick) @@ -386,6 +396,8 @@ public class HSQLDBRepository implements Repository { } } + // SQL statements, etc. + /** * Returns prepared statement using passed SQL, logging query if necessary. */ @@ -399,19 +411,6 @@ public class HSQLDBRepository implements Repository { return this.connection.prepareStatement(sql); } - /** - * Logs this transaction's SQL statements, if enabled. - */ - public void logStatements() { - if (this.sqlStatements == null) - return; - - LOGGER.info(String.format("HSQLDB SQL statements (session %d) leading up to this were:", this.sessionId)); - - for (String sql : this.sqlStatements) - LOGGER.info(sql); - } - /** * Execute SQL and return ResultSet with but added checking. *

@@ -429,15 +428,18 @@ public class HSQLDBRepository implements Repository { // We can't use try-with-resources here as closing the PreparedStatement on return would also prematurely close the ResultSet. preparedStatement.closeOnCompletion(); - long beforeQuery = System.currentTimeMillis(); + long beforeQuery = this.slowQueryThreshold == null ? 0 : System.currentTimeMillis(); ResultSet resultSet = this.checkedExecuteResultSet(preparedStatement, objects); - long queryTime = System.currentTimeMillis() - beforeQuery; - if (this.slowQueryThreshold != null && queryTime > this.slowQueryThreshold) { - LOGGER.info(String.format("HSQLDB query took %d ms: %s", queryTime, sql), new SQLException("slow query")); + if (this.slowQueryThreshold != null) { + long queryTime = System.currentTimeMillis() - beforeQuery; - logStatements(); + if (queryTime > this.slowQueryThreshold) { + LOGGER.info(String.format("HSQLDB query took %d ms: %s", queryTime, sql), new SQLException("slow query")); + + logStatements(); + } } return resultSet; @@ -500,16 +502,19 @@ public class HSQLDBRepository implements Repository { try (PreparedStatement preparedStatement = this.prepareStatement(sql)) { prepareExecute(preparedStatement, objects); - long beforeQuery = System.currentTimeMillis(); + long beforeQuery = this.slowQueryThreshold == null ? 0 : System.currentTimeMillis(); if (preparedStatement.execute()) throw new SQLException("Database produced results, not row count"); - long queryTime = System.currentTimeMillis() - beforeQuery; - if (this.slowQueryThreshold != null && queryTime > this.slowQueryThreshold) { - LOGGER.info(String.format("HSQLDB query took %d ms: %s", queryTime, sql), new SQLException("slow query")); + if (this.slowQueryThreshold != null) { + long queryTime = System.currentTimeMillis() - beforeQuery; - logStatements(); + if (queryTime > this.slowQueryThreshold) { + LOGGER.info(String.format("HSQLDB query took %d ms: %s", queryTime, sql), new SQLException("slow query")); + + logStatements(); + } } int rowCount = preparedStatement.getUpdateCount(); @@ -670,6 +675,21 @@ public class HSQLDBRepository implements Repository { stringBuilder.append(") "); } + // Debugging + + /** + * Logs this transaction's SQL statements, if enabled. + */ + public void logStatements() { + if (this.sqlStatements == null) + return; + + LOGGER.info(String.format("HSQLDB SQL statements (session %d) leading up to this were:", this.sessionId)); + + for (String sql : this.sqlStatements) + LOGGER.info(sql); + } + /** Logs other HSQLDB sessions then re-throws passed exception */ public SQLException examineException(SQLException e) throws SQLException { LOGGER.error(String.format("HSQLDB error (session %d): %s", this.sessionId, e.getMessage()), e); @@ -726,6 +746,22 @@ public class HSQLDBRepository implements Repository { } } + // Utility methods + + public static byte[] ed25519PrivateToPublicKey(byte[] privateKey) { + if (privateKey == null) + return null; + + return PrivateKeyAccount.toPublicKey(privateKey); + } + + public static String ed25519PublicKeyToAddress(byte[] publicKey) { + if (publicKey == null) + return null; + + return Crypto.toAddress(publicKey); + } + /** Converts milliseconds from epoch to OffsetDateTime needed for TIMESTAMP WITH TIME ZONE columns. */ /* package */ static OffsetDateTime toOffsetDateTime(Long timestamp) { if (timestamp == null) diff --git a/src/main/java/org/qortal/settings/Settings.java b/src/main/java/org/qortal/settings/Settings.java index c231186a..c2ded70b 100644 --- a/src/main/java/org/qortal/settings/Settings.java +++ b/src/main/java/org/qortal/settings/Settings.java @@ -82,6 +82,8 @@ public class Settings { private int minOutboundPeers = 40; /** Maximum number of peer connections we allow. */ private int maxPeers = 80; + /** Maximum number of threads for network engine. */ + private int maxNetworkThreadPoolSize = 10; // Which blockchains this node is running private String blockchainConfig = null; // use default from resources @@ -113,7 +115,7 @@ public class Settings { "3.cn.pool.ntp.org" }; /** Additional offset added to values returned by NTP.getTime() */ - private long testNtpOffset = 0; + private Long testNtpOffset = null; // Constructors @@ -331,6 +333,10 @@ public class Settings { return this.maxPeers; } + public int getMaxNetworkThreadPoolSize() { + return this.maxNetworkThreadPoolSize; + } + public String getBlockchainConfig() { return this.blockchainConfig; } @@ -359,7 +365,7 @@ public class Settings { return this.ntpServers; } - public long getTestNtpOffset() { + public Long getTestNtpOffset() { return this.testNtpOffset; } diff --git a/src/main/java/org/qortal/transaction/RewardShareTransaction.java b/src/main/java/org/qortal/transaction/RewardShareTransaction.java index cefb1e4a..8a478299 100644 --- a/src/main/java/org/qortal/transaction/RewardShareTransaction.java +++ b/src/main/java/org/qortal/transaction/RewardShareTransaction.java @@ -195,7 +195,9 @@ public class RewardShareTransaction extends Transaction { this.repository.getAccountRepository().delete(mintingAccount.getPublicKey(), rewardShareTransactionData.getRecipient()); } else { // Save reward-share info - rewardShareData = new RewardShareData(mintingAccount.getPublicKey(), rewardShareTransactionData.getRecipient(), rewardShareTransactionData.getRewardSharePublicKey(), rewardShareTransactionData.getSharePercent()); + rewardShareData = new RewardShareData(mintingAccount.getPublicKey(), mintingAccount.getAddress(), + rewardShareTransactionData.getRecipient(), rewardShareTransactionData.getRewardSharePublicKey(), + rewardShareTransactionData.getSharePercent()); this.repository.getAccountRepository().save(rewardShareData); } } @@ -217,8 +219,9 @@ public class RewardShareTransaction extends Transaction { if (rewardShareTransactionData.getPreviousSharePercent() != null) { // Revert previous sharing arrangement - RewardShareData rewardShareData = new RewardShareData(mintingAccount.getPublicKey(), rewardShareTransactionData.getRecipient(), - rewardShareTransactionData.getRewardSharePublicKey(), rewardShareTransactionData.getPreviousSharePercent()); + RewardShareData rewardShareData = new RewardShareData(mintingAccount.getPublicKey(), mintingAccount.getAddress(), + rewardShareTransactionData.getRecipient(), rewardShareTransactionData.getRewardSharePublicKey(), + rewardShareTransactionData.getPreviousSharePercent()); this.repository.getAccountRepository().save(rewardShareData); } else { diff --git a/src/main/java/org/qortal/utils/NTP.java b/src/main/java/org/qortal/utils/NTP.java index 36eae1e1..cc6dec56 100644 --- a/src/main/java/org/qortal/utils/NTP.java +++ b/src/main/java/org/qortal/utils/NTP.java @@ -11,6 +11,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import org.apache.commons.net.ntp.NTPUDPClient; import org.apache.commons.net.ntp.NtpV3Packet; @@ -18,7 +19,6 @@ import org.apache.commons.net.ntp.TimeInfo; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.qortal.settings.Settings; public class NTP implements Runnable { @@ -53,15 +53,10 @@ public class NTP implements Runnable { this.remote = remote; } - public boolean doPoll(NTPUDPClient client) { + public boolean doPoll(NTPUDPClient client, final long now) { Thread.currentThread().setName(String.format("NTP: %s", this.remote)); try { - final long now = System.currentTimeMillis(); - - if (now < this.nextPoll) - return false; - boolean isUpdated = false; try { TimeInfo timeInfo = client.getTime(InetAddress.getByName(remote)); @@ -110,26 +105,26 @@ public class NTP implements Runnable { } private final NTPUDPClient client; - private List ntpServers = new ArrayList<>(); + private final List ntpServers = new ArrayList<>(); private final ExecutorService serverExecutor; - private NTP() { + private NTP(String[] serverNames) { client = new NTPUDPClient(); client.setDefaultTimeout(2000); - for (String serverName : Settings.getInstance().getNtpServers()) + for (String serverName : serverNames) ntpServers.add(new NTPServer(serverName)); serverExecutor = Executors.newCachedThreadPool(); } - public static synchronized void start() { + public static synchronized void start(String[] serverNames) { if (isStarted) return; isStarted = true; instanceExecutor = Executors.newSingleThreadExecutor(); - instance = new NTP(); + instance = new NTP(serverNames); instanceExecutor.execute(instance); } @@ -137,9 +132,9 @@ public class NTP implements Runnable { instanceExecutor.shutdownNow(); } - public static synchronized void testMode() { - // Fix offset to match system time - NTP.offset = 0L; + public static synchronized void setFixedOffset(Long offset) { + // Fix offset, e.g. for testing + NTP.offset = offset; } /** @@ -151,7 +146,7 @@ public class NTP implements Runnable { if (NTP.offset == null) return null; - return System.currentTimeMillis() + NTP.offset + Settings.getInstance().getTestNtpOffset(); + return System.currentTimeMillis() + NTP.offset; } public void run() { @@ -161,103 +156,120 @@ public class NTP implements Runnable { while (!isStopping) { Thread.sleep(1000); - CompletionService ecs = new ExecutorCompletionService<>(serverExecutor); - for (NTPServer server : ntpServers) - ecs.submit(() -> server.doPoll(client)); + boolean haveUpdates = pollServers(); + if (!haveUpdates) + continue; - boolean hasUpdate = false; - for (int i = 0; i < ntpServers.size(); ++i) { - if (isStopping) - return; - - try { - hasUpdate = ecs.take().get() || hasUpdate; - } catch (ExecutionException e) { - // skip - } - } - - if (hasUpdate) { - double s0 = 0; - double s1 = 0; - double s2 = 0; - - for (NTPServer server : ntpServers) { - if (server.offset == null) { - server.usage = ' '; - continue; - } - - server.usage = '+'; - double value = server.offset * (double) server.stratum; - - s0 += 1; - s1 += value; - s2 += value * value; - } - - if (s0 < ntpServers.size() / 3 + 1) { - LOGGER.debug(String.format("Not enough replies (%d) to calculate network time", s0)); - } else { - double thresholdStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); - double mean = s1 / s0; - - // Now only consider offsets within 1 stddev? - s0 = 0; - s1 = 0; - s2 = 0; - - for (NTPServer server : ntpServers) { - if (server.offset == null || server.reach == 0) - continue; - - if (Math.abs(server.offset * (double)server.stratum - mean) > thresholdStddev) - continue; - - server.usage = '*'; - s0 += 1; - s1 += server.offset; - s2 += server.offset * server.offset; - } - - if (s0 <= 1) { - LOGGER.debug(String.format("Not enough useful values (%d) to calculate network time. (stddev: %7.4f)", s0, thresholdStddev)); - } else { - double filteredMean = s1 / s0; - double filteredStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); - - LOGGER.trace(String.format("Threshold stddev: %7.3f, mean: %7.3f, stddev: %7.3f, nValues: %.0f / %d", - thresholdStddev, filteredMean, filteredStddev, s0, ntpServers.size())); - - NTP.offset = (long) filteredMean; - LOGGER.debug(String.format("New NTP offset: %d", NTP.offset)); - } - } - - if (LOGGER.getLevel().isMoreSpecificThan(Level.TRACE)) { - LOGGER.trace(String.format("%c%16s %16s %2s %c %4s %4s %3s %7s %7s %7s", - ' ', "remote", "refid", "st", 't', "when", "poll", "reach", "delay", "offset", "jitter" - )); - - for (NTPServer server : ntpServers) - LOGGER.trace(String.format("%c%16.16s %16.16s %2s %c %4s %4d %3o %7s %7s %7s", - server.usage, - server.remote, - formatNull("%s", server.refId, ""), - formatNull("%2d", server.stratum, ""), - server.type, - formatNull("%4d", server.getWhen(), "-"), - server.poll, - server.reach, - formatNull("%5dms", server.delay, ""), - formatNull("% 5.0fms", server.offset, ""), - formatNull("%5.2fms", server.jitter, "") - )); - } - } + calculateOffset(); } } catch (InterruptedException e) { - // Exit + // Interrupted - time to exit + return; + } + } + + private boolean pollServers() throws InterruptedException { + final long now = System.currentTimeMillis(); + + List pendingServers = ntpServers.stream().filter(ntpServer -> now >= ntpServer.nextPoll).collect(Collectors.toList()); + + CompletionService ecs = new ExecutorCompletionService<>(serverExecutor); + for (NTPServer server : pendingServers) + ecs.submit(() -> server.doPoll(client, now)); + + boolean haveUpdate = false; + for (int i = 0; i < pendingServers.size(); ++i) { + if (isStopping) + return false; + + try { + haveUpdate = ecs.take().get() || haveUpdate; + } catch (ExecutionException e) { + // skip + } + } + + return haveUpdate; + } + + private void calculateOffset() { + double s0 = 0; + double s1 = 0; + double s2 = 0; + + for (NTPServer server : ntpServers) { + if (server.offset == null) { + server.usage = ' '; + continue; + } + + server.usage = '+'; + double value = server.offset * (double) server.stratum; + + s0 += 1; + s1 += value; + s2 += value * value; + } + + if (s0 < ntpServers.size() / 3 + 1) { + final double numberReplies = s0; + LOGGER.debug(() -> String.format("Not enough replies (%d) to calculate network time", numberReplies)); + } else { + double thresholdStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); + double mean = s1 / s0; + + // Now only consider offsets within 1 stddev? + s0 = 0; + s1 = 0; + s2 = 0; + + for (NTPServer server : ntpServers) { + if (server.offset == null || server.reach == 0) + continue; + + if (Math.abs(server.offset * (double)server.stratum - mean) > thresholdStddev) + continue; + + server.usage = '*'; + s0 += 1; + s1 += server.offset; + s2 += server.offset * server.offset; + } + + final double numberValues = s0; + if (s0 <= 1) { + LOGGER.debug(() -> String.format("Not enough useful values (%d) to calculate network time. (stddev: %7.4f)", numberValues, thresholdStddev)); + } else { + double filteredMean = s1 / s0; + double filteredStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); + + LOGGER.trace(() -> String.format("Threshold stddev: %7.3f, mean: %7.3f, stddev: %7.3f, nValues: %.0f / %d", + thresholdStddev, filteredMean, filteredStddev, numberValues, ntpServers.size())); + + NTP.offset = (long) filteredMean; + LOGGER.debug(() -> String.format("New NTP offset: %d", NTP.offset)); + } + } + + if (LOGGER.getLevel().isLessSpecificThan(Level.TRACE)) { + LOGGER.trace(() -> String.format("%c%16s %16s %2s %c %4s %4s %3s %7s %7s %7s", + ' ', "remote", "refid", "st", 't', "when", "poll", "reach", "delay", "offset", "jitter" + )); + + for (NTPServer server : ntpServers) + LOGGER.trace(() -> String.format("%c%16.16s %16.16s %2s %c %4s %4d %3o %7s %7s %7s", + server.usage, + server.remote, + formatNull("%s", server.refId, ""), + formatNull("%2d", server.stratum, ""), + server.type, + formatNull("%4d", server.getWhen(), "-"), + server.poll, + server.reach, + formatNull("%5dms", server.delay, ""), + formatNull("% 5.0fms", server.offset, ""), + formatNull("%5.2fms", server.jitter, "") + )); } } diff --git a/src/test/java/org/qortal/test/CompatibilityTests.java b/src/test/java/org/qortal/test/CompatibilityTests.java index 4418be3d..e3de07b1 100644 --- a/src/test/java/org/qortal/test/CompatibilityTests.java +++ b/src/test/java/org/qortal/test/CompatibilityTests.java @@ -3,6 +3,7 @@ package org.qortal.test; import org.junit.Test; import org.qortal.data.transaction.TransactionData; import org.qortal.repository.DataException; +import org.qortal.settings.Settings; import org.qortal.test.common.Common; import org.qortal.transaction.CreateAssetOrderTransaction; import org.qortal.transaction.CreatePollTransaction; @@ -22,7 +23,7 @@ public class CompatibilityTests extends Common { @Before public void beforeTest() throws DataException { Common.useSettings("test-settings-v1.json"); - NTP.testMode(); + NTP.setFixedOffset(Settings.getInstance().getTestNtpOffset()); } @Test diff --git a/src/test/java/org/qortal/test/apps/NTPTests.java b/src/test/java/org/qortal/test/apps/NTPTests.java index 6d7e89fd..1dc25b00 100644 --- a/src/test/java/org/qortal/test/apps/NTPTests.java +++ b/src/test/java/org/qortal/test/apps/NTPTests.java @@ -1,200 +1,44 @@ package org.qortal.test.apps; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Deque; -import java.util.LinkedList; import java.util.List; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Executors; -import org.apache.commons.net.ntp.NTPUDPClient; -import org.apache.commons.net.ntp.NtpV3Packet; -import org.apache.commons.net.ntp.TimeInfo; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.logging.log4j.core.LoggerContext; +import org.qortal.utils.NTP; public class NTPTests { private static final List CC_TLDS = Arrays.asList("oceania", "europe", "cn", "asia", "africa"); - public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException { - NTPUDPClient client = new NTPUDPClient(); - client.setDefaultTimeout(2000); - - class NTPServer { - private static final int MIN_POLL = 8; - - public char usage = ' '; - public String remote; - public String refId; - public Integer stratum; - public char type = 'u'; // unicast - public int poll = MIN_POLL; - public byte reach = 0; - public Long delay; - public Double offset; - public Double jitter; - - private Deque offsets = new LinkedList<>(); - private double totalSquareOffsets = 0.0; - private long nextPoll; - private Long lastGood; - - public NTPServer(String remote) { - this.remote = remote; - } - - public boolean poll(NTPUDPClient client) { - final long now = System.currentTimeMillis(); - - if (now < this.nextPoll) - return false; - - boolean isUpdated = false; - try { - TimeInfo timeInfo = client.getTime(InetAddress.getByName(remote)); - - timeInfo.computeDetails(); - NtpV3Packet ntpMessage = timeInfo.getMessage(); - - this.refId = ntpMessage.getReferenceIdString(); - this.stratum = ntpMessage.getStratum(); - this.poll = Math.max(MIN_POLL, 1 << ntpMessage.getPoll()); - - this.delay = timeInfo.getDelay(); - this.offset = (double) timeInfo.getOffset(); - - if (this.offsets.size() == 8) { - double oldOffset = this.offsets.removeFirst(); - this.totalSquareOffsets -= oldOffset * oldOffset; - } - - this.offsets.addLast(this.offset); - this.totalSquareOffsets += this.offset * this.offset; - - this.jitter = Math.sqrt(this.totalSquareOffsets / this.offsets.size()); - - this.reach = (byte) ((this.reach << 1) | 1); - this.lastGood = now; - - isUpdated = true; - } catch (IOException e) { - this.reach <<= 1; - } - - this.nextPoll = now + this.poll * 1000; - return isUpdated; - } - - public Integer getWhen() { - if (this.lastGood == null) - return null; - - return (int) ((System.currentTimeMillis() - this.lastGood) / 1000); - } - } - - List ntpServers = new ArrayList<>(); + public static void main(String[] args) throws InterruptedException { + List ntpServers = new ArrayList<>(); for (String ccTld : CC_TLDS) - for (int subpool = 0; subpool <=3; ++subpool) - ntpServers.add(new NTPServer(subpool + "." + ccTld + ".pool.ntp.org")); + for (int subpool = 0; subpool <= 3; ++subpool) + ntpServers.add(new String(subpool + "." + ccTld + ".pool.ntp.org")); - while (true) { - Thread.sleep(1000); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + NTP.shutdownNow(); + })); - CompletionService ecs = new ExecutorCompletionService(Executors.newCachedThreadPool()); - for (NTPServer server : ntpServers) - ecs.submit(() -> server.poll(client)); + Logger ntpLogger = LogManager.getLogger(NTP.class); + LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false); + Configuration config = loggerContext.getConfiguration(); + LoggerConfig loggerConfig = config.getLoggerConfig(ntpLogger.getName()); - boolean showReport = false; - for (int i = 0; i < ntpServers.size(); ++i) - try { - showReport = ecs.take().get() || showReport; - } catch (ExecutionException e) { - // skip - } + loggerConfig.setLevel(Level.TRACE); + loggerContext.updateLoggers(config); - if (showReport) { - double s0 = 0; - double s1 = 0; - double s2 = 0; + NTP.start(ntpServers.toArray(new String[0])); - for (NTPServer server : ntpServers) { - if (server.offset == null) { - server.usage = ' '; - continue; - } - - server.usage = '+'; - double value = server.offset * (double) server.stratum; - - s0 += 1; - s1 += value; - s2 += value * value; - } - - if (s0 < ntpServers.size() / 3 + 1) { - System.out.println("Not enough replies to calculate network time"); - } else { - double filterStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); - double filterMean = s1 / s0; - - // Now only consider offsets within 1 stddev? - s0 = 0; - s1 = 0; - s2 = 0; - - for (NTPServer server : ntpServers) { - if (server.offset == null || server.reach == 0) - continue; - - if (Math.abs(server.offset * (double)server.stratum - filterMean) > filterStddev) - continue; - - server.usage = '*'; - s0 += 1; - s1 += server.offset; - s2 += server.offset * server.offset; - } - - if (s0 <= 1) { - System.out.println(String.format("Not enough values to calculate network time. stddev: %7.4f", filterStddev)); - } else { - double mean = s1 / s0; - double newStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); - System.out.println(String.format("filtering stddev: %7.3f, mean: %7.3f, new stddev: %7.3f, nValues: %.0f / %d", filterStddev, mean, newStddev, s0, ntpServers.size())); - } - } - - System.out.println(String.format("%c%16s %16s %2s %c %4s %4s %3s %7s %7s %7s", - ' ', "remote", "refid", "st", 't', "when", "poll", "reach", "delay", "offset", "jitter" - )); - - for (NTPServer server : ntpServers) - System.out.println(String.format("%c%16.16s %16.16s %2s %c %4s %4d %3o %7s %7s %7s", - server.usage, - server.remote, - formatNull("%s", server.refId, ""), - formatNull("%2d", server.stratum, ""), - server.type, - formatNull("%4d", server.getWhen(), "-"), - server.poll, - server.reach, - formatNull("%5dms", server.delay, ""), - formatNull("% 5.0fms", server.offset, ""), - formatNull("%5.2fms", server.jitter, "") - )); - } - } - } - - private static String formatNull(String format, Object arg, String nullOutput) { - return arg != null ? String.format(format, arg) : nullOutput; + // Endless sleep + Thread.sleep(1000000000L); } } diff --git a/src/test/java/org/qortal/test/assets/OldTradingTests.java b/src/test/java/org/qortal/test/assets/OldTradingTests.java index 52cfc097..6a4752cf 100644 --- a/src/test/java/org/qortal/test/assets/OldTradingTests.java +++ b/src/test/java/org/qortal/test/assets/OldTradingTests.java @@ -6,6 +6,7 @@ import org.junit.Test; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; +import org.qortal.settings.Settings; import org.qortal.test.common.AccountUtils; import org.qortal.test.common.AssetUtils; import org.qortal.test.common.Common; @@ -19,7 +20,7 @@ public class OldTradingTests extends Common { @Before public void beforeTest() throws DataException { Common.useSettings("test-settings-old-asset.json"); - NTP.testMode(); + NTP.setFixedOffset(Settings.getInstance().getTestNtpOffset()); } @After diff --git a/src/test/java/org/qortal/test/common/Common.java b/src/test/java/org/qortal/test/common/Common.java index bd3c3d9b..0972986b 100644 --- a/src/test/java/org/qortal/test/common/Common.java +++ b/src/test/java/org/qortal/test/common/Common.java @@ -116,7 +116,7 @@ public class Common { public static void useDefaultSettings() throws DataException { useSettings(testSettingsFilename); - NTP.testMode(); + NTP.setFixedOffset(Settings.getInstance().getTestNtpOffset()); } public static void resetBlockchain() throws DataException {