Synchronization improvements

Don't attempt to sync, or generate blocks, if we think we're not up to date.

Notify Controller of newly generated block AFTER releasing blockchain lock.

Loads of changes to synchronizer.

Added missing GET_PEERS handling to Controller.onNetworkMessage.

More detailed peer information (last block sig, last generator sig, last block timestamp, ...)
New HEIGHT_V2 network message to help support above.

More, and improved, logging.

Fix for HSQLDB serialization failure caused by trying to save the same new transaction
received by more than one peer/thread simultaneously.
This commit is contained in:
catbref 2019-05-28 17:33:50 +01:00
parent 23bcba1650
commit 99024ee2ef
11 changed files with 444 additions and 138 deletions

View File

@ -70,12 +70,20 @@ public class BlockGenerator extends Thread {
// Sleep for a while // Sleep for a while
try { try {
repository.discardChanges(); // Free repository locks, if any repository.discardChanges(); // Free repository locks, if any
Thread.sleep(1000); // No point sleeping less than this as block timestamp millisecond values must be the same Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// We've been interrupted - time to exit // We've been interrupted - time to exit
return; return;
} }
// Don't generate if we don't have enough connected peers as where would the transactions/consensus come from?
if (Network.getInstance().getUniqueHandshakedPeers().size() < Settings.getInstance().getMinBlockchainPeers())
continue;
// Don't generate if it looks like we're behind
if (!Controller.getInstance().isUpToDate())
continue;
// Check blockchain hasn't changed // Check blockchain hasn't changed
BlockData lastBlockData = blockRepository.getLastBlock(); BlockData lastBlockData = blockRepository.getLastBlock();
if (previousBlock == null || !Arrays.equals(previousBlock.getSignature(), lastBlockData.getSignature())) { if (previousBlock == null || !Arrays.equals(previousBlock.getSignature(), lastBlockData.getSignature())) {
@ -83,10 +91,6 @@ public class BlockGenerator extends Thread {
newBlocks.clear(); newBlocks.clear();
} }
// Don't generate if we don't have enough connected peers as where would the transactions/consensus come from?
if (Network.getInstance().getUniqueHandshakedPeers().size() < Settings.getInstance().getMinBlockchainPeers())
continue;
// Do we need to build any potential new blocks? // Do we need to build any potential new blocks?
List<ForgingAccountData> forgingAccountsData = repository.getAccountRepository().getForgingAccounts(); List<ForgingAccountData> forgingAccountsData = repository.getAccountRepository().getForgingAccounts();
List<PrivateKeyAccount> forgingAccounts = forgingAccountsData.stream().map(accountData -> new PrivateKeyAccount(repository, accountData.getSeed())).collect(Collectors.toList()); List<PrivateKeyAccount> forgingAccounts = forgingAccountsData.stream().map(accountData -> new PrivateKeyAccount(repository, accountData.getSeed())).collect(Collectors.toList());
@ -108,7 +112,9 @@ public class BlockGenerator extends Thread {
// Make sure we're the only thread modifying the blockchain // Make sure we're the only thread modifying the blockchain
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock()) if (blockchainLock.tryLock()) {
boolean newBlockGenerated = false;
generation: try { generation: try {
// Clear repository's "in transaction" state so we don't cause a repository deadlock // Clear repository's "in transaction" state so we don't cause a repository deadlock
repository.discardChanges(); repository.discardChanges();
@ -160,7 +166,7 @@ public class BlockGenerator extends Thread {
repository.saveChanges(); repository.saveChanges();
// Notify controller // Notify controller
Controller.getInstance().onGeneratedBlock(newBlock.getBlockData()); newBlockGenerated = true;
} catch (DataException e) { } catch (DataException e) {
// Unable to process block - report and discard // Unable to process block - report and discard
LOGGER.error("Unable to process newly generated block?", e); LOGGER.error("Unable to process newly generated block?", e);
@ -169,6 +175,10 @@ public class BlockGenerator extends Thread {
} finally { } finally {
blockchainLock.unlock(); blockchainLock.unlock();
} }
if (newBlockGenerated)
Controller.getInstance().onGeneratedBlock();
}
} }
} catch (DataException e) { } catch (DataException e) {
LOGGER.warn("Repository issue while running block generator", e); LOGGER.warn("Repository issue while running block generator", e);

View File

@ -8,9 +8,11 @@ import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -36,6 +38,7 @@ import org.qora.network.message.GetPeersMessage;
import org.qora.network.message.GetSignaturesMessage; import org.qora.network.message.GetSignaturesMessage;
import org.qora.network.message.GetSignaturesV2Message; import org.qora.network.message.GetSignaturesV2Message;
import org.qora.network.message.HeightMessage; import org.qora.network.message.HeightMessage;
import org.qora.network.message.HeightV2Message;
import org.qora.network.message.Message; import org.qora.network.message.Message;
import org.qora.network.message.SignaturesMessage; import org.qora.network.message.SignaturesMessage;
import org.qora.network.message.TransactionMessage; import org.qora.network.message.TransactionMessage;
@ -136,6 +139,16 @@ public class Controller extends Thread {
} }
} }
/** Returns highest block, or null if there's a repository issue */
public BlockData getChainTip() {
try (final Repository repository = RepositoryManager.getRepository()) {
return repository.getBlockRepository().getLastBlock();
} catch (DataException e) {
LOGGER.error("Repository issue when fetching blockchain tip", e);
return null;
}
}
public ReentrantLock getBlockchainLock() { public ReentrantLock getBlockchainLock() {
return this.blockchainLock; return this.blockchainLock;
} }
@ -222,7 +235,7 @@ public class Controller extends Thread {
try { try {
while (!isStopping) { while (!isStopping) {
Thread.sleep(60 * 1000); Thread.sleep(14 * 1000);
potentiallySynchronize(); potentiallySynchronize();
@ -235,31 +248,24 @@ public class Controller extends Thread {
} }
private void potentiallySynchronize() { private void potentiallySynchronize() {
int ourHeight = getChainHeight();
if (ourHeight == 0)
return;
// If we have enough peers, potentially synchronize
List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers(); List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers();
// Check we have enough peers to potentially synchronize
if (peers.size() < Settings.getInstance().getMinBlockchainPeers()) if (peers.size() < Settings.getInstance().getMinBlockchainPeers())
return; return;
for(Peer peer : peers) for(Peer peer : peers)
LOGGER.trace(String.format("Peer %s is at height %d", peer, peer.getPeerData().getLastHeight())); LOGGER.trace(String.format("Peer %s is at height %d", peer, peer.getPeerData().getLastHeight()));
// Remove peers with unknown, or same, height // Remove peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature)
peers.removeIf(peer -> { peers.removeIf(hasShorterBlockchain());
Integer peerHeight = peer.getPeerData().getLastHeight();
return peerHeight == null;
});
// Remove peers that have "misbehaved" recently // Remove peers that have "misbehaved" recently
peers.removeIf(peer -> { peers.removeIf(hasPeerMisbehaved);
Long lastMisbehaved = peer.getPeerData().getLastMisbehaved();
return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF;
});
if (!peers.isEmpty()) { if (!peers.isEmpty()) {
int ourHeight = getChainHeight();
// Pick random peer to sync with // Pick random peer to sync with
int index = new SecureRandom().nextInt(peers.size()); int index = new SecureRandom().nextInt(peers.size());
Peer peer = peers.get(index); Peer peer = peers.get(index);
@ -294,14 +300,15 @@ public class Controller extends Thread {
break; break;
case OK: case OK:
LOGGER.debug(String.format("Synchronized with peer %s", peer)); case NOTHING_TO_DO:
LOGGER.debug(String.format("Synchronized with peer %s (%s)", peer, syncResult.name()));
break; break;
} }
// Broadcast our new height (if changed) // Broadcast our new height (if changed)
int updatedHeight = getChainHeight(); BlockData latestBlockData = getChainTip();
if (updatedHeight != ourHeight) if (latestBlockData.getHeight() != ourHeight)
Network.getInstance().broadcast(recipientPeer -> new HeightMessage(updatedHeight)); Network.getInstance().broadcast(recipientPeer -> Network.getInstance().buildHeightMessage(recipientPeer, latestBlockData));
} }
} }
@ -365,18 +372,24 @@ public class Controller extends Thread {
network.broadcast(peer -> network.buildPeersMessage(peer)); network.broadcast(peer -> network.buildPeersMessage(peer));
// Send our current height // Send our current height
network.broadcast(peer -> new HeightMessage(this.getChainHeight())); BlockData latestBlockData = getChainTip();
network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData));
// Request peers lists // Request peers lists
network.broadcast(peer -> new GetPeersMessage()); network.broadcast(peer -> new GetPeersMessage());
} }
public void onGeneratedBlock(BlockData newBlockData) { public void onGeneratedBlock() {
// XXX we should really be broadcasting the new block sig, not height // Broadcast our new height info
// Could even broadcast top two block sigs so that remote peers can see new block references current network-wide last block BlockData latestBlockData = getChainTip();
// Broadcast our new height Network network = Network.getInstance();
Network.getInstance().broadcast(peer -> new HeightMessage(newBlockData.getHeight())); network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData));
}
public void onNewTransaction(TransactionData transactionData) {
// Send round to all peers
Network.getInstance().broadcast(peer -> new TransactionMessage(transactionData));
} }
public void onNetworkMessage(Peer peer, Message message) { public void onNetworkMessage(Peer peer, Message message) {
@ -387,15 +400,44 @@ public class Controller extends Thread {
HeightMessage heightMessage = (HeightMessage) message; HeightMessage heightMessage = (HeightMessage) message;
// Update our record of peer's height // Update our record of peer's height
peer.getPeerData().setLastHeight(heightMessage.getHeight()); try (final Repository repository = RepositoryManager.getRepository()) {
PeerData peerData = peer.getPeerData();
peer.getPeerData().setLastHeight(heightMessage.getHeight());
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e);
}
break; break;
case GET_SIGNATURES: case HEIGHT_V2:
try (final Repository repository = RepositoryManager.getRepository()) { HeightV2Message heightV2Message = (HeightV2Message) message;
GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message;
byte[] parentSignature = getSignaturesMessage.getParentSignature();
// Update our record for peer's blockchain info
try (final Repository repository = RepositoryManager.getRepository()) {
PeerData peerData = peer.getPeerData();
peerData.setLastHeight(heightV2Message.getHeight());
peerData.setLastBlockSignature(heightV2Message.getSignature());
peerData.setLastBlockTimestamp(heightV2Message.getTimestamp());
peerData.setLastBlockGenerator(heightV2Message.getGenerator());
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e);
}
break;
case GET_SIGNATURES: {
GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message;
byte[] parentSignature = getSignaturesMessage.getParentSignature();
try (final Repository repository = RepositoryManager.getRepository()) {
List<byte[]> signatures = new ArrayList<>(); List<byte[]> signatures = new ArrayList<>();
do { do {
@ -411,17 +453,18 @@ public class Controller extends Thread {
Message signaturesMessage = new SignaturesMessage(signatures); Message signaturesMessage = new SignaturesMessage(signatures);
signaturesMessage.setId(message.getId()); signaturesMessage.setId(message.getId());
if (!peer.sendMessage(signaturesMessage)) if (!peer.sendMessage(signaturesMessage))
peer.disconnect(); peer.disconnect("failed to send signatures");
} catch (DataException e) { } catch (DataException e) {
LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); LOGGER.error(String.format("Repository issue while sending signatures after %s to peer %s", Base58.encode(parentSignature), peer), e);
} }
break; break;
}
case GET_SIGNATURES_V2: {
GetSignaturesV2Message getSignaturesMessage = (GetSignaturesV2Message) message;
byte[] parentSignature = getSignaturesMessage.getParentSignature();
case GET_SIGNATURES_V2:
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
GetSignaturesV2Message getSignaturesMessage = (GetSignaturesV2Message) message;
byte[] parentSignature = getSignaturesMessage.getParentSignature();
List<byte[]> signatures = new ArrayList<>(); List<byte[]> signatures = new ArrayList<>();
do { do {
@ -437,17 +480,18 @@ public class Controller extends Thread {
Message signaturesMessage = new SignaturesMessage(signatures); Message signaturesMessage = new SignaturesMessage(signatures);
signaturesMessage.setId(message.getId()); signaturesMessage.setId(message.getId());
if (!peer.sendMessage(signaturesMessage)) if (!peer.sendMessage(signaturesMessage))
peer.disconnect(); peer.disconnect("failed to send signatures (v2)");
} catch (DataException e) { } catch (DataException e) {
LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e);
} }
break; break;
}
case GET_BLOCK: case GET_BLOCK:
try (final Repository repository = RepositoryManager.getRepository()) { GetBlockMessage getBlockMessage = (GetBlockMessage) message;
GetBlockMessage getBlockMessage = (GetBlockMessage) message; byte[] signature = getBlockMessage.getSignature();
byte[] signature = getBlockMessage.getSignature();
try (final Repository repository = RepositoryManager.getRepository()) {
BlockData blockData = repository.getBlockRepository().fromSignature(signature); BlockData blockData = repository.getBlockRepository().fromSignature(signature);
if (blockData == null) { if (blockData == null) {
LOGGER.debug(String.format("Ignoring GET_BLOCK request from peer %s for unknown block %s", peer, Base58.encode(signature))); LOGGER.debug(String.format("Ignoring GET_BLOCK request from peer %s for unknown block %s", peer, Base58.encode(signature)));
@ -460,17 +504,17 @@ public class Controller extends Thread {
Message blockMessage = new BlockMessage(block); Message blockMessage = new BlockMessage(block);
blockMessage.setId(message.getId()); blockMessage.setId(message.getId());
if (!peer.sendMessage(blockMessage)) if (!peer.sendMessage(blockMessage))
peer.disconnect(); peer.disconnect("failed to send block");
} catch (DataException e) { } catch (DataException e) {
LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e);
} }
break; break;
case TRANSACTION: case TRANSACTION:
try (final Repository repository = RepositoryManager.getRepository()) { TransactionMessage transactionMessage = (TransactionMessage) message;
TransactionMessage transactionMessage = (TransactionMessage) message; TransactionData transactionData = transactionMessage.getTransactionData();
TransactionData transactionData = transactionMessage.getTransactionData(); try (final Repository repository = RepositoryManager.getRepository()) {
Transaction transaction = Transaction.fromData(repository, transactionData); Transaction transaction = Transaction.fromData(repository, transactionData);
// Check signature // Check signature
@ -479,33 +523,40 @@ public class Controller extends Thread {
break; break;
} }
// Do we have it already? // Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously
if (repository.getTransactionRepository().exists(transactionData.getSignature())) { ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
LOGGER.trace(String.format("Ignoring existing TRANSACTION %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); blockchainLock.lock();
break; try {
} // Do we have it already?
if (repository.getTransactionRepository().exists(transactionData.getSignature())) {
LOGGER.trace(String.format("Ignoring existing TRANSACTION %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
}
// Is it valid? // Is it valid?
ValidationResult validationResult = transaction.isValidUnconfirmed(); ValidationResult validationResult = transaction.isValidUnconfirmed();
if (validationResult != ValidationResult.OK) { if (validationResult != ValidationResult.OK) {
LOGGER.trace(String.format("Ignoring invalid (%s) TRANSACTION %s from peer %s", validationResult.name(), Base58.encode(transactionData.getSignature()), peer)); LOGGER.trace(String.format("Ignoring invalid (%s) TRANSACTION %s from peer %s", validationResult.name(), Base58.encode(transactionData.getSignature()), peer));
break; break;
} }
// Seems ok - add to unconfirmed pile // Seems ok - add to unconfirmed pile
repository.getTransactionRepository().save(transactionData); repository.getTransactionRepository().save(transactionData);
repository.getTransactionRepository().unconfirmTransaction(transactionData); repository.getTransactionRepository().unconfirmTransaction(transactionData);
repository.saveChanges(); repository.saveChanges();
} finally {
blockchainLock.unlock();
}
} catch (DataException e) { } catch (DataException e) {
LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e);
} }
break; break;
case GET_BLOCK_SUMMARIES: case GET_BLOCK_SUMMARIES:
try (final Repository repository = RepositoryManager.getRepository()) { GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
try (final Repository repository = RepositoryManager.getRepository()) {
List<BlockSummaryData> blockSummaries = new ArrayList<>(); List<BlockSummaryData> blockSummaries = new ArrayList<>();
int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested()); int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested());
@ -524,20 +575,67 @@ public class Controller extends Thread {
Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries); Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries);
blockSummariesMessage.setId(message.getId()); blockSummariesMessage.setId(message.getId());
if (!peer.sendMessage(blockSummariesMessage)) if (!peer.sendMessage(blockSummariesMessage))
peer.disconnect(); peer.disconnect("failed to send block summaries");
} catch (DataException e) { } catch (DataException e) {
LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e);
} }
break; break;
case GET_PEERS:
// Send our known peers
if (!peer.sendMessage(Network.getInstance().buildPeersMessage(peer)))
peer.disconnect("failed to send peers list");
break;
default: default:
LOGGER.debug(String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));
break; break;
} }
} }
public void onNewTransaction(TransactionData transactionData) { // Utilities
// Send round to all peers
Network.getInstance().broadcast(peer -> new TransactionMessage(transactionData)); public static final Predicate<Peer> hasPeerMisbehaved = peer -> {
Long lastMisbehaved = peer.getPeerData().getLastMisbehaved();
return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF;
};
/** True if peer has unknown height, lower height or same height and same block signature (unless we don't have their block signature). */
public static Predicate<Peer> hasShorterBlockchain() {
BlockData highestBlockData = getInstance().getChainTip();
int ourHeight = highestBlockData.getHeight();
return peer -> {
PeerData peerData = peer.getPeerData();
Integer peerHeight = peerData.getLastHeight();
if (peerHeight == null || peerHeight < ourHeight)
return true;
if (peerHeight > ourHeight || peerData.getLastBlockSignature() == null)
return false;
// Remove if signatures match
return Arrays.equals(peerData.getLastBlockSignature(), highestBlockData.getSignature());
};
}
/** Returns whether we think our node has up-to-date blockchain based on our height info about other peers. */
public boolean isUpToDate() {
List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers();
// Check we have enough peers to potentially synchronize/generator
if (peers.size() < Settings.getInstance().getMinBlockchainPeers())
return false;
// Remove peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature)
peers.removeIf(hasShorterBlockchain());
// Remove peers that have "misbehaved" recently
peers.removeIf(hasPeerMisbehaved);
// If we have any peers left, then they would be candidates for synchronization therefore we're not up to date.
return peers.isEmpty();
} }
} }

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.qora.block.Block; import org.qora.block.Block;
import org.qora.block.Block.ValidationResult; import org.qora.block.Block.ValidationResult;
import org.qora.block.BlockChain;
import org.qora.block.GenesisBlock; import org.qora.block.GenesisBlock;
import org.qora.data.block.BlockData; import org.qora.data.block.BlockData;
import org.qora.data.network.BlockSummaryData; import org.qora.data.network.BlockSummaryData;
@ -26,6 +27,7 @@ import org.qora.repository.DataException;
import org.qora.repository.Repository; import org.qora.repository.Repository;
import org.qora.repository.RepositoryManager; import org.qora.repository.RepositoryManager;
import org.qora.transaction.Transaction; import org.qora.transaction.Transaction;
import org.qora.utils.NTP;
public class Synchronizer { public class Synchronizer {
@ -35,6 +37,8 @@ public class Synchronizer {
private static final int MAXIMUM_BLOCK_STEP = 500; private static final int MAXIMUM_BLOCK_STEP = 500;
private static final int MAXIMUM_HEIGHT_DELTA = 300; // XXX move to blockchain config? private static final int MAXIMUM_HEIGHT_DELTA = 300; // XXX move to blockchain config?
private static final int MAXIMUM_COMMON_DELTA = 60; // XXX move to blockchain config? private static final int MAXIMUM_COMMON_DELTA = 60; // XXX move to blockchain config?
/** Maximum age for our latest block before we consider ditching our fork. */
private static final long MAXIMUM_TIP_AGE = BlockChain.getInstance().getMaxBlockTime() * 1000L * 10; // XXX move to blockchain config?
private static final int SYNC_BATCH_SIZE = 200; private static final int SYNC_BATCH_SIZE = 200;
private static Synchronizer instance; private static Synchronizer instance;
@ -42,7 +46,7 @@ public class Synchronizer {
private Repository repository; private Repository repository;
public enum SynchronizationResult { public enum SynchronizationResult {
OK, GENESIS_ONLY, NO_COMMON_BLOCK, TOO_FAR_BEHIND, TOO_DIVERGENT, NO_REPLY, INFERIOR_CHAIN, INVALID_DATA, NO_BLOCKCHAIN_LOCK, REPOSITORY_ISSUE; OK, NOTHING_TO_DO, GENESIS_ONLY, NO_COMMON_BLOCK, TOO_FAR_BEHIND, TOO_DIVERGENT, NO_REPLY, INFERIOR_CHAIN, INVALID_DATA, NO_BLOCKCHAIN_LOCK, REPOSITORY_ISSUE;
} }
// Constructors // Constructors
@ -75,14 +79,16 @@ public class Synchronizer {
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
try { try {
this.repository = repository; this.repository = repository;
final int ourInitialHeight = this.repository.getBlockRepository().getBlockchainHeight(); final BlockData ourLatestBlockData = this.repository.getBlockRepository().getLastBlock();
final int ourInitialHeight = ourLatestBlockData.getHeight();
int ourHeight = ourInitialHeight; int ourHeight = ourInitialHeight;
final int peerHeight = peer.getPeerData().getLastHeight(); int peerHeight = peer.getPeerData().getLastHeight();
// If peer is at genesis block then peer has no blocks so ignore them for a while // If peer is at genesis block then peer has no blocks so ignore them for a while
if (peerHeight == 1) if (peerHeight == 1)
return SynchronizationResult.GENESIS_ONLY; return SynchronizationResult.GENESIS_ONLY;
// XXX this may well be obsolete now
// If peer is too far behind us then don't them. // If peer is too far behind us then don't them.
int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA; int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA;
if (peerHeight < minHeight) { if (peerHeight < minHeight) {
@ -90,7 +96,12 @@ public class Synchronizer {
return SynchronizationResult.TOO_FAR_BEHIND; return SynchronizationResult.TOO_FAR_BEHIND;
} }
LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, ourHeight)); byte[] peersLastBlockSignature = peer.getPeerData().getLastBlockSignature();
byte[] ourLastBlockSignature = ourLatestBlockData.getSignature();
if (peerHeight == ourHeight && (peersLastBlockSignature == null || !Arrays.equals(peersLastBlockSignature, ourLastBlockSignature)))
LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d, signatures differ", peer, peerHeight, ourHeight));
else
LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, ourHeight));
List<byte[]> signatures = findSignaturesFromCommonBlock(peer, ourHeight); List<byte[]> signatures = findSignaturesFromCommonBlock(peer, ourHeight);
if (signatures == null) { if (signatures == null) {
@ -104,13 +115,22 @@ public class Synchronizer {
LOGGER.debug(String.format("Common block with peer %s is at height %d", peer, commonBlockHeight)); LOGGER.debug(String.format("Common block with peer %s is at height %d", peer, commonBlockHeight));
signatures.remove(0); signatures.remove(0);
// If common block height is higher than peer's last reported height
// then peer must have a very recent sync. Update our idea of peer's height.
if (commonBlockHeight > peerHeight) {
LOGGER.debug(String.format("Peer height %d was lower than common block height %d - using higher value", peerHeight, commonBlockHeight));
peerHeight = commonBlockHeight;
}
// XXX This may well be obsolete now
// If common block is peer's latest block then we simply have the same, or longer, chain to peer, so exit now // If common block is peer's latest block then we simply have the same, or longer, chain to peer, so exit now
if (commonBlockHeight == peerHeight) { if (commonBlockHeight == peerHeight) {
if (peerHeight == ourHeight) if (peerHeight == ourHeight)
LOGGER.info(String.format("We have the same blockchain as peer %s", peer)); LOGGER.info(String.format("We have the same blockchain as peer %s", peer));
else else
LOGGER.info(String.format("We have the same blockchain as peer %s, but longer", peer)); LOGGER.info(String.format("We have the same blockchain as peer %s, but longer", peer));
return SynchronizationResult.OK;
return SynchronizationResult.NOTHING_TO_DO;
} }
// If common block is too far behind us then we're on massively different forks so give up. // If common block is too far behind us then we're on massively different forks so give up.
@ -121,7 +141,15 @@ public class Synchronizer {
} }
// If we have blocks after common block then decide whether we want to sync (lowest block signature wins) // If we have blocks after common block then decide whether we want to sync (lowest block signature wins)
for (int height = commonBlockHeight + 1; height <= peerHeight && height <= ourHeight; ++height) { int highestMutualHeight = Math.min(peerHeight, ourHeight);
// If our latest block is very old, we're very behind and should ditch our fork.
if (ourLatestBlockData.getTimestamp() < NTP.getTime() - MAXIMUM_TIP_AGE) {
LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight));
highestMutualHeight = commonBlockHeight;
}
for (int height = commonBlockHeight + 1; height <= highestMutualHeight; ++height) {
int sigIndex = height - commonBlockHeight - 1; int sigIndex = height - commonBlockHeight - 1;
// Do we need more signatures? // Do we need more signatures?
@ -268,7 +296,7 @@ public class Synchronizer {
testSignature = testBlockData.getSignature(); testSignature = testBlockData.getSignature();
// Ask for block signatures since test block's signature // Ask for block signatures since test block's signature
LOGGER.trace(String.format("Requesting %d signature%s after our height %d", step, (step != 1 ? "s": ""), testHeight)); LOGGER.trace(String.format("Requesting %d signature%s after height %d", step, (step != 1 ? "s": ""), testHeight));
blockSignatures = this.getBlockSignatures(peer, testSignature, step); blockSignatures = this.getBlockSignatures(peer, testSignature, step);
if (blockSignatures == null) if (blockSignatures == null)

View File

@ -22,6 +22,9 @@ public class PeerData {
private Long lastAttempted; private Long lastAttempted;
private Long lastConnected; private Long lastConnected;
private Integer lastHeight; private Integer lastHeight;
private byte[] lastBlockSignature;
private Long lastBlockTimestamp;
private byte[] lastBlockGenerator;
private Long lastMisbehaved; private Long lastMisbehaved;
// Constructors // Constructors
@ -30,16 +33,19 @@ public class PeerData {
protected PeerData() { protected PeerData() {
} }
public PeerData(PeerAddress peerAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, Long lastMisbehaved) { public PeerData(PeerAddress peerAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, byte[] lastBlockSignature, Long lastBlockTimestamp, byte[] lastBlockGenerator, Long lastMisbehaved) {
this.peerAddress = peerAddress; this.peerAddress = peerAddress;
this.lastAttempted = lastAttempted; this.lastAttempted = lastAttempted;
this.lastConnected = lastConnected; this.lastConnected = lastConnected;
this.lastHeight = lastHeight; this.lastHeight = lastHeight;
this.lastBlockSignature = lastBlockSignature;
this.lastBlockTimestamp = lastBlockTimestamp;
this.lastBlockGenerator = lastBlockGenerator;
this.lastMisbehaved = lastMisbehaved; this.lastMisbehaved = lastMisbehaved;
} }
public PeerData(PeerAddress peerAddress) { public PeerData(PeerAddress peerAddress) {
this(peerAddress, null, null, null, null); this(peerAddress, null, null, null, null, null, null, null);
} }
// Getters / setters // Getters / setters
@ -75,6 +81,30 @@ public class PeerData {
this.lastHeight = lastHeight; this.lastHeight = lastHeight;
} }
public byte[] getLastBlockSignature() {
return lastBlockSignature;
}
public void setLastBlockSignature(byte[] lastBlockSignature) {
this.lastBlockSignature = lastBlockSignature;
}
public Long getLastBlockTimestamp() {
return lastBlockTimestamp;
}
public void setLastBlockTimestamp(Long lastBlockTimestamp) {
this.lastBlockTimestamp = lastBlockTimestamp;
}
public byte[] getLastBlockGenerator() {
return lastBlockGenerator;
}
public void setLastBlockGenerator(byte[] lastBlockGenerator) {
this.lastBlockGenerator = lastBlockGenerator;
}
public Long getLastMisbehaved() { public Long getLastMisbehaved() {
return this.lastMisbehaved; return this.lastMisbehaved;
} }

View File

@ -152,7 +152,7 @@ public enum Handshake {
// Drop other inbound peers with the same ID // Drop other inbound peers with the same ID
for (Peer otherPeer : Network.getInstance().getConnectedPeers()) for (Peer otherPeer : Network.getInstance().getConnectedPeers())
if (!otherPeer.isOutbound() && otherPeer.getPeerId() != null && Arrays.equals(otherPeer.getPeerId(), peer.getPendingPeerId())) if (!otherPeer.isOutbound() && otherPeer.getPeerId() != null && Arrays.equals(otherPeer.getPeerId(), peer.getPendingPeerId()))
otherPeer.disconnect(); otherPeer.disconnect("doppelganger");
// Tidy up // Tidy up
peer.setVerificationCodes(null, null); peer.setVerificationCodes(null, null);
@ -191,13 +191,13 @@ public enum Handshake {
Message versionMessage = new VersionMessage(buildTimestamp, versionString); Message versionMessage = new VersionMessage(buildTimestamp, versionString);
if (!peer.sendMessage(versionMessage)) if (!peer.sendMessage(versionMessage))
peer.disconnect(); peer.disconnect("failed to send version");
} }
private static void sendMyId(Peer peer) { private static void sendMyId(Peer peer) {
Message peerIdMessage = new PeerIdMessage(Network.getInstance().getOurPeerId()); Message peerIdMessage = new PeerIdMessage(Network.getInstance().getOurPeerId());
if (!peer.sendMessage(peerIdMessage)) if (!peer.sendMessage(peerIdMessage))
peer.disconnect(); peer.disconnect("failed to send peer ID");
} }
private static void sendProof(Peer peer) { private static void sendProof(Peer peer) {
@ -208,7 +208,7 @@ public enum Handshake {
// For incoming connections we only need to send a fake proof message as confirmation // For incoming connections we only need to send a fake proof message as confirmation
Message proofMessage = new ProofMessage(peer.getConnectionTimestamp(), 0, 0); Message proofMessage = new ProofMessage(peer.getConnectionTimestamp(), 0, 0);
if (!peer.sendMessage(proofMessage)) if (!peer.sendMessage(proofMessage))
peer.disconnect(); peer.disconnect("failed to send proof");
} }
} }
@ -218,14 +218,14 @@ public enum Handshake {
// Send VERIFICATION_CODES to peer // Send VERIFICATION_CODES to peer
Message verificationCodesMessage = new VerificationCodesMessage(peer.getVerificationCodeSent(), peer.getVerificationCodeExpected()); Message verificationCodesMessage = new VerificationCodesMessage(peer.getVerificationCodeSent(), peer.getVerificationCodeExpected());
if (!otherOutboundPeer.sendMessage(verificationCodesMessage)) { if (!otherOutboundPeer.sendMessage(verificationCodesMessage)) {
peer.disconnect(); // give up with this peer instead peer.disconnect("failed to send verification codes"); // give up with this peer instead
return; return;
} }
// Send PEER_VERIFY to peer // Send PEER_VERIFY to peer
Message peerVerifyMessage = new PeerVerifyMessage(peer.getVerificationCodeSent()); Message peerVerifyMessage = new PeerVerifyMessage(peer.getVerificationCodeSent());
if (!peer.sendMessage(peerVerifyMessage)) if (!peer.sendMessage(peerVerifyMessage))
peer.disconnect(); peer.disconnect("failed to send verification code");
} }
} }

View File

@ -25,10 +25,12 @@ import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.qora.controller.Controller; import org.qora.controller.Controller;
import org.qora.data.block.BlockData;
import org.qora.data.network.PeerData; import org.qora.data.network.PeerData;
import org.qora.data.transaction.TransactionData; import org.qora.data.transaction.TransactionData;
import org.qora.network.message.GetPeersMessage; import org.qora.network.message.GetPeersMessage;
import org.qora.network.message.HeightMessage; import org.qora.network.message.HeightMessage;
import org.qora.network.message.HeightV2Message;
import org.qora.network.message.Message; import org.qora.network.message.Message;
import org.qora.network.message.Message.MessageType; import org.qora.network.message.Message.MessageType;
import org.qora.network.message.PeerVerifyMessage; import org.qora.network.message.PeerVerifyMessage;
@ -316,6 +318,7 @@ public class Network extends Thread {
newPeer = new Peer(peerData); newPeer = new Peer(peerData);
// Update connection attempt info // Update connection attempt info
repository.discardChanges();
peerData.setLastAttempted(NTP.getTime()); peerData.setLastAttempted(NTP.getTime());
repository.getNetworkRepository().save(peerData); repository.getNetworkRepository().save(peerData);
repository.saveChanges(); repository.saveChanges();
@ -360,7 +363,7 @@ public class Network extends Thread {
return; return;
LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType));
peer.disconnect(); peer.disconnect("unexpected message");
return; return;
} }
@ -369,7 +372,7 @@ public class Network extends Thread {
if (newHandshakeStatus == null) { if (newHandshakeStatus == null) {
// Handshake failure // Handshake failure
LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name()));
peer.disconnect(); peer.disconnect("handshake failure");
return; return;
} }
@ -410,7 +413,7 @@ public class Network extends Thread {
case PEER_ID: case PEER_ID:
case PROOF: case PROOF:
LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer)); LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer));
peer.disconnect(); peer.disconnect("unexpected handshaking message");
return; return;
case PING: case PING:
@ -421,7 +424,7 @@ public class Network extends Thread {
pongMessage.setId(pingMessage.getId()); pongMessage.setId(pingMessage.getId());
if (!peer.sendMessage(pongMessage)) if (!peer.sendMessage(pongMessage))
peer.disconnect(); peer.disconnect("failed to send ping reply");
break; break;
@ -471,7 +474,7 @@ public class Network extends Thread {
PeerVerifyMessage peerVerifyMessage = new PeerVerifyMessage(peer.getVerificationCodeExpected()); PeerVerifyMessage peerVerifyMessage = new PeerVerifyMessage(peer.getVerificationCodeExpected());
if (!peer.sendMessage(peerVerifyMessage)) { if (!peer.sendMessage(peerVerifyMessage)) {
peer.disconnect(); peer.disconnect("failed to send verification code");
return; return;
} }
@ -481,13 +484,15 @@ public class Network extends Thread {
} }
private void onHandshakeCompleted(Peer peer) { private void onHandshakeCompleted(Peer peer) {
// Do we need extra handshaking because of peer dopplegangers? // Do we need extra handshaking because of peer doppelgangers?
if (peer.getPendingPeerId() != null) { if (peer.getPendingPeerId() != null) {
peer.setHandshakeStatus(Handshake.PEER_VERIFY); peer.setHandshakeStatus(Handshake.PEER_VERIFY);
peer.getHandshakeStatus().action(peer); peer.getHandshakeStatus().action(peer);
return; return;
} }
LOGGER.debug(String.format("Handshake completed with peer %s", peer));
// Make a note that we've successfully completed handshake (and when) // Make a note that we've successfully completed handshake (and when)
peer.getPeerData().setLastConnected(NTP.getTime()); peer.getPeerData().setLastConnected(NTP.getTime());
@ -495,16 +500,16 @@ public class Network extends Thread {
peer.startPings(); peer.startPings();
// Send our height // Send our height
Message heightMessage = new HeightMessage(Controller.getInstance().getChainHeight()); Message heightMessage = buildHeightMessage(peer, Controller.getInstance().getChainTip());
if (!peer.sendMessage(heightMessage)) { if (!peer.sendMessage(heightMessage)) {
peer.disconnect(); peer.disconnect("failed to send height/info");
return; return;
} }
// Send our peers list // Send our peers list
Message peersMessage = this.buildPeersMessage(peer); Message peersMessage = this.buildPeersMessage(peer);
if (!peer.sendMessage(peersMessage)) if (!peer.sendMessage(peersMessage))
peer.disconnect(); peer.disconnect("failed to send peers list");
// Send our unconfirmed transactions // Send our unconfirmed transactions
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
@ -513,7 +518,7 @@ public class Network extends Thread {
for (TransactionData transactionData : transactions) { for (TransactionData transactionData : transactions) {
Message transactionMessage = new TransactionMessage(transactionData); Message transactionMessage = new TransactionMessage(transactionData);
if (!peer.sendMessage(transactionMessage)) { if (!peer.sendMessage(transactionMessage)) {
peer.disconnect(); peer.disconnect("failed to send unconfirmed transaction");
return; return;
} }
} }
@ -524,7 +529,7 @@ public class Network extends Thread {
// Request their peers list // Request their peers list
Message getPeersMessage = new GetPeersMessage(); Message getPeersMessage = new GetPeersMessage();
if (!peer.sendMessage(getPeersMessage)) if (!peer.sendMessage(getPeersMessage))
peer.disconnect(); peer.disconnect("failed to request peers list");
} }
/** Returns PEERS message made from peers we've connected to recently, and this node's details */ /** Returns PEERS message made from peers we've connected to recently, and this node's details */
@ -588,6 +593,16 @@ public class Network extends Thread {
} }
} }
public Message buildHeightMessage(Peer peer, BlockData blockData) {
if (peer.getVersion() < 2) {
// Legacy height message
return new HeightMessage(blockData.getHeight());
}
// HEIGHT_V2 contains way more useful info
return new HeightV2Message(blockData.getHeight(), blockData.getSignature(), blockData.getTimestamp(), blockData.getGeneratorPublicKey());
}
// Network-wide calls // Network-wide calls
/** Returns list of connected peers that have completed handshaking. */ /** Returns list of connected peers that have completed handshaking. */
@ -712,7 +727,7 @@ public class Network extends Thread {
public void run() { public void run() {
for (Peer peer : targetPeers) for (Peer peer : targetPeers)
if (!peer.sendMessage(peerMessage.apply(peer))) if (!peer.sendMessage(peerMessage.apply(peer)))
peer.disconnect(); peer.disconnect("failed to broadcast message");
} }
} }

