Synchronization improvements (again!)

Bumped version

Controller no longer uses block height to determine whether to sync
but now uses peer's latest block's timestamp and signature.

Also BlockGenerator checks whether it's generating in isolation using
the same peer info (latest block timestamp and signature).

Added API call POST /admin/forcesync peer-address to help get wayward
nodes back on track.

Unified code around, and calling, Transaction.importAsUnconfirmed().

Tidied code around somelock.tryLock() to be more readable.

Controller (post-sync) now broadcasts new chaintip info if
our latest block's signature has changed, not simply the height.

Network.broadcast() only sends out via outbound peer if node has
more than one connection from the same peer. So Controller would
only update one of the peer records with chaintip info.
Controller now updates all connected peers with the ID when it
receives a HEIGHT or HEIGHT_V2 message.

Added node1 thru node7.mcfamily.io to default peers in Network.

Network ignores first "listen port" entry when receiving peers
list from an outbound-connection peer as it already knows
by virtue of having connected to it!

More network message debug logging (hopefully never to be seen).

[some old code left in, but commented out, for a while]
This commit is contained in:
catbref 2019-06-06 14:30:52 +01:00
parent d910cce807
commit c2e8392f05
11 changed files with 525 additions and 399 deletions

View File

@ -11,6 +11,8 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.LocalDate;
@ -43,11 +45,16 @@ import org.qora.api.model.ActivitySummary;
import org.qora.api.model.NodeInfo;
import org.qora.block.BlockChain;
import org.qora.controller.Controller;
import org.qora.controller.Synchronizer;
import org.qora.controller.Synchronizer.SynchronizationResult;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
import org.qora.repository.RepositoryManager;
import org.qora.data.account.ForgingAccountData;
import org.qora.data.account.ProxyForgerData;
import org.qora.network.Network;
import org.qora.network.Peer;
import org.qora.network.PeerAddress;
import org.qora.utils.Base58;
import com.google.common.collect.Lists;
@ -401,4 +408,51 @@ public class AdminResource {
}
}
@POST
@Path("/forcesync")
@Operation(
summary = "Forcibly synchronize to given peer.",
requestBody = @RequestBody(
required = true,
content = @Content(
mediaType = MediaType.TEXT_PLAIN,
schema = @Schema(
type = "string", example = "node7.mcfamily.io"
)
)
),
responses = {
@ApiResponse(
description = "\"true\"",
content = @Content(mediaType = MediaType.TEXT_PLAIN, schema = @Schema(type = "string"))
)
}
)
@ApiErrors({ApiError.INVALID_DATA, ApiError.REPOSITORY_ISSUE})
public String forceSync(String targetPeerAddress) {
Security.checkApiCallAllowed(request);
try {
// Try to resolve passed address to make things easier
PeerAddress peerAddress = PeerAddress.fromString(targetPeerAddress);
InetSocketAddress resolvedAddress = peerAddress.toSocketAddress();
List<Peer> peers = Network.getInstance().getHandshakedPeers();
Peer targetPeer = peers.stream().filter(peer -> peer.getResolvedAddress().equals(resolvedAddress)).findFirst().orElse(null);
if (targetPeer == null)
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(targetPeer, true);
return syncResult.name();
} catch (IllegalArgumentException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
} catch (ApiException e) {
throw e;
} catch (UnknownHostException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
}
}
}

View File

@ -427,12 +427,10 @@ public class TransactionsResource {
if (!transaction.isSignatureValid())
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_SIGNATURE);
ValidationResult result = transaction.isValidUnconfirmed();
ValidationResult result = transaction.importAsUnconfirmed();
if (result != ValidationResult.OK)
throw createTransactionInvalidException(request, result);
transaction.importAsUnconfirmed();
// Notify controller of new transaction
Controller.getInstance().onNewTransaction(transactionData);

View File

@ -386,25 +386,25 @@ public class BlockChain {
public static boolean orphan(int targetHeight) throws DataException {
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock())
try {
try (final Repository repository = RepositoryManager.getRepository()) {
for (int height = repository.getBlockRepository().getBlockchainHeight(); height > targetHeight; --height) {
LOGGER.info(String.format("Forcably orphaning block %d", height));
if (!blockchainLock.tryLock())
return false;
BlockData blockData = repository.getBlockRepository().fromHeight(height);
Block block = new Block(repository, blockData);
block.orphan();
repository.saveChanges();
}
try {
try (final Repository repository = RepositoryManager.getRepository()) {
for (int height = repository.getBlockRepository().getBlockchainHeight(); height > targetHeight; --height) {
LOGGER.info(String.format("Forcably orphaning block %d", height));
return true;
BlockData blockData = repository.getBlockRepository().fromHeight(height);
Block block = new Block(repository, blockData);
block.orphan();
repository.saveChanges();
}
} finally {
blockchainLock.unlock();
}
return false;
return true;
}
} finally {
blockchainLock.unlock();
}
}
}

View File

