diff --git a/src/main/java/org/qora/api/model/NodeInfo.java b/src/main/java/org/qora/api/model/NodeInfo.java index dada537b..8daec172 100644 --- a/src/main/java/org/qora/api/model/NodeInfo.java +++ b/src/main/java/org/qora/api/model/NodeInfo.java @@ -6,6 +6,7 @@ import javax.xml.bind.annotation.XmlAccessorType; @XmlAccessorType(XmlAccessType.FIELD) public class NodeInfo { + public Long currentTimestamp; public long uptime; public String buildVersion; public long buildTimestamp; diff --git a/src/main/java/org/qora/api/resource/AdminResource.java b/src/main/java/org/qora/api/resource/AdminResource.java index 4d923e0b..5445c0c2 100644 --- a/src/main/java/org/qora/api/resource/AdminResource.java +++ b/src/main/java/org/qora/api/resource/AdminResource.java @@ -58,6 +58,7 @@ import org.qora.network.Network; import org.qora.network.Peer; import org.qora.network.PeerAddress; import org.qora.utils.Base58; +import org.qora.utils.NTP; import com.google.common.collect.Lists; @@ -112,6 +113,7 @@ public class AdminResource { public NodeInfo info() { NodeInfo nodeInfo = new NodeInfo(); + nodeInfo.currentTimestamp = NTP.getTime(); nodeInfo.uptime = System.currentTimeMillis() - Controller.startTime; nodeInfo.buildVersion = Controller.getInstance().getVersionString(); nodeInfo.buildTimestamp = Controller.getInstance().getBuildTimestamp(); diff --git a/src/main/java/org/qora/block/Block.java b/src/main/java/org/qora/block/Block.java index 2adb5f54..da927b2e 100644 --- a/src/main/java/org/qora/block/Block.java +++ b/src/main/java/org/qora/block/Block.java @@ -40,6 +40,7 @@ import org.qora.transform.TransformationException; import org.qora.transform.block.BlockTransformer; import org.qora.transform.transaction.TransactionTransformer; import org.qora.utils.Base58; +import org.qora.utils.NTP; import com.google.common.primitives.Bytes; @@ -78,6 +79,7 @@ public class Block { TIMESTAMP_IN_FUTURE(21), TIMESTAMP_MS_INCORRECT(22), TIMESTAMP_TOO_SOON(23), + TIMESTAMP_INCORRECT(24), VERSION_INCORRECT(30), FEATURE_NOT_YET_RELEASED(31), GENERATING_BALANCE_INCORRECT(40), @@ -205,6 +207,10 @@ public class Block { byte[] reference = parentBlockData.getSignature(); BigDecimal generatingBalance = parentBlock.calcNextBlockGeneratingBalance(); + // After a certain height, block timestamps are generated using previous block and generator's public key + if (height >= BlockChain.getInstance().getNewBlockTimestampHeight()) + timestamp = calcTimestamp(parentBlockData, generator.getPublicKey()); + byte[] generatorSignature; try { generatorSignature = generator @@ -258,6 +264,7 @@ public class Block { Block parentBlock = new Block(repository, parentBlockData); newBlock.generator = generator; + BlockData parentBlockData = newBlock.getParent(); // Copy AT state data newBlock.ourAtStates = this.ourAtStates; @@ -269,6 +276,10 @@ public class Block { byte[] reference = this.blockData.getReference(); BigDecimal generatingBalance = this.blockData.getGeneratingBalance(); + // After a certain height, block timestamps are generated using previous block and generator's public key + if (height >= BlockChain.getInstance().getNewBlockTimestampHeight()) + timestamp = calcTimestamp(parentBlockData, generator.getPublicKey()); + byte[] generatorSignature; try { generatorSignature = generator @@ -734,13 +745,15 @@ public class Block { *

* For qora-core, we'll using the minimum from BlockChain config. */ - public static long calcMinimumTimestamp(BlockData parentBlockData, byte[] generatorPublicKey) { + public static long calcTimestamp(BlockData parentBlockData, byte[] generatorPublicKey) { long minBlockTime = BlockChain.getInstance().getMinBlockTime(); // seconds return parentBlockData.getTimestamp() + (minBlockTime * 1000L); } - public long calcMinimumTimestamp(BlockData parentBlockData) { - return calcMinimumTimestamp(parentBlockData, this.generator.getPublicKey()); + public static long calcMinimumTimestamp(BlockData parentBlockData) { + final int thisHeight = parentBlockData.getHeight() + 1; + BlockTimingByHeight blockTiming = BlockChain.getInstance().getBlockTimingByHeight(thisHeight); + return parentBlockData.getTimestamp() + blockTiming.target - blockTiming.deviation; } /** @@ -797,8 +810,9 @@ public class Block { if (this.blockData.getTimestamp() <= parentBlockData.getTimestamp()) return ValidationResult.TIMESTAMP_OLDER_THAN_PARENT; - // Check timestamp is not in the future (within configurable ~500ms margin) - if (this.blockData.getTimestamp() - BlockChain.getInstance().getBlockTimestampMargin() > System.currentTimeMillis()) + // Check timestamp is not in the future (within configurable margin) + // We don't need to check NTP.getTime() for null as we shouldn't reach here if that is already the case + if (this.blockData.getTimestamp() - BlockChain.getInstance().getBlockTimestampMargin() > NTP.getTime()) return ValidationResult.TIMESTAMP_IN_FUTURE; // Legacy gen1 test: check timestamp milliseconds is the same as parent timestamp milliseconds? @@ -807,9 +821,15 @@ public class Block { // Too early to forge block? // XXX DISABLED as it doesn't work - but why? - // if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMinBlockTime()) + // if (this.blockData.getTimestamp() < Block.calcMinimumTimestamp(parentBlockData)) // return ValidationResult.TIMESTAMP_TOO_SOON; + if (this.blockData.getHeight() >= BlockChain.getInstance().getNewBlockTimestampHeight()) { + long expectedTimestamp = calcTimestamp(parentBlockData, this.blockData.getGeneratorPublicKey()); + if (this.blockData.getTimestamp() != expectedTimestamp) + return ValidationResult.TIMESTAMP_INCORRECT; + } + return ValidationResult.OK; } diff --git a/src/main/java/org/qora/block/BlockGenerator.java b/src/main/java/org/qora/block/BlockGenerator.java index 710abe7a..bad4cb85 100644 --- a/src/main/java/org/qora/block/BlockGenerator.java +++ b/src/main/java/org/qora/block/BlockGenerator.java @@ -26,6 +26,7 @@ import org.qora.repository.RepositoryManager; import org.qora.settings.Settings; import org.qora.transaction.Transaction; import org.qora.utils.Base58; +import org.qora.utils.NTP; // Forging new blocks @@ -83,6 +84,14 @@ public class BlockGenerator extends Thread { if (!Controller.getInstance().isGenerationAllowed()) continue; + final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); + if (minLatestBlockTimestamp == null) + continue; + + final Long now = NTP.getTime(); + if (now == null) + continue; + List forgingAccountsData = repository.getAccountRepository().getForgingAccounts(); // No forging accounts? if (forgingAccountsData.isEmpty()) @@ -98,8 +107,6 @@ public class BlockGenerator extends Thread { if (peers.size() < Settings.getInstance().getMinBlockchainPeers()) continue; - final long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); - // Disregard peers that don't have a recent block peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp); @@ -172,7 +179,7 @@ public class BlockGenerator extends Thread { Block newBlock = goodBlocks.get(winningIndex); // Delete invalid transactions. NOTE: discards repository changes on entry, saves changes on exit. - deleteInvalidTransactions(repository); + // deleteInvalidTransactions(repository); // Add unconfirmed transactions addUnconfirmedTransactions(repository, newBlock); @@ -202,12 +209,16 @@ public class BlockGenerator extends Thread { if (proxyForgerData != null) { PublicKeyAccount forger = new PublicKeyAccount(repository, proxyForgerData.getForgerPublicKey()); - LOGGER.info(String.format("Generated block %d by %s on behalf of %s", + LOGGER.info(String.format("Generated block %d, sig %.8s by %s on behalf of %s", newBlock.getBlockData().getHeight(), + Base58.encode(newBlock.getBlockData().getSignature()), forger.getAddress(), proxyForgerData.getRecipient())); } else { - LOGGER.info(String.format("Generated block %d by %s", newBlock.getBlockData().getHeight(), newBlock.getGenerator().getAddress())); + LOGGER.info(String.format("Generated block %d, sig %.8s by %s", + newBlock.getBlockData().getHeight(), + Base58.encode(newBlock.getBlockData().getSignature()), + newBlock.getGenerator().getAddress())); } repository.saveChanges(); @@ -327,7 +338,7 @@ public class BlockGenerator extends Thread { blockchainLock.lock(); try { // Delete invalid transactions - deleteInvalidTransactions(repository); + // deleteInvalidTransactions(repository); // Add unconfirmed transactions addUnconfirmedTransactions(repository, newBlock); diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index ada7db35..2ee592a6 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -1,6 +1,5 @@ package org.qora.controller; -import java.awt.TrayIcon.MessageType; import java.io.IOException; import java.io.InputStream; import java.security.SecureRandom; @@ -16,7 +15,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; -import java.util.Scanner; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; @@ -91,8 +89,9 @@ public class Controller extends Thread { private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s/blockchain;create=true;hsqldb.full_log_replay=true"; private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000; // ms private static final long REPOSITORY_BACKUP_PERIOD = 123 * 60 * 1000; // ms - private static final long NTP_CHECK_PERIOD = 10 * 60 * 1000; // ms - private static final long MAX_NTP_OFFSET = 30 * 1000; // ms + private static final long NTP_PRE_SYNC_CHECK_PERIOD = 5 * 1000; // ms + private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000; // ms + private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000; // ms private static volatile boolean isStopping = false; private static BlockGenerator blockGenerator = null; @@ -103,15 +102,16 @@ public class Controller extends Thread { private final String buildVersion; private final long buildTimestamp; // seconds - private long repositoryBackupTimestamp = startTime + REPOSITORY_BACKUP_PERIOD; + private long repositoryBackupTimestamp = startTime + REPOSITORY_BACKUP_PERIOD; // ms private long ntpCheckTimestamp = startTime; // ms + private long deleteExpiredTimestamp = startTime + DELETE_EXPIRED_INTERVAL; // ms /** Whether BlockGenerator is allowed to generate blocks. Mostly determined by system clock accuracy. */ private volatile boolean isGenerationAllowed = false; - /** Signature of peer's latest block when we tried to sync but peer had inferior chain. */ - private byte[] inferiorChainPeerBlockSignature = null; - /** Signature of our latest block when we tried to sync but peer had inferior chain. */ - private byte[] inferiorChainOurBlockSignature = null; + /** Signature of peer's latest block that will result in no sync action needed (e.g. INFERIOR_CHAIN, NOTHING_TO_DO, OK). */ + private byte[] noSyncPeerBlockSignature = null; + /** Signature of our latest block that will result in no sync action needed (e.g. INFERIOR_CHAIN, NOTHING_TO_DO, OK). */ + private byte[] noSyncOurBlockSignature = null; /** * Map of recent requests for ARBITRARY transaction data payloads. @@ -223,6 +223,9 @@ public class Controller extends Thread { // Load/check settings, which potentially sets up blockchain config, etc. Settings.getInstance(); + LOGGER.info("Starting NTP"); + NTP.start(); + LOGGER.info("Starting repository"); try { RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getRepositoryUrl()); @@ -317,20 +320,31 @@ public class Controller extends Thread { potentiallySynchronize(); } + final long now = System.currentTimeMillis(); + // Clean up arbitrary data request cache - final long requestMinimumTimestamp = System.currentTimeMillis() - ARBITRARY_REQUEST_TIMEOUT; + final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT; arbitraryDataRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp); // Give repository a chance to backup - if (System.currentTimeMillis() >= repositoryBackupTimestamp) { - repositoryBackupTimestamp += REPOSITORY_BACKUP_PERIOD; + if (now >= repositoryBackupTimestamp) { + repositoryBackupTimestamp = now + REPOSITORY_BACKUP_PERIOD; RepositoryManager.backup(true); } - // Potentially nag end-user about NTP - if (System.currentTimeMillis() >= ntpCheckTimestamp) { - ntpCheckTimestamp += NTP_CHECK_PERIOD; - isGenerationAllowed = ntpCheck(); + // Check NTP status + if (now >= ntpCheckTimestamp) { + Long ntpTime = NTP.getTime(); + + if (ntpTime != null) { + LOGGER.info(String.format("Adjusting system time by NTP offset: %dms", ntpTime - now)); + ntpCheckTimestamp = now + NTP_POST_SYNC_CHECK_PERIOD; + } else { + LOGGER.info(String.format("No NTP offset yet")); + ntpCheckTimestamp = now + NTP_PRE_SYNC_CHECK_PERIOD; + } + + isGenerationAllowed = ntpTime != null; requestSysTrayUpdate = true; } @@ -341,6 +355,12 @@ public class Controller extends Thread { LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); } + // Delete expired transactions + if (now >= deleteExpiredTimestamp) { + deleteExpiredTimestamp = now + DELETE_EXPIRED_INTERVAL; + deleteExpiredTransactions(); + } + // Maybe update SysTray if (requestSysTrayUpdate) { requestSysTrayUpdate = false; @@ -353,6 +373,10 @@ public class Controller extends Thread { } private void potentiallySynchronize() throws InterruptedException { + final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); + if (minLatestBlockTimestamp == null) + return; + List peers = Network.getInstance().getUniqueHandshakedPeers(); // Disregard peers that have "misbehaved" recently @@ -363,7 +387,6 @@ public class Controller extends Thread { return; // Disregard peers that don't have a recent block - final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp); BlockData latestBlockData = getChainTip(); @@ -371,17 +394,17 @@ public class Controller extends Thread { // Disregard peers that have no block signature or the same block signature as us peers.removeIf(peer -> peer.getLastBlockSignature() == null || Arrays.equals(latestBlockData.getSignature(), peer.getLastBlockSignature())); - // Disregard peer we used last time, if both we and they are still on the same block and we didn't like their chain - if (inferiorChainOurBlockSignature != null && Arrays.equals(inferiorChainOurBlockSignature, latestBlockData.getSignature())) - peers.removeIf(peer -> Arrays.equals(inferiorChainPeerBlockSignature, peer.getLastBlockSignature())); + // Disregard peers that are on the same block as last sync attempt and we didn't like their chain + if (noSyncOurBlockSignature != null && Arrays.equals(noSyncOurBlockSignature, latestBlockData.getSignature())) + peers.removeIf(peer -> Arrays.equals(noSyncPeerBlockSignature, peer.getLastBlockSignature())); if (!peers.isEmpty()) { // Pick random peer to sync with int index = new SecureRandom().nextInt(peers.size()); Peer peer = peers.get(index); - inferiorChainOurBlockSignature = null; - inferiorChainPeerBlockSignature = null; + noSyncOurBlockSignature = null; + noSyncPeerBlockSignature = null; SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, false); switch (syncResult) { @@ -395,7 +418,7 @@ public class Controller extends Thread { // Don't use this peer again for a while PeerData peerData = peer.getPeerData(); - peerData.setLastMisbehaved(System.currentTimeMillis()); + peerData.setLastMisbehaved(NTP.getTime()); // Only save to repository if outbound peer if (peer.isOutbound()) @@ -408,8 +431,8 @@ public class Controller extends Thread { break; case INFERIOR_CHAIN: - inferiorChainOurBlockSignature = latestBlockData.getSignature(); - inferiorChainPeerBlockSignature = peer.getLastBlockSignature(); + noSyncOurBlockSignature = latestBlockData.getSignature(); + noSyncPeerBlockSignature = peer.getLastBlockSignature(); // These are minor failure results so fine to try again LOGGER.debug(() -> String.format("Refused to synchronize with peer %s (%s)", peer, syncResult.name())); break; @@ -425,6 +448,8 @@ public class Controller extends Thread { requestSysTrayUpdate = true; // fall-through... case NOTHING_TO_DO: + noSyncOurBlockSignature = latestBlockData.getSignature(); + noSyncPeerBlockSignature = peer.getLastBlockSignature(); LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name())); break; } @@ -436,62 +461,12 @@ public class Controller extends Thread { } } - /** - * Nag if we detect system clock is too far from internet time. - * - * @return true if clock is accurate, false if inaccurate or we don't know. - */ - private boolean ntpCheck() { - // Fetch mean offset from internet time (ms). - Long meanOffset = NTP.getOffset(); - - final boolean isWindows = System.getProperty("os.name").toLowerCase().contains("win"); - boolean isNtpActive = false; - if (isWindows) { - // Detecting Windows Time service - - String[] detectCmd = new String[] { "net", "start" }; - try { - Process process = new ProcessBuilder(Arrays.asList(detectCmd)).start(); - try (InputStream in = process.getInputStream(); Scanner scanner = new Scanner(in, "UTF8")) { - scanner.useDelimiter("\\A"); - String output = scanner.hasNext() ? scanner.next() : ""; - isNtpActive = output.contains("Windows Time"); - } - } catch (IOException e) { - // Not important - } - } else { - // Very basic unix-based attempt to check for ntpd - String[] detectCmd = new String[] { "ps", "-agx" }; - try { - Process process = new ProcessBuilder(Arrays.asList(detectCmd)).start(); - try (InputStream in = process.getInputStream(); Scanner scanner = new Scanner(in, "UTF8")) { - scanner.useDelimiter("\\A"); - String output = scanner.hasNext() ? scanner.next() : ""; - isNtpActive = output.contains("ntpd"); - } - } catch (IOException e) { - // Not important - } - } - - LOGGER.info(String.format("NTP mean offset %s, NTP service active: %s", meanOffset, isNtpActive)); - - final boolean isOffsetGood = meanOffset != null && Math.abs(meanOffset) < MAX_NTP_OFFSET; - - // If offset bad or NTP not active then nag - if (!isOffsetGood || !isNtpActive) { - String caption = Translator.INSTANCE.translate("SysTray", "NTP_NAG_CAPTION"); - String text = Translator.INSTANCE.translate("SysTray", isWindows ? "NTP_NAG_TEXT_WINDOWS" : "NTP_NAG_TEXT_UNIX"); - SysTray.getInstance().showMessage(caption, text, MessageType.WARNING); - } - - // Return whether we're accurate (disregarding whether NTP service is active) - return isOffsetGood; - } - private void updateSysTray() { + if (NTP.getTime() == null) { + SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING CLOCK")); + return; + } + final int numberOfPeers = Network.getInstance().getUniqueHandshakedPeers().size(); final int height = getChainHeight(); @@ -504,6 +479,22 @@ public class Controller extends Thread { SysTray.getInstance().setToolTipText(tooltip); } + public void deleteExpiredTransactions() { + try (final Repository repository = RepositoryManager.getRepository()) { + List transactions = repository.getTransactionRepository().getUnconfirmedTransactions(); + + for (TransactionData transactionData : transactions) + if (transactionData.getTimestamp() >= Transaction.getDeadline(transactionData)) { + LOGGER.info(String.format("Deleting expired, unconfirmed transaction %s", Base58.encode(transactionData.getSignature()))); + repository.getTransactionRepository().delete(transactionData); + } + + repository.saveChanges(); + } catch (DataException e) { + LOGGER.error("Repository issue while deleting expired unconfirmed transactions", e); + } + } + // Shutdown public void shutdown() { @@ -552,6 +543,9 @@ public class Controller extends Thread { LOGGER.error("Error occurred while shutting down repository", e); } + LOGGER.info("Shutting down NTP"); + NTP.shutdownNow(); + LOGGER.info("Shutdown complete!"); } } @@ -960,7 +954,7 @@ public class Controller extends Thread { byte[] signature = getArbitraryDataMessage.getSignature(); String signature58 = Base58.encode(signature); - Long timestamp = System.currentTimeMillis(); + Long timestamp = NTP.getTime(); Triple newEntry = new Triple<>(signature58, peer, timestamp); // If we've seen this request recently, then ignore @@ -1070,7 +1064,7 @@ public class Controller extends Thread { // Save our request into requests map String signature58 = Base58.encode(signature); - Triple requestEntry = new Triple<>(signature58, null, System.currentTimeMillis()); + Triple requestEntry = new Triple<>(signature58, null, NTP.getTime()); // Assign random ID to this message int id; @@ -1111,12 +1105,15 @@ public class Controller extends Thread { public static final Predicate hasPeerMisbehaved = peer -> { Long lastMisbehaved = peer.getPeerData().getLastMisbehaved(); - return lastMisbehaved != null && lastMisbehaved > System.currentTimeMillis() - MISBEHAVIOUR_COOLOFF; + return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF; }; /** Returns whether we think our node has up-to-date blockchain based on our info about other peers. */ public boolean isUpToDate() { - final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); + final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); + if (minLatestBlockTimestamp == null) + return false; + BlockData latestBlockData = getChainTip(); // Is our blockchain too old? @@ -1139,8 +1136,12 @@ public class Controller extends Thread { return !peers.isEmpty(); } - public static long getMinimumLatestBlockTimestamp() { - return System.currentTimeMillis() - BlockChain.getInstance().getMaxBlockTime() * 1000L * MAX_BLOCKCHAIN_TIP_AGE; + public static Long getMinimumLatestBlockTimestamp() { + Long now = NTP.getTime(); + if (now == null) + return null; + + return now - BlockChain.getInstance().getMaxBlockTime() * 1000L * MAX_BLOCKCHAIN_TIP_AGE; } } diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index e0a479bb..af2f942d 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -159,7 +159,10 @@ public class Synchronizer { int highestMutualHeight = Math.min(peerHeight, ourHeight); // If our latest block is very old, we're very behind and should ditch our fork. - final long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); + final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); + if (minLatestBlockTimestamp == null) + return SynchronizationResult.REPOSITORY_ISSUE; + if (ourInitialHeight > commonBlockHeight && ourLatestBlockData.getTimestamp() < minLatestBlockTimestamp) { LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight)); highestMutualHeight = commonBlockHeight; diff --git a/src/main/java/org/qora/network/Handshake.java b/src/main/java/org/qora/network/Handshake.java index 10aa4ed7..79acd1a0 100644 --- a/src/main/java/org/qora/network/Handshake.java +++ b/src/main/java/org/qora/network/Handshake.java @@ -30,6 +30,18 @@ public enum Handshake { PeerIdMessage peerIdMessage = (PeerIdMessage) message; byte[] peerId = peerIdMessage.getPeerId(); + if (Arrays.equals(peerId, Network.ZERO_PEER_ID)) { + if (peer.isOutbound()) { + // Peer has indicated they already have an outbound connection to us + LOGGER.trace(String.format("Peer %s already connected to us - discarding this connection", peer)); + } else { + // Not sure this should occur so log it + LOGGER.info(String.format("Inbound peer %s claims we also have outbound connection to them?", peer)); + } + + return null; + } + if (Arrays.equals(peerId, Network.getInstance().getOurPeerId())) { // Connected to self! // If outgoing connection then record destination as self so we don't try again @@ -53,6 +65,11 @@ public enum Handshake { if (otherOutboundPeer == null) { // We already have an inbound peer with this ID, but no outgoing peer with which to request verification LOGGER.trace(String.format("Discarding inbound peer %s with existing ID", peer)); + + // Let peer know by sending special zero peer ID. This avoids peer keeping connection open until timeout. + peerIdMessage = new PeerIdMessage(Network.ZERO_PEER_ID); + peer.sendMessage(peerIdMessage); + return null; } else { // Use corresponding outbound peer to verify inbound diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index f5d9da25..c7b0b694 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -54,6 +54,7 @@ import org.qora.repository.Repository; import org.qora.repository.RepositoryManager; import org.qora.settings.Settings; import org.qora.utils.ExecuteProduceConsume; +import org.qora.utils.NTP; // For managing peers public class Network extends Thread { @@ -94,6 +95,7 @@ public class Network extends Thread { public static final int MAX_SIGNATURES_PER_REPLY = 500; public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500; public static final int PEER_ID_LENGTH = 128; + public static final byte[] ZERO_PEER_ID = new byte[PEER_ID_LENGTH]; private final byte[] ourPeerId; private List connectedPeers; @@ -109,7 +111,6 @@ public class Network extends Thread { private long nextConnectTaskTimestamp; private ExecutorService broadcastExecutor; - /** Timestamp (ms) for next general info broadcast to all connected peers. Based on System.currentTimeMillis(). */ private long nextBroadcastTimestamp; private Lock mergePeersLock; @@ -146,14 +147,16 @@ public class Network extends Thread { ourPeerId = new byte[PEER_ID_LENGTH]; new SecureRandom().nextBytes(ourPeerId); + // Set bit to make sure our peer ID is not 0 + ourPeerId[ourPeerId.length - 1] |= 0x01; minOutboundPeers = Settings.getInstance().getMinOutboundPeers(); maxPeers = Settings.getInstance().getMaxPeers(); - nextConnectTaskTimestamp = System.currentTimeMillis(); + nextConnectTaskTimestamp = 0; // First connect once NTP syncs broadcastExecutor = Executors.newCachedThreadPool(); - nextBroadcastTimestamp = System.currentTimeMillis(); + nextBroadcastTimestamp = 0; // First broadcast once NTP syncs mergePeersLock = new ReentrantLock(); @@ -420,8 +423,8 @@ public class Network extends Thread { if (getOutboundHandshakedPeers().size() >= minOutboundPeers) return null; - final long now = System.currentTimeMillis(); - if (now < nextConnectTaskTimestamp) + final Long now = NTP.getTime(); + if (now == null || now < nextConnectTaskTimestamp) return null; nextConnectTaskTimestamp = now + 1000L; @@ -435,8 +438,8 @@ public class Network extends Thread { } private Task maybeProduceBroadcastTask() { - final long now = System.currentTimeMillis(); - if (now < nextBroadcastTimestamp) + final Long now = NTP.getTime(); + if (now == null || now < nextBroadcastTimestamp) return null; nextBroadcastTimestamp = now + BROADCAST_INTERVAL; @@ -457,9 +460,15 @@ public class Network extends Thread { if (socketChannel == null) return; + final Long now = NTP.getTime(); Peer newPeer; try { + if (now == null) { + LOGGER.trace(String.format("Connection discarded from peer %s due to lack of NTP sync", socketChannel.getRemoteAddress())); + return; + } + synchronized (this.connectedPeers) { if (connectedPeers.size() >= maxPeers) { // We have enough peers @@ -499,7 +508,9 @@ public class Network extends Thread { } public void prunePeers() throws InterruptedException, DataException { - final long now = System.currentTimeMillis(); + final Long now = NTP.getTime(); + if (now == null) + return; // Disconnect peers that are stuck during handshake List handshakePeers = this.getConnectedPeers(); @@ -551,12 +562,14 @@ public class Network extends Thread { } private Peer getConnectablePeer() throws InterruptedException { + final long now = NTP.getTime(); + try (final Repository repository = RepositoryManager.getRepository()) { // Find an address to connect to List peers = repository.getNetworkRepository().getAllPeers(); // Don't consider peers with recent connection failures - final long lastAttemptedThreshold = System.currentTimeMillis() - CONNECT_FAILURE_BACKOFF; + final long lastAttemptedThreshold = now - CONNECT_FAILURE_BACKOFF; peers.removeIf(peerData -> peerData.getLastAttempted() != null && peerData.getLastAttempted() > lastAttemptedThreshold); // Don't consider peers that we know loop back to ourself @@ -607,7 +620,7 @@ public class Network extends Thread { // Update connection attempt info repository.discardChanges(); - peerData.setLastAttempted(System.currentTimeMillis()); + peerData.setLastAttempted(now); repository.getNetworkRepository().save(peerData); repository.saveChanges(); @@ -834,7 +847,7 @@ public class Network extends Thread { LOGGER.debug(String.format("Handshake completed with peer %s", peer)); // Make a note that we've successfully completed handshake (and when) - peer.getPeerData().setLastConnected(System.currentTimeMillis()); + peer.getPeerData().setLastConnected(NTP.getTime()); // Update connection info for outbound peers only if (peer.isOutbound()) @@ -882,7 +895,7 @@ public class Network extends Thread { List knownPeers = repository.getNetworkRepository().getAllPeers(); // Filter out peers that we've not connected to ever or within X milliseconds - final long connectionThreshold = System.currentTimeMillis() - RECENT_CONNECTION_THRESHOLD; + final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD; Predicate notRecentlyConnected = peerData -> { final Long lastAttempted = peerData.getLastAttempted(); final Long lastConnected = peerData.getLastConnected(); @@ -1031,12 +1044,14 @@ public class Network extends Thread { // Network-wide calls private void mergePeers(String addedBy, List peerAddresses) { + final Long addedWhen = NTP.getTime(); + if (addedWhen == null) + return; + // Serialize using lock to prevent repository deadlocks if (!mergePeersLock.tryLock()) return; - final long addedWhen = System.currentTimeMillis(); - try { try (final Repository repository = RepositoryManager.getRepository()) { List knownPeers = repository.getNetworkRepository().getAllPeers(); diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index fcd2baaa..8ddce741 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -28,6 +28,7 @@ import org.qora.network.message.Message.MessageException; import org.qora.network.message.Message.MessageType; import org.qora.settings.Settings; import org.qora.utils.ExecuteProduceConsume; +import org.qora.utils.NTP; import org.qora.network.message.PingMessage; import org.qora.network.message.VersionMessage; @@ -279,7 +280,7 @@ public class Peer { } private void sharedSetup() throws IOException { - this.connectionTimestamp = System.currentTimeMillis(); + this.connectionTimestamp = NTP.getTime(); this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); this.socketChannel.configureBlocking(false); this.byteBuffer = ByteBuffer.allocate(Network.MAXIMUM_MESSAGE_SIZE); @@ -510,6 +511,7 @@ public class Peer { if (this.socketChannel.isOpen()) { try { + this.socketChannel.shutdownOutput(); this.socketChannel.close(); } catch (IOException e) { LOGGER.debug(String.format("IOException while trying to close peer %s", this)); diff --git a/src/main/java/org/qora/transaction/Transaction.java b/src/main/java/org/qora/transaction/Transaction.java index a585dbec..2e5594ca 100644 --- a/src/main/java/org/qora/transaction/Transaction.java +++ b/src/main/java/org/qora/transaction/Transaction.java @@ -30,6 +30,7 @@ import org.qora.repository.Repository; import org.qora.settings.Settings; import org.qora.transform.TransformationException; import org.qora.transform.transaction.TransactionTransformer; +import org.qora.utils.NTP; import static java.util.Arrays.stream; import static java.util.stream.Collectors.toMap; @@ -235,6 +236,7 @@ public abstract class Transaction { TRANSACTION_ALREADY_EXISTS(85), NO_BLOCKCHAIN_LOCK(86), ORDER_ALREADY_CLOSED(87), + CLOCK_NOT_SYNCED(88), NOT_YET_RELEASED(1000); public final int value; @@ -301,9 +303,13 @@ public abstract class Transaction { // More information - public long getDeadline() { + public static long getDeadline(TransactionData transactionData) { // 24 hour deadline to include transaction in a block - return this.transactionData.getTimestamp() + (24 * 60 * 60 * 1000); + return transactionData.getTimestamp() + (24 * 60 * 60 * 1000); + } + + public long getDeadline() { + return Transaction.getDeadline(transactionData); } public boolean hasMinimumFee() { @@ -507,7 +513,7 @@ public abstract class Transaction { * NOTE: temporarily updates accounts' lastReference to check validity.
* To do this, blockchain lock is obtained and pending repository changes are discarded. * - * @return true if transaction can be added to unconfirmed transactions, false otherwise + * @return transaction validation result, e.g. OK * @throws DataException */ public ValidationResult isValidUnconfirmed() throws DataException { @@ -517,7 +523,11 @@ public abstract class Transaction { return ValidationResult.TIMESTAMP_TOO_OLD; // Transactions with a timestamp too far into future are too new - long maxTimestamp = System.currentTimeMillis() + Settings.getInstance().getMaxTransactionTimestampFuture(); + final Long now = NTP.getTime(); + if (now == null) + return ValidationResult.CLOCK_NOT_SYNCED; + + long maxTimestamp = now + Settings.getInstance().getMaxTransactionTimestampFuture(); if (this.transactionData.getTimestamp() > maxTimestamp) return ValidationResult.TIMESTAMP_TOO_NEW; @@ -734,10 +744,14 @@ public abstract class Transaction { * @throws DataException */ private static boolean isStillValidUnconfirmed(Repository repository, TransactionData transactionData, long blockTimestamp) throws DataException { + final Long now = NTP.getTime(); + if (now == null) + return false; + Transaction transaction = Transaction.fromData(repository, transactionData); // Check transaction has not expired - if (transaction.getDeadline() <= blockTimestamp || transaction.getDeadline() < System.currentTimeMillis()) + if (transaction.getDeadline() <= blockTimestamp || transaction.getDeadline() < now) return false; // Is transaction is past max approval period? diff --git a/src/main/java/org/qora/utils/NTP.java b/src/main/java/org/qora/utils/NTP.java index 16529393..7707253a 100644 --- a/src/main/java/org/qora/utils/NTP.java +++ b/src/main/java/org/qora/utils/NTP.java @@ -3,99 +3,260 @@ package org.qora.utils; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.net.ntp.NTPUDPClient; +import org.apache.commons.net.ntp.NtpV3Packet; import org.apache.commons.net.ntp.TimeInfo; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qora.settings.Settings; -public class NTP { +public class NTP implements Runnable { private static final Logger LOGGER = LogManager.getLogger(NTP.class); - private static final double MAX_STDDEV = 125; // ms - /** - * Returns aggregated internet time. - * - * @return internet time (ms), or null if unsuccessful. - */ - public static Long getTime() { - Long meanOffset = getOffset(); - if (meanOffset == null) - return null; + private static boolean isStarted = false; + private static volatile boolean isStopping = false; + private static ExecutorService instanceExecutor; + private static NTP instance; + private static volatile Long offset = null; - return System.currentTimeMillis() + meanOffset; - } + static class NTPServer { + private static final int MIN_POLL = 64; - /** - * Returns mean offset from internet time. - * - * Positive offset means local clock is behind internet time. - * - * @return offset (ms), or null if unsuccessful. - */ - public static Long getOffset() { - String[] ntpServers = Settings.getInstance().getNtpServers(); + public char usage = ' '; + public String remote; + public String refId; + public Integer stratum; + public char type = 'u'; // unicast + public int poll = MIN_POLL; + public byte reach = 0; + public Long delay; + public Double offset; + public Double jitter; - NTPUDPClient client = new NTPUDPClient(); - client.setDefaultTimeout(2000); + private Deque offsets = new LinkedList<>(); + private double totalSquareOffsets = 0.0; + private long nextPoll; + private Long lastGood; - List offsets = new ArrayList<>(); + public NTPServer(String remote) { + this.remote = remote; + } + + public boolean poll(NTPUDPClient client) { + Thread.currentThread().setName(String.format("NTP: %s", this.remote)); - for (String server : ntpServers) { try { - TimeInfo timeInfo = client.getTime(InetAddress.getByName(server)); + final long now = System.currentTimeMillis(); - timeInfo.computeDetails(); + if (now < this.nextPoll) + return false; - LOGGER.debug(() -> String.format("%c%16.16s %16.16s %2d %c %4d %4d %3o %6dms % 5dms % 5dms", - ' ', - server, - timeInfo.getMessage().getReferenceIdString(), - timeInfo.getMessage().getStratum(), - 'u', - 0, - 1 << timeInfo.getMessage().getPoll(), - 1, - timeInfo.getDelay(), - timeInfo.getOffset(), - 0 - )); + boolean isUpdated = false; + try { + TimeInfo timeInfo = client.getTime(InetAddress.getByName(remote)); - offsets.add((double) timeInfo.getOffset()); - } catch (IOException e) { - // Try next server... + timeInfo.computeDetails(); + NtpV3Packet ntpMessage = timeInfo.getMessage(); + + this.refId = ntpMessage.getReferenceIdString(); + this.stratum = ntpMessage.getStratum(); + this.poll = Math.max(MIN_POLL, 1 << ntpMessage.getPoll()); + + this.delay = timeInfo.getDelay(); + this.offset = (double) timeInfo.getOffset(); + + if (this.offsets.size() == 8) { + double oldOffset = this.offsets.removeFirst(); + this.totalSquareOffsets -= oldOffset * oldOffset; + } + + this.offsets.addLast(this.offset); + this.totalSquareOffsets += this.offset * this.offset; + + this.jitter = Math.sqrt(this.totalSquareOffsets / this.offsets.size()); + + this.reach = (byte) ((this.reach << 1) | 1); + this.lastGood = now; + + isUpdated = true; + } catch (IOException e) { + this.reach <<= 1; + } + + this.nextPoll = now + this.poll * 1000; + return isUpdated; + } finally { + Thread.currentThread().setName("NTP (dormant)"); } } - if (offsets.size() < ntpServers.length / 2) { - LOGGER.info(String.format("Not enough replies: %d, minimum is %d", offsets.size(), ntpServers.length / 2)); + public Integer getWhen() { + if (this.lastGood == null) + return null; + + return (int) ((System.currentTimeMillis() - this.lastGood) / 1000); + } + } + + private final NTPUDPClient client; + private List ntpServers = new ArrayList<>(); + private final ExecutorService serverExecutor; + + private NTP() { + client = new NTPUDPClient(); + client.setDefaultTimeout(2000); + + for (String serverName : Settings.getInstance().getNtpServers()) + ntpServers.add(new NTPServer(serverName)); + + serverExecutor = Executors.newCachedThreadPool(); + } + + public static synchronized void start() { + if (isStarted) + return; + + instanceExecutor = Executors.newSingleThreadExecutor(); + instance = new NTP(); + instanceExecutor.execute(instance); + } + + public static void shutdownNow() { + instanceExecutor.shutdownNow(); + } + + /** + * Returns our estimate of internet time. + * + * @return internet time (ms), or null if unsynchronized. + */ + public static Long getTime() { + if (offset == null) return null; + + return System.currentTimeMillis() + offset; + } + + public void run() { + Thread.currentThread().setName("NTP instance"); + + try { + while (!isStopping) { + Thread.sleep(1000); + + CompletionService ecs = new ExecutorCompletionService(serverExecutor); + for (NTPServer server : ntpServers) + ecs.submit(() -> server.poll(client)); + + boolean hasUpdate = false; + for (int i = 0; i < ntpServers.size(); ++i) { + if (isStopping) + return; + + try { + hasUpdate = ecs.take().get() || hasUpdate; + } catch (ExecutionException e) { + // skip + } + } + + if (hasUpdate) { + double s0 = 0; + double s1 = 0; + double s2 = 0; + + for (NTPServer server : ntpServers) { + if (server.offset == null) { + server.usage = ' '; + continue; + } + + server.usage = '+'; + double value = server.offset * (double) server.stratum; + + s0 += 1; + s1 += value; + s2 += value * value; + } + + if (s0 < ntpServers.size() / 3 + 1) { + LOGGER.debug(String.format("Not enough replies (%d) to calculate network time", s0)); + } else { + double thresholdStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); + double mean = s1 / s0; + + // Now only consider offsets within 1 stddev? + s0 = 0; + s1 = 0; + s2 = 0; + + for (NTPServer server : ntpServers) { + if (server.offset == null || server.reach == 0) + continue; + + if (Math.abs(server.offset * (double)server.stratum - mean) > thresholdStddev) + continue; + + server.usage = '*'; + s0 += 1; + s1 += server.offset; + s2 += server.offset * server.offset; + } + + if (s0 <= 1) { + LOGGER.debug(String.format("Not enough useful values (%d) to calculate network time. (stddev: %7.4f)", s0, thresholdStddev)); + } else { + double filteredMean = s1 / s0; + double filteredStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); + + LOGGER.trace(String.format("Threshold stddev: %7.3f, mean: %7.3f, stddev: %7.3f, nValues: %.0f / %d", + thresholdStddev, filteredMean, filteredStddev, s0, ntpServers.size())); + + NTP.offset = (long) filteredMean; + LOGGER.debug(String.format("New NTP offset: %d", NTP.offset)); + } + } + + if (LOGGER.getLevel().isMoreSpecificThan(Level.TRACE)) { + LOGGER.trace(String.format("%c%16s %16s %2s %c %4s %4s %3s %7s %7s %7s", + ' ', "remote", "refid", "st", 't', "when", "poll", "reach", "delay", "offset", "jitter" + )); + + for (NTPServer server : ntpServers) + LOGGER.trace(String.format("%c%16.16s %16.16s %2s %c %4s %4d %3o %7s %7s %7s", + server.usage, + server.remote, + formatNull("%s", server.refId, ""), + formatNull("%2d", server.stratum, ""), + server.type, + formatNull("%4d", server.getWhen(), "-"), + server.poll, + server.reach, + formatNull("%5dms", server.delay, ""), + formatNull("% 5.0fms", server.offset, ""), + formatNull("%5.2fms", server.jitter, "") + )); + } + } + } + } catch (InterruptedException e) { + // Exit } + } - // sₙ represents sum of offsetⁿ - double s0 = 0; - double s1 = 0; - double s2 = 0; - - for (Double offset : offsets) { - s0 += 1; - s1 += offset; - s2 += offset * offset; - } - - double mean = s1 / s0; - double stddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); - - // If stddev is excessive then we're not very sure so give up - if (stddev > MAX_STDDEV) { - LOGGER.info(String.format("Excessive standard deviation %.1f, maximum is %.1f", stddev, MAX_STDDEV)); - return null; - } - - return (long) mean; + private static String formatNull(String format, Object arg, String nullOutput) { + return arg != null ? String.format(format, arg) : nullOutput; } } diff --git a/src/main/resources/blockchain.json b/src/main/resources/blockchain.json index 94a250fb..5d50c2f4 100644 --- a/src/main/resources/blockchain.json +++ b/src/main/resources/blockchain.json @@ -3,7 +3,7 @@ "blockDifficultyInterval": 10, "minBlockTime": 60, "maxBlockTime": 300, - "blockTimestampMargin": 500, + "blockTimestampMargin": 2000, "maxBytesPerUnitFee": 1024, "unitFee": "1.0", "useBrokenMD160ForAddresses": true, diff --git a/src/main/resources/i18n/SysTray_en.properties b/src/main/resources/i18n/SysTray_en.properties index 8b79dc85..63fa3420 100644 --- a/src/main/resources/i18n/SysTray_en.properties +++ b/src/main/resources/i18n/SysTray_en.properties @@ -25,3 +25,5 @@ NTP_NAG_TEXT_WINDOWS = Select "Synchronize clock" from menu to fix. OPEN_NODE_UI = Open Node UI SYNCHRONIZE_CLOCK = Synchronize clock + +SYNCHRONIZING_CLOCK = Synchronizing clock diff --git a/src/main/resources/i18n/SysTray_zh.properties b/src/main/resources/i18n/SysTray_zh.properties index 369e434a..4f2946b5 100644 --- a/src/main/resources/i18n/SysTray_zh.properties +++ b/src/main/resources/i18n/SysTray_zh.properties @@ -25,3 +25,5 @@ NTP_NAG_TEXT_WINDOWS = \u4ECE\u83DC\u5355\u4E2D\u9009\u62E9\u201C\u540C\u6B65\u6 OPEN_NODE_UI = \u5F00\u542F\u754C\u9762 SYNCHRONIZE_CLOCK = \u540C\u6B65\u65F6\u949F + +SYNCHRONIZING_CLOCK = \u540C\u6B65\u7740\u65F6\u949F diff --git a/src/test/java/org/qora/test/NTPTests.java b/src/test/java/org/qora/test/NTPTests.java index f5f73236..e5398f6d 100644 --- a/src/test/java/org/qora/test/NTPTests.java +++ b/src/test/java/org/qora/test/NTPTests.java @@ -5,7 +5,13 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Executors; import org.apache.commons.net.ntp.NTPUDPClient; import org.apache.commons.net.ntp.NtpV3Packet; @@ -13,74 +19,182 @@ import org.apache.commons.net.ntp.TimeInfo; public class NTPTests { - private static final List CC_TLDS = Arrays.asList("oceania", "europe", "lat", "asia", "africa"); + private static final List CC_TLDS = Arrays.asList("oceania", "europe", "cn", "asia", "africa"); - public static void main(String[] args) throws UnknownHostException, IOException { + public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException { NTPUDPClient client = new NTPUDPClient(); client.setDefaultTimeout(2000); - System.out.println(String.format("%c%16s %16s %2s %c %4s %4s %3s %7s %7s %7s", - ' ', "remote", "refid", "st", 't', "when", "poll", "reach", "delay", "offset", "jitter" - )); + class NTPServer { + private static final int MIN_POLL = 8; - List offsets = new ArrayList<>(); + public char usage = ' '; + public String remote; + public String refId; + public Integer stratum; + public char type = 'u'; // unicast + public int poll = MIN_POLL; + public byte reach = 0; + public Long delay; + public Double offset; + public Double jitter; - List ntpServers = new ArrayList<>(); - for (String ccTld : CC_TLDS) { - ntpServers.add(ccTld + ".pool.ntp.org"); - for (int subpool = 0; subpool <=3; ++subpool) - ntpServers.add(subpool + "." + ccTld + ".pool.ntp.org"); - } + private Deque offsets = new LinkedList<>(); + private double totalSquareOffsets = 0.0; + private long nextPoll; + private Long lastGood; - for (String server : ntpServers) { - try { - TimeInfo timeInfo = client.getTime(InetAddress.getByName(server)); + public NTPServer(String remote) { + this.remote = remote; + } - timeInfo.computeDetails(); - NtpV3Packet ntpMessage = timeInfo.getMessage(); + public boolean poll(NTPUDPClient client) { + final long now = System.currentTimeMillis(); - System.out.println(String.format("%c%16.16s %16.16s %2d %c %4d %4d %3o %6dms % 5dms % 5dms", - ' ', - server, - ntpMessage.getReferenceIdString(), - ntpMessage.getStratum(), - 'u', - 0, - 1 << ntpMessage.getPoll(), - 1, - timeInfo.getDelay(), - timeInfo.getOffset(), - 0 - )); + if (now < this.nextPoll) + return false; - offsets.add((double) timeInfo.getOffset()); - } catch (IOException e) { - // Try next server... + boolean isUpdated = false; + try { + TimeInfo timeInfo = client.getTime(InetAddress.getByName(remote)); + + timeInfo.computeDetails(); + NtpV3Packet ntpMessage = timeInfo.getMessage(); + + this.refId = ntpMessage.getReferenceIdString(); + this.stratum = ntpMessage.getStratum(); + this.poll = Math.max(MIN_POLL, 1 << ntpMessage.getPoll()); + + this.delay = timeInfo.getDelay(); + this.offset = (double) timeInfo.getOffset(); + + if (this.offsets.size() == 8) { + double oldOffset = this.offsets.removeFirst(); + this.totalSquareOffsets -= oldOffset * oldOffset; + } + + this.offsets.addLast(this.offset); + this.totalSquareOffsets += this.offset * this.offset; + + this.jitter = Math.sqrt(this.totalSquareOffsets / this.offsets.size()); + + this.reach = (byte) ((this.reach << 1) | 1); + this.lastGood = now; + + isUpdated = true; + } catch (IOException e) { + this.reach <<= 1; + } + + this.nextPoll = now + this.poll * 1000; + return isUpdated; + } + + public Integer getWhen() { + if (this.lastGood == null) + return null; + + return (int) ((System.currentTimeMillis() - this.lastGood) / 1000); } } - if (offsets.size() < ntpServers.size() / 2) { - System.err.println("Not enough replies"); - System.exit(1); + List ntpServers = new ArrayList<>(); + + for (String ccTld : CC_TLDS) + for (int subpool = 0; subpool <=3; ++subpool) + ntpServers.add(new NTPServer(subpool + "." + ccTld + ".pool.ntp.org")); + + while (true) { + Thread.sleep(1000); + + CompletionService ecs = new ExecutorCompletionService(Executors.newCachedThreadPool()); + for (NTPServer server : ntpServers) + ecs.submit(() -> server.poll(client)); + + boolean showReport = false; + for (int i = 0; i < ntpServers.size(); ++i) + try { + showReport = ecs.take().get() || showReport; + } catch (ExecutionException e) { + // skip + } + + if (showReport) { + double s0 = 0; + double s1 = 0; + double s2 = 0; + + for (NTPServer server : ntpServers) { + if (server.offset == null) { + server.usage = ' '; + continue; + } + + server.usage = '+'; + double value = server.offset * (double) server.stratum; + + s0 += 1; + s1 += value; + s2 += value * value; + } + + if (s0 < ntpServers.size() / 3 + 1) { + System.out.println("Not enough replies to calculate network time"); + } else { + double filterStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); + double filterMean = s1 / s0; + + // Now only consider offsets within 1 stddev? + s0 = 0; + s1 = 0; + s2 = 0; + + for (NTPServer server : ntpServers) { + if (server.offset == null || server.reach == 0) + continue; + + if (Math.abs(server.offset * (double)server.stratum - filterMean) > filterStddev) + continue; + + server.usage = '*'; + s0 += 1; + s1 += server.offset; + s2 += server.offset * server.offset; + } + + if (s0 <= 1) { + System.out.println(String.format("Not enough values to calculate network time. stddev: %7.4f", filterStddev)); + } else { + double mean = s1 / s0; + double newStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); + System.out.println(String.format("filtering stddev: %7.3f, mean: %7.3f, new stddev: %7.3f, nValues: %.0f / %d", filterStddev, mean, newStddev, s0, ntpServers.size())); + } + } + + System.out.println(String.format("%c%16s %16s %2s %c %4s %4s %3s %7s %7s %7s", + ' ', "remote", "refid", "st", 't', "when", "poll", "reach", "delay", "offset", "jitter" + )); + + for (NTPServer server : ntpServers) + System.out.println(String.format("%c%16.16s %16.16s %2s %c %4s %4d %3o %7s %7s %7s", + server.usage, + server.remote, + formatNull("%s", server.refId, ""), + formatNull("%2d", server.stratum, ""), + server.type, + formatNull("%4d", server.getWhen(), "-"), + server.poll, + server.reach, + formatNull("%5dms", server.delay, ""), + formatNull("% 5.0fms", server.offset, ""), + formatNull("%5.2fms", server.jitter, "") + )); + } } + } - double s0 = 0; - double s1 = 0; - double s2 = 0; - - for (Double offset : offsets) { - // Exclude nearby results for more extreme testing - if (offset < 100.0) - continue; - - s0 += 1; - s1 += offset; - s2 += offset * offset; - } - - double mean = s1 / s0; - double stddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1))); - System.out.println(String.format("mean: %7.3f, stddev: %7.3f", mean, stddev)); + private static String formatNull(String format, Object arg, String nullOutput) { + return arg != null ? String.format(format, arg) : nullOutput; } }