Synchronization improvements

Added a finer-grained result (enum) to synchronization to replace
stark boolean result. This allows Controller to decide whether
peer can be retried in the next round (e.g. network issue) or
cooled off for an hour (e.g. peer way too far behind)

Fixed bug with wrong UPDATE_GROUP serialized transaction length.

Added logging to BlockMessage when deserialization to help catch
future bugs.
This commit is contained in:
catbref 2019-05-17 07:55:11 +01:00
parent 1f81784bd6
commit 6a8a0f25c1
5 changed files with 96 additions and 52 deletions

View File

@ -20,6 +20,7 @@ import org.qora.api.ApiService;
import org.qora.block.Block;
import org.qora.block.BlockChain;
import org.qora.block.BlockGenerator;
import org.qora.controller.Synchronizer.SynchronizationResult;
import org.qora.data.block.BlockData;
import org.qora.data.network.BlockSummaryData;
import org.qora.data.network.PeerData;
@ -59,7 +60,7 @@ public class Controller extends Thread {
public static final String VERSION_PREFIX = "qora-core-";
private static final Logger LOGGER = LogManager.getLogger(Controller.class);
private static final long MISBEHAVIOUR_COOLOFF = 24 * 60 * 60 * 1000; // ms
private static final long MISBEHAVIOUR_COOLOFF = 60 * 60 * 1000; // ms
private static final Object shutdownLock = new Object();
private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s/blockchain;create=true";
@ -261,22 +262,38 @@ public class Controller extends Thread {
int index = new SecureRandom().nextInt(peers.size());
Peer peer = peers.get(index);
if (!Synchronizer.getInstance().synchronize(peer)) {
LOGGER.info(String.format("Failed to synchronize with peer %s", peer));
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer);
switch (syncResult) {
case GENESIS_ONLY:
case NO_COMMON_BLOCK:
case TOO_FAR_BEHIND:
case TOO_DIVERGENT:
case INVALID_DATA:
// These are more serious results that warrant a cool-off
LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name()));
// Failure so don't use this peer again for a while
try (final Repository repository = RepositoryManager.getRepository()) {
PeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(NTP.getTime());
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.warn("Repository issue while updating peer synchronization info", e);
}
// Don't use this peer again for a while
try (final Repository repository = RepositoryManager.getRepository()) {
PeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(NTP.getTime());
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.warn("Repository issue while updating peer synchronization info", e);
}
break;
return;
} else {
LOGGER.debug(String.format("Synchronized with peer %s", peer));
case NO_REPLY:
case INFERIOR_CHAIN:
case NO_BLOCKCHAIN_LOCK:
case REPOSITORY_ISSUE:
// These are minor failure results so fine to try again
LOGGER.info(String.format("Failed to synchronize with peer %s (%s)", peer, syncResult.name()));
break;
case OK:
LOGGER.debug(String.format("Synchronized with peer %s", peer));
break;
}
// Broadcast our new height (if changed)
@ -405,7 +422,7 @@ public class Controller extends Thread {
BlockData blockData = repository.getBlockRepository().fromSignature(signature);
if (blockData == null) {
LOGGER.trace(String.format("Ignoring GET_BLOCK request from peer %s for unknown block %s", peer, Base58.encode(signature)));
LOGGER.debug(String.format("Ignoring GET_BLOCK request from peer %s for unknown block %s", peer, Base58.encode(signature)));
// Send no response at all???
break;
}

View File

@ -31,13 +31,19 @@ public class Synchronizer {
private static final int INITIAL_BLOCK_STEP = 8;
private static final int MAXIMUM_BLOCK_STEP = 500;
private static final int MAXIMUM_HEIGHT_DELTA = 60; // XXX move to blockchain config?
private static final int MAXIMUM_HEIGHT_DELTA = 300; // XXX move to blockchain config?
private static final int MAXIMUM_COMMON_DELTA = 60; // XXX move to blockchain config?
private static final int SYNC_BATCH_SIZE = 200;
private static Synchronizer instance;
private Repository repository;
private int ourHeight;
public enum SynchronizationResult {
OK, GENESIS_ONLY, NO_COMMON_BLOCK, TOO_FAR_BEHIND, TOO_DIVERGENT, NO_REPLY, INFERIOR_CHAIN, INVALID_DATA, NO_BLOCKCHAIN_LOCK, REPOSITORY_ISSUE;
}
// Constructors
private Synchronizer() {
}
@ -58,7 +64,7 @@ public class Synchronizer {
* @param peer
* @return false if something went wrong, true otherwise.
*/
public boolean synchronize(Peer peer) {
public SynchronizationResult synchronize(Peer peer) {
// Make sure we're the only thread modifying the blockchain
// If we're already synchronizing with another peer then this will also return fast
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
@ -67,19 +73,27 @@ public class Synchronizer {
try (final Repository repository = RepositoryManager.getRepository()) {
try {
this.repository = repository;
this.ourHeight = this.repository.getBlockRepository().getBlockchainHeight();
final int ourInitialHeight = this.repository.getBlockRepository().getBlockchainHeight();
int ourHeight = ourInitialHeight;
final int peerHeight = peer.getPeerData().getLastHeight();
// If peer is at genesis block then peer has no blocks so ignore them for a while
if (peerHeight == 1)
return false;
return SynchronizationResult.GENESIS_ONLY;
LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, this.ourHeight));
// If peer is too far behind us then don't them.
int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA;
if (peerHeight < minHeight) {
LOGGER.info(String.format("Peer %s height %d is too far behind our height %d", peer, peerHeight, ourHeight));
return SynchronizationResult.TOO_DIVERGENT;
}
List<byte[]> signatures = findSignaturesFromCommonBlock(peer);
LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, ourHeight));
List<byte[]> signatures = findSignaturesFromCommonBlock(peer, ourHeight);
if (signatures == null) {
LOGGER.info(String.format("Failure to find common block with peer %s", peer));
return false;
return SynchronizationResult.NO_COMMON_BLOCK;
}
// First signature is common block
@ -90,17 +104,17 @@ public class Synchronizer {
// If common block is peer's latest block then we simply have a longer chain to peer, so exit now
if (commonBlockHeight == peerHeight)
return true;
return SynchronizationResult.OK;
// If common block is too far behind us then we're on massively different forks so give up.
int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA;
if (commonBlockHeight < minHeight) {
int minCommonHeight = ourHeight - MAXIMUM_COMMON_DELTA;
if (commonBlockHeight < minCommonHeight) {
LOGGER.info(String.format("Blockchain too divergent with peer %s", peer));
return false;
return SynchronizationResult.TOO_DIVERGENT;
}
// If we have blocks after common block then decide whether we want to sync (lowest block signature wins)
for (int height = commonBlockHeight + 1; height <= peerHeight && height <= this.ourHeight; ++height) {
for (int height = commonBlockHeight + 1; height <= peerHeight && height <= ourHeight; ++height) {
int sigIndex = height - commonBlockHeight - 1;
// Do we need more signatures?
@ -110,7 +124,7 @@ public class Synchronizer {
List<byte[]> moreSignatures = this.getBlockSignatures(peer, previousSignature, MAXIMUM_BLOCK_STEP);
if (moreSignatures == null || moreSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, height - 1));
return false;
return SynchronizationResult.NO_REPLY;
}
signatures.addAll(moreSignatures);
@ -127,7 +141,7 @@ public class Synchronizer {
*/
if (ourSignature[i] < peerSignature[i]) {
LOGGER.info(String.format("Not synchronizing with peer %s as we have better block at height %d", peer, height));
return false;
return SynchronizationResult.INFERIOR_CHAIN;
}
if (peerSignature[i] < ourSignature[i])
@ -135,16 +149,16 @@ public class Synchronizer {
}
}
if (this.ourHeight > commonBlockHeight) {
if (ourHeight > commonBlockHeight) {
// Unwind to common block (unless common block is our latest block)
LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockHeight));
while (this.ourHeight > commonBlockHeight) {
BlockData blockData = repository.getBlockRepository().fromHeight(this.ourHeight);
while (ourHeight > commonBlockHeight) {
BlockData blockData = repository.getBlockRepository().fromHeight(ourHeight);
Block block = new Block(repository, blockData);
block.orphan();
--this.ourHeight;
--ourHeight;
}
LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockHeight, peer));
@ -154,46 +168,50 @@ public class Synchronizer {
// Fetch, and apply, blocks from peer
byte[] signature = commonBlockData.getSignature();
while (this.ourHeight < peerHeight && this.ourHeight < commonBlockHeight + SYNC_BATCH_SIZE) {
while (ourHeight < peerHeight && ourHeight < commonBlockHeight + SYNC_BATCH_SIZE) {
// Do we need more signatures?
if (signatures.isEmpty()) {
signatures = this.getBlockSignatures(peer, signature, MAXIMUM_BLOCK_STEP);
if (signatures == null || signatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, this.ourHeight));
return false;
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, ourHeight));
return SynchronizationResult.NO_REPLY;
}
}
signature = signatures.get(0);
signatures.remove(0);
++this.ourHeight;
++ourHeight;
Block newBlock = this.fetchBlock(repository, peer, signature);
if (newBlock == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d", peer, this.ourHeight));
return false;
LOGGER.info(String.format("Peer %s failed to respond with block for height %d", peer, ourHeight));
return SynchronizationResult.NO_REPLY;
}
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d", peer, this.ourHeight));
return false;
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d", peer, ourHeight));
return SynchronizationResult.INVALID_DATA;
}
ValidationResult blockResult = newBlock.isValid();
if (blockResult != ValidationResult.OK) {
LOGGER.info(String.format("Peer %s sent invalid block for height %d: %s", peer, this.ourHeight, blockResult.name()));
return false;
LOGGER.info(String.format("Peer %s sent invalid block for height %d: %s", peer, ourHeight, blockResult.name()));
return SynchronizationResult.INVALID_DATA;
}
newBlock.process();
// If we've grown our blockchain then at least save progress so far
if (ourHeight > ourInitialHeight)
repository.saveChanges();
}
// Commit
repository.saveChanges();
LOGGER.info(String.format("Synchronized with peer %s to height %d", peer, this.ourHeight));
LOGGER.info(String.format("Synchronized with peer %s to height %d", peer, ourHeight));
return true;
return SynchronizationResult.OK;
} finally {
repository.discardChanges(); // Free repository locks, if any, also in case anything went wrong
this.repository = null;
@ -201,13 +219,13 @@ public class Synchronizer {
}
} catch (DataException e) {
LOGGER.error("Repository issue during synchronization with peer", e);
return false;
return SynchronizationResult.REPOSITORY_ISSUE;
} finally {
blockchainLock.unlock();
}
// Wasn't peer's fault we couldn't sync
return true;
return SynchronizationResult.NO_BLOCKCHAIN_LOCK;
}
/**
@ -217,7 +235,7 @@ public class Synchronizer {
* @return block signatures
* @throws DataException
*/
private List<byte[]> findSignaturesFromCommonBlock(Peer peer) throws DataException {
private List<byte[]> findSignaturesFromCommonBlock(Peer peer, int ourHeight) throws DataException {
// Start by asking for a few recent block hashes as this will cover a majority of reorgs
// Failing that, back off exponentially
int step = INITIAL_BLOCK_STEP;

View File

@ -6,6 +6,8 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qora.block.Block;
import org.qora.data.at.ATStateData;
import org.qora.data.block.BlockData;
@ -18,6 +20,8 @@ import com.google.common.primitives.Ints;
public class BlockMessage extends Message {
private static final Logger LOGGER = LogManager.getLogger(BlockMessage.class);
private Block block = null;
private BlockData blockData = null;
@ -66,6 +70,7 @@ public class BlockMessage extends Message {
return new BlockMessage(id, blockData, blockInfo.getB(), blockInfo.getC());
} catch (TransformationException e) {
LOGGER.info(String.format("Received garbled BLOCK message: %s", e.getMessage()));
return null;
}
}

View File

@ -210,7 +210,7 @@ public abstract class TransactionTransformer extends Transformer {
return (TransactionData) method.invoke(null, byteBuffer);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof BufferUnderflowException)
throw new TransformationException("Byte data too short for transaction type [" + type.value + "]");
throw new TransformationException("Byte data too short for " + type.name() + " transaction (type [" + type.value + "])");
if (e.getCause() instanceof TransformationException)
throw (TransformationException) e.getCause();

View File

@ -25,9 +25,11 @@ public class UpdateGroupTransactionTransformer extends TransactionTransformer {
private static final int NEW_DESCRIPTION_SIZE_LENGTH = INT_LENGTH;
private static final int NEW_IS_OPEN_LENGTH = BOOLEAN_LENGTH;
private static final int NEW_APPROVAL_THRESHOLD_LENGTH = BYTE_LENGTH;
private static final int NEW_MINIMUM_BLOCK_DELAY_LENGTH = INT_LENGTH;
private static final int NEW_MAXIMUM_BLOCK_DELAY_LENGTH = INT_LENGTH;
private static final int EXTRAS_LENGTH = GROUPID_LENGTH + NEW_OWNER_LENGTH + NEW_DESCRIPTION_SIZE_LENGTH + NEW_IS_OPEN_LENGTH
+ NEW_APPROVAL_THRESHOLD_LENGTH;
+ NEW_APPROVAL_THRESHOLD_LENGTH + NEW_MINIMUM_BLOCK_DELAY_LENGTH + NEW_MAXIMUM_BLOCK_DELAY_LENGTH;
protected static final TransactionLayout layout;
@ -44,6 +46,8 @@ public class UpdateGroupTransactionTransformer extends TransactionTransformer {
layout.add("group's new description", TransformationType.STRING);
layout.add("is group \"open\"?", TransformationType.BOOLEAN);
layout.add("new group transaction approval threshold", TransformationType.BYTE);
layout.add("new group approval minimum block delay", TransformationType.INT);
layout.add("new group approval maximum block delay", TransformationType.INT);
layout.add("fee", TransformationType.AMOUNT);
layout.add("signature", TransformationType.SIGNATURE);
}