forked from Qortal/qortal
Removed all code relating to "fast sync" as it is not complete - we can reintroduce this later.
This commit is contained in:
parent
be3069e0e5
commit
411279b3eb
@ -187,18 +187,6 @@ public class Controller extends Thread {
|
||||
}
|
||||
public GetBlockMessageStats getBlockMessageStats = new GetBlockMessageStats();
|
||||
|
||||
public static class GetBlocksMessageStats {
|
||||
public AtomicLong requests = new AtomicLong();
|
||||
public AtomicLong cacheHits = new AtomicLong();
|
||||
public AtomicLong unknownBlocks = new AtomicLong();
|
||||
public AtomicLong cacheFills = new AtomicLong();
|
||||
public AtomicLong fullyFromCache = new AtomicLong();
|
||||
|
||||
public GetBlocksMessageStats() {
|
||||
}
|
||||
}
|
||||
public GetBlocksMessageStats getBlocksMessageStats = new GetBlocksMessageStats();
|
||||
|
||||
public static class GetBlockSummariesStats {
|
||||
public AtomicLong requests = new AtomicLong();
|
||||
public AtomicLong cacheHits = new AtomicLong();
|
||||
@ -1350,10 +1338,6 @@ public class Controller extends Thread {
|
||||
onNetworkGetBlockMessage(peer, message);
|
||||
break;
|
||||
|
||||
case GET_BLOCKS:
|
||||
onNetworkGetBlocksMessage(peer, message);
|
||||
break;
|
||||
|
||||
case TRANSACTION:
|
||||
onNetworkTransactionMessage(peer, message);
|
||||
break;
|
||||
@ -1508,54 +1492,6 @@ public class Controller extends Thread {
|
||||
}
|
||||
}
|
||||
|
||||
private void onNetworkGetBlocksMessage(Peer peer, Message message) {
|
||||
GetBlocksMessage getBlocksMessage = (GetBlocksMessage) message;
|
||||
byte[] parentSignature = getBlocksMessage.getParentSignature();
|
||||
this.stats.getBlocksMessageStats.requests.incrementAndGet();
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
// If peer's parent signature matches our latest block signature
|
||||
// then we can short-circuit with an empty response
|
||||
BlockData chainTip = getChainTip();
|
||||
if (chainTip != null && Arrays.equals(parentSignature, chainTip.getSignature())) {
|
||||
Message blocksMessage = new BlocksMessage(Collections.emptyList());
|
||||
blocksMessage.setId(message.getId());
|
||||
if (!peer.sendMessage(blocksMessage))
|
||||
peer.disconnect("failed to send blocks");
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Ensure that we don't serve more blocks than the amount specified in the settings
|
||||
// Serializing multiple blocks is very slow, so by default we are using a low limit
|
||||
int blockLimitPerRequest = Settings.getInstance().getMaxBlocksPerResponse();
|
||||
int untrimmedBlockLimitPerRequest = Settings.getInstance().getMaxBlocksPerResponse();
|
||||
int numberRequested = Math.min(blockLimitPerRequest, getBlocksMessage.getNumberRequested());
|
||||
|
||||
List<Block> blocks = new ArrayList<>();
|
||||
BlockData blockData = repository.getBlockRepository().fromReference(parentSignature);
|
||||
|
||||
while (blockData != null && blocks.size() < numberRequested) {
|
||||
// If we're dealing with untrimmed blocks, ensure we don't go above the untrimmedBlockLimitPerRequest
|
||||
if (blockData.isTrimmed() == false && blocks.size() >= untrimmedBlockLimitPerRequest) {
|
||||
break;
|
||||
}
|
||||
Block block = new Block(repository, blockData);
|
||||
blocks.add(block);
|
||||
blockData = repository.getBlockRepository().fromReference(blockData.getSignature());
|
||||
}
|
||||
|
||||
Message blocksMessage = new BlocksMessage(blocks);
|
||||
blocksMessage.setId(message.getId());
|
||||
if (!peer.sendMessageWithTimeout(blocksMessage, FETCH_BLOCKS_TIMEOUT))
|
||||
peer.disconnect("failed to send blocks");
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while sending blocks after %s to peer %s", Base58.encode(parentSignature), peer), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void onNetworkTransactionMessage(Peer peer, Message message) {
|
||||
TransactionMessage transactionMessage = (TransactionMessage) message;
|
||||
TransactionData transactionData = transactionMessage.getTransactionData();
|
||||
|
@ -22,10 +22,8 @@ import org.qortal.data.transaction.RewardShareTransactionData;
|
||||
import org.qortal.data.transaction.TransactionData;
|
||||
import org.qortal.network.Peer;
|
||||
import org.qortal.network.message.BlockMessage;
|
||||
import org.qortal.network.message.BlocksMessage;
|
||||
import org.qortal.network.message.BlockSummariesMessage;
|
||||
import org.qortal.network.message.GetBlockMessage;
|
||||
import org.qortal.network.message.GetBlocksMessage;
|
||||
import org.qortal.network.message.GetBlockSummariesMessage;
|
||||
import org.qortal.network.message.GetSignaturesV2Message;
|
||||
import org.qortal.network.message.Message;
|
||||
@ -59,9 +57,6 @@ public class Synchronizer {
|
||||
/** Maximum number of block signatures we ask from peer in one go */
|
||||
private static final int MAXIMUM_REQUEST_SIZE = 200; // XXX move to Settings?
|
||||
|
||||
/* Minimum peer version that supports syncing multiple blocks at once via GetBlocksMessage */
|
||||
private static final long PEER_VERSION_160 = 0x0100060000L;
|
||||
|
||||
|
||||
|
||||
|
||||
@ -1108,107 +1103,7 @@ public class Synchronizer {
|
||||
|
||||
private SynchronizationResult applyNewBlocks(Repository repository, BlockData commonBlockData, int ourInitialHeight,
|
||||
Peer peer, int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws InterruptedException, DataException {
|
||||
|
||||
final BlockData ourLatestBlockData = repository.getBlockRepository().getLastBlock();
|
||||
if (Settings.getInstance().isFastSyncEnabled() && peer.getPeersVersion() >= PEER_VERSION_160 && ourLatestBlockData.isTrimmed())
|
||||
// This peer supports syncing multiple blocks at once via GetBlocksMessage, and it is enabled in the settings
|
||||
return this.applyNewBlocksUsingFastSync(repository, commonBlockData, ourInitialHeight, peer, peerHeight, peerBlockSummaries);
|
||||
else
|
||||
// Older peer version, or fast sync is disabled in the settings - use slow sync
|
||||
return this.applyNewBlocksUsingSlowSync(repository, commonBlockData, ourInitialHeight, peer, peerHeight, peerBlockSummaries);
|
||||
|
||||
}
|
||||
|
||||
private SynchronizationResult applyNewBlocksUsingFastSync(Repository repository, BlockData commonBlockData, int ourInitialHeight,
|
||||
Peer peer, int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws InterruptedException, DataException {
|
||||
LOGGER.debug(String.format("Fetching new blocks from peer %s using fast sync", peer));
|
||||
|
||||
final int commonBlockHeight = commonBlockData.getHeight();
|
||||
final byte[] commonBlockSig = commonBlockData.getSignature();
|
||||
byte[] latestPeerSignature = commonBlockSig;
|
||||
|
||||
int ourHeight = ourInitialHeight;
|
||||
|
||||
// Fetch, and apply, blocks from peer
|
||||
int maxBatchHeight = commonBlockHeight + SYNC_BATCH_SIZE;
|
||||
|
||||
// Ensure that we don't request more blocks than specified in the settings
|
||||
int maxBlocksPerRequest = Settings.getInstance().getMaxBlocksPerRequest();
|
||||
|
||||
while (ourHeight < peerHeight && ourHeight < maxBatchHeight) {
|
||||
if (Controller.isStopping())
|
||||
return SynchronizationResult.SHUTTING_DOWN;
|
||||
|
||||
int numberRequested = Math.min(maxBatchHeight - ourHeight, maxBlocksPerRequest);
|
||||
|
||||
LOGGER.trace(String.format("Fetching %d blocks after height %d, sig %.8s from %s", numberRequested, ourHeight, Base58.encode(latestPeerSignature), peer));
|
||||
List<Block> blocks = this.fetchBlocks(repository, peer, latestPeerSignature, numberRequested);
|
||||
if (blocks == null || blocks.isEmpty()) {
|
||||
LOGGER.info(String.format("Peer %s failed to respond with more blocks after height %d, sig %.8s", peer,
|
||||
ourHeight, Base58.encode(latestPeerSignature)));
|
||||
return SynchronizationResult.NO_REPLY;
|
||||
}
|
||||
LOGGER.trace(String.format("Received %d blocks after height %d, sig %.8s from %s", blocks.size(), ourHeight, Base58.encode(latestPeerSignature), peer));
|
||||
|
||||
for (Block newBlock : blocks) {
|
||||
++ourHeight;
|
||||
|
||||
if (Controller.isStopping())
|
||||
return SynchronizationResult.SHUTTING_DOWN;
|
||||
|
||||
if (newBlock == null) {
|
||||
LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer,
|
||||
ourHeight, Base58.encode(latestPeerSignature)));
|
||||
return SynchronizationResult.NO_REPLY;
|
||||
}
|
||||
|
||||
if (!newBlock.isSignatureValid()) {
|
||||
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer,
|
||||
ourHeight, Base58.encode(latestPeerSignature)));
|
||||
return SynchronizationResult.INVALID_DATA;
|
||||
}
|
||||
|
||||
// Set the repository, because we couldn't do that when originally constructing the Block
|
||||
newBlock.setRepository(repository);
|
||||
|
||||
// Transactions are transmitted without approval status so determine that now
|
||||
for (Transaction transaction : newBlock.getTransactions()) {
|
||||
transaction.setInitialApprovalStatus();
|
||||
}
|
||||
|
||||
ValidationResult blockResult = newBlock.isValid();
|
||||
if (blockResult != ValidationResult.OK) {
|
||||
LOGGER.info(String.format("Peer %s sent invalid block for height %d, sig %.8s: %s", peer,
|
||||
ourHeight, Base58.encode(latestPeerSignature), blockResult.name()));
|
||||
return SynchronizationResult.INVALID_DATA;
|
||||
}
|
||||
|
||||
// Save transactions attached to this block
|
||||
for (Transaction transaction : newBlock.getTransactions()) {
|
||||
TransactionData transactionData = transaction.getTransactionData();
|
||||
repository.getTransactionRepository().save(transactionData);
|
||||
}
|
||||
|
||||
newBlock.process();
|
||||
|
||||
LOGGER.trace(String.format("Processed block height %d, sig %.8s", newBlock.getBlockData().getHeight(), Base58.encode(newBlock.getBlockData().getSignature())));
|
||||
|
||||
repository.saveChanges();
|
||||
|
||||
Controller.getInstance().onNewBlock(newBlock.getBlockData());
|
||||
|
||||
// Update latestPeerSignature so that subsequent batches start requesting from the correct block
|
||||
latestPeerSignature = newBlock.getSignature();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return SynchronizationResult.OK;
|
||||
}
|
||||
|
||||
private SynchronizationResult applyNewBlocksUsingSlowSync(Repository repository, BlockData commonBlockData, int ourInitialHeight,
|
||||
Peer peer, int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws InterruptedException, DataException {
|
||||
LOGGER.debug(String.format("Fetching new blocks from peer %s using slow sync", peer));
|
||||
LOGGER.debug(String.format("Fetching new blocks from peer %s", peer));
|
||||
|
||||
final int commonBlockHeight = commonBlockData.getHeight();
|
||||
final byte[] commonBlockSig = commonBlockData.getSignature();
|
||||
@ -1336,22 +1231,6 @@ public class Synchronizer {
|
||||
return new Block(repository, blockMessage.getBlockData(), blockMessage.getTransactions(), blockMessage.getAtStates());
|
||||
}
|
||||
|
||||
private List<Block> fetchBlocks(Repository repository, Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException {
|
||||
Message getBlocksMessage = new GetBlocksMessage(parentSignature, numberRequested);
|
||||
|
||||
Message message = peer.getResponseWithTimeout(getBlocksMessage, FETCH_BLOCKS_TIMEOUT);
|
||||
if (message == null || message.getType() != MessageType.BLOCKS) {
|
||||
return null;
|
||||
}
|
||||
|
||||
BlocksMessage blocksMessage = (BlocksMessage) message;
|
||||
if (blocksMessage == null || blocksMessage.getBlocks() == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return blocksMessage.getBlocks();
|
||||
}
|
||||
|
||||
private void populateBlockSummariesMinterLevels(Repository repository, List<BlockSummaryData> blockSummaries) throws DataException {
|
||||
final int firstBlockHeight = blockSummaries.get(0).getHeight();
|
||||
|
||||
|
@ -1,91 +0,0 @@
|
||||
package org.qortal.network.message;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.block.Block;
|
||||
import org.qortal.data.at.ATStateData;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.data.transaction.TransactionData;
|
||||
import org.qortal.transform.TransformationException;
|
||||
import org.qortal.transform.block.BlockTransformer;
|
||||
import org.qortal.utils.Triple;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class BlocksMessage extends Message {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(BlocksMessage.class);
|
||||
|
||||
private List<Block> blocks;
|
||||
|
||||
public BlocksMessage(List<Block> blocks) {
|
||||
this(-1, blocks);
|
||||
}
|
||||
|
||||
private BlocksMessage(int id, List<Block> blocks) {
|
||||
super(id, MessageType.BLOCKS);
|
||||
|
||||
this.blocks = blocks;
|
||||
}
|
||||
|
||||
public List<Block> getBlocks() {
|
||||
return this.blocks;
|
||||
}
|
||||
|
||||
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
|
||||
|
||||
int count = bytes.getInt();
|
||||
List<Block> blocks = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < count; ++i) {
|
||||
int height = bytes.getInt();
|
||||
|
||||
try {
|
||||
boolean finalBlockInBuffer = (i == count-1);
|
||||
|
||||
Triple<BlockData, List<TransactionData>, List<ATStateData>> blockInfo = null;
|
||||
blockInfo = BlockTransformer.fromByteBuffer(bytes, finalBlockInBuffer);
|
||||
BlockData blockData = blockInfo.getA();
|
||||
blockData.setHeight(height);
|
||||
|
||||
// We are unable to obtain a valid Repository instance here, so set it to null and we will attach it later
|
||||
Block block = new Block(null, blockData, blockInfo.getB(), blockInfo.getC());
|
||||
blocks.add(block);
|
||||
|
||||
} catch (TransformationException e) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return new BlocksMessage(id, blocks);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] toData() {
|
||||
try {
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
|
||||
bytes.write(Ints.toByteArray(this.blocks.size()));
|
||||
|
||||
for (Block block : this.blocks) {
|
||||
bytes.write(Ints.toByteArray(block.getBlockData().getHeight()));
|
||||
bytes.write(BlockTransformer.toBytes(block));
|
||||
}
|
||||
LOGGER.trace(String.format("Total length of %d blocks is %d bytes", this.blocks.size(), bytes.size()));
|
||||
|
||||
return bytes.toByteArray();
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
} catch (TransformationException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,65 +0,0 @@
|
||||
package org.qortal.network.message;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.qortal.transform.Transformer;
|
||||
import org.qortal.transform.block.BlockTransformer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class GetBlocksMessage extends Message {
|
||||
|
||||
private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH;
|
||||
|
||||
private byte[] parentSignature;
|
||||
private int numberRequested;
|
||||
|
||||
public GetBlocksMessage(byte[] parentSignature, int numberRequested) {
|
||||
this(-1, parentSignature, numberRequested);
|
||||
}
|
||||
|
||||
private GetBlocksMessage(int id, byte[] parentSignature, int numberRequested) {
|
||||
super(id, MessageType.GET_BLOCKS);
|
||||
|
||||
this.parentSignature = parentSignature;
|
||||
this.numberRequested = numberRequested;
|
||||
}
|
||||
|
||||
public byte[] getParentSignature() {
|
||||
return this.parentSignature;
|
||||
}
|
||||
|
||||
public int getNumberRequested() {
|
||||
return this.numberRequested;
|
||||
}
|
||||
|
||||
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
|
||||
if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH + Transformer.INT_LENGTH)
|
||||
return null;
|
||||
|
||||
byte[] parentSignature = new byte[BLOCK_SIGNATURE_LENGTH];
|
||||
bytes.get(parentSignature);
|
||||
|
||||
int numberRequested = bytes.getInt();
|
||||
|
||||
return new GetBlocksMessage(id, parentSignature, numberRequested);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] toData() {
|
||||
try {
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
|
||||
bytes.write(this.parentSignature);
|
||||
|
||||
bytes.write(Ints.toByteArray(this.numberRequested));
|
||||
|
||||
return bytes.toByteArray();
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user