From 06e6802d97ecda5cd9438da8ab3e00ca6d2b302e Mon Sep 17 00:00:00 2001 From: catbref Date: Mon, 6 May 2019 09:17:54 +0100 Subject: [PATCH] More work on synchronization Various fixes to synchronization Added missing code for processing incoming block summaries in Network. Fixed block summaries serialization and removed references to BlockData. Fixed bug in transaction transformation where base transaction length didn't include reference or fee lengths. Original commit was ebbab7b --- .../java/org/qora/controller/Controller.java | 46 ++++++++++++++++--- .../org/qora/controller/Synchronizer.java | 31 +++++++++---- src/main/java/org/qora/network/Network.java | 2 + .../message/BlockSummariesMessage.java | 9 ++-- .../message/GetBlockSummariesMessage.java | 4 +- .../transaction/TransactionTransformer.java | 4 +- 6 files changed, 72 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index c414cc73..9d2811c7 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -21,12 +21,15 @@ import org.qora.block.Block; import org.qora.block.BlockChain; import org.qora.block.BlockGenerator; import org.qora.data.block.BlockData; +import org.qora.data.network.BlockSummaryData; import org.qora.data.network.PeerData; import org.qora.data.transaction.TransactionData; import org.qora.network.Network; import org.qora.network.Peer; import org.qora.network.message.BlockMessage; +import org.qora.network.message.BlockSummariesMessage; import org.qora.network.message.GetBlockMessage; +import org.qora.network.message.GetBlockSummariesMessage; import org.qora.network.message.GetPeersMessage; import org.qora.network.message.GetSignaturesMessage; import org.qora.network.message.HeightMessage; @@ -204,7 +207,7 @@ public class Controller extends Thread { potentiallySynchronize(); - // Query random connections for unconfirmed transactions + // Query random connections for unconfirmed transactions? } } catch (InterruptedException e) { // time to exit @@ -225,10 +228,10 @@ public class Controller extends Thread { for(Peer peer : peers) LOGGER.trace(String.format("Peer %s is at height %d", peer, peer.getPeerData().getLastHeight())); - // Remove peers with lower, or unknown, height + // Remove peers with unknown, or same, height peers.removeIf(peer -> { Integer peerHeight = peer.getPeerData().getLastHeight(); - return peerHeight == null || peerHeight <= ourHeight; + return peerHeight == null; }); // Remove peers that have "misbehaved" recently @@ -260,8 +263,10 @@ public class Controller extends Thread { LOGGER.debug(String.format("Synchronized with peer %s", peer)); - // Broadcast our new height - Network.getInstance().broadcast(recipientPeer -> new HeightMessage(getChainHeight())); + // Broadcast our new height (if changed) + int updatedHeight = getChainHeight(); + if (updatedHeight != ourHeight) + Network.getInstance().broadcast(recipientPeer -> new HeightMessage(updatedHeight)); } } @@ -363,7 +368,7 @@ public class Controller extends Thread { parentSignature = blockData.getSignature(); signatures.add(parentSignature); - } while (signatures.size() < 500); + } while (signatures.size() < Network.MAX_SIGNATURES_PER_REPLY); Message signaturesMessage = new SignaturesMessage(signatures); signaturesMessage.setId(message.getId()); @@ -432,6 +437,35 @@ public class Controller extends Thread { } break; + case GET_BLOCK_SUMMARIES: + try (final Repository repository = RepositoryManager.getRepository()) { + GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; + byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); + + List blockSummaries = new ArrayList<>(); + + int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested()); + + do { + BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); + + if (blockData == null) + break; + + BlockSummaryData blockSummary = new BlockSummaryData(blockData); + blockSummaries.add(blockSummary); + parentSignature = blockData.getSignature(); + } while (blockSummaries.size() < numberRequested); + + Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries); + blockSummariesMessage.setId(message.getId()); + if (!peer.sendMessage(blockSummariesMessage)) + peer.disconnect(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + } + break; + default: break; } diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index c6cb2b29..298a9624 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -48,6 +48,15 @@ public class Synchronizer { return instance; } + /** + * Attempt to synchronize blockchain with peer. + *

+ * Will return true if synchronization succeeded, + * even if no changes were made to our blockchain. + *

