From 7f4511cb7bd8275323d7111c2aa6e6f809c29e12 Mon Sep 17 00:00:00 2001 From: catbref Date: Fri, 1 Feb 2019 14:03:06 +0000 Subject: [PATCH] Networking and repository Some pom.xml changes to reduce maven-shade-plugin conflicting classes warnings. Repository now supports SAVEPOINT and ROLLBACK TO SAVEPOINT. HSQLDB concurrency/transaction model changed from LOCKS to MVCC to help with transaction deadlocks/rollbacks. More XXXs and TODOs added to Block.java for investigation/fix/improvements. Also used new repository SAVEPOINT feature when validating transactions instead of rolling back entire transaction. This fixes problem during synchronization where the rollback would undo previously synchronized, but not yet committed, blocks! Transactions orphaned by Block.orphan ARE now added to unconfirmed pile, unlike before. Concurrent lock now prevents simultaneous block generation and synchronization, including synchronization via multiple peers. Lots of new networking code: peer lists, block signatures, blocks, blockchain synchronization. PEERS_V2 message now supports hostnames, IPv6 and port numbers. Fixed bug with block serialization for transport over network. --- log4j2.properties | 4 + pom.xml | 12 + src/main/java/org/qora/at/BlockchainAPI.java | 4 +- src/main/java/org/qora/block/Block.java | 33 ++- .../java/org/qora/block/BlockGenerator.java | 60 +++-- src/main/java/org/qora/blockgenerator.java | 2 - .../java/org/qora/controller/Controller.java | 190 +++++++++++--- .../org/qora/controller/Synchronizer.java | 235 ++++++++++++++++-- .../java/org/qora/data/network/PeerData.java | 14 +- src/main/java/org/qora/network/Handshake.java | 6 +- src/main/java/org/qora/network/Network.java | 182 +++++++++----- src/main/java/org/qora/network/Peer.java | 26 +- .../qora/network/message/BlockMessage.java | 91 +++++++ .../qora/network/message/GetBlockMessage.java | 54 ++++ .../network/message/GetSignaturesMessage.java | 54 ++++ .../org/qora/network/message/Message.java | 3 +- .../qora/network/message/PeersMessage.java | 11 +- .../qora/network/message/PeersV2Message.java | 134 ++++++++++ .../network/message/SignaturesMessage.java | 66 +++++ .../java/org/qora/repository/Repository.java | 4 + .../repository/TransactionRepository.java | 16 +- .../hsqldb/HSQLDBDatabaseUpdates.java | 2 +- .../hsqldb/HSQLDBNetworkRepository.java | 12 +- .../repository/hsqldb/HSQLDBRepository.java | 34 +++ .../HSQLDBTransactionRepository.java | 9 + .../transform/block/BlockTransformer.java | 17 +- src/main/java/org/qora/utils/NTP.java | 1 - 27 files changed, 1081 insertions(+), 195 deletions(-) create mode 100644 src/main/java/org/qora/network/message/BlockMessage.java create mode 100644 src/main/java/org/qora/network/message/GetBlockMessage.java create mode 100644 src/main/java/org/qora/network/message/GetSignaturesMessage.java create mode 100644 src/main/java/org/qora/network/message/PeersV2Message.java create mode 100644 src/main/java/org/qora/network/message/SignaturesMessage.java diff --git a/log4j2.properties b/log4j2.properties index 9e9af414..3188c2ff 100644 --- a/log4j2.properties +++ b/log4j2.properties @@ -26,6 +26,10 @@ logger.txSearch.level = trace logger.blockgen.name = org.qora.block.BlockGenerator logger.blockgen.level = trace +# Debug synchronization +logger.sync.name = org.qora.controller.Synchronizer +logger.sync.level = trace + # Debug networking logger.network.name = org.qora.network.Network logger.network.level = trace diff --git a/pom.xml b/pom.xml index ef052c39..8dd67a2f 100644 --- a/pom.xml +++ b/pom.xml @@ -385,6 +385,12 @@ org.glassfish.jersey.inject jersey-hk2 ${jersey.version} + + + javax.inject + javax.inject + + org.glassfish.jersey.media @@ -406,6 +412,12 @@ io.swagger.core.v3 swagger-jaxrs2-servlet-initializer ${swagger-api.version} + + + io.swagger.core.v3 + swagger-integration + + org.webjars diff --git a/src/main/java/org/qora/at/BlockchainAPI.java b/src/main/java/org/qora/at/BlockchainAPI.java index ec58b900..f1c2a0dc 100644 --- a/src/main/java/org/qora/at/BlockchainAPI.java +++ b/src/main/java/org/qora/at/BlockchainAPI.java @@ -102,12 +102,12 @@ public enum BlockchainAPI { BTC(1) { @Override public void putTransactionFromRecipientAfterTimestampInA(String recipient, Timestamp timestamp, MachineState state) { - // TODO + // TODO BTC transaction support for ATv2 } @Override public long getAmountFromTransactionInA(Timestamp timestamp, MachineState state) { - // TODO + // TODO BTC transaction support for ATv2 return 0; } }; diff --git a/src/main/java/org/qora/block/Block.java b/src/main/java/org/qora/block/Block.java index f08d67d6..b5f8ecc4 100644 --- a/src/main/java/org/qora/block/Block.java +++ b/src/main/java/org/qora/block/Block.java @@ -82,6 +82,7 @@ public class Block { TRANSACTION_TIMESTAMP_INVALID(51), TRANSACTION_INVALID(52), TRANSACTION_PROCESSING_FAILED(53), + TRANSACTION_ALREADY_PROCESSED(54), AT_STATES_MISMATCH(61); public final int value; @@ -123,6 +124,7 @@ public class Block { // Other useful constants /** Maximum size of block in bytes */ + // TODO push this out to blockchain config file public static final int MAX_BLOCK_BYTES = 1048576; // Constructors @@ -737,7 +739,7 @@ public class Block { return ValidationResult.TIMESTAMP_MS_INCORRECT; // Too early to forge block? - // XXX DISABLED + // XXX DISABLED as it doesn't work - but why? // if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMinBlockTime()) // return ValidationResult.TIMESTAMP_TOO_SOON; @@ -751,6 +753,7 @@ public class Block { if (this.blockData.getGeneratingBalance().compareTo(parentBlock.calcNextBlockGeneratingBalance()) != 0) return ValidationResult.GENERATING_BALANCE_INCORRECT; + // XXX Block.isValid generator check relaxation?? blockchain config option? // After maximum block period, then generator checks are relaxed if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMaxBlockTime()) { // Check generator is allowed to forge this block @@ -814,6 +817,9 @@ public class Block { // Check transactions try { + // Create repository savepoint here so we can rollback to it after testing transactions + repository.setSavepoint(); + for (Transaction transaction : this.getTransactions()) { // GenesisTransactions are not allowed (GenesisBlock overrides isValid() to allow them) if (transaction instanceof GenesisTransaction) @@ -824,6 +830,10 @@ public class Block { || transaction.getDeadline() <= this.blockData.getTimestamp()) return ValidationResult.TRANSACTION_TIMESTAMP_INVALID; + // Check transaction isn't already included in a block + if (this.repository.getTransactionRepository().isConfirmed(transaction.getTransactionData().getSignature())) + return ValidationResult.TRANSACTION_ALREADY_PROCESSED; + // Check transaction is even valid // NOTE: in Gen1 there was an extra block height passed to DeployATTransaction.isValid Transaction.ValidationResult validationResult = transaction.isValid(); @@ -843,15 +853,15 @@ public class Block { } } } catch (DataException e) { - return ValidationResult.TRANSACTION_TIMESTAMP_INVALID; + // XXX why was this TRANSACTION_TIMESTAMP_INVALID? + return ValidationResult.TRANSACTION_INVALID; } finally { - // Discard changes to repository made by test-processing transactions above + // Rollback repository changes made by test-processing transactions above try { - this.repository.discardChanges(); + this.repository.rollbackToSavepoint(); } catch (DataException e) { /* - * discardChanges failure most likely due to prior DataException, so catch discardChanges' DataException and ignore. Prior DataException - * propagates to caller. + * Rollback failure most likely due to prior DataException, so discard this DataException. Prior DataException propagates to caller. */ } } @@ -916,7 +926,8 @@ public class Block { this.blockData.setTransactionCount(this.blockData.getTransactionCount() + 1); // We've added transactions, so recalculate transactions signature - calcTransactionsSignature(); + // XXX surely this breaks Block.isSignatureValid which is called before we are? + // calcTransactionsSignature(); } /** @@ -976,9 +987,7 @@ public class Block { } /** - * Removes block from blockchain undoing transactions. - *