View File

@ -12,6 +12,7 @@ import java.security.SecureRandom;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -23,6 +24,7 @@ import org.apache.logging.log4j.Logger;
import org.qora.controller.Controller; import org.qora.controller.Controller;
import org.qora.data.network.PeerData; import org.qora.data.network.PeerData;
import org.qora.network.message.Message; import org.qora.network.message.Message;
import org.qora.network.message.Message.MessageException;
import org.qora.network.message.Message.MessageType; import org.qora.network.message.Message.MessageType;
import org.qora.settings.Settings; import org.qora.settings.Settings;
import org.qora.network.message.PingMessage; import org.qora.network.message.PingMessage;
@ -236,10 +238,14 @@ public class Peer implements Runnable {
Network.getInstance().onMessage(this, message); Network.getInstance().onMessage(this, message);
} }
} }
} catch (MessageException e) {
LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this));
this.disconnect(e.getMessage());
} catch (SocketTimeoutException e) {
this.disconnect("timeout");
} catch (IOException e) { } catch (IOException e) {
// Fall-through this.disconnect("I/O error");
} finally { } finally {
this.disconnect();
Thread.currentThread().setName("disconnected peer"); Thread.currentThread().setName("disconnected peer");
} }
} }
@ -262,6 +268,8 @@ public class Peer implements Runnable {
this.out.write(message.toBytes()); this.out.write(message.toBytes());
this.out.flush(); this.out.flush();
} }
} catch (MessageException e) {
LOGGER.warn(String.format("Failed to send %s message with ID %d to peer %s: %s", message.getType().name(), message.getId(), this, e.getMessage()));
} catch (IOException e) { } catch (IOException e) {
// Send failure // Send failure
return false; return false;
@ -329,23 +337,24 @@ public class Peer implements Runnable {
long after = System.currentTimeMillis(); long after = System.currentTimeMillis();
if (message == null || message.getType() != MessageType.PING) if (message == null || message.getType() != MessageType.PING)
peer.disconnect(); peer.disconnect("no ping received");
peer.setLastPing(after - before); peer.setLastPing(after - before);
} }
} }
;
this.executor.scheduleWithFixedDelay(new Pinger(this), 0, PING_INTERVAL, TimeUnit.MILLISECONDS); Random random = new Random();
long initialDelay = random.nextInt(PING_INTERVAL);
this.executor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS);
} }
public void disconnect() { public void disconnect(String reason) {
// Shut down pinger // Shut down pinger
this.executor.shutdownNow(); this.executor.shutdownNow();
// Close socket // Close socket
if (!this.socket.isClosed()) { if (!this.socket.isClosed()) {
LOGGER.debug(String.format("Disconnecting peer %s", this)); LOGGER.debug(String.format("Disconnecting peer %s: %s", this, reason));
try { try {
this.socket.close(); this.socket.close();

View File

@ -0,0 +1,83 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.qora.transform.Transformer;
import org.qora.transform.block.BlockTransformer;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
public class HeightV2Message extends Message {
private int height;
private byte[] signature;
private long timestamp;
private byte[] generator;
public HeightV2Message(int height, byte[] signature, long timestamp, byte[] generator) {
this(-1, height, signature, timestamp, generator);
}
private HeightV2Message(int id, int height, byte[] signature, long timestamp, byte[] generator) {
super(id, MessageType.HEIGHT_V2);
this.height = height;
this.signature = signature;
this.timestamp = timestamp;
this.generator = generator;
}
public int getHeight() {
return this.height;
}
public byte[] getSignature() {
return this.signature;
}
public long getTimestamp() {
return this.timestamp;
}
public byte[] getGenerator() {
return this.generator;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int height = bytes.getInt();
byte[] signature = new byte[BlockTransformer.BLOCK_SIGNATURE_LENGTH];
bytes.get(signature);
long timestamp = bytes.getLong();
byte[] generator = new byte[Transformer.PUBLIC_KEY_LENGTH];
bytes.get(generator);
return new HeightV2Message(id, height, signature, timestamp, generator);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.height));
bytes.write(this.signature);
bytes.write(Longs.toByteArray(this.timestamp));
bytes.write(this.generator);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -27,6 +27,24 @@ public abstract class Message {
private static final int MAX_DATA_SIZE = 1024 * 1024; // 1MB private static final int MAX_DATA_SIZE = 1024 * 1024; // 1MB
@SuppressWarnings("serial")
public static class MessageException extends Exception {
public MessageException() {
}
public MessageException(String message) {
super(message);
}
public MessageException(String message, Throwable cause) {
super(message, cause);
}
public MessageException(Throwable cause) {
super(cause);
}
}
public enum MessageType { public enum MessageType {
GET_PEERS(1), GET_PEERS(1),
PEERS(2), PEERS(2),
@ -45,7 +63,8 @@ public abstract class Message {
BLOCK_SUMMARIES(15), BLOCK_SUMMARIES(15),
GET_SIGNATURES_V2(16), GET_SIGNATURES_V2(16),
PEER_VERIFY(17), PEER_VERIFY(17),
VERIFICATION_CODES(18); VERIFICATION_CODES(18),
HEIGHT_V2(19);
public final int value; public final int value;
public final Method fromByteBuffer; public final Method fromByteBuffer;
@ -119,7 +138,7 @@ public abstract class Message {
return this.type; return this.type;
} }
public static Message fromStream(DataInputStream in) throws SocketTimeoutException { public static Message fromStream(DataInputStream in) throws MessageException, IOException {
try { try {
// Read only enough bytes to cover Message "magic" preamble // Read only enough bytes to cover Message "magic" preamble
byte[] messageMagic = new byte[MAGIC_LENGTH]; byte[] messageMagic = new byte[MAGIC_LENGTH];
@ -127,13 +146,13 @@ public abstract class Message {
if (!Arrays.equals(messageMagic, Controller.getInstance().getMessageMagic())) if (!Arrays.equals(messageMagic, Controller.getInstance().getMessageMagic()))
// Didn't receive correct Message "magic" // Didn't receive correct Message "magic"
return null; throw new MessageException("Received incorrect message 'magic'");
int typeValue = in.readInt(); int typeValue = in.readInt();
MessageType messageType = MessageType.valueOf(typeValue); MessageType messageType = MessageType.valueOf(typeValue);
if (messageType == null) if (messageType == null)
// Unrecognised message type // Unrecognised message type
return null; throw new MessageException(String.format("Received unknown message type [%d]", typeValue));
// Find supporting object // Find supporting object
@ -144,14 +163,14 @@ public abstract class Message {
if (id <= 0) if (id <= 0)
// Invalid ID // Invalid ID
return null; throw new MessageException("Invalid negative ID");
} }
int dataSize = in.readInt(); int dataSize = in.readInt();
if (dataSize > MAX_DATA_SIZE) if (dataSize > MAX_DATA_SIZE)
// Too large // Too large
return null; throw new MessageException(String.format("Declared data length %d larger than max allowed %d", dataSize, MAX_DATA_SIZE));
byte[] data = null; byte[] data = null;
if (dataSize > 0) { if (dataSize > 0) {
@ -164,14 +183,14 @@ public abstract class Message {
// Test checksum // Test checksum
byte[] actualChecksum = generateChecksum(data); byte[] actualChecksum = generateChecksum(data);
if (!Arrays.equals(expectedChecksum, actualChecksum)) if (!Arrays.equals(expectedChecksum, actualChecksum))
return null; throw new MessageException("Message checksum incorrect");
} }
return messageType.fromBytes(id, data); return messageType.fromBytes(id, data);
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
throw e; throw e;
} catch (IOException e) { } catch (IOException e) {
return null; throw e;
} }
} }
@ -179,7 +198,7 @@ public abstract class Message {
return Arrays.copyOfRange(Crypto.digest(data), 0, CHECKSUM_LENGTH); return Arrays.copyOfRange(Crypto.digest(data), 0, CHECKSUM_LENGTH);
} }
public byte[] toBytes() { public byte[] toBytes() throws MessageException {
try { try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(); ByteArrayOutputStream bytes = new ByteArrayOutputStream();
@ -198,7 +217,7 @@ public abstract class Message {
byte[] data = this.toData(); byte[] data = this.toData();
if (data == null) if (data == null)
return null; throw new MessageException("Missing data payload");
bytes.write(Ints.toByteArray(data.length)); bytes.write(Ints.toByteArray(data.length));
@ -207,9 +226,12 @@ public abstract class Message {
bytes.write(data); bytes.write(data);
} }
if (bytes.size() > MAX_DATA_SIZE)
throw new MessageException(String.format("About to send message with length %d larger than allowed %d", bytes.size(), MAX_DATA_SIZE));
return bytes.toByteArray(); return bytes.toByteArray();
} catch (IOException e) { } catch (IOException e) {
return null; throw new MessageException("Failed to serialize message", e);
} }
} }

View File

@ -730,6 +730,13 @@ public class HSQLDBDatabaseUpdates {
stmt.execute("CREATE INDEX TransactionParticipantsAddressIndex on TransactionParticipants (participant)"); stmt.execute("CREATE INDEX TransactionParticipantsAddressIndex on TransactionParticipants (participant)");
break; break;
case 49:
// Additional peer information
stmt.execute("ALTER TABLE Peers ADD COLUMN last_block_signature BlockSignature BEFORE last_misbehaved");
stmt.execute("ALTER TABLE Peers ADD COLUMN last_block_timestamp TIMESTAMP WITH TIME ZONE BEFORE last_misbehaved");
stmt.execute("ALTER TABLE Peers ADD COLUMN last_block_generator QoraPublicKey BEFORE last_misbehaved");
break;
default: default:
// nothing to do // nothing to do
return false; return false;

View File

@ -2,9 +2,7 @@ package org.qora.repository.hsqldb;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar;
import java.util.List; import java.util.List;
import org.qora.data.network.PeerData; import org.qora.data.network.PeerData;
@ -22,9 +20,11 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
@Override @Override
public List<PeerData> getAllPeers() throws DataException { public List<PeerData> getAllPeers() throws DataException {
String sql = "SELECT address, last_connected, last_attempted, last_height, last_block_signature, last_block_timestamp, last_block_generator, last_misbehaved FROM Peers";
List<PeerData> peers = new ArrayList<>(); List<PeerData> peers = new ArrayList<>();
try (ResultSet resultSet = this.repository.checkedExecute("SELECT address, last_connected, last_attempted, last_height, last_misbehaved FROM Peers")) { try (ResultSet resultSet = this.repository.checkedExecute(sql)) {
if (resultSet == null) if (resultSet == null)
return peers; return peers;
@ -33,20 +33,23 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
String address = resultSet.getString(1); String address = resultSet.getString(1);
PeerAddress peerAddress = PeerAddress.fromString(address); PeerAddress peerAddress = PeerAddress.fromString(address);
Timestamp lastConnectedTimestamp = resultSet.getTimestamp(2, Calendar.getInstance(HSQLDBRepository.UTC)); Long lastConnected = HSQLDBRepository.getZonedTimestampMilli(resultSet, 2);
Long lastConnected = resultSet.wasNull() ? null : lastConnectedTimestamp.getTime();
Timestamp lastAttemptedTimestamp = resultSet.getTimestamp(3, Calendar.getInstance(HSQLDBRepository.UTC)); Long lastAttempted = HSQLDBRepository.getZonedTimestampMilli(resultSet, 3);
Long lastAttempted = resultSet.wasNull() ? null : lastAttemptedTimestamp.getTime();
Integer lastHeight = resultSet.getInt(4); Integer lastHeight = resultSet.getInt(4);
if (resultSet.wasNull()) if (resultSet.wasNull())
lastHeight = null; lastHeight = null;
Timestamp lastMisbehavedTimestamp = resultSet.getTimestamp(5, Calendar.getInstance(HSQLDBRepository.UTC)); byte[] lastBlockSignature = resultSet.getBytes(5);
Long lastMisbehaved = resultSet.wasNull() ? null : lastMisbehavedTimestamp.getTime();
peers.add(new PeerData(peerAddress, lastAttempted, lastConnected, lastHeight, lastMisbehaved)); Long lastBlockTimestamp = HSQLDBRepository.getZonedTimestampMilli(resultSet, 6);
byte[] lastBlockGenerator = resultSet.getBytes(7);
Long lastMisbehaved = HSQLDBRepository.getZonedTimestampMilli(resultSet, 8);
peers.add(new PeerData(peerAddress, lastAttempted, lastConnected, lastHeight, lastBlockSignature, lastBlockTimestamp, lastBlockGenerator, lastMisbehaved));
} while (resultSet.next()); } while (resultSet.next());
return peers; return peers;
@ -61,12 +64,12 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
public void save(PeerData peerData) throws DataException { public void save(PeerData peerData) throws DataException {
HSQLDBSaver saveHelper = new HSQLDBSaver("Peers"); HSQLDBSaver saveHelper = new HSQLDBSaver("Peers");
Timestamp lastConnected = peerData.getLastConnected() == null ? null : new Timestamp(peerData.getLastConnected()); saveHelper.bind("address", peerData.getAddress().toString()).bind("last_connected", HSQLDBRepository.toOffsetDateTime(peerData.getLastConnected()))
Timestamp lastAttempted = peerData.getLastAttempted() == null ? null : new Timestamp(peerData.getLastAttempted()); .bind("last_attempted", HSQLDBRepository.toOffsetDateTime(peerData.getLastAttempted()))
Timestamp lastMisbehaved = peerData.getLastMisbehaved() == null ? null : new Timestamp(peerData.getLastMisbehaved()); .bind("last_height", peerData.getLastHeight()).bind("last_block_signature", peerData.getLastBlockSignature())
.bind("last_block_timestamp", HSQLDBRepository.toOffsetDateTime(peerData.getLastBlockTimestamp()))
saveHelper.bind("address", peerData.getAddress().toString()).bind("last_connected", lastConnected).bind("last_attempted", lastAttempted) .bind("last_block_generator", peerData.getLastBlockGenerator())
.bind("last_height", peerData.getLastHeight()).bind("last_misbehaved", lastMisbehaved); .bind("last_misbehaved", HSQLDBRepository.toOffsetDateTime(peerData.getLastMisbehaved()));
try { try {
saveHelper.execute(this.repository); saveHelper.execute(this.repository);
@ -92,4 +95,5 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
throw new DataException("Unable to delete peers from repository", e); throw new DataException("Unable to delete peers from repository", e);
} }
} }
} }