diff --git a/src/main/java/org/qora/controller/AutoUpdate.java b/src/main/java/org/qora/controller/AutoUpdate.java index c398c453..ea077d7b 100644 --- a/src/main/java/org/qora/controller/AutoUpdate.java +++ b/src/main/java/org/qora/controller/AutoUpdate.java @@ -64,6 +64,8 @@ public class AutoUpdate extends Thread { } public void run() { + Thread.currentThread().setName("Auto-update"); + long buildTimestamp = Controller.getInstance().getBuildTimestamp() * 1000L; boolean attemptedUpdate = false; diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index f5a9d8a7..1b000f44 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -11,9 +11,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; @@ -75,8 +72,9 @@ public class Controller extends Thread { private static final Object shutdownLock = new Object(); private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s/blockchain;create=true"; - private static boolean isStopping = false; + private static volatile boolean isStopping = false; private static BlockGenerator blockGenerator = null; + private static volatile boolean requestSync = false; private static Controller instance; private final String buildVersion; private final long buildTimestamp; // seconds @@ -84,9 +82,6 @@ public class Controller extends Thread { /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly generated block. */ private final ReentrantLock blockchainLock = new ReentrantLock(); - /** Executor for processing network messages. */ - private final ExecutorService networkMessageExecutor = Executors.newCachedThreadPool(); - private Controller() { Properties properties = new Properties(); try (InputStream in = this.getClass().getResourceAsStream("/build.properties")) { @@ -256,17 +251,17 @@ public class Controller extends Thread { public void run() { Thread.currentThread().setName("Controller"); - try { - while (!isStopping) { - Thread.sleep(14 * 1000); - - potentiallySynchronize(); - - // Query random connections for unconfirmed transactions? + while (!isStopping) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + return; + } + + if (requestSync) { + requestSync = false; + potentiallySynchronize(); } - } catch (InterruptedException e) { - // time to exit - return; } } @@ -360,7 +355,6 @@ public class Controller extends Thread { } LOGGER.info("Shutting down networking"); - networkMessageExecutor.shutdown(); Network.getInstance().shutdown(); LOGGER.info("Shutting down API"); @@ -457,29 +451,6 @@ public class Controller extends Thread { } public void onNetworkMessage(Peer peer, Message message) { - class NetworkMessageProcessor implements Runnable { - private Peer peer; - private Message message; - - public NetworkMessageProcessor(Peer peer, Message message) { - this.peer = peer; - this.message = message; - } - - @Override - public void run() { - Controller.getInstance().processNetworkMessage(peer, message); - } - } - - try { - networkMessageExecutor.execute(new NetworkMessageProcessor(peer, message)); - } catch (RejectedExecutionException e) { - // Can't execute - probably because we're shutting down, so ignore - } - } - - private void processNetworkMessage(Peer peer, Message message) { LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer)); switch (message.getType()) { @@ -499,6 +470,9 @@ public class Controller extends Thread { LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e); } + // Potentially synchronize + requestSync = true; + break; } @@ -529,6 +503,9 @@ public class Controller extends Thread { LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e); } + // Potentially synchronize + requestSync = true; + break; } @@ -556,6 +533,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while sending signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); } + break; } @@ -583,6 +561,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); } + break; } @@ -607,6 +586,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e); } + break; } @@ -629,6 +609,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while send transaction %s to peer %s", Base58.encode(signature), peer), e); } + break; } @@ -664,12 +645,15 @@ public class Controller extends Thread { // Seems ok - add to unconfirmed pile transaction.importAsUnconfirmed(); + + LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); } finally { blockchainLock.unlock(); } } catch (DataException e) { LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); } + break; } @@ -699,29 +683,29 @@ public class Controller extends Thread { break; } + // Fetch actual transaction data from peer + Message getTransactionMessage = new GetTransactionMessage(signature); + Message responseMessage = peer.getResponse(getTransactionMessage); + if (responseMessage == null || !(responseMessage instanceof TransactionMessage)) { + peer.disconnect("failed to fetch unconfirmed transaction"); + break; + } + + TransactionMessage transactionMessage = (TransactionMessage) responseMessage; + TransactionData transactionData = transactionMessage.getTransactionData(); + Transaction transaction = Transaction.fromData(repository, transactionData); + + // Check signature + if (!transaction.isSignatureValid()) { + LOGGER.trace(String.format("Ignoring unconfirmed transaction %s with invalid signature from peer %s", Base58.encode(transactionData.getSignature()), peer)); + break; + } + // Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); if (blockchainLock.tryLock()) try { - // Fetch actual transaction data from peer - Message getTransactionMessage = new GetTransactionMessage(signature); - Message responseMessage = peer.getResponse(getTransactionMessage); - if (responseMessage == null || !(responseMessage instanceof TransactionMessage)) { - peer.disconnect("failed to fetch unconfirmed transaction"); - break; - } - - TransactionMessage transactionMessage = (TransactionMessage) responseMessage; - TransactionData transactionData = transactionMessage.getTransactionData(); - Transaction transaction = Transaction.fromData(repository, transactionData); - - // Check signature - if (!transaction.isSignatureValid()) { - LOGGER.trace(String.format("Ignoring unconfirmed transaction %s with invalid signature from peer %s", Base58.encode(transactionData.getSignature()), peer)); - break; - } - - // Do we have it already? + // Do we have it already? Rechecking in case it has appeared since previous check above if (repository.getTransactionRepository().exists(transactionData.getSignature())) { LOGGER.trace(String.format("Ignoring existing unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); break; @@ -739,12 +723,14 @@ public class Controller extends Thread { // Seems ok - add to unconfirmed pile transaction.importAsUnconfirmed(); + + LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + + // We could collate signatures that are new to us and broadcast them to our peers too + newSignatures.add(signature); } finally { blockchainLock.unlock(); } - - // We could collate signatures that are new to us and broadcast them to our peers too - newSignatures.add(signature); } } catch (DataException e) { LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e); @@ -786,6 +772,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e); } + break; } diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index 732f4382..29d844c5 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -2,6 +2,7 @@ package org.qora.controller; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.locks.ReentrantLock; @@ -10,7 +11,6 @@ import org.apache.logging.log4j.Logger; import org.qora.block.Block; import org.qora.block.Block.ValidationResult; import org.qora.block.BlockChain; -import org.qora.block.GenesisBlock; import org.qora.data.block.BlockData; import org.qora.data.network.BlockSummaryData; import org.qora.data.transaction.TransactionData; @@ -107,6 +107,10 @@ public class Synchronizer { List signatures = findSignaturesFromCommonBlock(peer, ourHeight); if (signatures == null) { + LOGGER.info(String.format("Error while trying to find common block with peer %s", peer)); + return SynchronizationResult.NO_REPLY; + } + if (signatures.isEmpty()) { LOGGER.info(String.format("Failure to find common block with peer %s", peer)); return SynchronizationResult.NO_COMMON_BLOCK; } @@ -290,7 +294,7 @@ public class Synchronizer { * Returns list of peer's block signatures starting with common block with peer. * * @param peer - * @return block signatures + * @return block signatures, or empty list if no common block, or null if there was an issue * @throws DataException */ private List findSignaturesFromCommonBlock(Peer peer, int ourHeight) throws DataException { @@ -299,10 +303,10 @@ public class Synchronizer { int step = INITIAL_BLOCK_STEP; List blockSignatures = null; - int testHeight = ourHeight - step; + int testHeight = Math.max(ourHeight - step, 1); byte[] testSignature = null; - while (testHeight > 1) { + while (testHeight >= 1) { // Fetch our block signature at this height BlockData testBlockData = this.repository.getBlockRepository().fromHeight(testHeight); if (testBlockData == null) { @@ -328,6 +332,11 @@ public class Synchronizer { // We have entries so we have found a common block break; + // No blocks after genesis block? + // We don't get called for a peer at genesis height so this means NO blocks in common + if (testHeight == 1) + return Collections.emptyList(); + if (peer.getVersion() >= 2) { step <<= 1; } else { @@ -336,13 +345,9 @@ public class Synchronizer { } step = Math.min(step, MAXIMUM_BLOCK_STEP); - testHeight -= step; + testHeight = Math.max(testHeight - step, 1); } - if (testHeight <= 1) - // Can't go back any further - return Genesis block - return new ArrayList(Arrays.asList(GenesisBlock.getInstance(this.repository).getBlockData().getSignature())); - // Prepend common block's signature as first block sig blockSignatures.add(0, testSignature); diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index e73c5d6f..76fb5c21 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -300,7 +300,7 @@ public class Network extends Thread { peers.removeIf(isSelfPeer); } - // Don't consider already connected peers + // Don't consider already connected peers (simple address match) Predicate isConnectedPeer = peerData -> { PeerAddress peerAddress = peerData.getAddress(); return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); @@ -310,6 +310,21 @@ public class Network extends Thread { peers.removeIf(isConnectedPeer); } + // Don't consider already connected peers (resolved address match) + Predicate isResolvedAsConnectedPeer = peerData -> { + try { + InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress(); + return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress)); + } catch (UnknownHostException e) { + // Can't resolve - no point even trying to connect + return true; + } + }; + + synchronized (this.connectedPeers) { + peers.removeIf(isResolvedAsConnectedPeer); + } + // Any left? if (peers.isEmpty()) return; @@ -642,7 +657,7 @@ public class Network extends Thread { return peers; } - /** Returns list of connected peers that have completed handshaking, with unbound duplicates removed. */ + /** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */ public List getUniqueHandshakedPeers() { final List peers; @@ -702,6 +717,8 @@ public class Network extends Thread { @Override public void run() { + Thread.currentThread().setName("Merging peers"); + // Serialize using lock to prevent repository deadlocks mergePeersLock.lock(); @@ -751,6 +768,8 @@ public class Network extends Thread { @Override public void run() { + Thread.currentThread().setName("Network Broadcast"); + for (Peer peer : targetPeers) { Message message = peerMessageBuilder.apply(peer); @@ -764,7 +783,7 @@ public class Network extends Thread { } try { - peerExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessageBuilder)); + peerExecutor.execute(new Broadcaster(this.getUniqueHandshakedPeers(), peerMessageBuilder)); } catch (RejectedExecutionException e) { // Can't execute - probably because we're shutting down, so ignore } diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index 7d0eee4b..763f3e77 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -44,6 +45,7 @@ public class Peer implements Runnable { private static final int RESPONSE_TIMEOUT = 5000; // ms private static final int PING_INTERVAL = 20000; // ms - just under every 30s is usually ideal to keep NAT mappings refreshed private static final int INACTIVITY_TIMEOUT = 30000; // ms + private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10; private final boolean isOutbound; private Socket socket = null; @@ -52,11 +54,14 @@ public class Peer implements Runnable { private Long connectionTimestamp = null; private OutputStream out; private Handshake handshakeStatus = Handshake.STARTED; - private Map> messages; + private Map> replyQueues; + private BlockingQueue unsolicitedQueue; + private ExecutorService messageExecutor; private VersionMessage versionMessage = null; private Integer version; - private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService pingExecutor; private Long lastPing = null; + private InetSocketAddress resolvedAddress = null; private boolean isLocal; private byte[] peerId; @@ -75,7 +80,8 @@ public class Peer implements Runnable { this.isOutbound = false; this.socket = socket; - this.isLocal = isAddressLocal(((InetSocketAddress) socket.getRemoteSocketAddress()).getAddress()); + this.resolvedAddress = ((InetSocketAddress) socket.getRemoteSocketAddress()); + this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); PeerAddress peerAddress = PeerAddress.fromSocket(socket); this.peerData = new PeerData(peerAddress); @@ -135,6 +141,10 @@ public class Peer implements Runnable { this.lastPing = lastPing; } + public InetSocketAddress getResolvedAddress() { + return this.resolvedAddress; + } + public boolean getIsLocal() { return this.isLocal; } @@ -190,11 +200,41 @@ public class Peer implements Runnable { new SecureRandom().nextBytes(verificationCodeExpected); } + class MessageProcessor implements Runnable { + private Peer peer; + private BlockingQueue blockingQueue; + + public MessageProcessor(Peer peer, BlockingQueue blockingQueue) { + this.peer = peer; + this.blockingQueue = blockingQueue; + } + + @Override + public void run() { + Thread.currentThread().setName("Peer UMP " + this.peer); + + while (true) { + try { + Message message = blockingQueue.poll(1000L, TimeUnit.MILLISECONDS); + if (message != null) + Network.getInstance().onMessage(peer, message); + } catch (InterruptedException e) { + // Shutdown + return; + } + } + } + } + private void setup() throws IOException { this.socket.setSoTimeout(INACTIVITY_TIMEOUT); this.out = this.socket.getOutputStream(); this.connectionTimestamp = NTP.getTime(); - this.messages = Collections.synchronizedMap(new HashMap>()); + this.replyQueues = Collections.synchronizedMap(new HashMap>()); + + this.unsolicitedQueue = new ArrayBlockingQueue<>(UNSOLICITED_MESSAGE_QUEUE_CAPACITY); + this.messageExecutor = Executors.newSingleThreadExecutor(); + this.messageExecutor.execute(new MessageProcessor(this, this.unsolicitedQueue)); } public boolean connect() { @@ -202,11 +242,10 @@ public class Peer implements Runnable { this.socket = new Socket(); try { - InetSocketAddress resolvedSocketAddress = this.peerData.getAddress().toSocketAddress(); + this.resolvedAddress = this.peerData.getAddress().toSocketAddress(); + this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); - this.isLocal = isAddressLocal(resolvedSocketAddress.getAddress()); - - this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT); + this.socket.connect(resolvedAddress, CONNECT_TIMEOUT); LOGGER.debug(String.format("Connected to peer %s", this)); } catch (SocketTimeoutException e) { LOGGER.trace(String.format("Connection timed out to peer %s", this)); @@ -242,13 +281,20 @@ public class Peer implements Runnable { LOGGER.trace(String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); // Find potential blocking queue for this id (expect null if id is -1) - BlockingQueue queue = this.messages.get(message.getId()); + BlockingQueue queue = this.replyQueues.get(message.getId()); if (queue != null) { // Adding message to queue will unblock thread waiting for response - this.messages.get(message.getId()).add(message); + this.replyQueues.get(message.getId()).add(message); } else { - // Nothing waiting for this message - pass up to network - Network.getInstance().onMessage(this, message); + // Nothing waiting for this message (unsolicited) - queue up for network + + // Queue full? + if (unsolicitedQueue.remainingCapacity() == 0) { + LOGGER.debug(String.format("No room for %s message with ID %s from peer %s", message.getType().name(), message.getId(), this)); + continue; + } + + unsolicitedQueue.add(message); } } } catch (MessageException e) { @@ -313,12 +359,12 @@ public class Peer implements Runnable { message.setId(id); // Put queue into map (keyed by message ID) so we can poll for a response - // If putIfAbsent() doesn't return null, then this id is already taken - } while (this.messages.putIfAbsent(id, blockingQueue) != null); + // If putIfAbsent() doesn't return null, then this ID is already taken + } while (this.replyQueues.putIfAbsent(id, blockingQueue) != null); // Try to send message if (!this.sendMessage(message)) { - this.messages.remove(id); + this.replyQueues.remove(id); return null; } @@ -329,7 +375,7 @@ public class Peer implements Runnable { // Our thread was interrupted. Probably in shutdown scenario. return null; } finally { - this.messages.remove(id); + this.replyQueues.remove(id); } } @@ -343,6 +389,8 @@ public class Peer implements Runnable { @Override public void run() { + Thread.currentThread().setName("Pinger " + this.peer); + PingMessage pingMessage = new PingMessage(); long before = System.currentTimeMillis(); @@ -358,16 +406,28 @@ public class Peer implements Runnable { Random random = new Random(); long initialDelay = random.nextInt(PING_INTERVAL); - this.executor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS); + this.pingExecutor = Executors.newSingleThreadScheduledExecutor(); + this.pingExecutor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS); } public void disconnect(String reason) { + LOGGER.trace(String.format("Disconnecting peer %s: %s", this, reason)); + // Shut down pinger - this.executor.shutdownNow(); + if (this.pingExecutor != null) { + this.pingExecutor.shutdownNow(); + this.pingExecutor = null; + } + + // Shut down unsolicited message processor + if (this.messageExecutor != null) { + this.messageExecutor.shutdownNow(); + this.messageExecutor = null; + } // Close socket if (!this.socket.isClosed()) { - LOGGER.debug(String.format("Disconnecting peer %s: %s", this, reason)); + LOGGER.debug(String.format("Closing socket with peer %s: %s", this, reason)); try { this.socket.close();