@ -10,9 +10,11 @@ import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qora.account.PrivateKeyAccount;
import org.qora.account.PublicKeyAccount;
import org.qora.block.Block.ValidationResult;
import org.qora.controller.Controller;
import org.qora.data.account.ForgingAccountData;
import org.qora.data.account.ProxyForgerData;
import org.qora.data.block.BlockData;
import org.qora.data.transaction.TransactionData;
import org.qora.network.Network;
@ -112,73 +114,89 @@ public class BlockGenerator extends Thread {
// Make sure we're the only thread modifying the blockchain
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock()) {
boolean newBlockGenerated = false;
if (!blockchainLock.tryLock())
continue;
generation: try {
// Clear repository's "in transaction" state so we don't cause a repository deadlock
repository.discardChanges();
boolean newBlockGenerated = false;
List<Block> goodBlocks = new ArrayList<>();
generation: try {
// Clear repository's "in transaction" state so we don't cause a repository deadlock
repository.discardChanges();
for (Block testBlock : newBlocks) {
// Is new block's timestamp valid yet?
// We do a separate check as some timestamp checks are skipped for testnet
if (testBlock.isTimestampValid() != ValidationResult.OK)
continue;
List<Block> goodBlocks = new ArrayList<>();
// Is new block valid yet? (Before adding unconfirmed transactions)
if (testBlock.isValid() != ValidationResult.OK)
continue;
for (Block testBlock : newBlocks) {
// Is new block's timestamp valid yet?
// We do a separate check as some timestamp checks are skipped for testnet
if (testBlock.isTimestampValid() != ValidationResult.OK)
continue;
goodBlocks.add(testBlock);
}
// Is new block valid yet? (Before adding unconfirmed transactions)
if (testBlock.isValid() != ValidationResult.OK)
continue;
if (goodBlocks.isEmpty())
break generation;
// Pick random generator
int winningIndex = new Random().nextInt(goodBlocks.size());
Block newBlock = goodBlocks.get(winningIndex);
// Delete invalid transactions. NOTE: discards repository changes on entry, saves changes on exit.
deleteInvalidTransactions(repository);
// Add unconfirmed transactions
addUnconfirmedTransactions(repository, newBlock);
// Sign to create block's signature
newBlock.sign();
// Is newBlock still valid?
ValidationResult validationResult = newBlock.isValid();
if (validationResult != ValidationResult.OK) {
// No longer valid? Report and discard
LOGGER.error("Valid, generated block now invalid '" + validationResult.name() + "' after adding unconfirmed transactions?");
newBlocks.clear();
break generation;
}
// Add to blockchain - something else will notice and broadcast new block to network
try {
newBlock.process();
LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight());
repository.saveChanges();
// Notify controller
newBlockGenerated = true;
} catch (DataException e) {
// Unable to process block - report and discard
LOGGER.error("Unable to process newly generated block?", e);
newBlocks.clear();
}
} finally {
blockchainLock.unlock();
goodBlocks.add(testBlock);
}
if (newBlockGenerated)
Controller.getInstance().onGeneratedBlock();
if (goodBlocks.isEmpty())
break generation;
// Pick random generator
int winningIndex = new Random().nextInt(goodBlocks.size());
Block newBlock = goodBlocks.get(winningIndex);
// Delete invalid transactions. NOTE: discards repository changes on entry, saves changes on exit.
deleteInvalidTransactions(repository);
// Add unconfirmed transactions
addUnconfirmedTransactions(repository, newBlock);
// Sign to create block's signature
newBlock.sign();
// Is newBlock still valid?
ValidationResult validationResult = newBlock.isValid();
if (validationResult != ValidationResult.OK) {
// No longer valid? Report and discard
LOGGER.error("Valid, generated block now invalid '" + validationResult.name() + "' after adding unconfirmed transactions?");
newBlocks.clear();
break generation;
}
// Add to blockchain - something else will notice and broadcast new block to network
try {
newBlock.process();
LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight());
repository.saveChanges();
ProxyForgerData proxyForgerData = repository.getAccountRepository().getProxyForgeData(newBlock.getBlockData().getGeneratorPublicKey());
if (proxyForgerData != null) {
PublicKeyAccount forger = new PublicKeyAccount(repository, proxyForgerData.getForgerPublicKey());
LOGGER.info(String.format("Generated block %d by %s on behalf of %s",
newBlock.getBlockData().getHeight(),
forger.getAddress(),
proxyForgerData.getRecipient()));
} else {
LOGGER.info(String.format("Generated block %d by %s", newBlock.getBlockData().getHeight(), newBlock.getGenerator().getAddress()));
}
repository.saveChanges();
// Notify controller
newBlockGenerated = true;
} catch (DataException e) {
// Unable to process block - report and discard
LOGGER.error("Unable to process newly generated block?", e);
newBlocks.clear();
}
} finally {
blockchainLock.unlock();
}
if (newBlockGenerated)
Controller.getInstance().onGeneratedBlock();
}
} catch (DataException e) {
LOGGER.warn("Repository issue while running block generator", e);

View File

@ -69,6 +69,7 @@ public class Controller extends Thread {
private static final Logger LOGGER = LogManager.getLogger(Controller.class);
private static final long MISBEHAVIOUR_COOLOFF = 60 * 60 * 1000; // ms
private static final int MAX_BLOCKCHAIN_TIP_AGE = 5; // blocks
private static final Object shutdownLock = new Object();
private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s/blockchain;create=true";
@ -272,23 +273,27 @@ public class Controller extends Thread {
if (peers.size() < Settings.getInstance().getMinBlockchainPeers())
return;
for(Peer peer : peers)
LOGGER.trace(String.format("Peer %s is at height %d", peer, peer.getPeerData().getLastHeight()));
// Remove peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature)
peers.removeIf(hasShorterBlockchain());
// Remove peers that have "misbehaved" recently
// Disregard peers that have "misbehaved" recently
peers.removeIf(hasPeerMisbehaved);
if (!peers.isEmpty()) {
int ourHeight = getChainHeight();
// Remove peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature)
// peers.removeIf(hasShorterBlockchain());
// Disregard peers that don't have a recent block
final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();
peers.removeIf(peer -> peer.getPeerData().getLastBlockTimestamp() == null || peer.getPeerData().getLastBlockTimestamp() < minLatestBlockTimestamp);
BlockData latestBlockData = getChainTip();
// Disregard peers that have no block signature or the same block signature as us
peers.removeIf(peer -> peer.getPeerData().getLastBlockSignature() == null || Arrays.equals(latestBlockData.getSignature(), peer.getPeerData().getLastBlockSignature()));
if (!peers.isEmpty()) {
// Pick random peer to sync with
int index = new SecureRandom().nextInt(peers.size());
Peer peer = peers.get(index);
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer);
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, false);
switch (syncResult) {
case GENESIS_ONLY:
case NO_COMMON_BLOCK:
@ -326,10 +331,10 @@ public class Controller extends Thread {
break;
}
// Broadcast our new height (if changed)
BlockData latestBlockData = getChainTip();
if (latestBlockData.getHeight() != ourHeight)
Network.getInstance().broadcast(recipientPeer -> Network.getInstance().buildHeightMessage(recipientPeer, latestBlockData));
// Broadcast our new chain tip (if changed)
BlockData newLatestBlockData = getChainTip();
if (!Arrays.equals(newLatestBlockData.getSignature(), latestBlockData.getSignature()))
Network.getInstance().broadcast(recipientPeer -> Network.getInstance().buildHeightMessage(recipientPeer, newLatestBlockData));
}
}
@ -457,18 +462,25 @@ public class Controller extends Thread {
case HEIGHT: {
HeightMessage heightMessage = (HeightMessage) message;
// Update our record of peer's height
PeerData peerData = peer.getPeerData();
peer.getPeerData().setLastHeight(heightMessage.getHeight());
// Update all peers with same ID
// Only save to repository if outbound peer
if (peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e);
}
List<Peer> connectedPeers = Network.getInstance().getHandshakedPeers();
for (Peer connectedPeer : connectedPeers) {
if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId()))
continue;
PeerData peerData = connectedPeer.getPeerData();
peerData.setLastHeight(heightMessage.getHeight());
// Only save to repository if outbound peer
if (connectedPeer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating height of peer %s", connectedPeer), e);
}
}
// Potentially synchronize
requestSync = true;
@ -479,30 +491,37 @@ public class Controller extends Thread {
case HEIGHT_V2: {
HeightV2Message heightV2Message = (HeightV2Message) message;
// Update our record for peer's blockchain info
PeerData peerData = peer.getPeerData();
// Update all peers with same ID
// We want to update atomically so use lock
ReentrantLock peerDataLock = peer.getPeerDataLock();
peerDataLock.lock();
try {
peerData.setLastHeight(heightV2Message.getHeight());
peerData.setLastBlockSignature(heightV2Message.getSignature());
peerData.setLastBlockTimestamp(heightV2Message.getTimestamp());
peerData.setLastBlockGenerator(heightV2Message.getGenerator());
} finally {
peerDataLock.unlock();
}
List<Peer> connectedPeers = Network.getInstance().getHandshakedPeers();
for (Peer connectedPeer : connectedPeers) {
if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId()))
continue;
// Only save to repository if outbound peer
if (peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e);
PeerData peerData = connectedPeer.getPeerData();
// We want to update atomically so use lock
ReentrantLock peerDataLock = connectedPeer.getPeerDataLock();
peerDataLock.lock();
try {
peerData.setLastHeight(heightV2Message.getHeight());
peerData.setLastBlockSignature(heightV2Message.getSignature());
peerData.setLastBlockTimestamp(heightV2Message.getTimestamp());
peerData.setLastBlockGenerator(heightV2Message.getGenerator());
} finally {
peerDataLock.unlock();
}
// Only save to repository if outbound peer
if (connectedPeer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating info of peer %s", connectedPeer), e);
}
}
// Potentially synchronize
requestSync = true;
@ -622,34 +641,28 @@ public class Controller extends Thread {
// Check signature
if (!transaction.isSignatureValid()) {
LOGGER.trace(String.format("Ignoring TRANSACTION %s with invalid signature from peer %s", Base58.encode(transactionData.getSignature()), peer));
LOGGER.trace(String.format("Ignoring %s transaction %s with invalid signature from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
break;
}
// Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock())
try {
// Do we have it already?
if (repository.getTransactionRepository().exists(transactionData.getSignature())) {
LOGGER.trace(String.format("Ignoring existing TRANSACTION %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
}
ValidationResult validationResult = transaction.importAsUnconfirmed();
// Is it valid?
ValidationResult validationResult = transaction.isValidUnconfirmed();
if (validationResult != ValidationResult.OK) {
LOGGER.trace(String.format("Ignoring invalid (%s) TRANSACTION %s from peer %s", validationResult.name(), Base58.encode(transactionData.getSignature()), peer));
break;
}
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
LOGGER.trace(String.format("Ignoring existing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
}
// Seems ok - add to unconfirmed pile
transaction.importAsUnconfirmed();
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
LOGGER.trace(String.format("Couldn't lock blockchain to import unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
}
LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
} finally {
blockchainLock.unlock();
}
if (validationResult != ValidationResult.OK) {
LOGGER.trace(String.format("Ignoring invalid (%s) %s transaction %s from peer %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
break;
}
LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e);
}
@ -677,9 +690,9 @@ public class Controller extends Thread {
try (final Repository repository = RepositoryManager.getRepository()) {
for (byte[] signature : signatures) {
// Do we have it already?
// Do we have it already? (Before requesting transaction data itself)
if (repository.getTransactionRepository().exists(signature)) {
LOGGER.trace(String.format("Ignoring unconfirmed transaction %s from peer %s", Base58.encode(signature), peer));
LOGGER.trace(String.format("Ignoring existing transaction %s from peer %s", Base58.encode(signature), peer));
break;
}
@ -697,40 +710,31 @@ public class Controller extends Thread {
// Check signature
if (!transaction.isSignatureValid()) {
LOGGER.trace(String.format("Ignoring unconfirmed transaction %s with invalid signature from peer %s", Base58.encode(transactionData.getSignature()), peer));
LOGGER.trace(String.format("Ignoring %s transaction %s with invalid signature from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
break;
}
// Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock())
try {
// Do we have it already? Rechecking in case it has appeared since previous check above
if (repository.getTransactionRepository().exists(transactionData.getSignature())) {
LOGGER.trace(String.format("Ignoring existing unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
}
ValidationResult validationResult = transaction.importAsUnconfirmed();
// Is it valid?
ValidationResult validationResult = transaction.isValidUnconfirmed();
if (validationResult != ValidationResult.OK) {
LOGGER.trace(String.format("Ignoring invalid (%s) unconfirmed transaction %s from peer %s", validationResult.name(), Base58.encode(transactionData.getSignature()), peer));
break;
}
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
LOGGER.trace(String.format("Ignoring existing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
}
// Clean repository state before import
repository.discardChanges();
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
LOGGER.trace(String.format("Couldn't lock blockchain to import unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
}
// Seems ok - add to unconfirmed pile
transaction.importAsUnconfirmed();
if (validationResult != ValidationResult.OK) {
LOGGER.trace(String.format("Ignoring invalid (%s) %s transaction %s from peer %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
break;
}
LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
// We could collate signatures that are new to us and broadcast them to our peers too
newSignatures.add(signature);
} finally {
blockchainLock.unlock();
}
// We could collate signatures that are new to us and broadcast them to our peers too
newSignatures.add(signature);
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e);
@ -809,26 +813,42 @@ public class Controller extends Thread {
};
}
/** Returns whether we think our node has up-to-date blockchain based on our height info about other peers. */
/** Returns whether we think our node has up-to-date blockchain based on our info about other peers. */
public boolean isUpToDate() {
// Is our blockchain too old?
final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();
BlockData latestBlockData = getChainTip();
if (latestBlockData.getTimestamp() < minLatestBlockTimestamp)
return false;
List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers();
// Check we have enough peers to potentially synchronize/generator
if (peers.size() < Settings.getInstance().getMinBlockchainPeers())
return false;
// Remove peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature)
peers.removeIf(hasShorterBlockchain());
// Remove peers that within 1 block of our height (actually ourHeight + 1)
final int maxHeight = getChainHeight() + 1;
peers.removeIf(peer -> peer.getPeerData().getLastHeight() <= maxHeight );
// Remove peers that have "misbehaved" recently
// Disregard peers that have "misbehaved" recently
peers.removeIf(hasPeerMisbehaved);
// Disregard peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature)
// peers.removeIf(hasShorterBlockchain());
// Disregard peers that within 1 block of our height (actually ourHeight + 1)
// final int maxHeight = getChainHeight() + 1;
// peers.removeIf(peer -> peer.getPeerData().getLastHeight() <= maxHeight );
// Disregard peers that don't have a recent block
peers.removeIf(peer -> peer.getPeerData().getLastBlockTimestamp() == null || peer.getPeerData().getLastBlockTimestamp() < minLatestBlockTimestamp);
// If we have any peers left, then they would be candidates for synchronization therefore we're not up to date.
return peers.isEmpty();
// return peers.isEmpty();
// If we don't have any peers left then can't synchronize, therefore consider ourself not up to date
return !peers.isEmpty();
}
public long getMinimumLatestBlockTimestamp() {
return NTP.getTime() - BlockChain.getInstance().getMaxBlockTime() * 1000L * MAX_BLOCKCHAIN_TIP_AGE;
}
}

View File

@ -1,6 +1,5 @@
package org.qora.controller;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -72,222 +71,223 @@ public class Synchronizer {
* @param peer
* @return false if something went wrong, true otherwise.
*/
public SynchronizationResult synchronize(Peer peer) {
public SynchronizationResult synchronize(Peer peer, boolean force) {
// Make sure we're the only thread modifying the blockchain
// If we're already synchronizing with another peer then this will also return fast
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock())
try {
try (final Repository repository = RepositoryManager.getRepository()) {
try {
this.repository = repository;
final BlockData ourLatestBlockData = this.repository.getBlockRepository().getLastBlock();
final int ourInitialHeight = ourLatestBlockData.getHeight();
int ourHeight = ourInitialHeight;
int peerHeight = peer.getPeerData().getLastHeight();
if (!blockchainLock.tryLock())
// Wasn't peer's fault we couldn't sync
return SynchronizationResult.NO_BLOCKCHAIN_LOCK;
// If peer is at genesis block then peer has no blocks so ignore them for a while
if (peerHeight == 1)
return SynchronizationResult.GENESIS_ONLY;
try {
try (final Repository repository = RepositoryManager.getRepository()) {
try {
this.repository = repository;
final BlockData ourLatestBlockData = this.repository.getBlockRepository().getLastBlock();
final int ourInitialHeight = ourLatestBlockData.getHeight();
int ourHeight = ourInitialHeight;
int peerHeight = peer.getPeerData().getLastHeight();
// XXX this may well be obsolete now
// If peer is too far behind us then don't them.
int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA;
if (peerHeight < minHeight) {
LOGGER.info(String.format("Peer %s height %d is too far behind our height %d", peer, peerHeight, ourHeight));
return SynchronizationResult.TOO_FAR_BEHIND;
}
// If peer is at genesis block then peer has no blocks so ignore them for a while
if (peerHeight == 1)
return SynchronizationResult.GENESIS_ONLY;
byte[] peersLastBlockSignature = peer.getPeerData().getLastBlockSignature();
byte[] ourLastBlockSignature = ourLatestBlockData.getSignature();
if (peerHeight == ourHeight && (peersLastBlockSignature == null || !Arrays.equals(peersLastBlockSignature, ourLastBlockSignature)))
LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d, signatures differ", peer, peerHeight, ourHeight));
// XXX this may well be obsolete now
// If peer is too far behind us then don't them.
int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA;
if (!force && peerHeight < minHeight) {
LOGGER.info(String.format("Peer %s height %d is too far behind our height %d", peer, peerHeight, ourHeight));
return SynchronizationResult.TOO_FAR_BEHIND;
}
byte[] peersLastBlockSignature = peer.getPeerData().getLastBlockSignature();
byte[] ourLastBlockSignature = ourLatestBlockData.getSignature();
if (peerHeight == ourHeight && (peersLastBlockSignature == null || !Arrays.equals(peersLastBlockSignature, ourLastBlockSignature)))
LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d, signatures differ", peer, peerHeight, ourHeight));
else
LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, ourHeight));
List<byte[]> signatures = findSignaturesFromCommonBlock(peer, ourHeight);
if (signatures == null) {
LOGGER.info(String.format("Error while trying to find common block with peer %s", peer));
return SynchronizationResult.NO_REPLY;
}
if (signatures.isEmpty()) {
LOGGER.info(String.format("Failure to find common block with peer %s", peer));
return SynchronizationResult.NO_COMMON_BLOCK;
}
// First signature is common block
BlockData commonBlockData = this.repository.getBlockRepository().fromSignature(signatures.get(0));
final int commonBlockHeight = commonBlockData.getHeight();
LOGGER.debug(String.format("Common block with peer %s is at height %d", peer, commonBlockHeight));
signatures.remove(0);
// If common block height is higher than peer's last reported height
// then peer must have a very recent sync. Update our idea of peer's height.
if (commonBlockHeight > peerHeight) {
LOGGER.debug(String.format("Peer height %d was lower than common block height %d - using higher value", peerHeight, commonBlockHeight));
peerHeight = commonBlockHeight;
}
// XXX This may well be obsolete now
// If common block is peer's latest block then we simply have the same, or longer, chain to peer, so exit now
if (commonBlockHeight == peerHeight) {
if (peerHeight == ourHeight)
LOGGER.info(String.format("We have the same blockchain as peer %s", peer));
else
LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, ourHeight));
LOGGER.info(String.format("We have the same blockchain as peer %s, but longer", peer));
List<byte[]> signatures = findSignaturesFromCommonBlock(peer, ourHeight);
if (signatures == null) {
LOGGER.info(String.format("Error while trying to find common block with peer %s", peer));
return SynchronizationResult.NO_REPLY;
}
if (signatures.isEmpty()) {
LOGGER.info(String.format("Failure to find common block with peer %s", peer));
return SynchronizationResult.NO_COMMON_BLOCK;
}
return SynchronizationResult.NOTHING_TO_DO;
}
// First signature is common block
BlockData commonBlockData = this.repository.getBlockRepository().fromSignature(signatures.get(0));
final int commonBlockHeight = commonBlockData.getHeight();
LOGGER.debug(String.format("Common block with peer %s is at height %d", peer, commonBlockHeight));
signatures.remove(0);
// If common block is too far behind us then we're on massively different forks so give up.
int minCommonHeight = ourHeight - MAXIMUM_COMMON_DELTA;
if (!force && commonBlockHeight < minCommonHeight) {
LOGGER.info(String.format("Blockchain too divergent with peer %s", peer));
return SynchronizationResult.TOO_DIVERGENT;
}
// If common block height is higher than peer's last reported height
// then peer must have a very recent sync. Update our idea of peer's height.
if (commonBlockHeight > peerHeight) {
LOGGER.debug(String.format("Peer height %d was lower than common block height %d - using higher value", peerHeight, commonBlockHeight));
peerHeight = commonBlockHeight;
}
// If we have blocks after common block then decide whether we want to sync (lowest block signature wins)
int highestMutualHeight = Math.min(peerHeight, ourHeight);
// XXX This may well be obsolete now
// If common block is peer's latest block then we simply have the same, or longer, chain to peer, so exit now
if (commonBlockHeight == peerHeight) {
if (peerHeight == ourHeight)
LOGGER.info(String.format("We have the same blockchain as peer %s", peer));
else
LOGGER.info(String.format("We have the same blockchain as peer %s, but longer", peer));
// XXX This might be obsolete now
// If our latest block is very old, we're very behind and should ditch our fork.
if (ourInitialHeight > commonBlockHeight && ourLatestBlockData.getTimestamp() < NTP.getTime() - MAXIMUM_TIP_AGE) {
LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight));
highestMutualHeight = commonBlockHeight;
}
return SynchronizationResult.NOTHING_TO_DO;
}
for (int height = commonBlockHeight + 1; height <= highestMutualHeight; ++height) {
int sigIndex = height - commonBlockHeight - 1;
// If common block is too far behind us then we're on massively different forks so give up.
int minCommonHeight = ourHeight - MAXIMUM_COMMON_DELTA;
if (commonBlockHeight < minCommonHeight) {
LOGGER.info(String.format("Blockchain too divergent with peer %s", peer));
return SynchronizationResult.TOO_DIVERGENT;
}
// If we have blocks after common block then decide whether we want to sync (lowest block signature wins)
int highestMutualHeight = Math.min(peerHeight, ourHeight);
// If our latest block is very old, we're very behind and should ditch our fork.
if (ourInitialHeight > commonBlockHeight && ourLatestBlockData.getTimestamp() < NTP.getTime() - MAXIMUM_TIP_AGE) {
LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight));
highestMutualHeight = commonBlockHeight;
}
for (int height = commonBlockHeight + 1; height <= highestMutualHeight; ++height) {
int sigIndex = height - commonBlockHeight - 1;
// Do we need more signatures?
if (signatures.size() - 1 < sigIndex) {
// Grab more signatures
byte[] previousSignature = sigIndex == 0 ? commonBlockData.getSignature() : signatures.get(sigIndex - 1);
List<byte[]> moreSignatures = this.getBlockSignatures(peer, previousSignature, MAXIMUM_BLOCK_STEP);
if (moreSignatures == null || moreSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, height - 1));
return SynchronizationResult.NO_REPLY;
}
signatures.addAll(moreSignatures);
}
byte[] ourSignature = this.repository.getBlockRepository().fromHeight(height).getSignature();
byte[] peerSignature = signatures.get(sigIndex);
for (int i = 0; i < ourSignature.length; ++i) {
/*
* If our byte is lower, we don't synchronize with this peer,
* if their byte is lower, check next height,
* (if bytes are equal, try next byte).
*/
if (ourSignature[i] < peerSignature[i]) {
LOGGER.info(String.format("Not synchronizing with peer %s as we have better block at height %d", peer, height));
return SynchronizationResult.INFERIOR_CHAIN;
}
if (peerSignature[i] < ourSignature[i])
break;
}
}
if (ourHeight > commonBlockHeight) {
// Unwind to common block (unless common block is our latest block)
LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockHeight));
while (ourHeight > commonBlockHeight) {
BlockData blockData = repository.getBlockRepository().fromHeight(ourHeight);
Block block = new Block(repository, blockData);
block.orphan();
--ourHeight;
}
LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockHeight, peer));
} else {
LOGGER.debug(String.format("Fetching new blocks from peer %s", peer));
}
// Fetch, and apply, blocks from peer
byte[] signature = commonBlockData.getSignature();
int maxBatchHeight = commonBlockHeight + SYNC_BATCH_SIZE;
while (ourHeight < peerHeight && ourHeight < maxBatchHeight) {
// Do we need more signatures?
if (signatures.isEmpty()) {
int numberRequested = maxBatchHeight - ourHeight;
LOGGER.trace(String.format("Requesting %d signature%s after height %d", numberRequested, (numberRequested != 1 ? "s": ""), ourHeight));
signatures = this.getBlockSignatures(peer, signature, numberRequested);
if (signatures == null || signatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, ourHeight));
return SynchronizationResult.NO_REPLY;
}
LOGGER.trace(String.format("Received %s signature%s", signatures.size(), (signatures.size() != 1 ? "s" : "")));
}
signature = signatures.get(0);
signatures.remove(0);
++ourHeight;
Block newBlock = this.fetchBlock(repository, peer, signature);
if (newBlock == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d", peer, ourHeight));
// Do we need more signatures?
if (signatures.size() - 1 < sigIndex) {
// Grab more signatures
byte[] previousSignature = sigIndex == 0 ? commonBlockData.getSignature() : signatures.get(sigIndex - 1);
List<byte[]> moreSignatures = this.getBlockSignatures(peer, previousSignature, MAXIMUM_BLOCK_STEP);
if (moreSignatures == null || moreSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, height - 1));
return SynchronizationResult.NO_REPLY;
}
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d", peer, ourHeight));
return SynchronizationResult.INVALID_DATA;
}
ValidationResult blockResult = newBlock.isValid();
if (blockResult != ValidationResult.OK) {
LOGGER.info(String.format("Peer %s sent invalid block for height %d: %s", peer, ourHeight, blockResult.name()));
return SynchronizationResult.INVALID_DATA;
}
// Save transactions attached to this block
for (Transaction transaction : newBlock.getTransactions()) {
TransactionData transactionData = transaction.getTransactionData();
// Fix up approval status
if (transaction.needsGroupApproval()) {
transactionData.setApprovalStatus(ApprovalStatus.PENDING);
} else {
transactionData.setApprovalStatus(ApprovalStatus.NOT_REQUIRED);
}
repository.getTransactionRepository().save(transactionData);
}
newBlock.process();
// If we've grown our blockchain then at least save progress so far
if (ourHeight > ourInitialHeight)
repository.saveChanges();
signatures.addAll(moreSignatures);
}
// Commit
repository.saveChanges();
LOGGER.info(String.format("Synchronized with peer %s to height %d", peer, ourHeight));
byte[] ourSignature = this.repository.getBlockRepository().fromHeight(height).getSignature();
byte[] peerSignature = signatures.get(sigIndex);
return SynchronizationResult.OK;
} finally {
repository.discardChanges(); // Free repository locks, if any, also in case anything went wrong
this.repository = null;
for (int i = 0; i < ourSignature.length; ++i) {
/*
* If our byte is lower, we don't synchronize with this peer,
* if their byte is lower, check next height,
* (if bytes are equal, try next byte).
*/
if (ourSignature[i] < peerSignature[i]) {
LOGGER.info(String.format("Not synchronizing with peer %s as we have better block at height %d", peer, height));
return SynchronizationResult.INFERIOR_CHAIN;
}
if (peerSignature[i] < ourSignature[i])
break;
}
}
}
} catch (DataException e) {
LOGGER.error("Repository issue during synchronization with peer", e);
return SynchronizationResult.REPOSITORY_ISSUE;
} finally {
blockchainLock.unlock();
}
// Wasn't peer's fault we couldn't sync
return SynchronizationResult.NO_BLOCKCHAIN_LOCK;
if (ourHeight > commonBlockHeight) {
// Unwind to common block (unless common block is our latest block)
LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockHeight));
while (ourHeight > commonBlockHeight) {
BlockData blockData = repository.getBlockRepository().fromHeight(ourHeight);
Block block = new Block(repository, blockData);
block.orphan();
--ourHeight;
}
LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockHeight, peer));
} else {
LOGGER.debug(String.format("Fetching new blocks from peer %s", peer));
}
// Fetch, and apply, blocks from peer
byte[] signature = commonBlockData.getSignature();
int maxBatchHeight = commonBlockHeight + SYNC_BATCH_SIZE;
while (ourHeight < peerHeight && ourHeight < maxBatchHeight) {
// Do we need more signatures?
if (signatures.isEmpty()) {
int numberRequested = maxBatchHeight - ourHeight;
LOGGER.trace(String.format("Requesting %d signature%s after height %d", numberRequested, (numberRequested != 1 ? "s": ""), ourHeight));
signatures = this.getBlockSignatures(peer, signature, numberRequested);
if (signatures == null || signatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, ourHeight));
return SynchronizationResult.NO_REPLY;
}
LOGGER.trace(String.format("Received %s signature%s", signatures.size(), (signatures.size() != 1 ? "s" : "")));
}
signature = signatures.get(0);
signatures.remove(0);
++ourHeight;
Block newBlock = this.fetchBlock(repository, peer, signature);
if (newBlock == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d", peer, ourHeight));
return SynchronizationResult.NO_REPLY;
}
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d", peer, ourHeight));
return SynchronizationResult.INVALID_DATA;
}
ValidationResult blockResult = newBlock.isValid();
if (blockResult != ValidationResult.OK) {
LOGGER.info(String.format("Peer %s sent invalid block for height %d: %s", peer, ourHeight, blockResult.name()));
return SynchronizationResult.INVALID_DATA;
}
// Save transactions attached to this block
for (Transaction transaction : newBlock.getTransactions()) {
TransactionData transactionData = transaction.getTransactionData();
// Fix up approval status
if (transaction.needsGroupApproval()) {
transactionData.setApprovalStatus(ApprovalStatus.PENDING);
} else {
transactionData.setApprovalStatus(ApprovalStatus.NOT_REQUIRED);
}
repository.getTransactionRepository().save(transactionData);
}
newBlock.process();
// If we've grown our blockchain then at least save progress so far
if (ourHeight > ourInitialHeight)
repository.saveChanges();
}
// Commit
repository.saveChanges();
LOGGER.info(String.format("Synchronized with peer %s to height %d", peer, ourHeight));
return SynchronizationResult.OK;
} finally {
repository.discardChanges(); // Free repository locks, if any, also in case anything went wrong
this.repository = null;
}
}
} catch (DataException e) {
LOGGER.error("Repository issue during synchronization with peer", e);
return SynchronizationResult.REPOSITORY_ISSUE;
} finally {
blockchainLock.unlock();
}
}
/**

View File

@ -65,7 +65,15 @@ public class Network extends Thread {
/** Maximum time since last successful connection before a peer is potentially considered "old", in milliseconds. */
private static final long OLD_PEER_CONNECTION_PERIOD = 7 * 24 * 60 * 60 * 1000; // ms
private static final String[] INITIAL_PEERS = new String[] { "node1.qora.org", "node2.qora.org" };
private static final String[] INITIAL_PEERS = new String[] {
"node1.qora.org",
"node2.qora.org",
"node3.qora.org",
"node4.qora.org",
"node5.qora.org",
"node6.qora.org",
"node7.qora.org"
};
public static final int MAX_SIGNATURES_PER_REPLY = 500;
public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500;
@ -479,12 +487,15 @@ public class Network extends Thread {
List<PeerAddress> peerV2Addresses = peersV2Message.getPeerAddresses();
// First entry contains remote peer's listen port but empty address.
// Overwrite address with one obtained from socket.
int peerPort = peerV2Addresses.get(0).getPort();
peerV2Addresses.remove(0);
PeerAddress sendingPeerAddress = PeerAddress.fromString(peer.getPeerData().getAddress().getHost() + ":" + peerPort);
LOGGER.trace("PEERS_V2 sending peer's listen address: " + sendingPeerAddress.toString());
peerV2Addresses.add(0, sendingPeerAddress);
// If inbound peer, use listen port and socket address to recreate first entry
if (!peer.isOutbound()) {
PeerAddress sendingPeerAddress = PeerAddress.fromString(peer.getPeerData().getAddress().getHost() + ":" + peerPort);
LOGGER.trace("PEERS_V2 sending peer's listen address: " + sendingPeerAddress.toString());
peerV2Addresses.add(0, sendingPeerAddress);
}
mergePeers(peerV2Addresses);
break;

View File

@ -275,8 +275,10 @@ public class Peer implements Runnable {
while (true) {
// Wait (up to INACTIVITY_TIMEOUT) for, and parse, incoming message
Message message = Message.fromStream(in);
if (message == null)
if (message == null) {
this.disconnect("null message");
return;
}
LOGGER.trace(String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this));

View File

@ -16,6 +16,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketTimeoutException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -101,14 +102,17 @@ public abstract class Message {
return map.get(value);
}
public Message fromBytes(int id, byte[] data) {
public Message fromBytes(int id, byte[] data) throws MessageException {
if (this.fromByteBuffer == null)
return null;
throw new MessageException("Unsupported message type [" + value + "] during conversion from bytes");
try {
return (Message) this.fromByteBuffer.invoke(null, id, data == null ? null : ByteBuffer.wrap(data));
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
return null;
if (e.getCause() instanceof BufferUnderflowException)
throw new MessageException("Byte data too short for " + name() + " message");
throw new MessageException("Internal error with " + name() + " message during conversion from bytes");
}
}
}

View File

@ -233,6 +233,8 @@ public abstract class Transaction {
GROUP_APPROVAL_NOT_REQUIRED(82),
GROUP_APPROVAL_DECIDED(83),
MAXIMUM_PROXY_RELATIONSHIPS(84),
TRANSACTION_ALREADY_EXISTS(85),
NO_BLOCKCHAIN_LOCK(86),
NOT_YET_RELEASED(1000);
public final int value;
@ -827,17 +829,36 @@ public abstract class Transaction {
*
* @throws DataException
*/
public void importAsUnconfirmed() throws DataException {
// Fix up approval status
if (this.needsGroupApproval()) {
transactionData.setApprovalStatus(ApprovalStatus.PENDING);
} else {
transactionData.setApprovalStatus(ApprovalStatus.NOT_REQUIRED);
}
public ValidationResult importAsUnconfirmed() throws DataException {
// Attempt to acquire blockchain lock
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (!blockchainLock.tryLock())
return ValidationResult.NO_BLOCKCHAIN_LOCK;
repository.getTransactionRepository().save(transactionData);
repository.getTransactionRepository().unconfirmTransaction(transactionData);
repository.saveChanges();
try {
// Check transaction doesn't already exist
if (repository.getTransactionRepository().exists(transactionData.getSignature()))
return ValidationResult.TRANSACTION_ALREADY_EXISTS;
ValidationResult validationResult = this.isValidUnconfirmed();
if (validationResult != ValidationResult.OK)
return validationResult;
// Fix up approval status
if (this.needsGroupApproval()) {
transactionData.setApprovalStatus(ApprovalStatus.PENDING);
} else {
transactionData.setApprovalStatus(ApprovalStatus.NOT_REQUIRED);
}
repository.getTransactionRepository().save(transactionData);
repository.getTransactionRepository().unconfirmTransaction(transactionData);
repository.saveChanges();
return ValidationResult.OK;
} finally {
blockchainLock.unlock();
}
}
/**

View File

@ -31,10 +31,8 @@ public class TransactionUtils {
} catch (InterruptedException e) {
}
ValidationResult result = transaction.isValidUnconfirmed();
ValidationResult result = transaction.importAsUnconfirmed();
assertEquals("Transaction invalid", ValidationResult.OK, result);
transaction.importAsUnconfirmed();
}
/** Signs transaction using given account and forges a new block, using "alice" account. */