forked from Qortal/qortal
Relocated all synchronizer code from the controller to the synchronizer, and also moved synchonization onto its own thread.
This commit is contained in:
parent
fa2bd40d5f
commit
5b788dad2f
@ -4,6 +4,7 @@ import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.Synchronizer;
|
||||
import org.qortal.network.Network;
|
||||
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
@ -22,7 +23,7 @@ public class NodeStatus {
|
||||
public NodeStatus() {
|
||||
this.isMintingPossible = Controller.getInstance().isMintingPossible();
|
||||
|
||||
this.syncPercent = Controller.getInstance().getSyncPercent();
|
||||
this.syncPercent = Synchronizer.getInstance().getSyncPercent();
|
||||
this.isSynchronizing = this.syncPercent != null;
|
||||
|
||||
this.numberOfConnections = Network.getInstance().getHandshakedPeers().size();
|
||||
|
@ -44,6 +44,7 @@ import org.qortal.api.model.NodeInfo;
|
||||
import org.qortal.api.model.NodeStatus;
|
||||
import org.qortal.block.BlockChain;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.Synchronizer;
|
||||
import org.qortal.controller.Synchronizer.SynchronizationResult;
|
||||
import org.qortal.data.account.MintingAccountData;
|
||||
import org.qortal.data.account.RewardShareData;
|
||||
@ -525,7 +526,7 @@ public class AdminResource {
|
||||
SynchronizationResult syncResult;
|
||||
try {
|
||||
do {
|
||||
syncResult = Controller.getInstance().actuallySynchronize(targetPeer, true);
|
||||
syncResult = Synchronizer.getInstance().actuallySynchronize(targetPeer, true);
|
||||
} while (syncResult == SynchronizationResult.OK);
|
||||
} finally {
|
||||
blockchainLock.unlock();
|
||||
|
@ -149,7 +149,7 @@ public class BlockMinter extends Thread {
|
||||
|
||||
// Disregard peers that don't have a recent block, but only if we're not in recovery mode.
|
||||
// In that mode, we want to allow minting on top of older blocks, to recover stalled networks.
|
||||
if (Controller.getInstance().getRecoveryMode() == false)
|
||||
if (Synchronizer.getInstance().getRecoveryMode() == false)
|
||||
peers.removeIf(Controller.hasNoRecentBlock);
|
||||
|
||||
// Don't mint if we don't have enough up-to-date peers as where would the transactions/consensus come from?
|
||||
@ -174,7 +174,7 @@ public class BlockMinter extends Thread {
|
||||
|
||||
// If our latest block isn't recent then we need to synchronize instead of minting, unless we're in recovery mode.
|
||||
if (!peers.isEmpty() && lastBlockData.getTimestamp() < minLatestBlockTimestamp)
|
||||
if (Controller.getInstance().getRecoveryMode() == false && recoverInvalidBlock == false)
|
||||
if (Synchronizer.getInstance().getRecoveryMode() == false && recoverInvalidBlock == false)
|
||||
continue;
|
||||
|
||||
// There are enough peers with a recent block and our latest block is recent
|
||||
|
@ -7,7 +7,6 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.Security;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
@ -52,7 +51,6 @@ import org.qortal.block.Block;
|
||||
import org.qortal.block.BlockChain;
|
||||
import org.qortal.block.BlockChain.BlockTimingByHeight;
|
||||
import org.qortal.controller.arbitrary.*;
|
||||
import org.qortal.controller.Synchronizer.SynchronizationResult;
|
||||
import org.qortal.controller.repository.PruneManager;
|
||||
import org.qortal.controller.repository.NamesDatabaseIntegrityCheck;
|
||||
import org.qortal.controller.tradebot.TradeBot;
|
||||
@ -94,14 +92,13 @@ public class Controller extends Thread {
|
||||
public static final String VERSION_PREFIX = "qortal-";
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(Controller.class);
|
||||
private static final long MISBEHAVIOUR_COOLOFF = 10 * 60 * 1000L; // ms
|
||||
public static final long MISBEHAVIOUR_COOLOFF = 10 * 60 * 1000L; // ms
|
||||
private static final int MAX_BLOCKCHAIN_TIP_AGE = 5; // blocks
|
||||
private static final Object shutdownLock = new Object();
|
||||
private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s" + File.separator + "blockchain;create=true;hsqldb.full_log_replay=true";
|
||||
private static final long NTP_PRE_SYNC_CHECK_PERIOD = 5 * 1000L; // ms
|
||||
private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000L; // ms
|
||||
private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms
|
||||
private static final long RECOVERY_MODE_TIMEOUT = 10 * 60 * 1000L; // ms
|
||||
private static final int MAX_INCOMING_TRANSACTIONS = 5000;
|
||||
|
||||
// To do with online accounts list
|
||||
@ -114,7 +111,6 @@ public class Controller extends Thread {
|
||||
|
||||
private static volatile boolean isStopping = false;
|
||||
private static BlockMinter blockMinter = null;
|
||||
private static volatile boolean requestSync = false;
|
||||
private static volatile boolean requestSysTrayUpdate = true;
|
||||
private static Controller instance;
|
||||
|
||||
@ -148,24 +144,9 @@ public class Controller extends Thread {
|
||||
/** Whether we can mint new blocks, as reported by BlockMinter. */
|
||||
private volatile boolean isMintingPossible = false;
|
||||
|
||||
/** Synchronization object for sync variables below */
|
||||
private final Object syncLock = new Object();
|
||||
/** Whether we are attempting to synchronize. */
|
||||
private volatile boolean isSynchronizing = false;
|
||||
/** Temporary estimate of synchronization progress for SysTray use. */
|
||||
private volatile int syncPercent = 0;
|
||||
|
||||
/** List of incoming transaction that are in the import queue */
|
||||
private List<TransactionData> incomingTransactions = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
/** Latest block signatures from other peers that we know are on inferior chains. */
|
||||
List<ByteArray> inferiorChainSignatures = new ArrayList<>();
|
||||
|
||||
/** Recovery mode, which is used to bring back a stalled network */
|
||||
private boolean recoveryMode = false;
|
||||
private boolean peersAvailable = true; // peersAvailable must default to true
|
||||
private long timePeersLastAvailable = 0;
|
||||
|
||||
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */
|
||||
private final ReentrantLock blockchainLock = new ReentrantLock();
|
||||
|
||||
@ -354,20 +335,6 @@ public class Controller extends Thread {
|
||||
return this.isMintingPossible;
|
||||
}
|
||||
|
||||
public boolean isSynchronizing() {
|
||||
return this.isSynchronizing;
|
||||
}
|
||||
|
||||
public Integer getSyncPercent() {
|
||||
synchronized (this.syncLock) {
|
||||
return this.isSynchronizing ? this.syncPercent : null;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean getRecoveryMode() {
|
||||
return this.recoveryMode;
|
||||
}
|
||||
|
||||
// Entry point
|
||||
|
||||
public static void main(String[] args) {
|
||||
@ -472,6 +439,9 @@ public class Controller extends Thread {
|
||||
}
|
||||
});
|
||||
|
||||
LOGGER.info("Starting synchronizer");
|
||||
Synchronizer.getInstance().start();
|
||||
|
||||
LOGGER.info("Starting block minter");
|
||||
blockMinter = new BlockMinter();
|
||||
blockMinter.start();
|
||||
@ -584,11 +554,6 @@ public class Controller extends Thread {
|
||||
}
|
||||
}
|
||||
|
||||
if (requestSync) {
|
||||
requestSync = false;
|
||||
potentiallySynchronize();
|
||||
}
|
||||
|
||||
// Process incoming transactions queue
|
||||
processIncomingTransactionsQueue();
|
||||
|
||||
@ -716,27 +681,6 @@ public class Controller extends Thread {
|
||||
}
|
||||
}
|
||||
|
||||
private long getRandomRepositoryMaintenanceInterval() {
|
||||
final long minInterval = Settings.getInstance().getRepositoryMaintenanceMinInterval();
|
||||
final long maxInterval = Settings.getInstance().getRepositoryMaintenanceMaxInterval();
|
||||
if (maxInterval == 0) {
|
||||
return 0;
|
||||
}
|
||||
return (new Random().nextLong() % (maxInterval - minInterval)) + minInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Export current trade bot states and minting accounts.
|
||||
*/
|
||||
public void exportRepositoryData() {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
repository.exportNodeLocalData();
|
||||
|
||||
} catch (DataException e) {
|
||||
// Fail silently as this is an optional step
|
||||
}
|
||||
}
|
||||
|
||||
public static final Predicate<Peer> hasMisbehaved = peer -> {
|
||||
final Long lastMisbehaved = peer.getPeerData().getLastMisbehaved();
|
||||
return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF;
|
||||
@ -761,7 +705,7 @@ public class Controller extends Thread {
|
||||
|
||||
public static final Predicate<Peer> hasInferiorChainTip = peer -> {
|
||||
final PeerChainTipData peerChainTipData = peer.getChainTipData();
|
||||
final List<ByteArray> inferiorChainTips = getInstance().inferiorChainSignatures;
|
||||
final List<ByteArray> inferiorChainTips = Synchronizer.getInstance().inferiorChainSignatures;
|
||||
return peerChainTipData == null || peerChainTipData.getLastBlockSignature() == null || inferiorChainTips.contains(new ByteArray(peerChainTipData.getLastBlockSignature()));
|
||||
};
|
||||
|
||||
@ -770,218 +714,34 @@ public class Controller extends Thread {
|
||||
return peer.isAtLeastVersion(minPeerVersion) == false;
|
||||
};
|
||||
|
||||
private void potentiallySynchronize() throws InterruptedException {
|
||||
// Already synchronizing via another thread?
|
||||
if (this.isSynchronizing)
|
||||
return;
|
||||
|
||||
List<Peer> peers = Network.getInstance().getHandshakedPeers();
|
||||
|
||||
// Disregard peers that have "misbehaved" recently
|
||||
peers.removeIf(hasMisbehaved);
|
||||
|
||||
// Disregard peers that only have genesis block
|
||||
peers.removeIf(hasOnlyGenesisBlock);
|
||||
|
||||
// Disregard peers that don't have a recent block
|
||||
peers.removeIf(hasNoRecentBlock);
|
||||
|
||||
// Disregard peers that are on an old version
|
||||
peers.removeIf(hasOldVersion);
|
||||
|
||||
checkRecoveryModeForPeers(peers);
|
||||
if (recoveryMode) {
|
||||
peers = Network.getInstance().getHandshakedPeers();
|
||||
peers.removeIf(hasOnlyGenesisBlock);
|
||||
peers.removeIf(hasMisbehaved);
|
||||
peers.removeIf(hasOldVersion);
|
||||
private long getRandomRepositoryMaintenanceInterval() {
|
||||
final long minInterval = Settings.getInstance().getRepositoryMaintenanceMinInterval();
|
||||
final long maxInterval = Settings.getInstance().getRepositoryMaintenanceMaxInterval();
|
||||
if (maxInterval == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Check we have enough peers to potentially synchronize
|
||||
if (peers.size() < Settings.getInstance().getMinBlockchainPeers())
|
||||
return;
|
||||
|
||||
// Disregard peers that have no block signature or the same block signature as us
|
||||
peers.removeIf(hasNoOrSameBlock);
|
||||
|
||||
// Disregard peers that are on the same block as last sync attempt and we didn't like their chain
|
||||
peers.removeIf(hasInferiorChainTip);
|
||||
|
||||
final int peersBeforeComparison = peers.size();
|
||||
|
||||
// Request recent block summaries from the remaining peers, and locate our common block with each
|
||||
Synchronizer.getInstance().findCommonBlocksWithPeers(peers);
|
||||
|
||||
// Compare the peers against each other, and against our chain, which will return an updated list excluding those without common blocks
|
||||
peers = Synchronizer.getInstance().comparePeers(peers);
|
||||
|
||||
// We may have added more inferior chain tips when comparing peers, so remove any peers that are currently on those chains
|
||||
peers.removeIf(hasInferiorChainTip);
|
||||
|
||||
final int peersRemoved = peersBeforeComparison - peers.size();
|
||||
if (peersRemoved > 0 && peers.size() > 0)
|
||||
LOGGER.debug(String.format("Ignoring %d peers on inferior chains. Peers remaining: %d", peersRemoved, peers.size()));
|
||||
|
||||
if (peers.isEmpty())
|
||||
return;
|
||||
|
||||
if (peers.size() > 1) {
|
||||
StringBuilder finalPeersString = new StringBuilder();
|
||||
for (Peer peer : peers)
|
||||
finalPeersString = finalPeersString.length() > 0 ? finalPeersString.append(", ").append(peer) : finalPeersString.append(peer);
|
||||
LOGGER.debug(String.format("Choosing random peer from: [%s]", finalPeersString.toString()));
|
||||
}
|
||||
|
||||
// Pick random peer to sync with
|
||||
int index = new SecureRandom().nextInt(peers.size());
|
||||
Peer peer = peers.get(index);
|
||||
|
||||
actuallySynchronize(peer, false);
|
||||
return (new Random().nextLong() % (maxInterval - minInterval)) + minInterval;
|
||||
}
|
||||
|
||||
public SynchronizationResult actuallySynchronize(Peer peer, boolean force) throws InterruptedException {
|
||||
boolean hasStatusChanged = false;
|
||||
BlockData priorChainTip = this.getChainTip();
|
||||
/**
|
||||
* Export current trade bot states and minting accounts.
|
||||
*/
|
||||
public void exportRepositoryData() {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
repository.exportNodeLocalData();
|
||||
|
||||
synchronized (this.syncLock) {
|
||||
this.syncPercent = (priorChainTip.getHeight() * 100) / peer.getChainTipData().getLastHeight();
|
||||
|
||||
// Only update SysTray if we're potentially changing height
|
||||
if (this.syncPercent < 100) {
|
||||
this.isSynchronizing = true;
|
||||
hasStatusChanged = true;
|
||||
}
|
||||
}
|
||||
peer.setSyncInProgress(true);
|
||||
|
||||
if (hasStatusChanged)
|
||||
updateSysTray();
|
||||
|
||||
try {
|
||||
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, force);
|
||||
switch (syncResult) {
|
||||
case GENESIS_ONLY:
|
||||
case NO_COMMON_BLOCK:
|
||||
case TOO_DIVERGENT:
|
||||
case INVALID_DATA: {
|
||||
// These are more serious results that warrant a cool-off
|
||||
LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name()));
|
||||
|
||||
// Don't use this peer again for a while
|
||||
Network.getInstance().peerMisbehaved(peer);
|
||||
break;
|
||||
}
|
||||
|
||||
case INFERIOR_CHAIN: {
|
||||
// Update our list of inferior chain tips
|
||||
ByteArray inferiorChainSignature = new ByteArray(peer.getChainTipData().getLastBlockSignature());
|
||||
if (!inferiorChainSignatures.contains(inferiorChainSignature))
|
||||
inferiorChainSignatures.add(inferiorChainSignature);
|
||||
|
||||
// These are minor failure results so fine to try again
|
||||
LOGGER.debug(() -> String.format("Refused to synchronize with peer %s (%s)", peer, syncResult.name()));
|
||||
|
||||
// Notify peer of our superior chain
|
||||
if (!peer.sendMessage(Network.getInstance().buildHeightMessage(peer, priorChainTip)))
|
||||
peer.disconnect("failed to notify peer of our superior chain");
|
||||
break;
|
||||
}
|
||||
|
||||
case NO_REPLY:
|
||||
case NO_BLOCKCHAIN_LOCK:
|
||||
case REPOSITORY_ISSUE:
|
||||
// These are minor failure results so fine to try again
|
||||
LOGGER.debug(() -> String.format("Failed to synchronize with peer %s (%s)", peer, syncResult.name()));
|
||||
break;
|
||||
|
||||
case SHUTTING_DOWN:
|
||||
// Just quietly exit
|
||||
break;
|
||||
|
||||
case OK:
|
||||
// fall-through...
|
||||
case NOTHING_TO_DO: {
|
||||
// Update our list of inferior chain tips
|
||||
ByteArray inferiorChainSignature = new ByteArray(peer.getChainTipData().getLastBlockSignature());
|
||||
if (!inferiorChainSignatures.contains(inferiorChainSignature))
|
||||
inferiorChainSignatures.add(inferiorChainSignature);
|
||||
|
||||
LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Has our chain tip changed?
|
||||
BlockData newChainTip;
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
newChainTip = repository.getBlockRepository().getLastBlock();
|
||||
} catch (DataException e) {
|
||||
LOGGER.warn(String.format("Repository issue when trying to fetch post-synchronization chain tip: %s", e.getMessage()));
|
||||
return syncResult;
|
||||
}
|
||||
|
||||
if (!Arrays.equals(newChainTip.getSignature(), priorChainTip.getSignature())) {
|
||||
// Reset our cache of inferior chains
|
||||
inferiorChainSignatures.clear();
|
||||
|
||||
Network network = Network.getInstance();
|
||||
network.broadcast(broadcastPeer -> network.buildHeightMessage(broadcastPeer, newChainTip));
|
||||
}
|
||||
|
||||
return syncResult;
|
||||
} finally {
|
||||
isSynchronizing = false;
|
||||
peer.setSyncInProgress(false);
|
||||
} catch (DataException e) {
|
||||
// Fail silently as this is an optional step
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkRecoveryModeForPeers(List<Peer> qualifiedPeers) {
|
||||
List<Peer> handshakedPeers = Network.getInstance().getHandshakedPeers();
|
||||
|
||||
if (handshakedPeers.size() > 0) {
|
||||
// There is at least one handshaked peer
|
||||
if (qualifiedPeers.isEmpty()) {
|
||||
// There are no 'qualified' peers - i.e. peers that have a recent block we can sync to
|
||||
boolean werePeersAvailable = peersAvailable;
|
||||
peersAvailable = false;
|
||||
|
||||
// If peers only just became unavailable, update our record of the time they were last available
|
||||
if (werePeersAvailable)
|
||||
timePeersLastAvailable = NTP.getTime();
|
||||
|
||||
// If enough time has passed, enter recovery mode, which lifts some restrictions on who we can sync with and when we can mint
|
||||
if (NTP.getTime() - timePeersLastAvailable > RECOVERY_MODE_TIMEOUT) {
|
||||
if (recoveryMode == false) {
|
||||
LOGGER.info(String.format("Peers have been unavailable for %d minutes. Entering recovery mode...", RECOVERY_MODE_TIMEOUT/60/1000));
|
||||
recoveryMode = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// We now have at least one peer with a recent block, so we can exit recovery mode and sync normally
|
||||
peersAvailable = true;
|
||||
if (recoveryMode) {
|
||||
LOGGER.info("Peers have become available again. Exiting recovery mode...");
|
||||
recoveryMode = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return recoveryMode;
|
||||
}
|
||||
|
||||
public void addInferiorChainSignature(byte[] inferiorSignature) {
|
||||
// Update our list of inferior chain tips
|
||||
ByteArray inferiorChainSignature = new ByteArray(inferiorSignature);
|
||||
if (!inferiorChainSignatures.contains(inferiorChainSignature))
|
||||
inferiorChainSignatures.add(inferiorChainSignature);
|
||||
}
|
||||
|
||||
public static class StatusChangeEvent implements Event {
|
||||
public StatusChangeEvent() {
|
||||
}
|
||||
}
|
||||
|
||||
private void updateSysTray() {
|
||||
public void updateSysTray() {
|
||||
if (NTP.getTime() == null) {
|
||||
SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING_CLOCK"));
|
||||
SysTray.getInstance().setTrayIcon(1);
|
||||
@ -997,13 +757,13 @@ public class Controller extends Thread {
|
||||
|
||||
String actionText;
|
||||
|
||||
synchronized (this.syncLock) {
|
||||
synchronized (Synchronizer.getInstance().syncLock) {
|
||||
if (this.isMintingPossible) {
|
||||
actionText = Translator.INSTANCE.translate("SysTray", "MINTING_ENABLED");
|
||||
SysTray.getInstance().setTrayIcon(2);
|
||||
}
|
||||
else if (this.isSynchronizing) {
|
||||
actionText = String.format("%s - %d%%", Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING_BLOCKCHAIN"), this.syncPercent);
|
||||
else if (Synchronizer.getInstance().isSynchronizing()) {
|
||||
actionText = String.format("%s - %d%%", Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING_BLOCKCHAIN"), Synchronizer.getInstance().getSyncPercent());
|
||||
SysTray.getInstance().setTrayIcon(3);
|
||||
}
|
||||
else if (numberOfPeers < Settings.getInstance().getMinBlockchainPeers()) {
|
||||
@ -1092,6 +852,9 @@ public class Controller extends Thread {
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.info("Shutting down synchronizer");
|
||||
Synchronizer.getInstance().shutdown();
|
||||
|
||||
// Export local data
|
||||
LOGGER.info("Backing up local data");
|
||||
this.exportRepositoryData();
|
||||
@ -1726,7 +1489,7 @@ public class Controller extends Thread {
|
||||
peer.setChainTipData(newChainTipData);
|
||||
|
||||
// Potentially synchronize
|
||||
requestSync = true;
|
||||
Synchronizer.getInstance().requestSync();
|
||||
}
|
||||
|
||||
private void onNetworkGetTransactionMessage(Peer peer, Message message) {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package org.qortal.controller;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.security.SecureRandom;
|
||||
import java.text.DecimalFormat;
|
||||
import java.text.NumberFormat;
|
||||
import java.util.*;
|
||||
@ -20,6 +21,7 @@ import org.qortal.data.block.CommonBlockData;
|
||||
import org.qortal.data.network.PeerChainTipData;
|
||||
import org.qortal.data.transaction.RewardShareTransactionData;
|
||||
import org.qortal.data.transaction.TransactionData;
|
||||
import org.qortal.network.Network;
|
||||
import org.qortal.network.Peer;
|
||||
import org.qortal.network.message.BlockMessage;
|
||||
import org.qortal.network.message.BlockSummariesMessage;
|
||||
@ -35,11 +37,10 @@ import org.qortal.repository.RepositoryManager;
|
||||
import org.qortal.settings.Settings;
|
||||
import org.qortal.transaction.Transaction;
|
||||
import org.qortal.utils.Base58;
|
||||
import org.qortal.utils.ByteArray;
|
||||
import org.qortal.utils.NTP;
|
||||
|
||||
import static org.qortal.network.Peer.FETCH_BLOCKS_TIMEOUT;
|
||||
|
||||
public class Synchronizer {
|
||||
public class Synchronizer extends Thread {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(Synchronizer.class);
|
||||
|
||||
@ -57,12 +58,31 @@ public class Synchronizer {
|
||||
/** Maximum number of block signatures we ask from peer in one go */
|
||||
private static final int MAXIMUM_REQUEST_SIZE = 200; // XXX move to Settings?
|
||||
|
||||
private static final long RECOVERY_MODE_TIMEOUT = 10 * 60 * 1000L; // ms
|
||||
|
||||
|
||||
private boolean running;
|
||||
|
||||
/** Latest block signatures from other peers that we know are on inferior chains. */
|
||||
List<ByteArray> inferiorChainSignatures = new ArrayList<>();
|
||||
|
||||
/** Recovery mode, which is used to bring back a stalled network */
|
||||
private boolean recoveryMode = false;
|
||||
private boolean peersAvailable = true; // peersAvailable must default to true
|
||||
private long timePeersLastAvailable = 0;
|
||||
|
||||
// Keep track of the size of the last re-org, so it can be logged
|
||||
private int lastReorgSize;
|
||||
|
||||
/** Synchronization object for sync variables below */
|
||||
public final Object syncLock = new Object();
|
||||
/** Whether we are attempting to synchronize. */
|
||||
private volatile boolean isSynchronizing = false;
|
||||
/** Temporary estimate of synchronization progress for SysTray use. */
|
||||
private volatile int syncPercent = 0;
|
||||
|
||||
private static volatile boolean requestSync = false;
|
||||
|
||||
// Keep track of invalid blocks so that we don't keep trying to sync them
|
||||
private Map<String, Long> invalidBlockSignatures = Collections.synchronizedMap(new HashMap<>());
|
||||
public Long timeValidBlockLastReceived = null;
|
||||
@ -77,6 +97,7 @@ public class Synchronizer {
|
||||
// Constructors
|
||||
|
||||
private Synchronizer() {
|
||||
this.running = true;
|
||||
}
|
||||
|
||||
public static Synchronizer getInstance() {
|
||||
@ -87,6 +108,261 @@ public class Synchronizer {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (running) {
|
||||
Thread.sleep(1000);
|
||||
|
||||
if (requestSync) {
|
||||
requestSync = false;
|
||||
Synchronizer.getInstance().potentiallySynchronize();
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Clear interrupted flag so we can shutdown trim threads
|
||||
Thread.interrupted();
|
||||
// Fall-through to exit
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
this.running = false;
|
||||
this.interrupt();
|
||||
}
|
||||
|
||||
|
||||
|
||||
public boolean isSynchronizing() {
|
||||
return this.isSynchronizing;
|
||||
}
|
||||
|
||||
public Integer getSyncPercent() {
|
||||
synchronized (this.syncLock) {
|
||||
return this.isSynchronizing ? this.syncPercent : null;
|
||||
}
|
||||
}
|
||||
|
||||
public void requestSync() {
|
||||
requestSync = true;
|
||||
}
|
||||
|
||||
public boolean isSyncRequested() {
|
||||
return requestSync;
|
||||
}
|
||||
|
||||
public boolean getRecoveryMode() {
|
||||
return this.recoveryMode;
|
||||
}
|
||||
|
||||
|
||||
public void potentiallySynchronize() throws InterruptedException {
|
||||
// Already synchronizing via another thread?
|
||||
if (this.isSynchronizing)
|
||||
return;
|
||||
|
||||
List<Peer> peers = Network.getInstance().getHandshakedPeers();
|
||||
|
||||
// Disregard peers that have "misbehaved" recently
|
||||
peers.removeIf(Controller.hasMisbehaved);
|
||||
|
||||
// Disregard peers that only have genesis block
|
||||
peers.removeIf(Controller.hasOnlyGenesisBlock);
|
||||
|
||||
// Disregard peers that don't have a recent block
|
||||
peers.removeIf(Controller.hasNoRecentBlock);
|
||||
|
||||
// Disregard peers that are on an old version
|
||||
peers.removeIf(Controller.hasOldVersion);
|
||||
|
||||
checkRecoveryModeForPeers(peers);
|
||||
if (recoveryMode) {
|
||||
peers = Network.getInstance().getHandshakedPeers();
|
||||
peers.removeIf(Controller.hasOnlyGenesisBlock);
|
||||
peers.removeIf(Controller.hasMisbehaved);
|
||||
peers.removeIf(Controller.hasOldVersion);
|
||||
}
|
||||
|
||||
// Check we have enough peers to potentially synchronize
|
||||
if (peers.size() < Settings.getInstance().getMinBlockchainPeers())
|
||||
return;
|
||||
|
||||
// Disregard peers that have no block signature or the same block signature as us
|
||||
peers.removeIf(Controller.hasNoOrSameBlock);
|
||||
|
||||
// Disregard peers that are on the same block as last sync attempt and we didn't like their chain
|
||||
peers.removeIf(Controller.hasInferiorChainTip);
|
||||
|
||||
final int peersBeforeComparison = peers.size();
|
||||
|
||||
// Request recent block summaries from the remaining peers, and locate our common block with each
|
||||
Synchronizer.getInstance().findCommonBlocksWithPeers(peers);
|
||||
|
||||
// Compare the peers against each other, and against our chain, which will return an updated list excluding those without common blocks
|
||||
peers = Synchronizer.getInstance().comparePeers(peers);
|
||||
|
||||
// We may have added more inferior chain tips when comparing peers, so remove any peers that are currently on those chains
|
||||
peers.removeIf(Controller.hasInferiorChainTip);
|
||||
|
||||
final int peersRemoved = peersBeforeComparison - peers.size();
|
||||
if (peersRemoved > 0 && peers.size() > 0)
|
||||
LOGGER.debug(String.format("Ignoring %d peers on inferior chains. Peers remaining: %d", peersRemoved, peers.size()));
|
||||
|
||||
if (peers.isEmpty())
|
||||
return;
|
||||
|
||||
if (peers.size() > 1) {
|
||||
StringBuilder finalPeersString = new StringBuilder();
|
||||
for (Peer peer : peers)
|
||||
finalPeersString = finalPeersString.length() > 0 ? finalPeersString.append(", ").append(peer) : finalPeersString.append(peer);
|
||||
LOGGER.debug(String.format("Choosing random peer from: [%s]", finalPeersString.toString()));
|
||||
}
|
||||
|
||||
// Pick random peer to sync with
|
||||
int index = new SecureRandom().nextInt(peers.size());
|
||||
Peer peer = peers.get(index);
|
||||
|
||||
actuallySynchronize(peer, false);
|
||||
}
|
||||
|
||||
public SynchronizationResult actuallySynchronize(Peer peer, boolean force) throws InterruptedException {
|
||||
boolean hasStatusChanged = false;
|
||||
BlockData priorChainTip = Controller.getInstance().getChainTip();
|
||||
|
||||
synchronized (this.syncLock) {
|
||||
this.syncPercent = (priorChainTip.getHeight() * 100) / peer.getChainTipData().getLastHeight();
|
||||
|
||||
// Only update SysTray if we're potentially changing height
|
||||
if (this.syncPercent < 100) {
|
||||
this.isSynchronizing = true;
|
||||
hasStatusChanged = true;
|
||||
}
|
||||
}
|
||||
peer.setSyncInProgress(true);
|
||||
|
||||
if (hasStatusChanged)
|
||||
Controller.getInstance().updateSysTray();
|
||||
|
||||
try {
|
||||
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, force);
|
||||
switch (syncResult) {
|
||||
case GENESIS_ONLY:
|
||||
case NO_COMMON_BLOCK:
|
||||
case TOO_DIVERGENT:
|
||||
case INVALID_DATA: {
|
||||
// These are more serious results that warrant a cool-off
|
||||
LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name()));
|
||||
|
||||
// Don't use this peer again for a while
|
||||
Network.getInstance().peerMisbehaved(peer);
|
||||
break;
|
||||
}
|
||||
|
||||
case INFERIOR_CHAIN: {
|
||||
// Update our list of inferior chain tips
|
||||
ByteArray inferiorChainSignature = new ByteArray(peer.getChainTipData().getLastBlockSignature());
|
||||
if (!inferiorChainSignatures.contains(inferiorChainSignature))
|
||||
inferiorChainSignatures.add(inferiorChainSignature);
|
||||
|
||||
// These are minor failure results so fine to try again
|
||||
LOGGER.debug(() -> String.format("Refused to synchronize with peer %s (%s)", peer, syncResult.name()));
|
||||
|
||||
// Notify peer of our superior chain
|
||||
if (!peer.sendMessage(Network.getInstance().buildHeightMessage(peer, priorChainTip)))
|
||||
peer.disconnect("failed to notify peer of our superior chain");
|
||||
break;
|
||||
}
|
||||
|
||||
case NO_REPLY:
|
||||
case NO_BLOCKCHAIN_LOCK:
|
||||
case REPOSITORY_ISSUE:
|
||||
// These are minor failure results so fine to try again
|
||||
LOGGER.debug(() -> String.format("Failed to synchronize with peer %s (%s)", peer, syncResult.name()));
|
||||
break;
|
||||
|
||||
case SHUTTING_DOWN:
|
||||
// Just quietly exit
|
||||
break;
|
||||
|
||||
case OK:
|
||||
// fall-through...
|
||||
case NOTHING_TO_DO: {
|
||||
// Update our list of inferior chain tips
|
||||
ByteArray inferiorChainSignature = new ByteArray(peer.getChainTipData().getLastBlockSignature());
|
||||
if (!inferiorChainSignatures.contains(inferiorChainSignature))
|
||||
inferiorChainSignatures.add(inferiorChainSignature);
|
||||
|
||||
LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Has our chain tip changed?
|
||||
BlockData newChainTip;
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
newChainTip = repository.getBlockRepository().getLastBlock();
|
||||
} catch (DataException e) {
|
||||
LOGGER.warn(String.format("Repository issue when trying to fetch post-synchronization chain tip: %s", e.getMessage()));
|
||||
return syncResult;
|
||||
}
|
||||
|
||||
if (!Arrays.equals(newChainTip.getSignature(), priorChainTip.getSignature())) {
|
||||
// Reset our cache of inferior chains
|
||||
inferiorChainSignatures.clear();
|
||||
|
||||
Network network = Network.getInstance();
|
||||
network.broadcast(broadcastPeer -> network.buildHeightMessage(broadcastPeer, newChainTip));
|
||||
}
|
||||
|
||||
return syncResult;
|
||||
} finally {
|
||||
this.isSynchronizing = false;
|
||||
peer.setSyncInProgress(false);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkRecoveryModeForPeers(List<Peer> qualifiedPeers) {
|
||||
List<Peer> handshakedPeers = Network.getInstance().getHandshakedPeers();
|
||||
|
||||
if (handshakedPeers.size() > 0) {
|
||||
// There is at least one handshaked peer
|
||||
if (qualifiedPeers.isEmpty()) {
|
||||
// There are no 'qualified' peers - i.e. peers that have a recent block we can sync to
|
||||
boolean werePeersAvailable = peersAvailable;
|
||||
peersAvailable = false;
|
||||
|
||||
// If peers only just became unavailable, update our record of the time they were last available
|
||||
if (werePeersAvailable)
|
||||
timePeersLastAvailable = NTP.getTime();
|
||||
|
||||
// If enough time has passed, enter recovery mode, which lifts some restrictions on who we can sync with and when we can mint
|
||||
if (NTP.getTime() - timePeersLastAvailable > RECOVERY_MODE_TIMEOUT) {
|
||||
if (recoveryMode == false) {
|
||||
LOGGER.info(String.format("Peers have been unavailable for %d minutes. Entering recovery mode...", RECOVERY_MODE_TIMEOUT/60/1000));
|
||||
recoveryMode = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// We now have at least one peer with a recent block, so we can exit recovery mode and sync normally
|
||||
peersAvailable = true;
|
||||
if (recoveryMode) {
|
||||
LOGGER.info("Peers have become available again. Exiting recovery mode...");
|
||||
recoveryMode = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return recoveryMode;
|
||||
}
|
||||
|
||||
public void addInferiorChainSignature(byte[] inferiorSignature) {
|
||||
// Update our list of inferior chain tips
|
||||
ByteArray inferiorChainSignature = new ByteArray(inferiorSignature);
|
||||
if (!inferiorChainSignatures.contains(inferiorChainSignature))
|
||||
inferiorChainSignatures.add(inferiorChainSignature);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Iterate through a list of supplied peers, and attempt to find our common block with each.
|
||||
* If a common block is found, its summary will be retained in the peer's commonBlockSummary property, for processing later.
|
||||
@ -279,7 +555,7 @@ public class Synchronizer {
|
||||
// We have already determined that the correct chain diverged from a lower height. We are safe to skip these peers.
|
||||
for (Peer peer : peersSharingCommonBlock) {
|
||||
LOGGER.debug(String.format("Peer %s has common block at height %d but the superior chain is at height %d. Removing it from this round.", peer, commonBlockSummary.getHeight(), dropPeersAfterCommonBlockHeight));
|
||||
Controller.getInstance().addInferiorChainSignature(peer.getChainTipData().getLastBlockSignature());
|
||||
this.addInferiorChainSignature(peer.getChainTipData().getLastBlockSignature());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package org.qortal.controller.repository;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.Synchronizer;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
@ -47,7 +48,7 @@ public class AtStatesPruner implements Runnable {
|
||||
continue;
|
||||
|
||||
// Don't even attempt if we're mid-sync as our repository requests will be delayed for ages
|
||||
if (Controller.getInstance().isSynchronizing())
|
||||
if (Synchronizer.getInstance().isSynchronizing())
|
||||
continue;
|
||||
|
||||
// Prune AT states for all blocks up until our latest minus pruneBlockLimit
|
||||
|
@ -3,6 +3,7 @@ package org.qortal.controller.repository;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.Synchronizer;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
@ -34,7 +35,7 @@ public class AtStatesTrimmer implements Runnable {
|
||||
continue;
|
||||
|
||||
// Don't even attempt if we're mid-sync as our repository requests will be delayed for ages
|
||||
if (Controller.getInstance().isSynchronizing())
|
||||
if (Synchronizer.getInstance().isSynchronizing())
|
||||
continue;
|
||||
|
||||
long currentTrimmableTimestamp = NTP.getTime() - Settings.getInstance().getAtStatesMaxLifetime();
|
||||
|
@ -3,6 +3,7 @@ package org.qortal.controller.repository;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.Synchronizer;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.repository.*;
|
||||
import org.qortal.settings.Settings;
|
||||
@ -51,7 +52,7 @@ public class BlockArchiver implements Runnable {
|
||||
}
|
||||
|
||||
// Don't even attempt if we're mid-sync as our repository requests will be delayed for ages
|
||||
if (Controller.getInstance().isSynchronizing()) {
|
||||
if (Synchronizer.getInstance().isSynchronizing()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package org.qortal.controller.repository;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.Synchronizer;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
@ -51,7 +52,7 @@ public class BlockPruner implements Runnable {
|
||||
continue;
|
||||
|
||||
// Don't even attempt if we're mid-sync as our repository requests will be delayed for ages
|
||||
if (Controller.getInstance().isSynchronizing()) {
|
||||
if (Synchronizer.getInstance().isSynchronizing()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.block.BlockChain;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.Synchronizer;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
@ -36,7 +37,7 @@ public class OnlineAccountsSignaturesTrimmer implements Runnable {
|
||||
continue;
|
||||
|
||||
// Don't even attempt if we're mid-sync as our repository requests will be delayed for ages
|
||||
if (Controller.getInstance().isSynchronizing())
|
||||
if (Synchronizer.getInstance().isSynchronizing())
|
||||
continue;
|
||||
|
||||
// Trim blockchain by removing 'old' online accounts signatures
|
||||
|
@ -5,6 +5,7 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.block.Block;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.Synchronizer;
|
||||
import org.qortal.data.block.BlockArchiveData;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.settings.Settings;
|
||||
@ -100,7 +101,7 @@ public class BlockArchiveWriter {
|
||||
if (Controller.isStopping()) {
|
||||
return BlockArchiveWriteResult.STOPPING;
|
||||
}
|
||||
if (Controller.getInstance().isSynchronizing()) {
|
||||
if (Synchronizer.getInstance().isSynchronizing()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user