Compare commits

...

38 Commits

Author SHA1 Message Date
CalDescent
a48a9592d0 Merge branch 'master' into sync-multiple-blocks
# Conflicts:
#	src/main/java/org/qortal/settings/Settings.java
2021-06-20 08:16:29 +01:00
CalDescent
904be3005f Enable fast sync by default. 2021-06-12 13:07:25 +01:00
CalDescent
95eaf4c887 Merge branch 'master' into sync-multiple-blocks 2021-06-12 11:15:28 +01:00
CalDescent
bc6b3fb5f4 Include timestamps in block-timings.sh 2021-06-09 13:04:49 +01:00
CalDescent
df47f5d47b Merge branch 'master' into sync-multiple-blocks 2021-06-06 10:35:47 +01:00
CalDescent
c63a7884cb Limit to 10 untrimmed blocks per response, as they are larger than the trimmed ones. 2021-06-02 09:10:25 +01:00
CalDescent
cffbd41f26 Reduce memory allocations in onNetworkGetBlocksMessage 2021-06-02 09:09:06 +01:00
CalDescent
2ceba45782 Fast sync default blocks per request increased to 100. 2021-05-30 14:57:58 +01:00
CalDescent
ed423ed041 Increased MAX_DATA_SIZE and SYNC_BATCH_SIZE, to increase the effectiveness of the batch sync. 2021-05-30 14:54:13 +01:00
CalDescent
f58a52eaa4 Further work to increase the response timeout when requesting multiple blocks. 2021-05-30 13:10:38 +01:00
CalDescent
688404011b Relocate FETCH_BLOCKS_TIMEOUT to Peer.java and use a static import. 2021-05-30 10:08:50 +01:00
CalDescent
8881e0fb75 Merge branch 'master' into sync-multiple-blocks 2021-05-30 09:57:56 +01:00
CalDescent
61de7e144e Merge branch 'networking' into sync-multiple-blocks
# Conflicts:
#	src/main/java/org/qortal/network/Peer.java
2021-05-30 09:57:35 +01:00
CalDescent
c3ff9e49e8 Merge pull request #40 from szisti/fixedNetwork
Support for configuration based fixed network
2021-05-29 09:51:56 +01:00
Istvan Szabo
d52875aa8f Added logs to intentional disconnects 2021-05-28 16:06:27 +01:00
Istvan Szabo
9027cd290c Filter out on demand connections when using fixed network 2021-05-28 14:47:30 +01:00
Istvan Szabo
58a7203ede Support for configuration based fixed network 2021-05-28 14:47:30 +01:00
CalDescent
7f5486dade Merge branch 'master' into sync-multiple-blocks 2021-05-25 07:37:01 +01:00
CalDescent
27aeb4f05f Merge branch 'master' into sync-multiple-blocks
# Conflicts:
#	src/main/java/org/qortal/controller/Synchronizer.java
2021-05-16 11:21:34 +01:00
CalDescent
255233fe38 Reduced log spam. 2021-05-10 09:10:51 +01:00
CalDescent
4ac3984b7c Merge branch 'master' into sync-multiple-blocks
# Conflicts:
#	src/main/java/org/qortal/settings/Settings.java
2021-05-10 09:09:41 +01:00
CalDescent
428af3c0e8 Only use fast sync on trimmed blocks, as others are too large.
This could probably be improved to make sure that all blocks in the next request are within the trimmed time range, but it's enough for now.
2021-05-09 13:57:47 +01:00
CalDescent
68544715bf Skip Block.logDebugInfo() altogether if the log level is more specific than DEBUG, to avoid wasting resources. 2021-05-09 09:05:51 +01:00
CalDescent
d2ea5633fb Fixed divide by zero exception.
Block.calcKeyDistance() cannot be called on some trimmed blocks, because the minter level is unable to be inferred in some cases. This generally hasn't been an issue, but the new Block.logDebugInfo() method is invoking it for all blocks. For now I am adding defensiveness to the debug method, but longer term we might want to add defensiveness to Block.calcKeyDistance() itself, if we ever encounter this issue again. I will leave it alone for now, to reduce risk.
2021-05-09 09:05:42 +01:00
CalDescent
3aa9b5f0b6 Increased timeout when syncing multiple blocks from 4s to 10s. 2021-05-08 22:36:41 +01:00
CalDescent
6c5dbf7bd0 Preliminary version for fast sync set to 1.6.0, because 1.5.x is already released. 2021-05-08 16:36:42 +01:00
CalDescent
3b3dc5032b Merge branch 'master' into sync-multiple-blocks
# Conflicts:
#	pom.xml
#	src/main/java/org/qortal/controller/Synchronizer.java