- * Note: it is up to the caller to re-add any of the block's transactions back to the unconfirmed transactions pile. + * Removes block from blockchain undoing transactions and adding them to unconfirmed pile. * * @throws DataException */ @@ -990,10 +999,14 @@ public class Block { Transaction transaction = transactions.get(sequence); transaction.orphan(); + // Unlink transaction from this block BlockTransactionData blockTransactionData = new BlockTransactionData(this.getSignature(), sequence, transaction.getTransactionData().getSignature()); this.repository.getBlockRepository().delete(blockTransactionData); + // Add to unconfirmed pile + this.repository.getTransactionRepository().unconfirmTransaction(transaction.getTransactionData()); + this.repository.getTransactionRepository().deleteParticipants(transaction.getTransactionData()); } diff --git a/src/main/java/org/qora/block/BlockGenerator.java b/src/main/java/org/qora/block/BlockGenerator.java index 88dac306..00871c35 100644 --- a/src/main/java/org/qora/block/BlockGenerator.java +++ b/src/main/java/org/qora/block/BlockGenerator.java @@ -2,6 +2,7 @@ package org.qora.block; import java.util.Arrays; import java.util.List; +import java.util.concurrent.locks.Lock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -78,40 +79,47 @@ public class BlockGenerator extends Thread { if (newBlock == null) newBlock = new Block(repository, previousBlock.getBlockData(), generator); - // Is new block valid yet? (Before adding unconfirmed transactions) - if (newBlock.isValid() == ValidationResult.OK) { - // Add unconfirmed transactions - addUnconfirmedTransactions(repository, newBlock); + // Make sure we're the only thread modifying the blockchain + Lock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (blockchainLock.tryLock()) + try { + // Is new block valid yet? (Before adding unconfirmed transactions) + if (newBlock.isValid() == ValidationResult.OK) { + // Add unconfirmed transactions + addUnconfirmedTransactions(repository, newBlock); - // Sign to create block's signature - newBlock.sign(); + // Sign to create block's signature + newBlock.sign(); - // If newBlock is still valid then we can use it - ValidationResult validationResult = newBlock.isValid(); - if (validationResult == ValidationResult.OK) { - // Add to blockchain - something else will notice and broadcast new block to network - try { - newBlock.process(); - LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight()); - repository.saveChanges(); + // If newBlock is still valid then we can use it + ValidationResult validationResult = newBlock.isValid(); + if (validationResult == ValidationResult.OK) { + // Add to blockchain - something else will notice and broadcast new block to network + try { + newBlock.process(); + LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight()); + repository.saveChanges(); - // Notify controller - Controller.getInstance().onGeneratedBlock(newBlock.getBlockData()); - } catch (DataException e) { - // Unable to process block - report and discard - LOGGER.error("Unable to process newly generated block?", e); - newBlock = null; + // Notify controller + Controller.getInstance().onGeneratedBlock(newBlock.getBlockData()); + } catch (DataException e) { + // Unable to process block - report and discard + LOGGER.error("Unable to process newly generated block?", e); + newBlock = null; + } + } else { + // No longer valid? Report and discard + LOGGER.error("Valid, generated block now invalid '" + validationResult.name() + "' after adding unconfirmed transactions?"); + newBlock = null; + } } - } else { - // No longer valid? Report and discard - LOGGER.error("Valid, generated block now invalid '" + validationResult.name() + "' after adding unconfirmed transactions?"); - newBlock = null; + } finally { + blockchainLock.unlock(); } - } // Sleep for a while try { - repository.discardChanges(); // Free transactional locks, if any + repository.discardChanges(); // Free repository locks, if any Thread.sleep(1000); // No point sleeping less than this as block timestamp millisecond values must be the same } catch (InterruptedException e) { // We've been interrupted - time to exit diff --git a/src/main/java/org/qora/blockgenerator.java b/src/main/java/org/qora/blockgenerator.java index dba5ac3f..0e8d8edb 100644 --- a/src/main/java/org/qora/blockgenerator.java +++ b/src/main/java/org/qora/blockgenerator.java @@ -58,14 +58,12 @@ public class blockgenerator { try { blockGenerator.join(); } catch (InterruptedException e) { - // TODO Auto-generated catch block e.printStackTrace(); } try { RepositoryManager.closeRepositoryFactory(); } catch (DataException e) { - // TODO Auto-generated catch block e.printStackTrace(); } } diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 61801cf2..7e4aa7b8 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -2,24 +2,35 @@ package org.qora.controller; import java.io.IOException; import java.io.InputStream; +import java.security.SecureRandom; import java.security.Security; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider; import org.qora.api.ApiService; +import org.qora.block.Block; import org.qora.block.BlockChain; import org.qora.block.BlockGenerator; import org.qora.data.block.BlockData; +import org.qora.data.network.PeerData; import org.qora.network.Network; import org.qora.network.Peer; +import org.qora.network.message.BlockMessage; +import org.qora.network.message.GetBlockMessage; +import org.qora.network.message.GetSignaturesMessage; import org.qora.network.message.HeightMessage; import org.qora.network.message.Message; +import org.qora.network.message.SignaturesMessage; import org.qora.repository.DataException; import org.qora.repository.Repository; import org.qora.repository.RepositoryFactory; @@ -27,6 +38,7 @@ import org.qora.repository.RepositoryManager; import org.qora.repository.hsqldb.HSQLDBRepositoryFactory; import org.qora.settings.Settings; import org.qora.utils.Base58; +import org.qora.utils.NTP; public class Controller extends Thread { @@ -47,6 +59,9 @@ public class Controller extends Thread { private final String buildVersion; private final long buildTimestamp; + /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly generated block. */ + private final Lock blockchainLock; + private Controller() { Properties properties = new Properties(); try (InputStream in = ClassLoader.getSystemResourceAsStream("build.properties")) { @@ -66,6 +81,8 @@ public class Controller extends Thread { throw new RuntimeException("Can't read build.version from build.properties resource"); this.buildVersion = VERSION_PREFIX + buildVersion; + + blockchainLock = new ReentrantLock(); } public static Controller getInstance() { @@ -75,6 +92,38 @@ public class Controller extends Thread { return instance; } + // Getters / setters + + public byte[] getMessageMagic() { + return new byte[] { + 0x12, 0x34, 0x56, 0x78 + }; + } + + public long getBuildTimestamp() { + return this.buildTimestamp; + } + + public String getVersionString() { + return this.buildVersion; + } + + /** Returns current blockchain height, or 0 if there's a repository issue */ + public int getChainHeight() { + try (final Repository repository = RepositoryManager.getRepository()) { + return repository.getBlockRepository().getBlockchainHeight(); + } catch (DataException e) { + LOGGER.error("Repository issue when fetching blockchain height", e); + return 0; + } + } + + public Lock getBlockchainLock() { + return this.blockchainLock; + } + + // Entry point + public static void main(String args[]) { LOGGER.info("Starting up..."); @@ -101,10 +150,10 @@ public class Controller extends Thread { System.exit(2); } - // XXX work to be done here! - if (args.length == 0) { + // XXX extract private key needed for block gen + if (args.length == 0 || !args[0].equals("NO-BLOCK-GEN")) { LOGGER.info("Starting block generator"); - byte[] privateKey = Base58.decode("A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6"); + byte[] privateKey = Base58.decode(args.length > 0 ? args[0] : "A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6"); blockGenerator = new BlockGenerator(privateKey); blockGenerator.start(); } @@ -130,6 +179,8 @@ public class Controller extends Thread { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { + Thread.currentThread().setName("Shutdown hook"); + Controller.getInstance().shutdown(); } }); @@ -138,16 +189,17 @@ public class Controller extends Thread { Controller.getInstance().start(); } + // Main thread + @Override public void run() { Thread.currentThread().setName("Controller"); try { - while (true) { + while (!isStopping) { Thread.sleep(1000); - // Query random connections for their blockchain status - // If height > ours then potentially synchronize + potentiallySynchronize(); // Query random connections for unconfirmed transactions } @@ -157,6 +209,38 @@ public class Controller extends Thread { } } + private void potentiallySynchronize() { + int ourHeight = getChainHeight(); + if (ourHeight == 0) + return; + + // If we have enough peers, potentially synchronize + List peers = Network.getInstance().getHandshakeCompletedPeers(); + if (peers.size() >= Settings.getInstance().getMinPeers()) { + peers.removeIf(peer -> peer.getPeerData().getLastHeight() <= ourHeight); + + if (!peers.isEmpty()) { + // Pick random peer to sync with + int index = new SecureRandom().nextInt(peers.size()); + Peer peer = peers.get(index); + + if (!Synchronizer.getInstance().synchronize(peer)) { + // Failure so don't use this peer again for a while + try (final Repository repository = RepositoryManager.getRepository()) { + PeerData peerData = peer.getPeerData(); + peerData.setLastMisbehaved(NTP.getTime()); + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } catch (DataException e) { + LOGGER.warn("Repository issue while updating peer synchronization info", e); + } + } + } + } + } + + // Shutdown + public void shutdown() { synchronized (shutdownLock) { if (!isStopping) { @@ -164,6 +248,11 @@ public class Controller extends Thread { LOGGER.info("Shutting down controller"); this.interrupt(); + try { + this.join(); + } catch (InterruptedException e) { + // We were interrupted while waiting for thread to join + } LOGGER.info("Shutting down networking"); Network.getInstance().shutdown(); @@ -177,7 +266,7 @@ public class Controller extends Thread { try { blockGenerator.join(); } catch (InterruptedException e) { - // We were interrupted while waiting for thread to 'join' + // We were interrupted while waiting for thread to join } } @@ -198,39 +287,16 @@ public class Controller extends Thread { System.exit(0); } - public byte[] getMessageMagic() { - return new byte[] { - 0x12, 0x34, 0x56, 0x78 - }; - } - - public long getBuildTimestamp() { - return this.buildTimestamp; - } - - public String getVersionString() { - return this.buildVersion; - } - - public int getChainHeight() { - try (final Repository repository = RepositoryManager.getRepository()) { - return repository.getBlockRepository().getBlockchainHeight(); - } catch (DataException e) { - LOGGER.error("Repository issue when fetching blockchain height", e); - return 0; - } - } - // Callbacks for/from network public void doNetworkBroadcast() { Network network = Network.getInstance(); // Send our known peers - network.broadcast(network.buildPeersMessage()); + network.broadcast(peer -> network.buildPeersMessage(peer)); // Send our current height - network.broadcast(new HeightMessage(this.getChainHeight())); + network.broadcast(peer -> new HeightMessage(this.getChainHeight())); } public void onGeneratedBlock(BlockData newBlockData) { @@ -238,24 +304,66 @@ public class Controller extends Thread { // Could even broadcast top two block sigs so that remote peers can see new block references current network-wide last block // Broadcast our new height - Network.getInstance().broadcast(new HeightMessage(newBlockData.getHeight())); + Network.getInstance().broadcast(peer -> new HeightMessage(newBlockData.getHeight())); } public void onNetworkMessage(Peer peer, Message message) { - LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer.getRemoteSocketAddress())); + LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer)); switch (message.getType()) { case HEIGHT: HeightMessage heightMessage = (HeightMessage) message; - // If we connected to peer, then update our record of peer's height - if (peer.isOutbound()) - peer.getPeerData().setLastHeight(heightMessage.getHeight()); + // Update our record of peer's height + peer.getPeerData().setLastHeight(heightMessage.getHeight()); - // XXX we should instead test incoming block sigs to see if we have them, and if not do sync - // Is peer's blockchain longer than ours? - if (heightMessage.getHeight() > getChainHeight()) - Synchronizer.getInstance().synchronize(peer); + break; + + case GET_SIGNATURES: + try (final Repository repository = RepositoryManager.getRepository()) { + GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message; + byte[] parentSignature = getSignaturesMessage.getParentSignature(); + + List signatures = new ArrayList<>(); + + do { + BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); + + if (blockData == null) + break; + + parentSignature = blockData.getSignature(); + signatures.add(parentSignature); + } while (signatures.size() < 500); + + Message signaturesMessage = new SignaturesMessage(signatures); + signaturesMessage.setId(message.getId()); + if (!peer.sendMessage(signaturesMessage)) + peer.disconnect(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + } + break; + + case GET_BLOCK: + try (final Repository repository = RepositoryManager.getRepository()) { + GetBlockMessage getBlockMessage = (GetBlockMessage) message; + byte[] signature = getBlockMessage.getSignature(); + + BlockData blockData = repository.getBlockRepository().fromSignature(signature); + if (blockData == null) + // No response at all??? + break; + + Block block = new Block(repository, blockData); + + Message blockMessage = new BlockMessage(block); + blockMessage.setId(message.getId()); + if (!peer.sendMessage(blockMessage)) + peer.disconnect(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + } break; default: diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index b42e0f56..b82c48df 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -1,15 +1,40 @@ package org.qora.controller; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.locks.Lock; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.qora.block.Block; +import org.qora.block.Block.ValidationResult; +import org.qora.block.GenesisBlock; +import org.qora.data.block.BlockData; import org.qora.network.Peer; +import org.qora.network.message.BlockMessage; +import org.qora.network.message.GetBlockMessage; +import org.qora.network.message.GetSignaturesMessage; +import org.qora.network.message.Message; +import org.qora.network.message.Message.MessageType; +import org.qora.network.message.SignaturesMessage; +import org.qora.repository.DataException; +import org.qora.repository.Repository; +import org.qora.repository.RepositoryManager; public class Synchronizer { private static final Logger LOGGER = LogManager.getLogger(Synchronizer.class); + private static final int INITIAL_BLOCK_STEP = 8; + private static final int MAXIMUM_BLOCK_STEP = 500; + private static final int MAXIMUM_HEIGHT_DELTA = 2000; // XXX move to blockchain config? + private static Synchronizer instance; + private Repository repository; + private int ourHeight; + private Synchronizer() { } @@ -20,35 +45,211 @@ public class Synchronizer { return instance; } - public void synchronize(Peer peer) { - // If we're already synchronizing with another peer then return + public boolean synchronize(Peer peer) { + // Make sure we're the only thread modifying the blockchain + // If we're already synchronizing with another peer then this will also return fast + Lock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (blockchainLock.tryLock()) + try { + try (final Repository repository = RepositoryManager.getRepository()) { + try { + this.repository = repository; + this.ourHeight = this.repository.getBlockRepository().getBlockchainHeight(); + int peerHeight = peer.getPeerData().getLastHeight(); - LOGGER.info(String.format("Synchronizing with peer %s", peer.getRemoteSocketAddress())); + LOGGER.info(String.format("Synchronizing with peer %s from height %d to height %d", peer, this.ourHeight, peerHeight)); - // Peer has different latest block sig to us + List signatures = findSignaturesFromCommonBlock(peer); + if (signatures == null) { + LOGGER.info(String.format("Failure to find common block with peer %s", peer)); + return false; + } - // find common block? + // First signature is common block + BlockData commonBlockData = this.repository.getBlockRepository().fromSignature(signatures.get(0)); + signatures.remove(0); - // if common block is too far behind us then we're on massively different forks so give up, maybe human invention required to download desired fork + // If common block is too far behind us then we're on massively different forks so give up. + int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA; + if (commonBlockData.getHeight() < minHeight) { + LOGGER.info(String.format("Blockchain too divergent with peer %s", peer)); + return false; + } - // unwind to common block (unless common block is our latest block) + if (this.ourHeight > commonBlockData.getHeight()) { + // Unwind to common block (unless common block is our latest block) + LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockData.getHeight())); - // apply some newer blocks from peer + while (this.ourHeight > commonBlockData.getHeight()) { + BlockData blockData = repository.getBlockRepository().fromHeight(this.ourHeight); + Block block = new Block(repository, blockData); + block.orphan(); - // commit + --this.ourHeight; + } - // If our block gen creates a block while we do this - what happens? - // does repository serialization prevent issues? + LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockData.getHeight(), peer)); + } else { + LOGGER.debug(String.format("Fetching new blocks from peer %s", peer)); + } - // blockgen: block 123: pay X from A to B, commit - // sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit + // Fetch, and apply, blocks from peer + byte[] signature = commonBlockData.getSignature(); + while (this.ourHeight < peerHeight) { + // Do we need more signatures? + if (signatures.isEmpty()) { + signatures = this.getBlockSignatures(peer, signature, MAXIMUM_BLOCK_STEP); + if (signatures == null || signatures.isEmpty()) { + LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, this.ourHeight)); + return false; + } + } - // and vice versa? + signature = signatures.get(0); + signatures.remove(0); + ++this.ourHeight; - // sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit - // blockgen: block 123: pay X from A to B, commit + BlockData newBlockData = this.fetchBlockData(peer, signature); - // simply block syncing when generating and vice versa by grabbing a Controller-owned non-blocking mutex? + if (newBlockData == null) { + LOGGER.info(String.format("Peer %s failed to respond with block for height %d", peer, this.ourHeight)); + return false; + } + + Block newBlock = new Block(repository, newBlockData); + + if (!newBlock.isSignatureValid()) { + LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d", peer, this.ourHeight)); + return false; + } + + ValidationResult blockResult = newBlock.isValid(); + if (blockResult != ValidationResult.OK) { + LOGGER.info(String.format("Peer %s sent invalid block for height %d: %s", peer, this.ourHeight, blockResult.name())); + return false; + } + + newBlock.process(); + } + + // Commit + repository.saveChanges(); + LOGGER.info(String.format("Synchronized with peer %s to height %d", peer, this.ourHeight)); + + return true; + } finally { + repository.discardChanges(); + this.repository = null; + } + } + } catch (DataException e) { + LOGGER.error("Repository issue during synchronization with peer", e); + return false; + } finally { + blockchainLock.unlock(); + } + + // Wasn't peer's fault we couldn't sync + return true; + } + + /** + * Returns list of block signatures start with common block with peer. + * + * @param peer + * @return block signatures + * @throws DataException + */ + private List findSignaturesFromCommonBlock(Peer peer) throws DataException { + // Start by asking for a few recent block hashes as this will cover a majority of reorgs + // Failing that, back off exponentially + int step = INITIAL_BLOCK_STEP; + + List blockSignatures = null; + int testHeight = ourHeight - step; + byte[] testSignature = null; + + while (testHeight > 1) { + // Fetch our block signature at this height + BlockData testBlockData = this.repository.getBlockRepository().fromHeight(testHeight); + if (testBlockData == null) { + // Not found? But we've locked the blockchain and height is below blockchain's tip! + LOGGER.error("Failed to get block at height lower than blockchain tip during synchronization?"); + return null; + } + + testSignature = testBlockData.getSignature(); + + // Ask for block signatures since test block's signature + LOGGER.trace(String.format("Requesting %d signature%s after our height %d", step, (step != 1 ? "s": ""), testHeight)); + blockSignatures = this.getBlockSignatures(peer, testSignature, step); + + if (blockSignatures == null) + // No response - give up this time + return null; + + LOGGER.trace(String.format("Received %s signature%s", blockSignatures.size(), (blockSignatures.size() != 1 ? "s" : ""))); + + // Empty list means remote peer is unaware of test signature OR has no new blocks after test signature + if (!blockSignatures.isEmpty()) + // We have entries so we have found a common block + break; + + if (peer.getVersion() >= 2) { + step <<= 1; + } else { + // Old v1 peers are hard-coded to return 500 signatures so we might as well go backward by 500 too + step = 500; + } + step = Math.min(step, MAXIMUM_BLOCK_STEP); + + testHeight -= step; + } + + if (testHeight <= 1) + // Can't go back any further - return Genesis block + return new ArrayList(Arrays.asList(GenesisBlock.getInstance(this.repository).getBlockData().getSignature())); + + // Prepend common block's signature as first block sig + blockSignatures.add(0, testSignature); + + // Work through returned signatures to get closer common block + // Do this by trimming all-but-one leading known signatures + for (int i = blockSignatures.size() - 1; i > 0; --i) { + BlockData blockData = this.repository.getBlockRepository().fromSignature(blockSignatures.get(i)); + + if (blockData != null) { + blockSignatures.subList(0, i).clear(); + break; + } + } + + return blockSignatures; + } + + private List getBlockSignatures(Peer peer, byte[] parentSignature, int countRequested) { + // TODO countRequested is v2+ feature + Message getSignaturesMessage = new GetSignaturesMessage(parentSignature); + + Message message = peer.getResponse(getSignaturesMessage); + if (message == null || message.getType() != MessageType.SIGNATURES) + return null; + + SignaturesMessage signaturesMessage = (SignaturesMessage) message; + + return signaturesMessage.getSignatures(); + } + + private BlockData fetchBlockData(Peer peer, byte[] signature) { + Message getBlockMessage = new GetBlockMessage(signature); + + Message message = peer.getResponse(getBlockMessage); + if (message == null || message.getType() != MessageType.BLOCK) + return null; + + BlockMessage blockMessage = (BlockMessage) message; + + return blockMessage.getBlockData(); } } diff --git a/src/main/java/org/qora/data/network/PeerData.java b/src/main/java/org/qora/data/network/PeerData.java index 48b5288b..f120eacd 100644 --- a/src/main/java/org/qora/data/network/PeerData.java +++ b/src/main/java/org/qora/data/network/PeerData.java @@ -14,6 +14,7 @@ public class PeerData { private Long lastAttempted; private Long lastConnected; private Integer lastHeight; + private Long lastMisbehaved; // Constructors @@ -21,15 +22,16 @@ public class PeerData { protected PeerData() { } - public PeerData(InetSocketAddress socketAddress, Long lastAttempted, Long lastConnected, Integer lastHeight) { + public PeerData(InetSocketAddress socketAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, Long lastMisbehaved) { this.socketAddress = socketAddress; this.lastAttempted = lastAttempted; this.lastConnected = lastConnected; this.lastHeight = lastHeight; + this.lastMisbehaved = lastMisbehaved; } public PeerData(InetSocketAddress socketAddress) { - this(socketAddress, null, null, null); + this(socketAddress, null, null, null, null); } // Getters / setters @@ -62,4 +64,12 @@ public class PeerData { this.lastHeight = lastHeight; } + public Long getLastMisbehaved() { + return this.lastMisbehaved; + } + + public void setLastMisbehaved(Long lastMisbehaved) { + this.lastMisbehaved = lastMisbehaved; + } + } diff --git a/src/main/java/org/qora/network/Handshake.java b/src/main/java/org/qora/network/Handshake.java index 87e7ae61..1b1368f4 100644 --- a/src/main/java/org/qora/network/Handshake.java +++ b/src/main/java/org/qora/network/Handshake.java @@ -5,7 +5,6 @@ import java.util.Arrays; import org.qora.controller.Controller; import org.qora.network.message.Message; import org.qora.network.message.Message.MessageType; -import org.qora.utils.NTP; import org.qora.network.message.PeerIdMessage; import org.qora.network.message.ProofMessage; import org.qora.network.message.VersionMessage; @@ -77,7 +76,7 @@ public enum Handshake { if (peer.isOutbound()) return COMPLETED; - // Check salt hasn't been seen before - this stops multiple peers reusing salt nonce in a Sybil-like attack + // Check salt hasn't been seen before - this stops multiple peers reusing same nonce in a Sybil-like attack if (Proof.seenSalt(proofMessage.getSalt())) return null; @@ -103,9 +102,6 @@ public enum Handshake { @Override public void action(Peer peer) { // Note: this is only called when we've made outbound connection - - // Make a note that we've successfully completed handshake (and when) - peer.getPeerData().setLastConnected(NTP.getTime()); } }; diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index cf034f3c..2372cd8c 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -9,6 +9,7 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -23,6 +24,7 @@ import org.qora.data.network.PeerData; import org.qora.network.message.HeightMessage; import org.qora.network.message.Message; import org.qora.network.message.PeersMessage; +import org.qora.network.message.PeersV2Message; import org.qora.network.message.PingMessage; import org.qora.repository.DataException; import org.qora.repository.Repository; @@ -34,11 +36,16 @@ import org.qora.utils.NTP; public class Network extends Thread { private static final Logger LOGGER = LogManager.getLogger(Network.class); - private static final int LISTEN_BACKLOG = 10; - private static final int CONNECT_FAILURE_BACKOFF = 60 * 1000; // ms - private static final int BROADCAST_INTERVAL = 60 * 1000; // ms private static Network instance; + private static final int LISTEN_BACKLOG = 10; + /** How long before retrying after a connection failure, in milliseconds. */ + private static final int CONNECT_FAILURE_BACKOFF = 60 * 1000; // ms + /** How long between informational broadcasts to all connected peers, in milliseconds. */ + private static final int BROADCAST_INTERVAL = 60 * 1000; // ms + /** Maximum time since last successful connection for peer info to be propagated, in milliseconds. */ + private static final long RECENT_CONNECTION_THRESHOLD = 24 * 60 * 60 * 1000; // ms + public static final int PEER_ID_LENGTH = 128; private final byte[] ourPeerId; @@ -113,7 +120,7 @@ public class Network extends Thread { } public void noteToSelf(Peer peer) { - LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer.getRemoteSocketAddress())); + LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer)); synchronized (this.selfPeers) { this.selfPeers.add(peer.getPeerData()); @@ -129,7 +136,7 @@ public class Network extends Thread { // Maintain long-term connections to various peers' API applications try { while (true) { - acceptConnection(); + acceptConnections(); createConnection(); @@ -160,38 +167,40 @@ public class Network extends Thread { } @SuppressWarnings("resource") - private void acceptConnection() throws InterruptedException { + private void acceptConnections() throws InterruptedException { Socket socket; - try { - socket = this.listenSocket.accept(); - } catch (SocketTimeoutException e) { - // No connections to accept - return; - } catch (IOException e) { - // Something went wrong or listen socket was closed due to shutdown - return; - } - - synchronized (this.connectedPeers) { - if (connectedPeers.size() >= maxPeers) { - // We have enough peers - LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress())); - - try { - socket.close(); - } catch (IOException e) { - // Not important - } - + do { + try { + socket = this.listenSocket.accept(); + } catch (SocketTimeoutException e) { + // No connections to accept + return; + } catch (IOException e) { + // Something went wrong or listen socket was closed due to shutdown return; } - LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress())); - Peer newPeer = new Peer(socket); - this.connectedPeers.add(newPeer); - peerExecutor.execute(newPeer); - } + synchronized (this.connectedPeers) { + if (connectedPeers.size() >= maxPeers) { + // We have enough peers + LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress())); + + try { + socket.close(); + } catch (IOException e) { + // Not important + } + + return; + } + + LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress())); + Peer newPeer = new Peer(socket); + this.connectedPeers.add(newPeer); + peerExecutor.execute(newPeer); + } + } while (true); } private void createConnection() throws InterruptedException, DataException { @@ -220,7 +229,7 @@ public class Network extends Thread { // Don't consider already connected peers Predicate isConnectedPeer = peerData -> this.connectedPeers.stream() - .anyMatch(peer -> peer.getPeerData() != null && peer.getPeerData().getSocketAddress().equals(peerData.getSocketAddress())); + .anyMatch(peer -> peer.getPeerData().getSocketAddress().equals(peerData.getSocketAddress())); synchronized (this.connectedPeers) { peers.removeIf(isConnectedPeer); @@ -269,7 +278,7 @@ public class Network extends Thread { /** Called when a new message arrives for a peer. message can be null if called after connection */ public void onMessage(Peer peer, Message message) { if (message != null) - LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer.getRemoteSocketAddress())); + LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer)); Handshake handshakeStatus = peer.getHandshakeStatus(); if (handshakeStatus != Handshake.COMPLETED) { @@ -277,8 +286,7 @@ public class Network extends Thread { // Check message type is as expected if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { - LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer.getRemoteSocketAddress(), - handshakeStatus.expectedMessageType)); + LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); peer.disconnect(); return; } @@ -287,7 +295,7 @@ public class Network extends Thread { if (newHandshakeStatus == null) { // Handshake failure - LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer.getRemoteSocketAddress(), message.getType().name())); + LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); peer.disconnect(); return; } @@ -296,7 +304,7 @@ public class Network extends Thread { // If we made outbound connection then we need to act first newHandshakeStatus.action(peer); else - // We have inbound connection so we need to respond inline with what we just received + // We have inbound connection so we need to respond in kind with what we just received handshakeStatus.action(peer); peer.setHandshakeStatus(newHandshakeStatus); @@ -313,7 +321,7 @@ public class Network extends Thread { case VERSION: case PEER_ID: case PROOF: - LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer.getRemoteSocketAddress())); + LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer)); peer.disconnect(); return; @@ -334,16 +342,28 @@ public class Network extends Thread { List peerAddresses = new ArrayList<>(); + // v1 PEERS message doesn't support port numbers so we have to add default port for (InetAddress peerAddress : peersMessage.getPeerAddresses()) peerAddresses.add(new InetSocketAddress(peerAddress, Settings.DEFAULT_LISTEN_PORT)); - try { - mergePeers(peerAddresses); - } catch (DataException e) { - // Not good - peer.disconnect(); - return; - } + // Also add peer's details + peerAddresses.add(new InetSocketAddress(peer.getRemoteSocketAddress().getHostString(), Settings.DEFAULT_LISTEN_PORT)); + + mergePeers(peerAddresses); + break; + + case PEERS_V2: + PeersV2Message peersV2Message = (PeersV2Message) message; + + List peerV2Addresses = peersV2Message.getPeerAddresses(); + + // First entry contains remote peer's listen port but empty address. + // Overwrite address with one obtained from socket. + int peerPort = peerV2Addresses.get(0).getPort(); + peerV2Addresses.remove(0); + peerV2Addresses.add(0, InetSocketAddress.createUnresolved(peer.getRemoteSocketAddress().getHostString(), peerPort)); + + mergePeers(peerV2Addresses); break; default: @@ -354,6 +374,9 @@ public class Network extends Thread { } private void onHandshakeCompleted(Peer peer) { + // Make a note that we've successfully completed handshake (and when) + peer.getPeerData().setLastConnected(NTP.getTime()); + peer.startPings(); Message heightMessage = new HeightMessage(Controller.getInstance().getChainHeight()); @@ -363,36 +386,61 @@ public class Network extends Thread { return; } - Message peersMessage = this.buildPeersMessage(); + Message peersMessage = this.buildPeersMessage(peer); if (!peer.sendMessage(peersMessage)) peer.disconnect(); } - public Message buildPeersMessage() { - List peers = new ArrayList<>(); + /** Returns PEERS message made from peers we've connected to recently, and this node's details */ + public Message buildPeersMessage(Peer peer) { + try (final Repository repository = RepositoryManager.getRepository()) { + List knownPeers = repository.getNetworkRepository().getAllPeers(); - synchronized (this.connectedPeers) { - // Only outbound peer connections that have completed handshake - peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED) - .collect(Collectors.toList()); + // Filter out peers that we've not connected to ever or within X milliseconds + long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD; + knownPeers.removeIf(peerData -> peerData.getLastConnected() == null || peerData.getLastConnected() < connectionThreshold); + + // Map to socket addresses + List peerSocketAddresses = knownPeers.stream().map(peerData -> peerData.getSocketAddress()).collect(Collectors.toList()); + + if (peer.getVersion() >= 2) + // New format PEERS_V2 message that supports hostnames, IPv6 and ports + return new PeersV2Message(peerSocketAddresses); + else + // Legacy PEERS message that only sends IPv4 addresses + return new PeersMessage(peerSocketAddresses); + } catch (DataException e) { + LOGGER.error("Repository issue while building PEERS message", e); + return new PeersMessage(Collections.emptyList()); } - - return new PeersMessage(peers); } // Network-wide calls - private List getCompletedPeers() { - List completedPeers = new ArrayList<>(); + /** Returns list of connected peers that have completed handshaking. */ + public List getHandshakeCompletedPeers() { + List peers = new ArrayList<>(); synchronized (this.connectedPeers) { - completedPeers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); + peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); } - return completedPeers; + return peers; } - private void mergePeers(List peerAddresses) throws DataException { + /** Returns list of peers we connected to that have completed handshaking. */ + public List getOutboundHandshakeCompletedPeers() { + List peers = new ArrayList<>(); + + synchronized (this.connectedPeers) { + peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED) + .collect(Collectors.toList()); + } + + return peers; + } + + private void mergePeers(List peerAddresses) { try (final Repository repository = RepositoryManager.getRepository()) { List knownPeers = repository.getNetworkRepository().getAllPeers(); @@ -412,28 +460,30 @@ public class Network extends Thread { } repository.saveChanges(); + } catch (DataException e) { + LOGGER.error("Repository issue while merging peers list from remote node", e); } } - public void broadcast(Message message) { + public void broadcast(Function peerMessage) { class Broadcaster implements Runnable { private List targetPeers; - private Message message; + private Function peerMessage; - public Broadcaster(List targetPeers, Message message) { + public Broadcaster(List targetPeers, Function peerMessage) { this.targetPeers = targetPeers; - this.message = message; + this.peerMessage = peerMessage; } @Override public void run() { for (Peer peer : targetPeers) - if (!peer.sendMessage(message)) + if (!peer.sendMessage(peerMessage.apply(peer))) peer.disconnect(); } } - peerExecutor.execute(new Broadcaster(this.getCompletedPeers(), message)); + peerExecutor.execute(new Broadcaster(this.getHandshakeCompletedPeers(), peerMessage)); } public void shutdown() { diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index b004fcd8..32acd76c 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -62,6 +62,7 @@ public class Peer implements Runnable { this.isOutbound = false; this.socket = socket; this.remoteSocketAddress = (InetSocketAddress) this.socket.getRemoteSocketAddress(); + this.peerData = new PeerData(this.remoteSocketAddress); } // Getters / setters @@ -121,6 +122,15 @@ public class Peer implements Runnable { this.lastPing = lastPing; } + // Easier, and nicer output, than peer.getRemoteSocketAddress() + + @Override + public String toString() { + InetSocketAddress socketAddress = this.getRemoteSocketAddress(); + + return socketAddress.getHostString() + ":" + socketAddress.getPort(); + } + // Processing private void setup() throws IOException { @@ -131,22 +141,22 @@ public class Peer implements Runnable { } public boolean connect() { - LOGGER.trace(String.format("Connecting to peer %s", this.remoteSocketAddress)); + LOGGER.trace(String.format("Connecting to peer %s", this)); this.socket = new Socket(); try { InetSocketAddress resolvedSocketAddress = new InetSocketAddress(this.remoteSocketAddress.getHostString(), this.remoteSocketAddress.getPort()); this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT); - LOGGER.debug(String.format("Connected to peer %s", this.remoteSocketAddress)); + LOGGER.debug(String.format("Connected to peer %s", this)); } catch (SocketTimeoutException e) { - LOGGER.trace(String.format("Connection timed out to peer %s", this.remoteSocketAddress)); + LOGGER.trace(String.format("Connection timed out to peer %s", this)); return false; } catch (UnknownHostException e) { - LOGGER.trace(String.format("Connection failed to unresolved peer %s", this.remoteSocketAddress)); + LOGGER.trace(String.format("Connection failed to unresolved peer %s", this)); return false; } catch (IOException e) { - LOGGER.trace(String.format("Connection failed to peer %s", this.remoteSocketAddress)); + LOGGER.trace(String.format("Connection failed to peer %s", this)); return false; } @@ -157,7 +167,7 @@ public class Peer implements Runnable { @Override public void run() { - Thread.currentThread().setName("Peer " + this.socket.getRemoteSocketAddress()); + Thread.currentThread().setName("Peer " + this); try (DataInputStream in = new DataInputStream(socket.getInputStream())) { setup(); @@ -199,7 +209,7 @@ public class Peer implements Runnable { try { // Send message - LOGGER.trace(String.format("Sending %s message to peer %s", message.getType().name(), this.getRemoteSocketAddress())); + LOGGER.trace(String.format("Sending %s message to peer %s", message.getType().name(), this)); synchronized (this.out) { this.out.write(message.toBytes()); @@ -288,7 +298,7 @@ public class Peer implements Runnable { // Close socket if (!this.socket.isClosed()) { - LOGGER.debug(String.format("Disconnected peer %s", this.getRemoteSocketAddress())); + LOGGER.debug(String.format("Disconnected peer %s", this)); try { this.socket.close(); diff --git a/src/main/java/org/qora/network/message/BlockMessage.java b/src/main/java/org/qora/network/message/BlockMessage.java new file mode 100644 index 00000000..649915ab --- /dev/null +++ b/src/main/java/org/qora/network/message/BlockMessage.java @@ -0,0 +1,91 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.qora.block.Block; +import org.qora.data.at.ATStateData; +import org.qora.data.block.BlockData; +import org.qora.data.transaction.TransactionData; +import org.qora.transform.TransformationException; +import org.qora.transform.block.BlockTransformer; +import org.qora.utils.Triple; + +import com.google.common.primitives.Ints; + +public class BlockMessage extends Message { + + private Block block = null; + + private BlockData blockData = null; + private List transactions = null; + private List atStates = null; + + private int height; + + public BlockMessage(Block block) { + super(MessageType.BLOCK); + + this.block = block; + this.height = block.getBlockData().getHeight(); + } + + private BlockMessage(int id, BlockData blockData, List transactions, List atStates) { + super(id, MessageType.BLOCK); + + this.blockData = blockData; + this.transactions = transactions; + this.atStates = atStates; + + this.height = blockData.getHeight(); + } + + public BlockData getBlockData() { + return this.blockData; + } + + public List getTransactions() { + return this.transactions; + } + + public List getAtStates() { + return this.atStates; + } + + public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws UnsupportedEncodingException { + try { + int height = byteBuffer.getInt(); + + Triple, List> blockInfo = BlockTransformer.fromByteBuffer(byteBuffer); + + BlockData blockData = blockInfo.getA(); + blockData.setHeight(height); + + return new BlockMessage(id, blockData, blockInfo.getB(), blockInfo.getC()); + } catch (TransformationException e) { + return null; + } + } + + @Override + protected byte[] toData() { + if (this.block == null) + return null; + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Ints.toByteArray(this.height)); + + bytes.write(BlockTransformer.toBytes(this.block)); + + return bytes.toByteArray(); + } catch (TransformationException | IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/GetBlockMessage.java b/src/main/java/org/qora/network/message/GetBlockMessage.java new file mode 100644 index 00000000..3813d5ee --- /dev/null +++ b/src/main/java/org/qora/network/message/GetBlockMessage.java @@ -0,0 +1,54 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import org.qora.transform.block.BlockTransformer; + +public class GetBlockMessage extends Message { + + private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH; + + private byte[] signature; + + public GetBlockMessage(byte[] signature) { + this(-1, signature); + } + + private GetBlockMessage(int id, byte[] signature) { + super(id, MessageType.GET_BLOCK); + + this.signature = signature; + } + + public byte[] getSignature() { + return this.signature; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH) + return null; + + byte[] signature = new byte[BLOCK_SIGNATURE_LENGTH]; + + bytes.get(signature); + + return new GetBlockMessage(id, signature); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(this.signature); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/GetSignaturesMessage.java b/src/main/java/org/qora/network/message/GetSignaturesMessage.java new file mode 100644 index 00000000..5379abcd --- /dev/null +++ b/src/main/java/org/qora/network/message/GetSignaturesMessage.java @@ -0,0 +1,54 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import org.qora.transform.block.BlockTransformer; + +public class GetSignaturesMessage extends Message { + + private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH; + + private byte[] parentSignature; + + public GetSignaturesMessage(byte[] parentSignature) { + this(-1, parentSignature); + } + + private GetSignaturesMessage(int id, byte[] parentSignature) { + super(id, MessageType.GET_SIGNATURES); + + this.parentSignature = parentSignature; + } + + public byte[] getParentSignature() { + return this.parentSignature; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH) + return null; + + byte[] parentSignature = new byte[BLOCK_SIGNATURE_LENGTH]; + + bytes.get(parentSignature); + + return new GetSignaturesMessage(id, parentSignature); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(this.parentSignature); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/Message.java b/src/main/java/org/qora/network/message/Message.java index 03755e8a..ab8eacf1 100644 --- a/src/main/java/org/qora/network/message/Message.java +++ b/src/main/java/org/qora/network/message/Message.java @@ -39,7 +39,8 @@ public abstract class Message { PING(9), VERSION(10), PEER_ID(11), - PROOF(12); + PROOF(12), + PEERS_V2(13); public final int value; public final Method fromByteBuffer; diff --git a/src/main/java/org/qora/network/message/PeersMessage.java b/src/main/java/org/qora/network/message/PeersMessage.java index 9c6aa448..a576c403 100644 --- a/src/main/java/org/qora/network/message/PeersMessage.java +++ b/src/main/java/org/qora/network/message/PeersMessage.java @@ -4,13 +4,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.qora.network.Peer; - import com.google.common.primitives.Ints; // NOTE: this legacy message only supports 4-byte IPv4 addresses and doesn't send port number either @@ -20,15 +19,15 @@ public class PeersMessage extends Message { private List peerAddresses; - public PeersMessage(List peers) { - super(-1, MessageType.PEERS); + public PeersMessage(List peerSocketAddresses) { + super(MessageType.PEERS); // We have to forcibly resolve into IP addresses as we can't send hostnames this.peerAddresses = new ArrayList<>(); - for (Peer peer : peers) { + for (InetSocketAddress peerSocketAddress : peerSocketAddresses) { try { - InetAddress resolvedAddress = InetAddress.getByName(peer.getRemoteSocketAddress().getHostString()); + InetAddress resolvedAddress = InetAddress.getByName(peerSocketAddress.getHostString()); // Filter out unsupported address types if (resolvedAddress.getAddress().length != ADDRESS_LENGTH) diff --git a/src/main/java/org/qora/network/message/PeersV2Message.java b/src/main/java/org/qora/network/message/PeersV2Message.java new file mode 100644 index 00000000..528dfcbb --- /dev/null +++ b/src/main/java/org/qora/network/message/PeersV2Message.java @@ -0,0 +1,134 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.qora.settings.Settings; + +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Ints; + +// NOTE: this message supports hostnames, IPv6, port numbers and IPv4 addresses (in IPv6 form) +public class PeersV2Message extends Message { + + private static final byte[] IPV6_V4_PREFIX = new byte[] { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff + }; + + private List peerSocketAddresses; + + public PeersV2Message(List peerSocketAddresses) { + this(-1, peerSocketAddresses); + } + + private PeersV2Message(int id, List peerSocketAddresses) { + super(id, MessageType.PEERS_V2); + + this.peerSocketAddresses = peerSocketAddresses; + } + + public List getPeerAddresses() { + return this.peerSocketAddresses; + } + + public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws UnsupportedEncodingException { + // Read entry count + int count = byteBuffer.getInt(); + + List peerSocketAddresses = new ArrayList<>(); + + byte[] ipAddressBytes = new byte[16]; + int port; + + for (int i = 0; i < count; ++i) { + byte addressSize = byteBuffer.get(); + + if (addressSize == 0) { + // Address size of 0 indicates IP address (always in IPv6 form) + byteBuffer.get(ipAddressBytes); + + port = byteBuffer.getInt(); + + try { + InetAddress address = InetAddress.getByAddress(ipAddressBytes); + + peerSocketAddresses.add(new InetSocketAddress(address, port)); + } catch (UnknownHostException e) { + // Ignore and continue + } + } else { + byte[] hostnameBytes = new byte[addressSize & 0xff]; + byteBuffer.get(hostnameBytes); + String hostname = new String(hostnameBytes, "UTF-8"); + + port = byteBuffer.getInt(); + + peerSocketAddresses.add(InetSocketAddress.createUnresolved(hostname, port)); + } + } + + return new PeersV2Message(id, peerSocketAddresses); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + // First entry represents sending node but contains only port number with empty address. + List socketAddresses = new ArrayList<>(this.peerSocketAddresses); + socketAddresses.add(0, new InetSocketAddress(Settings.getInstance().getListenPort())); + + // Number of entries we are sending. + int count = socketAddresses.size(); + + for (InetSocketAddress socketAddress : socketAddresses) { + // Hostname preferred, failing that IP address + if (socketAddress.isUnresolved()) { + String hostname = socketAddress.getHostString(); + + byte[] hostnameBytes = hostname.getBytes("UTF-8"); + + // We don't support hostnames that are longer than 256 bytes + if (hostnameBytes.length > 256) { + --count; + continue; + } + + bytes.write(hostnameBytes.length); + + bytes.write(hostnameBytes); + } else { + // IP address + byte[] ipAddressBytes = socketAddress.getAddress().getAddress(); + + // IPv4? Convert to IPv6 form + if (ipAddressBytes.length == 4) + ipAddressBytes = Bytes.concat(IPV6_V4_PREFIX, ipAddressBytes); + + // Write zero length to indicate IP address follows + bytes.write(0); + + bytes.write(ipAddressBytes); + } + + // Port + bytes.write(Ints.toByteArray(socketAddress.getPort())); + } + + // Prepend updated entry count + byte[] countBytes = Ints.toByteArray(count); + return Bytes.concat(countBytes, bytes.toByteArray()); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/SignaturesMessage.java b/src/main/java/org/qora/network/message/SignaturesMessage.java new file mode 100644 index 00000000..efe1cdad --- /dev/null +++ b/src/main/java/org/qora/network/message/SignaturesMessage.java @@ -0,0 +1,66 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.qora.transform.block.BlockTransformer; + +import com.google.common.primitives.Ints; + +public class SignaturesMessage extends Message { + + private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH; + + private List signatures; + + public SignaturesMessage(List signatures) { + this(-1, signatures); + } + + private SignaturesMessage(int id, List signatures) { + super(id, MessageType.SIGNATURES); + + this.signatures = signatures; + } + + public List getSignatures() { + return this.signatures; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int count = bytes.getInt(); + + if (bytes.remaining() != count * BLOCK_SIGNATURE_LENGTH) + return null; + + List signatures = new ArrayList<>(); + for (int i = 0; i < count; ++i) { + byte[] signature = new byte[BLOCK_SIGNATURE_LENGTH]; + bytes.get(signature); + signatures.add(signature); + } + + return new SignaturesMessage(id, signatures); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Ints.toByteArray(this.signatures.size())); + + for (byte[] signature : this.signatures) + bytes.write(signature); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/repository/Repository.java b/src/main/java/org/qora/repository/Repository.java index bbe1a06a..efab17e5 100644 --- a/src/main/java/org/qora/repository/Repository.java +++ b/src/main/java/org/qora/repository/Repository.java @@ -24,6 +24,10 @@ public interface Repository extends AutoCloseable { public void discardChanges() throws DataException; + void setSavepoint() throws DataException; + + void rollbackToSavepoint() throws DataException; + @Override public void close() throws DataException; diff --git a/src/main/java/org/qora/repository/TransactionRepository.java b/src/main/java/org/qora/repository/TransactionRepository.java index 25dbed5c..97a6aa34 100644 --- a/src/main/java/org/qora/repository/TransactionRepository.java +++ b/src/main/java/org/qora/repository/TransactionRepository.java @@ -44,6 +44,14 @@ public interface TransactionRepository { public List getAssetTransactions(int assetId, ConfirmationStatus confirmationStatus, Integer limit, Integer offset, Boolean reverse) throws DataException; + /** + * Returns whether transaction is confirmed or not. + * + * @param signature + * @return true if confirmed, false if not. + */ + public boolean isConfirmed(byte[] signature) throws DataException; + /** * Returns list of unconfirmed transactions in timestamp-else-signature order. *

@@ -75,7 +83,13 @@ public interface TransactionRepository { */ public void confirmTransaction(byte[] signature) throws DataException; - void unconfirmTransaction(TransactionData transactionData) throws DataException; + /** + * Add transaction to unconfirmed transactions pile. + * + * @param transactionData + * @throws DataException + */ + public void unconfirmTransaction(TransactionData transactionData) throws DataException; public void save(TransactionData transactionData) throws DataException; diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java index 54ea6175..e41e2c31 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java @@ -503,7 +503,7 @@ public class HSQLDBDatabaseUpdates { case 30: // Networking stmt.execute("CREATE TABLE Peers (hostname VARCHAR(255), port INTEGER, last_connected TIMESTAMP WITH TIME ZONE, last_attempted TIMESTAMP WITH TIME ZONE, " - + "last_height INTEGER, PRIMARY KEY (hostname, port))"); + + "last_height INTEGER, last_misbehaved TIMESTAMP WITH TIME ZONE, PRIMARY KEY (hostname, port))"); break; default: diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java index 4d8b0992..f8292680 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java @@ -24,7 +24,8 @@ public class HSQLDBNetworkRepository implements NetworkRepository { public List getAllPeers() throws DataException { List peers = new ArrayList<>(); - try (ResultSet resultSet = this.repository.checkedExecute("SELECT hostname, port, last_connected, last_attempted, last_height FROM Peers")) { + try (ResultSet resultSet = this.repository + .checkedExecute("SELECT hostname, port, last_connected, last_attempted, last_height, last_misbehaved FROM Peers")) { if (resultSet == null) return peers; @@ -44,7 +45,10 @@ public class HSQLDBNetworkRepository implements NetworkRepository { if (resultSet.wasNull()) lastHeight = null; - peers.add(new PeerData(socketAddress, lastConnected, lastAttempted, lastHeight)); + Timestamp lastMisbehavedTimestamp = resultSet.getTimestamp(6, Calendar.getInstance(HSQLDBRepository.UTC)); + Long lastMisbehaved = resultSet.wasNull() ? null : lastMisbehavedTimestamp.getTime(); + + peers.add(new PeerData(socketAddress, lastConnected, lastAttempted, lastHeight, lastMisbehaved)); } while (resultSet.next()); return peers; @@ -59,9 +63,11 @@ public class HSQLDBNetworkRepository implements NetworkRepository { Timestamp lastConnected = peerData.getLastConnected() == null ? null : new Timestamp(peerData.getLastConnected()); Timestamp lastAttempted = peerData.getLastAttempted() == null ? null : new Timestamp(peerData.getLastAttempted()); + Timestamp lastMisbehaved = peerData.getLastMisbehaved() == null ? null : new Timestamp(peerData.getLastMisbehaved()); saveHelper.bind("hostname", peerData.getSocketAddress().getHostString()).bind("port", peerData.getSocketAddress().getPort()) - .bind("last_connected", lastConnected).bind("last_attempted", lastAttempted).bind("last_height", peerData.getLastHeight()); + .bind("last_connected", lastConnected).bind("last_attempted", lastAttempted).bind("last_height", peerData.getLastHeight()) + .bind("last_misbehaved", lastMisbehaved); try { saveHelper.execute(this.repository); diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java index 16db33ad..09481e09 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java @@ -5,7 +5,10 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Savepoint; import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; import java.util.TimeZone; import org.apache.logging.log4j.LogManager; @@ -33,11 +36,13 @@ public class HSQLDBRepository implements Repository { public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); protected Connection connection; + protected List savepoints; protected boolean debugState = false; // NB: no visibility modifier so only callable from within same package HSQLDBRepository(Connection connection) { this.connection = connection; + this.savepoints = new ArrayList<>(); } @Override @@ -91,6 +96,8 @@ public class HSQLDBRepository implements Repository { this.connection.commit(); } catch (SQLException e) { throw new DataException("commit error", e); + } finally { + this.savepoints.clear(); } } @@ -100,6 +107,33 @@ public class HSQLDBRepository implements Repository { this.connection.rollback(); } catch (SQLException e) { throw new DataException("rollback error", e); + } finally { + this.savepoints.clear(); + } + } + + @Override + public void setSavepoint() throws DataException { + try { + Savepoint savepoint = this.connection.setSavepoint(); + this.savepoints.add(savepoint); + } catch (SQLException e) { + throw new DataException("savepoint error", e); + } + } + + @Override + public void rollbackToSavepoint() throws DataException { + if (this.savepoints.isEmpty()) + throw new DataException("no savepoint to rollback"); + + Savepoint savepoint = this.savepoints.get(0); + this.savepoints.remove(0); + + try { + this.connection.rollback(savepoint); + } catch (SQLException e) { + throw new DataException("savepoint rollback error", e); } } diff --git a/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java b/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java index dc4b710d..fb5054cf 100644 --- a/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java @@ -475,6 +475,15 @@ public class HSQLDBTransactionRepository implements TransactionRepository { } } + @Override + public boolean isConfirmed(byte[] signature) throws DataException { + try { + return this.repository.exists("BlockTransactions", "transaction_signature = ?", signature); + } catch (SQLException e) { + throw new DataException("Unable to check whether transaction is confirmed in repository", e); + } + } + @Override public List getUnconfirmedTransactions(Integer limit, Integer offset, Boolean reverse) throws DataException { String sql = "SELECT signature FROM UnconfirmedTransactions ORDER BY creation"; diff --git a/src/main/java/org/qora/transform/block/BlockTransformer.java b/src/main/java/org/qora/transform/block/BlockTransformer.java index b1b39685..748f9368 100644 --- a/src/main/java/org/qora/transform/block/BlockTransformer.java +++ b/src/main/java/org/qora/transform/block/BlockTransformer.java @@ -44,10 +44,10 @@ public class BlockTransformer extends Transformer { private static final int GENERATOR_LENGTH = PUBLIC_KEY_LENGTH; private static final int TRANSACTION_COUNT_LENGTH = INT_LENGTH; - private static final int BASE_LENGTH = VERSION_LENGTH + BLOCK_REFERENCE_LENGTH + TIMESTAMP_LENGTH + GENERATING_BALANCE_LENGTH + GENERATOR_LENGTH + private static final int BASE_LENGTH = VERSION_LENGTH + TIMESTAMP_LENGTH + BLOCK_REFERENCE_LENGTH + GENERATING_BALANCE_LENGTH + GENERATOR_LENGTH + TRANSACTIONS_SIGNATURE_LENGTH + GENERATOR_SIGNATURE_LENGTH + TRANSACTION_COUNT_LENGTH; - protected static final int BLOCK_SIGNATURE_LENGTH = GENERATOR_SIGNATURE_LENGTH + TRANSACTIONS_SIGNATURE_LENGTH; + public static final int BLOCK_SIGNATURE_LENGTH = GENERATOR_SIGNATURE_LENGTH + TRANSACTIONS_SIGNATURE_LENGTH; protected static final int TRANSACTION_SIZE_LENGTH = INT_LENGTH; // per transaction protected static final int AT_BYTES_LENGTH = INT_LENGTH; protected static final int AT_FEES_LENGTH = LONG_LENGTH; @@ -72,9 +72,20 @@ public class BlockTransformer extends Transformer { ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + return fromByteBuffer(byteBuffer); + } + + /** + * Extract block data and transaction data from serialized bytes. + * + * @param bytes + * @return BlockData and a List of transactions. + * @throws TransformationException + */ + public static Triple, List> fromByteBuffer(ByteBuffer byteBuffer) throws TransformationException { int version = byteBuffer.getInt(); - if (version >= 2 && bytes.length < BASE_LENGTH + AT_LENGTH) + if (version >= 2 && byteBuffer.remaining() < BASE_LENGTH + AT_BYTES_LENGTH - VERSION_LENGTH) throw new TransformationException("Byte data too short for V2+ Block"); long timestamp = byteBuffer.getLong(); diff --git a/src/main/java/org/qora/utils/NTP.java b/src/main/java/org/qora/utils/NTP.java index 9fd6f20e..c0a9f07e 100644 --- a/src/main/java/org/qora/utils/NTP.java +++ b/src/main/java/org/qora/utils/NTP.java @@ -24,7 +24,6 @@ public final class NTP { lastUpdate = System.currentTimeMillis(); // Log new value of offset - // TODO: LOGGER.info(Lang.getInstance().translate("Adjusting time with %offset% milliseconds.").replace("%offset%", String.valueOf(offset))); LOGGER.info("Adjusting time with %offset% milliseconds.".replace("%offset%", String.valueOf(offset))); }