Increased NTP check interval from 5 minutes to 10 minutes.

Controller batches SysTray updates into one per second.

Block generation only allowed by Controller is clock
known to be accurate. ('not sure' stops generation).

NTP MAX_STDDEV increased from 25ms to 125ms to cater
for poorly connected nodes.

Controller sends peers list over outbound peer connections,
requests peers list from inbound peer connections.

When peer handshake completes, Network & Controller only send
initial messages over outbound peer connections.
This is to fix HEIGHT_V2 messages being processed out-of-order
breaking handshaking, as indicated by log entries like:

2019-07-26 16:16:35 DEBUG Network:702 - Unexpected HEIGHT_V2 message from xx.xxx.xx.xx:pppp, expected PROOF
2019-07-26 16:16:35 DEBUG Network:840 - Handshake completed with peer xx.xxx.xx.xx:pppp

Increased connection failure backoff from 1 minute to 5 minutes,
as handshake timeout is 1 minute and then nodes would immediately reconnect.

Changed default NTP servers from asia to cn.
This commit is contained in:
catbref 2019-07-26 17:04:47 +01:00
parent 33f3d35784
commit 63036f3592
5 changed files with 99 additions and 79 deletions

View File

@ -91,12 +91,13 @@ public class Controller extends Thread {
private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s/blockchain;create=true;hsqldb.full_log_replay=true"; private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s/blockchain;create=true;hsqldb.full_log_replay=true";
private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000; // ms private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000; // ms
private static final long REPOSITORY_BACKUP_PERIOD = 123 * 60 * 1000; // ms private static final long REPOSITORY_BACKUP_PERIOD = 123 * 60 * 1000; // ms
private static final long NTP_CHECK_PERIOD = 5 * 60 * 1000; // ms private static final long NTP_CHECK_PERIOD = 10 * 60 * 1000; // ms
private static final long MAX_NTP_OFFSET = 500; // ms private static final long MAX_NTP_OFFSET = 500; // ms
private static volatile boolean isStopping = false; private static volatile boolean isStopping = false;
private static BlockGenerator blockGenerator = null; private static BlockGenerator blockGenerator = null;
private static volatile boolean requestSync = false; private static volatile boolean requestSync = false;
private static volatile boolean requestSysTrayUpdate = false;
private static Controller instance; private static Controller instance;
private final String buildVersion; private final String buildVersion;
@ -105,7 +106,7 @@ public class Controller extends Thread {
private long repositoryBackupTimestamp = startTime + REPOSITORY_BACKUP_PERIOD; private long repositoryBackupTimestamp = startTime + REPOSITORY_BACKUP_PERIOD;
private long ntpCheckTimestamp = startTime; // ms private long ntpCheckTimestamp = startTime; // ms
/** Whether BlockGenerator is allowed to generate blocks. Mostly determined by system clock accuracy. */ /** Whether BlockGenerator is allowed to generate blocks. Mostly determined by system clock accuracy. */
private boolean isGenerationAllowed = false; private volatile boolean isGenerationAllowed = false;
/** Signature of peer's latest block when we tried to sync but peer had inferior chain. */ /** Signature of peer's latest block when we tried to sync but peer had inferior chain. */
private byte[] inferiorChainPeerBlockSignature = null; private byte[] inferiorChainPeerBlockSignature = null;
@ -329,11 +330,8 @@ public class Controller extends Thread {
// Potentially nag end-user about NTP // Potentially nag end-user about NTP
if (System.currentTimeMillis() >= ntpCheckTimestamp) { if (System.currentTimeMillis() >= ntpCheckTimestamp) {
ntpCheckTimestamp += NTP_CHECK_PERIOD; ntpCheckTimestamp += NTP_CHECK_PERIOD;
Boolean isClockAccurate = ntpCheck(); isGenerationAllowed = ntpCheck();
if (isClockAccurate != null) { requestSysTrayUpdate = true;
isGenerationAllowed = isClockAccurate;
updateSysTray();
}
} }
// Prune stuck/slow/old peers // Prune stuck/slow/old peers
@ -342,6 +340,12 @@ public class Controller extends Thread {
} catch (DataException e) { } catch (DataException e) {
LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage()));
} }
// Maybe update SysTray
if (requestSysTrayUpdate) {
requestSysTrayUpdate = false;
updateSysTray();
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Fall-through to exit // Fall-through to exit
@ -418,7 +422,7 @@ public class Controller extends Thread {
break; break;
case OK: case OK:
updateSysTray(); requestSysTrayUpdate = true;
// fall-through... // fall-through...
case NOTHING_TO_DO: case NOTHING_TO_DO:
LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name())); LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name()));
@ -435,14 +439,14 @@ public class Controller extends Thread {
/** /**
* Nag if we detect system clock is too far from internet time. * Nag if we detect system clock is too far from internet time.
* *
* @return <tt>true</tt> if clock is accurate, <tt>false</tt> if inaccurate, <tt>null</tt> if we don't know. * @return <tt>true</tt> if clock is accurate, <tt>false</tt> if inaccurate or we don't know.
*/ */
private Boolean ntpCheck() { private boolean ntpCheck() {
// Fetch mean offset from internet time (ms). // Fetch mean offset from internet time (ms).
Long meanOffset = NTP.getOffset(); Long meanOffset = NTP.getOffset();
final boolean isWindows = System.getProperty("os.name").toLowerCase().contains("win"); final boolean isWindows = System.getProperty("os.name").toLowerCase().contains("win");
Boolean isNtpActive = null; boolean isNtpActive = false;
if (isWindows) { if (isWindows) {
// Detecting Windows Time service // Detecting Windows Time service
@ -472,24 +476,22 @@ public class Controller extends Thread {
} }
} }
// If offset is good and ntp is active then we're good LOGGER.info(String.format("NTP mean offset %s, NTP service active: %s", meanOffset, isNtpActive));
if (meanOffset != null && Math.abs(meanOffset) < MAX_NTP_OFFSET && isNtpActive == true)
return true;
// Time to nag final boolean isOffsetGood = meanOffset != null && Math.abs(meanOffset) < MAX_NTP_OFFSET;
String caption = Translator.INSTANCE.translate("SysTray", "NTP_NAG_CAPTION");
String text = Translator.INSTANCE.translate("SysTray", isWindows ? "NTP_NAG_TEXT_WINDOWS" : "NTP_NAG_TEXT_UNIX");
SysTray.getInstance().showMessage(caption, text, MessageType.WARNING);
if (meanOffset == null) // If offset bad or NTP not active then nag
// We don't know if we're inaccurate if (!isOffsetGood || !isNtpActive) {
return null; String caption = Translator.INSTANCE.translate("SysTray", "NTP_NAG_CAPTION");
String text = Translator.INSTANCE.translate("SysTray", isWindows ? "NTP_NAG_TEXT_WINDOWS" : "NTP_NAG_TEXT_UNIX");
SysTray.getInstance().showMessage(caption, text, MessageType.WARNING);
}
// Return whether we're accurate (disregarding whether NTP service is active) // Return whether we're accurate (disregarding whether NTP service is active)
return Math.abs(meanOffset) < MAX_NTP_OFFSET; return isOffsetGood;
} }
public void updateSysTray() { private void updateSysTray() {
final int numberOfPeers = Network.getInstance().getUniqueHandshakedPeers().size(); final int numberOfPeers = Network.getInstance().getUniqueHandshakedPeers().size();
final int height = getChainHeight(); final int height = getChainHeight();
@ -565,17 +567,14 @@ public class Controller extends Thread {
public void doNetworkBroadcast() { public void doNetworkBroadcast() {
Network network = Network.getInstance(); Network network = Network.getInstance();
// Send our known peers // Send (if outbound) / Request peer lists
network.broadcast(peer -> network.buildPeersMessage(peer)); network.broadcast(peer -> peer.isOutbound() ? network.buildPeersMessage(peer) : new GetPeersMessage());
// Send our current height // Send our current height
BlockData latestBlockData = getChainTip(); BlockData latestBlockData = getChainTip();
network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData)); network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData));
// Request peers lists // Send (if outbound) / Request unconfirmed transaction signatures
network.broadcast(peer -> new GetPeersMessage());
// Request unconfirmed transaction signatures
network.broadcast(peer -> network.buildGetUnconfirmedTransactionsMessage(peer)); network.broadcast(peer -> network.buildGetUnconfirmedTransactionsMessage(peer));
} }
@ -594,39 +593,42 @@ public class Controller extends Thread {
} }
public void onPeerHandshakeCompleted(Peer peer) { public void onPeerHandshakeCompleted(Peer peer) {
if (peer.getVersion() < 2) { // Only send if outbound
// Legacy mode if (peer.isOutbound()) {
if (peer.getVersion() < 2) {
// Legacy mode
// Send our unconfirmed transactions // Send our unconfirmed transactions
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
List<TransactionData> transactions = repository.getTransactionRepository().getUnconfirmedTransactions(); List<TransactionData> transactions = repository.getTransactionRepository().getUnconfirmedTransactions();
for (TransactionData transactionData : transactions) { for (TransactionData transactionData : transactions) {
Message transactionMessage = new TransactionMessage(transactionData); Message transactionMessage = new TransactionMessage(transactionData);
if (!peer.sendMessage(transactionMessage)) { if (!peer.sendMessage(transactionMessage)) {
peer.disconnect("failed to send unconfirmed transaction"); peer.disconnect("failed to send unconfirmed transaction");
return; return;
}
} }
} catch (DataException e) {
LOGGER.error("Repository issue while sending unconfirmed transactions", e);
} }
} catch (DataException e) { } else {
LOGGER.error("Repository issue while sending unconfirmed transactions", e); // V2 protocol
}
} else {
// V2 protocol
// Request peer's unconfirmed transactions // Request peer's unconfirmed transactions
Message message = new GetUnconfirmedTransactionsMessage(); Message message = new GetUnconfirmedTransactionsMessage();
if (!peer.sendMessage(message)) { if (!peer.sendMessage(message)) {
peer.disconnect("failed to send request for unconfirmed transactions"); peer.disconnect("failed to send request for unconfirmed transactions");
return; return;
}
} }
} }
updateSysTray(); requestSysTrayUpdate = true;
} }
public void onPeerDisconnect(Peer peer) { public void onPeerDisconnect(Peer peer) {
updateSysTray(); requestSysTrayUpdate = true;
} }
public void onNetworkMessage(Peer peer, Message message) { public void onNetworkMessage(Peer peer, Message message) {
@ -655,10 +657,17 @@ public class Controller extends Thread {
case HEIGHT_V2: { case HEIGHT_V2: {
HeightV2Message heightV2Message = (HeightV2Message) message; HeightV2Message heightV2Message = (HeightV2Message) message;
// If peer is inbound and we've not updated their height
// then this is probably their initial HEIGHT_V2 message
// so they need a corresponding HEIGHT_V2 message from us
if (!peer.isOutbound() && peer.getLastHeight() == null)
peer.sendMessage(Network.getInstance().buildHeightMessage(peer, getChainTip()));
// Update all peers with same ID // Update all peers with same ID
List<Peer> connectedPeers = Network.getInstance().getHandshakedPeers(); List<Peer> connectedPeers = Network.getInstance().getHandshakedPeers();
for (Peer connectedPeer : connectedPeers) { for (Peer connectedPeer : connectedPeers) {
// Skip connectedPeer if they have no ID or their ID doesn't match sender's ID
if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId()))
continue; continue;

View File

@ -63,7 +63,7 @@ public class Network extends Thread {
private static final int LISTEN_BACKLOG = 10; private static final int LISTEN_BACKLOG = 10;
/** How long before retrying after a connection failure, in milliseconds. */ /** How long before retrying after a connection failure, in milliseconds. */
private static final int CONNECT_FAILURE_BACKOFF = 60 * 1000; // ms private static final int CONNECT_FAILURE_BACKOFF = 5 * 60 * 1000; // ms
/** How long between informational broadcasts to all connected peers, in milliseconds. */ /** How long between informational broadcasts to all connected peers, in milliseconds. */
private static final int BROADCAST_INTERVAL = 60 * 1000; // ms private static final int BROADCAST_INTERVAL = 60 * 1000; // ms
/** Maximum time since last successful connection for peer info to be propagated, in milliseconds. */ /** Maximum time since last successful connection for peer info to be propagated, in milliseconds. */
@ -508,7 +508,7 @@ public class Network extends Thread {
handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT); handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT);
for (Peer peer : handshakePeers) for (Peer peer : handshakePeers)
peer.disconnect("handshake timeout"); peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name()));
// Prune 'old' peers from repository... // Prune 'old' peers from repository...
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
@ -848,24 +848,29 @@ public class Network extends Thread {
// Start regular pings // Start regular pings
peer.startPings(); peer.startPings();
// Send our height // Only the outbound side needs to send anything (after we've received handshake-completing response).
Message heightMessage = buildHeightMessage(peer, Controller.getInstance().getChainTip()); // (If inbound sent anything here, it's possible it could be processed out-of-order with handshake message).
if (!peer.sendMessage(heightMessage)) {
peer.disconnect("failed to send height/info"); if (peer.isOutbound()) {
return; // Send our height
Message heightMessage = buildHeightMessage(peer, Controller.getInstance().getChainTip());
if (!peer.sendMessage(heightMessage)) {
peer.disconnect("failed to send height/info");
return;
}
// Send our peers list
Message peersMessage = this.buildPeersMessage(peer);
if (!peer.sendMessage(peersMessage))
peer.disconnect("failed to send peers list");
// Request their peers list
Message getPeersMessage = new GetPeersMessage();
if (!peer.sendMessage(getPeersMessage))
peer.disconnect("failed to request peers list");
} }
// Send our peers list // Ask Controller if they want to do anything
Message peersMessage = this.buildPeersMessage(peer);
if (!peer.sendMessage(peersMessage))
peer.disconnect("failed to send peers list");
// Request their peers list
Message getPeersMessage = new GetPeersMessage();
if (!peer.sendMessage(getPeersMessage))
peer.disconnect("failed to request peers list");
// Ask Controller if they want to send anything
Controller.getInstance().onPeerHandshakeCompleted(peer); Controller.getInstance().onPeerHandshakeCompleted(peer);
} }

View File

@ -483,7 +483,8 @@ public class Peer {
} }
public void disconnect(String reason) { public void disconnect(String reason) {
LOGGER.debug(String.format("Disconnecting peer %s: %s", this, reason)); if (!isStopping)
LOGGER.debug(() -> String.format("Disconnecting peer %s: %s", this, reason));
this.shutdown(); this.shutdown();
@ -491,7 +492,10 @@ public class Peer {
} }
public void shutdown() { public void shutdown() {
LOGGER.debug(() -> String.format("Shutting down peer %s", this)); if (!isStopping)
LOGGER.debug(() -> String.format("Shutting down peer %s", this));
isStopping = true;
if (this.socketChannel.isOpen()) { if (this.socketChannel.isOpen()) {
try { try {

View File

@ -102,11 +102,11 @@ public class Settings {
"1.pool.ntp.org", "1.pool.ntp.org",
"2.pool.ntp.org", "2.pool.ntp.org",
"3.pool.ntp.org", "3.pool.ntp.org",
"asia.pool.ntp.org", "cn.pool.ntp.org",
"0.asia.pool.ntp.org", "0.cn.pool.ntp.org",
"1.asia.pool.ntp.org", "1.cn.pool.ntp.org",
"2.asia.pool.ntp.org", "2.cn.pool.ntp.org",
"3.asia.pool.ntp.org" "3.cn.pool.ntp.org"
}; };
// Constructors // Constructors

View File

@ -14,7 +14,7 @@ import org.qora.settings.Settings;
public class NTP { public class NTP {
private static final Logger LOGGER = LogManager.getLogger(NTP.class); private static final Logger LOGGER = LogManager.getLogger(NTP.class);
private static final double MAX_STDDEV = 25; // ms private static final double MAX_STDDEV = 125; // ms
/** /**
* Returns aggregated internet time. * Returns aggregated internet time.
@ -71,7 +71,7 @@ public class NTP {
} }
if (offsets.size() < ntpServers.length / 2) { if (offsets.size() < ntpServers.length / 2) {
LOGGER.debug("Not enough replies"); LOGGER.info(String.format("Not enough replies: %d, minimum is %d", offsets.size(), ntpServers.length / 2));
return null; return null;
} }
@ -90,8 +90,10 @@ public class NTP {
double stddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); double stddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1)));
// If stddev is excessive then we're not very sure so give up // If stddev is excessive then we're not very sure so give up
if (stddev > MAX_STDDEV) if (stddev > MAX_STDDEV) {
LOGGER.info(String.format("Excessive standard deviation %.1f, maximum is %.1f", stddev, MAX_STDDEV));
return null; return null;
}
return (long) mean; return (long) mean;
} }