Removed all fast sync code from Controller.syncToPeerChain(), so it is now the same as `master`.
2021-05-08 16:35:09 +01:00
CalDescent
08f3d653cc Added new settings "maxBlocksPerRequest" and "maxBlocksPerResponse", to control the number of blocks requested and returned by nodes when using GetBlocksMessage and BlocksMessage. 2021-03-28 17:27:04 +01:00
CalDescent
f2bbafe6c2 Added missing break statement if a peer fails to respond with blocks when resolving a fork via fast sync. 2021-03-28 16:52:15 +01:00
CalDescent
cb80280eaf Bump Peer response timeout from 3s to 4s 2021-03-28 14:47:57 +01:00
CalDescent
f22f954ae3 Use MAXIMUM_BLOCKS_REQUEST_SIZE for GetBlocksMessage and BlockMessage, instead of MAXIMUM_REQUEST_SIZE.
Currently set to 1, as serialization of the BlocksMessage data on mainnet is too slow to use this for any significant number of blocks right now. Hopefully we can find a way to optimise this process, which will allow us to use this for multiple block syncing.

Until then, sticking with single blocks should still be enough to help solve the network congestion and re-orgs we are seeing, because it gives us the ability to request the next block based on the previous block's signature, which was unavailable using GET_BLOCK. This removes the requirement to fetch all block signatures upfront, and therefore it shouldn't matter if the peer does a partial re-org whilst a node is syncing to it.
2021-03-28 14:35:47 +01:00
CalDescent
2556855bd7 Added missing return statement if a peer fails to respond with blocks when fast syncing. 2021-03-28 14:19:53 +01:00
CalDescent
365662a2af MAXIMUM_RETRIES reduced from 3 to 1. It will now only retry once, which should save around 6 seconds of wasted synchronization time if a node is unable to respond with the requested block (due to a re-org, etc). 2021-03-27 19:25:27 +00:00
CalDescent
3e0ff7f43f Split Synchronizer.applyNewBlocks() into two methods.
If fast syncing is enabled in the settings (by default it's disabled) AND the peer is running at least v1.5.0, then it will route through to a new method which fetches multiple blocks at a time, in a very similar way to Synchronizer.syncToPeerChain().
If fast syncing is disabled in the settings, or we are communicating with a peer on an older version, it will continue to sync blocks individually.
2021-03-27 18:04:47 +00:00
CalDescent
8c3753326f Check "isFastSyncEnabledWhenResolvingFork" setting before requesting multiple blocks from a peer. This allows users to opt out of this functionality, by setting it to false in their settings. 2021-03-27 18:00:39 +00:00
CalDescent
dbcf6de2d5 Added new settings "fastSyncEnabled" (default: false) and "fastSyncEnabledWhenResolvingFork" (default: true). 2021-03-27 17:59:23 +00:00
CalDescent
a5308995b7 Bump version to 1.5.0, to allow nodes to start using the new syncing method. 2021-03-27 15:46:30 +00:00
CalDescent
270ac88b51 Added GetBlocksMessage and BlocksMessage, which allow multiple blocks to be transferred between peers in a single request.
When communicating with a peer that is running at least version 1.5.0, it will now sync multiple blocks at once in Synchronizer.syncToPeerChain(). This allows us to bypass the single block syncing (and retry mechanism), which has proven to be unviable when there are multiple active forks with several blocks in each chain.

For peers below v1.5.0, the logic should remain unaffected and it will continue to sync blocks individually.
2021-03-27 15:45:15 +00:00
13 changed files with 529 additions and 35 deletions

View File

@@ -476,6 +476,16 @@ public class Block {
return this.minter;
}
public void setRepository(Repository repository) throws DataException {
this.repository = repository;
for (Transaction transaction : this.getTransactions()) {
transaction.setRepository(repository);
}
}
// More information
/**
@@ -524,8 +534,10 @@ public class Block {
long nonAtTransactionCount = transactionsData.stream().filter(transactionData -> transactionData.getType() != TransactionType.AT).count();
// The number of non-AT transactions fetched from repository should correspond with Block's transactionCount
if (nonAtTransactionCount != this.blockData.getTransactionCount())
if (nonAtTransactionCount != this.blockData.getTransactionCount()) {
LOGGER.error(() -> String.format("Block's transactions from repository (%d) do not match block's transaction count (%d)", nonAtTransactionCount, this.blockData.getTransactionCount()));
throw new IllegalStateException("Block's transactions from repository do not match block's transaction count");
}
this.transactions = new ArrayList<>();

View File

@@ -68,9 +68,11 @@ import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.message.ArbitraryDataMessage;
import org.qortal.network.message.BlockSummariesMessage;
import org.qortal.network.message.BlocksMessage;
import org.qortal.network.message.CachedBlockMessage;
import org.qortal.network.message.GetArbitraryDataMessage;
import org.qortal.network.message.GetBlockMessage;
import org.qortal.network.message.GetBlocksMessage;
import org.qortal.network.message.GetBlockSummariesMessage;
import org.qortal.network.message.GetOnlineAccountsMessage;
import org.qortal.network.message.GetPeersMessage;
@@ -101,6 +103,8 @@ import org.qortal.utils.Triple;
import com.google.common.primitives.Longs;
import static org.qortal.network.Peer.FETCH_BLOCKS_TIMEOUT;
public class Controller extends Thread {
static {
@@ -222,6 +226,18 @@ public class Controller extends Thread {
}
public GetBlockMessageStats getBlockMessageStats = new GetBlockMessageStats();
public static class GetBlocksMessageStats {
public AtomicLong requests = new AtomicLong();
public AtomicLong cacheHits = new AtomicLong();
public AtomicLong unknownBlocks = new AtomicLong();
public AtomicLong cacheFills = new AtomicLong();
public AtomicLong fullyFromCache = new AtomicLong();
public GetBlocksMessageStats() {
}
}
public GetBlocksMessageStats getBlocksMessageStats = new GetBlocksMessageStats();
public static class GetBlockSummariesStats {
public AtomicLong requests = new AtomicLong();
public AtomicLong cacheHits = new AtomicLong();
@@ -1201,6 +1217,10 @@ public class Controller extends Thread {
onNetworkGetBlockMessage(peer, message);
break;
case GET_BLOCKS:
onNetworkGetBlocksMessage(peer, message);
break;
case TRANSACTION:
onNetworkTransactionMessage(peer, message);
break;
@@ -1315,6 +1335,54 @@ public class Controller extends Thread {
}
}
private void onNetworkGetBlocksMessage(Peer peer, Message message) {
GetBlocksMessage getBlocksMessage = (GetBlocksMessage) message;
byte[] parentSignature = getBlocksMessage.getParentSignature();
this.stats.getBlocksMessageStats.requests.incrementAndGet();
try (final Repository repository = RepositoryManager.getRepository()) {
// If peer's parent signature matches our latest block signature
// then we can short-circuit with an empty response
BlockData chainTip = getChainTip();
if (chainTip != null && Arrays.equals(parentSignature, chainTip.getSignature())) {
Message blocksMessage = new BlocksMessage(Collections.emptyList());
blocksMessage.setId(message.getId());
if (!peer.sendMessage(blocksMessage))
peer.disconnect("failed to send blocks");
return;
}
// Ensure that we don't serve more blocks than the amount specified in the settings
// Serializing multiple blocks is very slow, so by default we are using a low limit
int blockLimitPerRequest = Settings.getInstance().getMaxBlocksPerResponse();
int untrimmedBlockLimitPerRequest = Settings.getInstance().getMaxBlocksPerResponse();
int numberRequested = Math.min(blockLimitPerRequest, getBlocksMessage.getNumberRequested());
List<Block> blocks = new ArrayList<>();
BlockData blockData = repository.getBlockRepository().fromReference(parentSignature);
while (blockData != null && blocks.size() < numberRequested) {
// If we're dealing with untrimmed blocks, ensure we don't go above the untrimmedBlockLimitPerRequest
if (blockData.isTrimmed() == false && blocks.size() >= untrimmedBlockLimitPerRequest) {
break;
}
Block block = new Block(repository, blockData);
blocks.add(block);
blockData = repository.getBlockRepository().fromReference(blockData.getSignature());
}
Message blocksMessage = new BlocksMessage(blocks);
blocksMessage.setId(message.getId());
if (!peer.sendMessageWithTimeout(blocksMessage, FETCH_BLOCKS_TIMEOUT))
peer.disconnect("failed to send blocks");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending blocks after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
}
private void onNetworkTransactionMessage(Peer peer, Message message) {
TransactionMessage transactionMessage = (TransactionMessage) message;
TransactionData transactionData = transactionMessage.getTransactionData();

View File

@@ -25,8 +25,10 @@ import org.qortal.data.transaction.RewardShareTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.network.Peer;
import org.qortal.network.message.BlockMessage;
import org.qortal.network.message.BlocksMessage;
import org.qortal.network.message.BlockSummariesMessage;
import org.qortal.network.message.GetBlockMessage;
import org.qortal.network.message.GetBlocksMessage;
import org.qortal.network.message.GetBlockSummariesMessage;
import org.qortal.network.message.GetSignaturesV2Message;
import org.qortal.network.message.Message;
@@ -40,12 +42,14 @@ import org.qortal.transaction.Transaction;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import static org.qortal.network.Peer.FETCH_BLOCKS_TIMEOUT;
public class Synchronizer {
private static final Logger LOGGER = LogManager.getLogger(Synchronizer.class);
/** Max number of new blocks we aim to add to chain tip in each sync round */
private static final int SYNC_BATCH_SIZE = 200; // XXX move to Settings?
private static final int SYNC_BATCH_SIZE = 1000; // XXX move to Settings?
/** Initial jump back of block height when searching for common block with peer */
private static final int INITIAL_BLOCK_STEP = 8;
@@ -58,6 +62,11 @@ public class Synchronizer {
/** Maximum number of block signatures we ask from peer in one go */
private static final int MAXIMUM_REQUEST_SIZE = 200; // XXX move to Settings?
/* Minimum peer version that supports syncing multiple blocks at once via GetBlocksMessage */
private static final long PEER_VERSION_160 = 0x0100060000L;
// Keep track of the size of the last re-org, so it can be logged
private int lastReorgSize;
@@ -790,7 +799,7 @@ public class Synchronizer {
}
private SynchronizationResult syncToPeerChain(Repository repository, BlockData commonBlockData, int ourInitialHeight,
Peer peer, final int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws DataException, InterruptedException {
Peer peer, final int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws DataException, InterruptedException {
final int commonBlockHeight = commonBlockData.getHeight();
final byte[] commonBlockSig = commonBlockData.getSignature();
String commonBlockSig58 = Base58.encode(commonBlockSig);
@@ -820,19 +829,19 @@ public class Synchronizer {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
// Ensure we don't request more than MAXIMUM_REQUEST_SIZE
int numberRequested = Math.min(numberSignaturesRequired, MAXIMUM_REQUEST_SIZE);
// Ensure we don't request more than MAXIMUM_REQUEST_SIZE
int numberRequested = Math.min(numberSignaturesRequired, MAXIMUM_REQUEST_SIZE);
// Do we need more signatures?
// Do we need more signatures?
if (peerBlockSignatures.isEmpty() && numberRequested > 0) {
LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s",
numberRequested, (numberRequested != 1 ? "s" : ""), height, Base58.encode(latestPeerSignature)));
LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s",
numberRequested, (numberRequested != 1 ? "s" : ""), height, Base58.encode(latestPeerSignature)));
peerBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberRequested);
peerBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberRequested);
if (peerBlockSignatures == null || peerBlockSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer,
height, Base58.encode(latestPeerSignature)));
if (peerBlockSignatures == null || peerBlockSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer,
height, Base58.encode(latestPeerSignature)));
// Clear our cache of common block summaries for this peer, as they are likely to be invalid
CommonBlockData cachedCommonBlockData = peer.getCommonBlockData();
@@ -842,7 +851,7 @@ public class Synchronizer {
// If we have already received newer blocks from this peer that what we have already, go ahead and apply them
if (peerBlocks.size() > 0) {
final BlockData ourLatestBlockData = repository.getBlockRepository().getLastBlock();
final Block peerLatestBlock = peerBlocks.get(peerBlocks.size() - 1);
final Block peerLatestBlock = peerBlocks.get(peerBlocks.size() - 1);
final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp();
if (ourLatestBlockData != null && peerLatestBlock != null && minLatestBlockTimestamp != null) {
@@ -865,8 +874,8 @@ public class Synchronizer {
return SynchronizationResult.NO_REPLY;
}
numberSignaturesRequired = peerHeight - height - peerBlockSignatures.size();
LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : "")));
numberSignaturesRequired = peerHeight - height - peerBlockSignatures.size();
LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : "")));
}
if (peerBlockSignatures.isEmpty()) {
@@ -1003,8 +1012,108 @@ public class Synchronizer {
}
private SynchronizationResult applyNewBlocks(Repository repository, BlockData commonBlockData, int ourInitialHeight,
Peer peer, int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws InterruptedException, DataException {
final BlockData ourLatestBlockData = repository.getBlockRepository().getLastBlock();
if (Settings.getInstance().isFastSyncEnabled() && peer.getPeersVersion() >= PEER_VERSION_160 && ourLatestBlockData.isTrimmed())
// This peer supports syncing multiple blocks at once via GetBlocksMessage, and it is enabled in the settings
return this.applyNewBlocksUsingFastSync(repository, commonBlockData, ourInitialHeight, peer, peerHeight, peerBlockSummaries);
else
// Older peer version, or fast sync is disabled in the settings - use slow sync
return this.applyNewBlocksUsingSlowSync(repository, commonBlockData, ourInitialHeight, peer, peerHeight, peerBlockSummaries);
}
private SynchronizationResult applyNewBlocksUsingFastSync(Repository repository, BlockData commonBlockData, int ourInitialHeight,
Peer peer, int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws InterruptedException, DataException {
LOGGER.debug(String.format("Fetching new blocks from peer %s using fast sync", peer));
final int commonBlockHeight = commonBlockData.getHeight();
final byte[] commonBlockSig = commonBlockData.getSignature();
byte[] latestPeerSignature = commonBlockSig;
int ourHeight = ourInitialHeight;
// Fetch, and apply, blocks from peer
int maxBatchHeight = commonBlockHeight + SYNC_BATCH_SIZE;
// Ensure that we don't request more blocks than specified in the settings
int maxBlocksPerRequest = Settings.getInstance().getMaxBlocksPerRequest();
while (ourHeight < peerHeight && ourHeight < maxBatchHeight) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
int numberRequested = Math.min(maxBatchHeight - ourHeight, maxBlocksPerRequest);
LOGGER.trace(String.format("Fetching %d blocks after height %d, sig %.8s from %s", numberRequested, ourHeight, Base58.encode(latestPeerSignature), peer));
List<Block> blocks = this.fetchBlocks(repository, peer, latestPeerSignature, numberRequested);
if (blocks == null || blocks.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more blocks after height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.NO_REPLY;
}
LOGGER.trace(String.format("Received %d blocks after height %d, sig %.8s from %s", blocks.size(), ourHeight, Base58.encode(latestPeerSignature), peer));
for (Block newBlock : blocks) {
++ourHeight;
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
if (newBlock == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.NO_REPLY;
}
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.INVALID_DATA;
}
// Set the repository, because we couldn't do that when originally constructing the Block
newBlock.setRepository(repository);
// Transactions are transmitted without approval status so determine that now
for (Transaction transaction : newBlock.getTransactions()) {
transaction.setInitialApprovalStatus();
}
ValidationResult blockResult = newBlock.isValid();
if (blockResult != ValidationResult.OK) {
LOGGER.info(String.format("Peer %s sent invalid block for height %d, sig %.8s: %s", peer,
ourHeight, Base58.encode(latestPeerSignature), blockResult.name()));
return SynchronizationResult.INVALID_DATA;
}
// Save transactions attached to this block
for (Transaction transaction : newBlock.getTransactions()) {
TransactionData transactionData = transaction.getTransactionData();
repository.getTransactionRepository().save(transactionData);
}
newBlock.process();
LOGGER.trace(String.format("Processed block height %d, sig %.8s", newBlock.getBlockData().getHeight(), Base58.encode(newBlock.getBlockData().getSignature())));
repository.saveChanges();
Controller.getInstance().onNewBlock(newBlock.getBlockData());
// Update latestPeerSignature so that subsequent batches start requesting from the correct block
latestPeerSignature = newBlock.getSignature();
}
}
return SynchronizationResult.OK;
}
private SynchronizationResult applyNewBlocksUsingSlowSync(Repository repository, BlockData commonBlockData, int ourInitialHeight,
Peer peer, int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws InterruptedException, DataException {
LOGGER.debug(String.format("Fetching new blocks from peer %s", peer));
LOGGER.debug(String.format("Fetching new blocks from peer %s using slow sync", peer));
final int commonBlockHeight = commonBlockData.getHeight();
final byte[] commonBlockSig = commonBlockData.getSignature();
@@ -1125,6 +1234,22 @@ public class Synchronizer {
return new Block(repository, blockMessage.getBlockData(), blockMessage.getTransactions(), blockMessage.getAtStates());
}
private List<Block> fetchBlocks(Repository repository, Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException {
Message getBlocksMessage = new GetBlocksMessage(parentSignature, numberRequested);
Message message = peer.getResponseWithTimeout(getBlocksMessage, FETCH_BLOCKS_TIMEOUT);
if (message == null || message.getType() != MessageType.BLOCKS) {
return null;
}
BlocksMessage blocksMessage = (BlocksMessage) message;
if (blocksMessage == null || blocksMessage.getBlocks() == null) {
return null;
}
return blocksMessage.getBlocks();
}
private void populateBlockSummariesMinterLevels(Repository repository, List<BlockSummaryData> blockSummaries) throws DataException {
final int firstBlockHeight = blockSummaries.get(0).getHeight();

View File

@@ -9,7 +9,10 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.qortal.block.BlockChain;
import org.qortal.settings.Settings;
import org.qortal.crypto.Crypto;
import org.qortal.utils.NTP;
// All properties to be converted to JSON via JAX-RS
@XmlAccessorType(XmlAccessType.FIELD)
@@ -204,6 +207,13 @@ public class BlockData implements Serializable {
return this.onlineAccountsSignatures;
}
public boolean isTrimmed() {
long onlineAccountSignaturesTrimmedTimestamp = NTP.getTime() - BlockChain.getInstance().getOnlineAccountSignaturesMaxLifetime();
long currentTrimmableTimestamp = NTP.getTime() - Settings.getInstance().getAtStatesMaxLifetime();
long blockTimestamp = this.getTimestamp();
return blockTimestamp < onlineAccountSignaturesTrimmedTimestamp && blockTimestamp < currentTrimmableTimestamp;
}
// JAXB special
@XmlElement(name = "minterAddress")

View File

@@ -155,9 +155,23 @@ public class Network {
}
// Load all known peers from repository
try (Repository repository = RepositoryManager.getRepository()) {
synchronized (this.allKnownPeers) {
this.allKnownPeers.addAll(repository.getNetworkRepository().getAllPeers());
synchronized (this.allKnownPeers) { List<String> fixedNetwork = Settings.getInstance().getFixedNetwork();
if (fixedNetwork != null && !fixedNetwork.isEmpty()) {
Long addedWhen = NTP.getTime();
String addedBy = "fixedNetwork";
List<PeerAddress> peerAddresses = new ArrayList<>();
for (String address : fixedNetwork) {
PeerAddress peerAddress = PeerAddress.fromString(address);
peerAddresses.add(peerAddress);
}
List<PeerData> peers = peerAddresses.stream()
.map(peerAddress -> new PeerData(peerAddress, addedWhen, addedBy))
.collect(Collectors.toList());
this.allKnownPeers.addAll(peers);
} else {
try (Repository repository = RepositoryManager.getRepository()) {
this.allKnownPeers.addAll(repository.getNetworkRepository().getAllPeers());
}
}
}
@@ -513,14 +527,24 @@ public class Network {
if (socketChannel == null) {
return;
}
PeerAddress address = PeerAddress.fromSocket(socketChannel.socket());
List<String> fixedNetwork = Settings.getInstance().getFixedNetwork();
if (fixedNetwork != null && !fixedNetwork.isEmpty() && ipNotInFixedList(address, fixedNetwork)) {
try {
LOGGER.debug("Connection discarded from peer {} as not in the fixed network list", address);
socketChannel.close();
} catch (IOException e) {
// IGNORE
}
return;
}
final Long now = NTP.getTime();
Peer newPeer;
try {
if (now == null) {
LOGGER.debug("Connection discarded from peer {} due to lack of NTP sync",
PeerAddress.fromSocket(socketChannel.socket()));
LOGGER.debug("Connection discarded from peer {} due to lack of NTP sync", address);
socketChannel.close();
return;
}
@@ -528,12 +552,12 @@ public class Network {
synchronized (this.connectedPeers) {
if (connectedPeers.size() >= maxPeers) {
// We have enough peers
LOGGER.debug("Connection discarded from peer {}", PeerAddress.fromSocket(socketChannel.socket()));
LOGGER.debug("Connection discarded from peer {} because the server is full", address);
socketChannel.close();
return;
}
LOGGER.debug("Connection accepted from peer {}", PeerAddress.fromSocket(socketChannel.socket()));
LOGGER.debug("Connection accepted from peer {}", address);
newPeer = new Peer(socketChannel, channelSelector);
this.connectedPeers.add(newPeer);
@@ -541,6 +565,7 @@ public class Network {
} catch (IOException e) {
if (socketChannel.isOpen()) {
try {
LOGGER.debug("Connection failed from peer {} while connecting/closing", address);
socketChannel.close();
} catch (IOException ce) {
// Couldn't close?
@@ -552,6 +577,16 @@ public class Network {
this.onPeerReady(newPeer);
}
private boolean ipNotInFixedList(PeerAddress address, List<String> fixedNetwork) {
for (String ipAddress : fixedNetwork) {
String[] bits = ipAddress.split(":");
if (bits.length >= 1 && bits.length <= 2 && address.getHost().equals(bits[0])) {
return false;
}
}
return true;
}
private Peer getConnectablePeer(final Long now) throws InterruptedException {
// We can't block here so use tryRepository(). We don't NEED to connect a new peer.
try (Repository repository = RepositoryManager.tryRepository()) {
@@ -1145,6 +1180,10 @@ public class Network {
private boolean mergePeers(Repository repository, String addedBy, long addedWhen, List<PeerAddress> peerAddresses)
throws DataException {
List<String> fixedNetwork = Settings.getInstance().getFixedNetwork();
if (fixedNetwork != null && !fixedNetwork.isEmpty()) {
return false;
}
List<PeerData> newPeers;
synchronized (this.allKnownPeers) {
for (PeerData knownPeerData : this.allKnownPeers) {

View File

@@ -47,6 +47,11 @@ public class Peer {
*/
private static final int RESPONSE_TIMEOUT = 3000; // ms
/**
* Maximum time to wait for a peer to respond with blocks (ms)
*/
public static final int FETCH_BLOCKS_TIMEOUT = 10000;
/**
* Interval between PING messages to a peer. (ms)
* <p>
@@ -544,12 +549,22 @@ public class Peer {
}
/**
* Attempt to send Message to peer.
* Attempt to send Message to peer, using default RESPONSE_TIMEOUT.
*
* @param message message to be sent
* @return <code>true</code> if message successfully sent; <code>false</code> otherwise
*/
public boolean sendMessage(Message message) {
return this.sendMessageWithTimeout(message, RESPONSE_TIMEOUT);
}
/**
* Attempt to send Message to peer, using custom timeout.
*
* @param message message to be sent
* @return <code>true</code> if message successfully sent; <code>false</code> otherwise
*/
public boolean sendMessageWithTimeout(Message message, int timeout) {
if (!this.socketChannel.isOpen()) {
return false;
}
@@ -583,7 +598,7 @@ public class Peer {
*/
Thread.sleep(1L); //NOSONAR squid:S2276
if (System.currentTimeMillis() - sendStart > RESPONSE_TIMEOUT) {
if (System.currentTimeMillis() - sendStart > timeout) {
// We've taken too long to send this message
return false;
}
@@ -604,7 +619,7 @@ public class Peer {
}
/**
* Send message to peer and await response.
* Send message to peer and await response, using default RESPONSE_TIMEOUT.
* <p>
* Message is assigned a random ID and sent.
* If a response with matching ID is received then it is returned to caller.
@@ -618,6 +633,24 @@ public class Peer {
* @throws InterruptedException if interrupted while waiting
*/
public Message getResponse(Message message) throws InterruptedException {
return getResponseWithTimeout(message, RESPONSE_TIMEOUT);
}
/**
* Send message to peer and await response.
* <p>
* Message is assigned a random ID and sent.
* If a response with matching ID is received then it is returned to caller.
* <p>
* If no response with matching ID within timeout, or some other error/exception occurs,
* then return <code>null</code>.<br>
* (Assume peer will be rapidly disconnected after this).
*
* @param message message to send
* @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs
* @throws InterruptedException if interrupted while waiting
*/
public Message getResponseWithTimeout(Message message, int timeout) throws InterruptedException {
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<>(1);
// Assign random ID to this message
@@ -632,13 +665,13 @@ public class Peer {
message.setId(id);
// Try to send message
if (!this.sendMessage(message)) {
if (!this.sendMessageWithTimeout(message, timeout)) {
this.replyQueues.remove(id);
return null;
}
try {
return blockingQueue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
return blockingQueue.poll(timeout, TimeUnit.MILLISECONDS);
} finally {
this.replyQueues.remove(id);
}

View File

@@ -0,0 +1,91 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.block.Block;
import org.qortal.data.at.ATStateData;
import org.qortal.data.block.BlockData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.transform.TransformationException;
import org.qortal.transform.block.BlockTransformer;
import org.qortal.utils.Triple;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class BlocksMessage extends Message {
private static final Logger LOGGER = LogManager.getLogger(BlocksMessage.class);
private List<Block> blocks;
public BlocksMessage(List<Block> blocks) {
this(-1, blocks);
}
private BlocksMessage(int id, List<Block> blocks) {
super(id, MessageType.BLOCKS);
this.blocks = blocks;
}
public List<Block> getBlocks() {
return this.blocks;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int count = bytes.getInt();
List<Block> blocks = new ArrayList<>();
for (int i = 0; i < count; ++i) {
int height = bytes.getInt();
try {
boolean finalBlockInBuffer = (i == count-1);
Triple<BlockData, List<TransactionData>, List<ATStateData>> blockInfo = null;
blockInfo = BlockTransformer.fromByteBuffer(bytes, finalBlockInBuffer);
BlockData blockData = blockInfo.getA();
blockData.setHeight(height);
// We are unable to obtain a valid Repository instance here, so set it to null and we will attach it later
Block block = new Block(null, blockData, blockInfo.getB(), blockInfo.getC());
blocks.add(block);
} catch (TransformationException e) {
return null;
}
}
return new BlocksMessage(id, blocks);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.blocks.size()));
for (Block block : this.blocks) {
bytes.write(Ints.toByteArray(block.getBlockData().getHeight()));
bytes.write(BlockTransformer.toBytes(block));
}
LOGGER.trace(String.format("Total length of %d blocks is %d bytes", this.blocks.size(), bytes.size()));
return bytes.toByteArray();
} catch (IOException e) {
return null;
} catch (TransformationException e) {
return null;
}
}
}

View File

@@ -0,0 +1,65 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import org.qortal.transform.Transformer;
import org.qortal.transform.block.BlockTransformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
public class GetBlocksMessage extends Message {
private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH;
private byte[] parentSignature;
private int numberRequested;
public GetBlocksMessage(byte[] parentSignature, int numberRequested) {
this(-1, parentSignature, numberRequested);
}
private GetBlocksMessage(int id, byte[] parentSignature, int numberRequested) {
super(id, MessageType.GET_BLOCKS);
this.parentSignature = parentSignature;
this.numberRequested = numberRequested;
}
public byte[] getParentSignature() {
return this.parentSignature;
}
public int getNumberRequested() {
return this.numberRequested;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH + Transformer.INT_LENGTH)
return null;
byte[] parentSignature = new byte[BLOCK_SIGNATURE_LENGTH];
bytes.get(parentSignature);
int numberRequested = bytes.getInt();
return new GetBlocksMessage(id, parentSignature, numberRequested);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.parentSignature);
bytes.write(Ints.toByteArray(this.numberRequested));
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@@ -25,7 +25,7 @@ public abstract class Message {
private static final int MAGIC_LENGTH = 4;
private static final int CHECKSUM_LENGTH = 4;
private static final int MAX_DATA_SIZE = 1024 * 1024; // 1MB
private static final int MAX_DATA_SIZE = 10 * 1024 * 1024; // 10MB
@SuppressWarnings("serial")
public static class MessageException extends Exception {
@@ -80,7 +80,10 @@ public abstract class Message {
GET_ONLINE_ACCOUNTS(81),
ARBITRARY_DATA(90),
GET_ARBITRARY_DATA(91);
GET_ARBITRARY_DATA(91),
BLOCKS(100),
GET_BLOCKS(101);
public final int value;
public final Method fromByteBufferMethod;

View File

@@ -5,6 +5,7 @@ import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.List;
import java.util.Locale;
import javax.xml.bind.JAXBContext;
@@ -137,6 +138,17 @@ public class Settings {
/** Maximum time (in seconds) that we should attempt to remain connected to a peer for */
private int maxPeerConnectionTime = 20 * 60; // seconds
/** Whether to sync multiple blocks at once in normal operation */
private boolean fastSyncEnabled = true;
/** Whether to sync multiple blocks at once when the peer has a different chain */
private boolean fastSyncEnabledWhenResolvingFork = true;
/** Maximum number of blocks to request at once */
private int maxBlocksPerRequest = 100;
/** Maximum number of blocks this node will serve in a single response */
private int maxBlocksPerResponse = 200;
/** Maximum number of untrimmed blocks this node will serve in a single response */
private int maxUntrimmedBlocksPerResponse = 10;
// Which blockchains this node is running
private String blockchainConfig = null; // use default from resources
private BitcoinNet bitcoinNet = BitcoinNet.MAIN;
@@ -152,6 +164,7 @@ public class Settings {
private String repositoryPath = "db";
/** Repository connection pool size. Needs to be a bit bigger than maxNetworkThreadPoolSize */
private int repositoryConnectionPoolSize = 100;
private List<String> fixedNetwork;
// Auto-update sources
private String[] autoUpdateRepos = new String[] {
@@ -460,6 +473,20 @@ public class Settings {
return this.repositoryConnectionPoolSize;
}
public boolean isFastSyncEnabled() {
return this.fastSyncEnabled;
}
public boolean isFastSyncEnabledWhenResolvingFork() {
return this.fastSyncEnabledWhenResolvingFork;
}
public int getMaxBlocksPerRequest() { return this.maxBlocksPerRequest; }
public int getMaxBlocksPerResponse() { return this.maxBlocksPerResponse; }
public int getMaxUntrimmedBlocksPerResponse() { return this.maxUntrimmedBlocksPerResponse; }
public boolean isAutoUpdateEnabled() {
return this.autoUpdateEnabled;
}
@@ -516,4 +543,7 @@ public class Settings {
return this.onlineSignaturesTrimBatchSize;
}
public List<String> getFixedNetwork() {
return fixedNetwork;
}
}

View File

@@ -315,6 +315,10 @@ public abstract class Transaction {
return this.transactionData;
}
public void setRepository(Repository repository) {
this.repository = repository;
}
// More information
public static long getDeadline(TransactionData transactionData) {

View File

@@ -74,19 +74,30 @@ public class BlockTransformer extends Transformer {
}
/**
* Extract block data and transaction data from serialized bytes.
*
* Extract block data and transaction data from serialized bytes containing a single block.
*
* @param bytes
* @return BlockData and a List of transactions.
* @throws TransformationException
*/
public static Triple<BlockData, List<TransactionData>, List<ATStateData>> fromByteBuffer(ByteBuffer byteBuffer) throws TransformationException {
return BlockTransformer.fromByteBuffer(byteBuffer, true);
}
/**
* Extract block data and transaction data from serialized bytes containing one or more blocks.
*
* @param bytes
* @return the next block's BlockData and a List of transactions.
* @throws TransformationException
*/
public static Triple<BlockData, List<TransactionData>, List<ATStateData>> fromByteBuffer(ByteBuffer byteBuffer, boolean finalBlockInBuffer) throws TransformationException {
int version = byteBuffer.getInt();
if (byteBuffer.remaining() < BASE_LENGTH + AT_BYTES_LENGTH - VERSION_LENGTH)
if (finalBlockInBuffer && byteBuffer.remaining() < BASE_LENGTH + AT_BYTES_LENGTH - VERSION_LENGTH)
throw new TransformationException("Byte data too short for Block");
if (byteBuffer.remaining() > BlockChain.getInstance().getMaxBlockSize())
if (finalBlockInBuffer && byteBuffer.remaining() > BlockChain.getInstance().getMaxBlockSize())
throw new TransformationException("Byte data too long for Block");
long timestamp = byteBuffer.getLong();
@@ -210,7 +221,8 @@ public class BlockTransformer extends Transformer {
byteBuffer.get(onlineAccountsSignatures);
}
if (byteBuffer.hasRemaining())
// We should only complain about excess byte data if we aren't expecting more blocks in this ByteBuffer
if (finalBlockInBuffer && byteBuffer.hasRemaining())
throw new TransformationException("Excess byte data found after parsing Block");
// We don't have a height!

View File

@@ -69,11 +69,13 @@ function fetch_and_process_blocks {
online_accounts_count=$(echo "${block_minting_info}" | jq -r .onlineAccountsCount)
key_distance_ratio=$(echo "${block_minting_info}" | jq -r .keyDistanceRatio)
time_delta=$(echo "${block_minting_info}" | jq -r .timeDelta)
timestamp=$(echo "${block_minting_info}" | jq -r .timestamp)
time_offset=$(calculate_time_offset "${key_distance_ratio}")
block_time=$((target-deviation+time_offset))
echo "=== BLOCK ${height} ==="
echo "Timestamp: ${timestamp}"
echo "Minter level: ${minter_level}"
echo "Online accounts: ${online_accounts_count}"
echo "Key distance ratio: ${key_distance_ratio}"