+ * @param peer + * @return false if something went wrong, true otherwise. + */ public boolean synchronize(Peer peer) { // Make sure we're the only thread modifying the blockchain // If we're already synchronizing with another peer then this will also return fast @@ -58,9 +67,9 @@ public class Synchronizer { try { this.repository = repository; this.ourHeight = this.repository.getBlockRepository().getBlockchainHeight(); - int peerHeight = peer.getPeerData().getLastHeight(); + final int peerHeight = peer.getPeerData().getLastHeight(); - LOGGER.info(String.format("Synchronizing with peer %s from height %d to height %d", peer, this.ourHeight, peerHeight)); + LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, this.ourHeight)); List signatures = findSignaturesFromCommonBlock(peer); if (signatures == null) { @@ -70,13 +79,17 @@ public class Synchronizer { // First signature is common block BlockData commonBlockData = this.repository.getBlockRepository().fromSignature(signatures.get(0)); - int commonBlockHeight = commonBlockData.getHeight(); + final int commonBlockHeight = commonBlockData.getHeight(); LOGGER.info(String.format("Common block with peer %s is at height %d", peer, commonBlockHeight)); signatures.remove(0); + // If common block is peer's latest block then we simply have a longer chain to peer, so exit now + if (commonBlockHeight == peerHeight) + return true; + // If common block is too far behind us then we're on massively different forks so give up. int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA; - if (commonBlockData.getHeight() < minHeight) { + if (commonBlockHeight < minHeight) { LOGGER.info(String.format("Blockchain too divergent with peer %s", peer)); return false; } @@ -117,11 +130,11 @@ public class Synchronizer { } } - if (this.ourHeight > commonBlockData.getHeight()) { + if (this.ourHeight > commonBlockHeight) { // Unwind to common block (unless common block is our latest block) - LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockData.getHeight())); + LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockHeight)); - while (this.ourHeight > commonBlockData.getHeight()) { + while (this.ourHeight > commonBlockHeight) { BlockData blockData = repository.getBlockRepository().fromHeight(this.ourHeight); Block block = new Block(repository, blockData); block.orphan(); @@ -129,7 +142,7 @@ public class Synchronizer { --this.ourHeight; } - LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockData.getHeight(), peer)); + LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockHeight, peer)); } else { LOGGER.debug(String.format("Fetching new blocks from peer %s", peer)); } @@ -193,7 +206,7 @@ public class Synchronizer { } /** - * Returns list of block signatures start with common block with peer. + * Returns list of peer's block signatures starting with common block with peer. * * @param peer * @return block signatures diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index 506b0f13..116bd1fb 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -55,6 +55,8 @@ public class Network extends Thread { /** Maximum time since last successful connection for peer info to be propagated, in milliseconds. */ private static final long RECENT_CONNECTION_THRESHOLD = 24 * 60 * 60 * 1000; // ms + public static final int MAX_SIGNATURES_PER_REPLY = 500; + public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500; public static final int PEER_ID_LENGTH = 128; private final byte[] ourPeerId; diff --git a/src/main/java/org/qora/network/message/BlockSummariesMessage.java b/src/main/java/org/qora/network/message/BlockSummariesMessage.java index ed79d2ab..74b53552 100644 --- a/src/main/java/org/qora/network/message/BlockSummariesMessage.java +++ b/src/main/java/org/qora/network/message/BlockSummariesMessage.java @@ -6,9 +6,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; -import org.qora.data.block.BlockData; import org.qora.data.network.BlockSummaryData; import org.qora.transform.Transformer; import org.qora.transform.block.BlockTransformer; @@ -21,9 +19,8 @@ public class BlockSummariesMessage extends Message { private List blockSummaries; - public BlockSummariesMessage(List blocksData) { - // Extract what we need from block data - this(-1, blocksData.stream().map(blockData -> new BlockSummaryData(blockData)).collect(Collectors.toList())); + public BlockSummariesMessage(List blockSummaries) { + this(-1, blockSummaries); } private BlockSummariesMessage(int id, List blockSummaries) { @@ -67,7 +64,7 @@ public class BlockSummariesMessage extends Message { bytes.write(Ints.toByteArray(this.blockSummaries.size())); for (BlockSummaryData blockSummary : this.blockSummaries) { - bytes.write(blockSummary.getHeight()); + bytes.write(Ints.toByteArray(blockSummary.getHeight())); bytes.write(blockSummary.getSignature()); bytes.write(blockSummary.getGeneratorPublicKey()); } diff --git a/src/main/java/org/qora/network/message/GetBlockSummariesMessage.java b/src/main/java/org/qora/network/message/GetBlockSummariesMessage.java index 057b9c83..c5d0d730 100644 --- a/src/main/java/org/qora/network/message/GetBlockSummariesMessage.java +++ b/src/main/java/org/qora/network/message/GetBlockSummariesMessage.java @@ -8,6 +8,8 @@ import java.nio.ByteBuffer; import org.qora.transform.Transformer; import org.qora.transform.block.BlockTransformer; +import com.google.common.primitives.Ints; + public class GetBlockSummariesMessage extends Message { private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH; @@ -53,7 +55,7 @@ public class GetBlockSummariesMessage extends Message { bytes.write(this.parentSignature); - bytes.write(this.numberRequested); + bytes.write(Ints.toByteArray(this.numberRequested)); return bytes.toByteArray(); } catch (IOException e) { diff --git a/src/main/java/org/qora/transform/transaction/TransactionTransformer.java b/src/main/java/org/qora/transform/transaction/TransactionTransformer.java index d8147a4e..43d0d95d 100644 --- a/src/main/java/org/qora/transform/transaction/TransactionTransformer.java +++ b/src/main/java/org/qora/transform/transaction/TransactionTransformer.java @@ -222,8 +222,8 @@ public abstract class TransactionTransformer extends Transformer { } protected static int getBaseLength(TransactionData transactionData) { - // All transactions have at least txType, timestamp, maybe txGroupId, tx creator's public key and finally signature (on the end) - int baseLength = TYPE_LENGTH + TIMESTAMP_LENGTH + PUBLIC_KEY_LENGTH + SIGNATURE_LENGTH; + // All transactions have at least txType, timestamp, reference, tx creator's public key and also fee and signature (on the end) + int baseLength = TYPE_LENGTH + TIMESTAMP_LENGTH + REFERENCE_LENGTH + PUBLIC_KEY_LENGTH + FEE_LENGTH + SIGNATURE_LENGTH; if (transactionData.getTimestamp() >= BlockChain.getInstance().getQoraV2Timestamp()) baseLength += GROUPID_LENGTH;