forked from Qortal/qortal
Rewrite of Synchronizer.syncToPeerChain(), this time borrowing ideas from Synchronizer.applyNewBlocks().
Main differences / improvements: - Only request a single batch of signatures upfront, instead of the entire peer's chain. There is no point in requesting them all, as the later ones may not be valid by the time we have finished requesting all the blocks before them. - If we fail to fetch a block, clear any queued signatures that are in memory and re-fetch signatures after the last block received. This allows us to cope with peers that re-org whilst we are syncing with them. - If we can't find any more block signatures, or the peer fails to respond to a block, apply our progress anyway. This should reduce wasted work and network congestion, and helps cope with larger peer re-orgs. - The retry mechanism remains in place, but instead of fetching the same incorrect block over and over, it will attempt to locate a new block signature each time, as described above. To help reduce code complexity, block signature requests are no longer retried.
This commit is contained in:
parent
30160e2843
commit
e89d31eb5a
@ -354,41 +354,85 @@ public class Synchronizer {
|
||||
final byte[] commonBlockSig = commonBlockData.getSignature();
|
||||
String commonBlockSig58 = Base58.encode(commonBlockSig);
|
||||
|
||||
byte[] latestPeerSignature = commonBlockSig;
|
||||
int height = commonBlockHeight;
|
||||
|
||||
LOGGER.debug(() -> String.format("Fetching peer %s chain from height %d, sig %.8s", peer, commonBlockHeight, commonBlockSig58));
|
||||
|
||||
int ourHeight = ourInitialHeight;
|
||||
|
||||
// Overall plan: fetch peer's blocks first, then orphan, then apply
|
||||
|
||||
// Convert any leftover (post-common) block summaries into signatures to request from peer
|
||||
List<byte[]> peerBlockSignatures = peerBlockSummaries.stream().map(BlockSummaryData::getSignature).collect(Collectors.toList());
|
||||
|
||||
// Keep a list of blocks received so far
|
||||
List<Block> peerBlocks = new ArrayList<>();
|
||||
|
||||
// Calculate the total number of additional blocks this peer has beyond the common block
|
||||
int additionalPeerBlocksAfterCommonBlock = peerHeight - commonBlockHeight;
|
||||
// Subtract the number of signatures that we already have, as we don't need to request them again
|
||||
int numberSignaturesRequired = additionalPeerBlocksAfterCommonBlock - peerBlockSignatures.size();
|
||||
|
||||
// Fetch remaining block signatures, if needed
|
||||
int retryCount = 0;
|
||||
while (numberSignaturesRequired > 0) {
|
||||
byte[] latestPeerSignature = peerBlockSignatures.isEmpty() ? commonBlockSig : peerBlockSignatures.get(peerBlockSignatures.size() - 1);
|
||||
int lastPeerHeight = commonBlockHeight + peerBlockSignatures.size();
|
||||
int numberOfSignaturesToRequest = Math.min(numberSignaturesRequired, MAXIMUM_REQUEST_SIZE);
|
||||
while (height < peerHeight) {
|
||||
if (Controller.isStopping())
|
||||
return SynchronizationResult.SHUTTING_DOWN;
|
||||
|
||||
// Ensure we don't request more than MAXIMUM_REQUEST_SIZE
|
||||
int numberRequested = Math.min(numberSignaturesRequired, MAXIMUM_REQUEST_SIZE);
|
||||
|
||||
// Do we need more signatures?
|
||||
if (peerBlockSignatures.isEmpty() && numberRequested > 0) {
|
||||
LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s",
|
||||
numberOfSignaturesToRequest, (numberOfSignaturesToRequest != 1 ? "s": ""), lastPeerHeight, Base58.encode(latestPeerSignature)));
|
||||
numberRequested, (numberRequested != 1 ? "s" : ""), height, Base58.encode(latestPeerSignature)));
|
||||
|
||||
List<byte[]> moreBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberOfSignaturesToRequest);
|
||||
peerBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberRequested);
|
||||
|
||||
if (moreBlockSignatures == null || moreBlockSignatures.isEmpty()) {
|
||||
if (peerBlockSignatures == null || peerBlockSignatures.isEmpty()) {
|
||||
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer,
|
||||
lastPeerHeight, Base58.encode(latestPeerSignature)));
|
||||
height, Base58.encode(latestPeerSignature)));
|
||||
|
||||
if (retryCount >= MAXIMUM_RETRIES) {
|
||||
// Give up with this peer
|
||||
// If we have already received blocks from this peer, go ahead and apply them
|
||||
if (peerBlocks.size() > 0) {
|
||||
break;
|
||||
}
|
||||
// Otherwise, give up and move on to the next peer
|
||||
return SynchronizationResult.NO_REPLY;
|
||||
}
|
||||
else {
|
||||
|
||||
numberSignaturesRequired = peerHeight - height - peerBlockSignatures.size();
|
||||
LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : "")));
|
||||
}
|
||||
|
||||
if (peerBlockSignatures.isEmpty()) {
|
||||
LOGGER.trace(String.format("No more signatures or blocks to request from peer %s", peer));
|
||||
break;
|
||||
}
|
||||
|
||||
byte[] nextPeerSignature = peerBlockSignatures.get(0);
|
||||
int nextHeight = height + 1;
|
||||
|
||||
LOGGER.trace(String.format("Fetching block %d, sig %.8s from %s", nextHeight, Base58.encode(nextPeerSignature), peer));
|
||||
Block newBlock = this.fetchBlock(repository, peer, nextPeerSignature);
|
||||
|
||||
if (newBlock == null) {
|
||||
LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer,
|
||||
nextHeight, Base58.encode(nextPeerSignature)));
|
||||
|
||||
if (retryCount >= MAXIMUM_RETRIES) {
|
||||
|
||||
// If we have already received blocks from this peer, go ahead and apply them
|
||||
if (peerBlocks.size() > 0) {
|
||||
break;
|
||||
}
|
||||
// Otherwise, give up and move on to the next peer
|
||||
return SynchronizationResult.NO_REPLY;
|
||||
|
||||
} else {
|
||||
// Re-fetch signatures, in case the peer is now on a different fork
|
||||
peerBlockSignatures.clear();
|
||||
numberSignaturesRequired = peerHeight - height;
|
||||
|
||||
// Retry until retryCount reaches MAXIMUM_RETRIES
|
||||
retryCount++;
|
||||
int triesRemaining = MAXIMUM_RETRIES - retryCount;
|
||||
@ -400,62 +444,31 @@ public class Synchronizer {
|
||||
// Reset retryCount because the last request succeeded
|
||||
retryCount = 0;
|
||||
|
||||
LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : "")));
|
||||
|
||||
peerBlockSignatures.addAll(moreBlockSignatures);
|
||||
numberSignaturesRequired = additionalPeerBlocksAfterCommonBlock - peerBlockSignatures.size();
|
||||
}
|
||||
|
||||
// Fetch blocks using signatures
|
||||
LOGGER.debug(String.format("Fetching new blocks from peer %s after height %d", peer, commonBlockHeight));
|
||||
List<Block> peerBlocks = new ArrayList<>();
|
||||
|
||||
retryCount = 0;
|
||||
while (peerBlocks.size() < peerBlockSignatures.size()) {
|
||||
byte[] blockSignature = peerBlockSignatures.get(peerBlocks.size());
|
||||
|
||||
LOGGER.debug(String.format("Fetching block with signature %.8s", Base58.encode(blockSignature)));
|
||||
int blockHeightToRequest = commonBlockHeight + peerBlocks.size() + 1; // +1 because we are requesting the next block, beyond what we already have in the peerBlocks array
|
||||
Block newBlock = this.fetchBlock(repository, peer, blockSignature);
|
||||
|
||||
if (newBlock == null) {
|
||||
LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer, blockHeightToRequest, Base58.encode(blockSignature)));
|
||||
|
||||
if (retryCount >= MAXIMUM_RETRIES) {
|
||||
// Give up with this peer
|
||||
return SynchronizationResult.NO_REPLY;
|
||||
}
|
||||
else {
|
||||
// Retry until retryCount reaches MAXIMUM_RETRIES
|
||||
retryCount++;
|
||||
int triesRemaining = MAXIMUM_RETRIES - retryCount;
|
||||
LOGGER.info(String.format("Re-issuing request to peer %s (%d attempt%s remaining)", peer, triesRemaining, (triesRemaining != 1 ? "s": "")));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
LOGGER.trace(String.format("Fetched block %d, sig %.8s from %s", nextHeight, Base58.encode(latestPeerSignature), peer));
|
||||
|
||||
if (!newBlock.isSignatureValid()) {
|
||||
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer,
|
||||
blockHeightToRequest, Base58.encode(blockSignature)));
|
||||
nextHeight, Base58.encode(latestPeerSignature)));
|
||||
return SynchronizationResult.INVALID_DATA;
|
||||
}
|
||||
|
||||
// Reset retryCount because the last request succeeded
|
||||
retryCount = 0;
|
||||
|
||||
LOGGER.debug(String.format("Received block with height %d, sig: %.8s", newBlock.getBlockData().getHeight(), Base58.encode(blockSignature)));
|
||||
|
||||
// Transactions are transmitted without approval status so determine that now
|
||||
for (Transaction transaction : newBlock.getTransactions())
|
||||
transaction.setInitialApprovalStatus();
|
||||
|
||||
peerBlocks.add(newBlock);
|
||||
|
||||
// Now that we've received this block, we can increase our height and move on to the next one
|
||||
latestPeerSignature = nextPeerSignature;
|
||||
peerBlockSignatures.remove(0);
|
||||
++height;
|
||||
}
|
||||
|
||||
// Unwind to common block (unless common block is our latest block)
|
||||
LOGGER.debug(String.format("Orphaning blocks back to common block height %d, sig %.8s", commonBlockHeight, commonBlockSig58));
|
||||
int ourHeight = ourInitialHeight;
|
||||
LOGGER.debug(String.format("Orphaning blocks back to common block height %d, sig %.8s. Our height: %d", commonBlockHeight, commonBlockSig58, ourHeight));
|
||||
|
||||
BlockData orphanBlockData = repository.getBlockRepository().fromHeight(ourHeight);
|
||||
BlockData orphanBlockData = repository.getBlockRepository().fromHeight(ourInitialHeight);
|
||||
while (ourHeight > commonBlockHeight) {
|
||||
if (Controller.isStopping())
|
||||
return SynchronizationResult.SHUTTING_DOWN;
|
||||
@ -477,6 +490,9 @@ public class Synchronizer {
|
||||
LOGGER.debug(String.format("Orphaned blocks back to height %d, sig %.8s - applying new blocks from peer %s", commonBlockHeight, commonBlockSig58, peer));
|
||||
|
||||
for (Block newBlock : peerBlocks) {
|
||||
if (Controller.isStopping())
|
||||
return SynchronizationResult.SHUTTING_DOWN;
|
||||
|
||||
ValidationResult blockResult = newBlock.isValid();
|
||||
if (blockResult != ValidationResult.OK) {
|
||||
LOGGER.info(String.format("Peer %s sent invalid block for height %d, sig %.8s: %s", peer,
|
||||
|
Loading…
Reference in New Issue
Block a user