From 0db43451d49b3f8583869c9654521ce521eab6de Mon Sep 17 00:00:00 2001 From: catbref <misc-github@talk2dom.com> Date: Wed, 30 Jan 2019 18:24:10 +0000 Subject: [PATCH] Interim networking code commit in case of dev catastrophe! DB shape change from v29 to add peer info. New NetworkRepository to handle above. Peer handshaking with v2 anti-DoS PoW code. Handshaking refactored into a state-machine-like enum. Some peer-related API calls added. Peers exchange pings, heights, peers. No actual peer sync yet. Other changes: Peer version info taken from Maven build properties/resource file. AnnotationPostProcessor more resilient when fetching PathItems. Per-repository session debugging flag that can be toggled at will. HSQLDBRepository.delete() now returns int so callers can detect whether anything was actually deleted. Some renaming to settings. --- .classpath | 12 +- log4j2.properties | 14 + pom.xml | 18 + src/main/java/org/qora/api/ApiService.java | 4 +- .../org/qora/api/model/ConnectedPeer.java | 34 ++ .../org/qora/api/resource/AdminResource.java | 2 +- .../api/resource/AnnotationPostProcessor.java | 12 +- .../org/qora/api/resource/ApiDefinition.java | 1 + .../org/qora/api/resource/PeersResource.java | 229 +++++++++ .../java/org/qora/block/BlockGenerator.java | 4 + .../java/org/qora/controller/Controller.java | 194 +++++++- .../org/qora/controller/Synchronizer.java | 54 +++ .../java/org/qora/crypto/Credentials.java | 71 +++ .../java/org/qora/data/network/PeerData.java | 65 +++ src/main/java/org/qora/network/Handshake.java | 151 ++++++ src/main/java/org/qora/network/Network.java | 445 ++++++++++++++++++ src/main/java/org/qora/network/Peer.java | 302 ++++++++++++ src/main/java/org/qora/network/Proof.java | 112 +++++ .../qora/network/message/HeightMessage.java | 47 ++ .../org/qora/network/message/Message.java | 212 +++++++++ .../qora/network/message/PeerIdMessage.java | 52 ++ .../qora/network/message/PeersMessage.java | 93 ++++ .../org/qora/network/message/PingMessage.java | 25 + .../qora/network/message/ProofMessage.java | 63 +++ .../qora/network/message/VersionMessage.java | 67 +++ .../qora/repository/NetworkRepository.java | 15 + .../java/org/qora/repository/Repository.java | 4 + .../hsqldb/HSQLDBDatabaseUpdates.java | 6 + .../hsqldb/HSQLDBNetworkRepository.java | 83 ++++ .../repository/hsqldb/HSQLDBRepository.java | 23 +- src/main/java/org/qora/settings/Settings.java | 82 +++- src/main/resources/build.properties | 2 + 32 files changed, 2445 insertions(+), 53 deletions(-) create mode 100644 src/main/java/org/qora/api/model/ConnectedPeer.java create mode 100644 src/main/java/org/qora/api/resource/PeersResource.java create mode 100644 src/main/java/org/qora/controller/Synchronizer.java create mode 100644 src/main/java/org/qora/crypto/Credentials.java create mode 100644 src/main/java/org/qora/data/network/PeerData.java create mode 100644 src/main/java/org/qora/network/Handshake.java create mode 100644 src/main/java/org/qora/network/Network.java create mode 100644 src/main/java/org/qora/network/Peer.java create mode 100644 src/main/java/org/qora/network/Proof.java create mode 100644 src/main/java/org/qora/network/message/HeightMessage.java create mode 100644 src/main/java/org/qora/network/message/Message.java create mode 100644 src/main/java/org/qora/network/message/PeerIdMessage.java create mode 100644 src/main/java/org/qora/network/message/PeersMessage.java create mode 100644 src/main/java/org/qora/network/message/PingMessage.java create mode 100644 src/main/java/org/qora/network/message/ProofMessage.java create mode 100644 src/main/java/org/qora/network/message/VersionMessage.java create mode 100644 src/main/java/org/qora/repository/NetworkRepository.java create mode 100644 src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java create mode 100644 src/main/resources/build.properties diff --git a/.classpath b/.classpath index c9350e4d..793e3dcc 100644 --- a/.classpath +++ b/.classpath @@ -1,11 +1,5 @@ <?xml version="1.0" encoding="UTF-8"?> <classpath> - <classpathentry kind="src" output="target/classes" path="target/generated-sources/package-info"> - <attributes> - <attribute name="optional" value="true"/> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> <classpathentry kind="src" output="target/classes" path="src/main/java"> <attributes> <attribute name="optional" value="true"/> @@ -41,6 +35,12 @@ <attribute name="m2e-apt" value="true"/> </attributes> </classpathentry> + <classpathentry kind="src" output="target/classes" path="target/generated-sources/package-info"> + <attributes> + <attribute name="optional" value="true"/> + <attribute name="maven.pomderived" value="true"/> + </attributes> + </classpathentry> <classpathentry kind="src" output="target/test-classes" path="target/generated-test-sources/test-annotations"> <attributes> <attribute name="optional" value="true"/> diff --git a/log4j2.properties b/log4j2.properties index b5c3718f..9e9af414 100644 --- a/log4j2.properties +++ b/log4j2.properties @@ -10,6 +10,10 @@ rootLogger.appenderRef.rolling.ref = FILE logger.hsqldb.name = hsqldb.db logger.hsqldb.level = warn +# Support optional, per-session HSQLDB debugging +logger.hsqldbDebug.name = org.qora.repository.hsqldb.HSQLDBRepository +logger.hsqldbDebug.level = debug + # Suppress extraneous Jersey warning logger.jerseyInject.name = org.glassfish.jersey.internal.inject.Providers logger.jerseyInject.level = error @@ -18,6 +22,16 @@ logger.jerseyInject.level = error logger.txSearch.name = org.qora.repository.hsqldb.transaction.HSQLDBTransactionRepository logger.txSearch.level = trace +# Debug block generator +logger.blockgen.name = org.qora.block.BlockGenerator +logger.blockgen.level = trace + +# Debug networking +logger.network.name = org.qora.network.Network +logger.network.level = trace +logger.peer.name = org.qora.network.Peer +logger.peer.level = trace + appender.console.type = Console appender.console.name = stdout appender.console.layout.type = PatternLayout diff --git a/pom.xml b/pom.xml index 08de2ea7..ef052c39 100644 --- a/pom.xml +++ b/pom.xml @@ -8,6 +8,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <bouncycastle.version>1.60</bouncycastle.version> + <dagger.version>1.2.2</dagger.version> <hsqldb.version>r5836</hsqldb.version> <jetty.version>9.4.11.v20180605</jetty.version> <jersey.version>2.27</jersey.version> @@ -16,10 +17,17 @@ <swagger-api.version>2.0.6</swagger-api.version> <swagger-ui.version>3.19.0</swagger-ui.version> <felix-bundle-plugin.version>3.5.0</felix-bundle-plugin.version> + <build.timestamp>${maven.build.timestamp}</build.timestamp> </properties> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>${basedir}/src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> @@ -357,6 +365,11 @@ <artifactId>jetty-rewrite</artifactId> <version>${jetty.version}</version> </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-client</artifactId> + <version>${jetty.version}</version> + </dependency> <!-- Jersey --> <dependency> <groupId>org.glassfish.jersey.core</groupId> @@ -416,5 +429,10 @@ <artifactId>bcprov-jdk15on</artifactId> <version>${bouncycastle.version}</version> </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bctls-jdk15on</artifactId> + <version>${bouncycastle.version}</version> + </dependency> </dependencies> </project> diff --git a/src/main/java/org/qora/api/ApiService.java b/src/main/java/org/qora/api/ApiService.java index ea562911..05b12213 100644 --- a/src/main/java/org/qora/api/ApiService.java +++ b/src/main/java/org/qora/api/ApiService.java @@ -31,11 +31,11 @@ public class ApiService { config.register(AnnotationPostProcessor.class); // Create RPC server - this.server = new Server(Settings.getInstance().getRpcPort()); + this.server = new Server(Settings.getInstance().getApiPort()); // IP address based access control InetAccessHandler accessHandler = new InetAccessHandler(); - for (String pattern : Settings.getInstance().getRpcAllowed()) { + for (String pattern : Settings.getInstance().getApiAllowed()) { accessHandler.include(pattern); } this.server.setHandler(accessHandler); diff --git a/src/main/java/org/qora/api/model/ConnectedPeer.java b/src/main/java/org/qora/api/model/ConnectedPeer.java new file mode 100644 index 00000000..1ac08433 --- /dev/null +++ b/src/main/java/org/qora/api/model/ConnectedPeer.java @@ -0,0 +1,34 @@ +package org.qora.api.model; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; + +import org.qora.network.Peer; + +@XmlAccessorType(XmlAccessType.FIELD) +public class ConnectedPeer { + + public String hostname; + public int port; + public Long lastPing; + public Integer lastHeight; + + public enum Direction { + INBOUND, + OUTBOUND; + } + + public Direction direction; + + protected ConnectedPeer() { + } + + public ConnectedPeer(Peer peer) { + this.hostname = peer.getRemoteSocketAddress().getHostString(); + this.port = peer.getRemoteSocketAddress().getPort(); + this.lastPing = peer.getLastPing(); + this.direction = peer.isOutbound() ? Direction.OUTBOUND : Direction.INBOUND; + this.lastHeight = peer.getPeerData() == null ? null : peer.getPeerData().getLastHeight(); + } + +} diff --git a/src/main/java/org/qora/api/resource/AdminResource.java b/src/main/java/org/qora/api/resource/AdminResource.java index 51bf2fac..372a33e4 100644 --- a/src/main/java/org/qora/api/resource/AdminResource.java +++ b/src/main/java/org/qora/api/resource/AdminResource.java @@ -73,7 +73,7 @@ public class AdminResource { new Thread(new Runnable() { @Override public void run() { - Controller.shutdown(); + Controller.getInstance().shutdownAndExit(); } }).start(); diff --git a/src/main/java/org/qora/api/resource/AnnotationPostProcessor.java b/src/main/java/org/qora/api/resource/AnnotationPostProcessor.java index ee5d6a13..9593c2a9 100644 --- a/src/main/java/org/qora/api/resource/AnnotationPostProcessor.java +++ b/src/main/java/org/qora/api/resource/AnnotationPostProcessor.java @@ -69,6 +69,12 @@ public class AnnotationPostProcessor implements ReaderListener { LOGGER.trace("Found @ApiErrors annotation on " + clazz.getSimpleName() + "." + method.getName()); PathItem pathItem = getPathItemFromMethod(openAPI, classPathString, method); + if (pathItem == null) { + LOGGER.error(String.format("Couldn't get PathItem for %s", clazz.getSimpleName() + "." + method.getName())); + LOGGER.error(String.format("Known paths: %s", String.join(", ", openAPI.getPaths().keySet()))); + continue; + } + for (Operation operation : pathItem.readOperations()) for (ApiError apiError : apiErrors.value()) addApiErrorResponse(operation, apiError); @@ -82,6 +88,10 @@ public class AnnotationPostProcessor implements ReaderListener { return openAPI.getPaths().get(classPathString); String pathString = path.value(); + + if (pathString.equals("/")) + pathString = ""; + return openAPI.getPaths().get(classPathString + pathString); } @@ -107,7 +117,7 @@ public class AnnotationPostProcessor implements ReaderListener { // XXX: addExamples(..) is not working in Swagger 2.0.4. This bug is referenced in https://github.com/swagger-api/swagger-ui/issues/2651 // Replace the call to .setExample(..) by .addExamples(..) when the bug is fixed. apiResponse.getContent().get(javax.ws.rs.core.MediaType.APPLICATION_JSON).setExample(example); - //apiResponse.getContent().get(javax.ws.rs.core.MediaType.APPLICATION_JSON).addExamples(Integer.toString(apiErrorCode), example); + // apiResponse.getContent().get(javax.ws.rs.core.MediaType.APPLICATION_JSON).addExamples(Integer.toString(apiErrorCode), example); } } diff --git a/src/main/java/org/qora/api/resource/ApiDefinition.java b/src/main/java/org/qora/api/resource/ApiDefinition.java index 6ff1153f..549a88d7 100644 --- a/src/main/java/org/qora/api/resource/ApiDefinition.java +++ b/src/main/java/org/qora/api/resource/ApiDefinition.java @@ -16,6 +16,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; @Tag(name = "Groups"), @Tag(name = "Names"), @Tag(name = "Payments"), + @Tag(name = "Peers"), @Tag(name = "Transactions"), @Tag(name = "Utilities") }, diff --git a/src/main/java/org/qora/api/resource/PeersResource.java b/src/main/java/org/qora/api/resource/PeersResource.java new file mode 100644 index 00000000..a802bd2f --- /dev/null +++ b/src/main/java/org/qora/api/resource/PeersResource.java @@ -0,0 +1,229 @@ +package org.qora.api.resource; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.ArraySchema; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.parameters.RequestBody; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.stream.Collectors; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; + +import org.qora.api.ApiError; +import org.qora.api.ApiErrors; +import org.qora.api.ApiException; +import org.qora.api.ApiExceptionFactory; +import org.qora.api.Security; +import org.qora.api.model.ConnectedPeer; +import org.qora.data.network.PeerData; +import org.qora.network.Network; +import org.qora.repository.DataException; +import org.qora.repository.Repository; +import org.qora.repository.RepositoryManager; +import org.qora.settings.Settings; + +@Path("/peers") +@Produces({ + MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN +}) +@Tag( + name = "Peers" +) +public class PeersResource { + + @Context + HttpServletRequest request; + + @GET + @Operation( + summary = "Fetch list of connected peers", + responses = { + @ApiResponse( + content = @Content( + mediaType = MediaType.APPLICATION_JSON, + array = @ArraySchema( + schema = @Schema( + implementation = ConnectedPeer.class + ) + ) + ) + ) + } + ) + public List<ConnectedPeer> getPeers() { + return Network.getInstance().getConnectedPeers().stream().map(peer -> new ConnectedPeer(peer)).collect(Collectors.toList()); + } + + @GET + @Path("/known") + @Operation( + summary = "Fetch list of all known peers", + responses = { + @ApiResponse( + content = @Content( + mediaType = MediaType.APPLICATION_JSON, + array = @ArraySchema( + schema = @Schema( + implementation = PeerData.class + ) + ) + ) + ) + } + ) + @ApiErrors({ + ApiError.REPOSITORY_ISSUE + }) + public List<PeerData> getKnownPeers() { + try (final Repository repository = RepositoryManager.getRepository()) { + return repository.getNetworkRepository().getAllPeers(); + } catch (DataException e) { + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e); + } + } + + @GET + @Path("/self") + @Operation( + summary = "Fetch list of peers that connect to self", + responses = { + @ApiResponse( + content = @Content( + mediaType = MediaType.APPLICATION_JSON, + array = @ArraySchema( + schema = @Schema( + implementation = PeerData.class + ) + ) + ) + ) + } + ) + public List<PeerData> getSelfPeers() { + return Network.getInstance().getSelfPeers(); + } + + @POST + @Operation( + summary = "Add new peer address", + requestBody = @RequestBody( + required = true, + content = @Content( + mediaType = MediaType.TEXT_PLAIN, + schema = @Schema( + type = "string" + ) + ) + ), + responses = { + @ApiResponse( + description = "true if accepted", + content = @Content( + schema = @Schema( + type = "string" + ) + ) + ) + } + ) + @ApiErrors({ + ApiError.INVALID_DATA, ApiError.REPOSITORY_ISSUE + }) + public String addPeer(String peerAddress) { + Security.checkApiCallAllowed(request); + + try (final Repository repository = RepositoryManager.getRepository()) { + String[] peerParts = peerAddress.split(":"); + + // Expecting one or two parts + if (peerParts.length < 1 || peerParts.length > 2) + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); + + String hostname = peerParts[0]; + int port = peerParts.length == 2 ? Integer.parseInt(peerParts[1]) : Settings.DEFAULT_LISTEN_PORT; + + InetSocketAddress socketAddress = new InetSocketAddress(hostname, port); + + PeerData peerData = new PeerData(socketAddress); + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + + return "true"; + } catch (IllegalArgumentException e) { + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); + } catch (ApiException e) { + throw e; + } catch (DataException e) { + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e); + } + } + + @DELETE + @Operation( + summary = "Remove peer address from database", + requestBody = @RequestBody( + required = true, + content = @Content( + mediaType = MediaType.TEXT_PLAIN, + schema = @Schema( + type = "string" + ) + ) + ), + responses = { + @ApiResponse( + description = "true if removed, false if not found", + content = @Content( + schema = @Schema( + type = "string" + ) + ) + ) + } + ) + @ApiErrors({ + ApiError.INVALID_DATA, ApiError.REPOSITORY_ISSUE + }) + public String removePeer(String peerAddress) { + Security.checkApiCallAllowed(request); + + try (final Repository repository = RepositoryManager.getRepository()) { + String[] peerParts = peerAddress.split(":"); + + // Expecting one or two parts + if (peerParts.length < 1 || peerParts.length > 2) + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); + + String hostname = peerParts[0]; + int port = peerParts.length == 2 ? Integer.parseInt(peerParts[1]) : Settings.DEFAULT_LISTEN_PORT; + + InetSocketAddress socketAddress = new InetSocketAddress(hostname, port); + + PeerData peerData = new PeerData(socketAddress); + + int numDeleted = repository.getNetworkRepository().delete(peerData); + repository.saveChanges(); + + return numDeleted != 0 ? "true" : "false"; + } catch (IllegalArgumentException e) { + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); + } catch (ApiException e) { + throw e; + } catch (DataException e) { + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e); + } + } + +} diff --git a/src/main/java/org/qora/block/BlockGenerator.java b/src/main/java/org/qora/block/BlockGenerator.java index ee8c3ddd..88dac306 100644 --- a/src/main/java/org/qora/block/BlockGenerator.java +++ b/src/main/java/org/qora/block/BlockGenerator.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qora.account.PrivateKeyAccount; import org.qora.block.Block.ValidationResult; +import org.qora.controller.Controller; import org.qora.data.block.BlockData; import org.qora.data.transaction.TransactionData; import org.qora.repository.BlockRepository; @@ -93,6 +94,9 @@ public class BlockGenerator extends Thread { newBlock.process(); LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight()); repository.saveChanges(); + + // Notify controller + Controller.getInstance().onGeneratedBlock(newBlock.getBlockData()); } catch (DataException e) { // Unable to process block - report and discard LOGGER.error("Unable to process newly generated block?", e); diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 1198b7d2..61801cf2 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -1,41 +1,85 @@ package org.qora.controller; +import java.io.IOException; +import java.io.InputStream; import java.security.Security; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Properties; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider; import org.qora.api.ApiService; import org.qora.block.BlockChain; import org.qora.block.BlockGenerator; +import org.qora.data.block.BlockData; +import org.qora.network.Network; +import org.qora.network.Peer; +import org.qora.network.message.HeightMessage; +import org.qora.network.message.Message; import org.qora.repository.DataException; +import org.qora.repository.Repository; import org.qora.repository.RepositoryFactory; import org.qora.repository.RepositoryManager; import org.qora.repository.hsqldb.HSQLDBRepositoryFactory; import org.qora.settings.Settings; import org.qora.utils.Base58; -public class Controller { +public class Controller extends Thread { static { // This must go before any calls to LogManager/Logger System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager"); } - private static final Logger LOGGER = LogManager.getLogger(Controller.class); - public static final String connectionUrl = "jdbc:hsqldb:file:db/blockchain;create=true"; - public static final long startTime = System.currentTimeMillis(); + public static final String VERSION_PREFIX = "qora-core-"; + + private static final Logger LOGGER = LogManager.getLogger(Controller.class); private static final Object shutdownLock = new Object(); private static boolean isStopping = false; + private static BlockGenerator blockGenerator = null; + private static Controller instance; + private final String buildVersion; + private final long buildTimestamp; - private static BlockGenerator blockGenerator; + private Controller() { + Properties properties = new Properties(); + try (InputStream in = ClassLoader.getSystemResourceAsStream("build.properties")) { + properties.load(in); + } catch (IOException e) { + throw new RuntimeException("Can't read build.properties resource", e); + } + + String buildTimestamp = properties.getProperty("build.timestamp"); + if (buildTimestamp == null) + throw new RuntimeException("Can't read build.timestamp from build.properties resource"); + + this.buildTimestamp = LocalDateTime.parse(buildTimestamp, DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssX")).toEpochSecond(ZoneOffset.UTC); + + String buildVersion = properties.getProperty("build.version"); + if (buildVersion == null) + throw new RuntimeException("Can't read build.version from build.properties resource"); + + this.buildVersion = VERSION_PREFIX + buildVersion; + } + + public static Controller getInstance() { + if (instance == null) + instance = new Controller(); + + return instance; + } public static void main(String args[]) { LOGGER.info("Starting up..."); Security.insertProviderAt(new BouncyCastleProvider(), 0); + Security.insertProviderAt(new BouncyCastleJsseProvider(), 1); // Load/check settings, which potentially sets up blockchain config, etc. Settings.getInstance(); @@ -57,12 +101,15 @@ public class Controller { System.exit(2); } - LOGGER.info("Starting block generator"); - byte[] privateKey = Base58.decode("A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6"); - blockGenerator = new BlockGenerator(privateKey); - blockGenerator.start(); + // XXX work to be done here! + if (args.length == 0) { + LOGGER.info("Starting block generator"); + byte[] privateKey = Base58.decode("A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6"); + blockGenerator = new BlockGenerator(privateKey); + blockGenerator.start(); + } - LOGGER.info("Starting API"); + LOGGER.info("Starting API on port " + Settings.getInstance().getApiPort()); try { ApiService apiService = ApiService.getInstance(); apiService.start(); @@ -71,35 +118,74 @@ public class Controller { System.exit(1); } + LOGGER.info("Starting networking"); + try { + Network network = Network.getInstance(); + network.start(); + } catch (Exception e) { + LOGGER.error("Unable to start networking", e); + System.exit(1); + } + Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - Controller.shutdown(); + Controller.getInstance().shutdown(); } }); + + LOGGER.info("Starting controller"); + Controller.getInstance().start(); } - public static void shutdown() { + @Override + public void run() { + Thread.currentThread().setName("Controller"); + + try { + while (true) { + Thread.sleep(1000); + + // Query random connections for their blockchain status + // If height > ours then potentially synchronize + + // Query random connections for unconfirmed transactions + } + } catch (InterruptedException e) { + // time to exit + return; + } + } + + public void shutdown() { synchronized (shutdownLock) { if (!isStopping) { isStopping = true; + LOGGER.info("Shutting down controller"); + this.interrupt(); + + LOGGER.info("Shutting down networking"); + Network.getInstance().shutdown(); + LOGGER.info("Shutting down API"); ApiService.getInstance().stop(); - LOGGER.info("Shutting down block generator"); - blockGenerator.shutdown(); - try { - blockGenerator.join(); - } catch (InterruptedException e) { - e.printStackTrace(); + if (blockGenerator != null) { + LOGGER.info("Shutting down block generator"); + blockGenerator.shutdown(); + try { + blockGenerator.join(); + } catch (InterruptedException e) { + // We were interrupted while waiting for thread to 'join' + } } try { LOGGER.info("Shutting down repository"); RepositoryManager.closeRepositoryFactory(); } catch (DataException e) { - e.printStackTrace(); + LOGGER.error("Error occurred while shutting down repository", e); } LOGGER.info("Shutdown complete!"); @@ -107,4 +193,74 @@ public class Controller { } } + public void shutdownAndExit() { + this.shutdown(); + System.exit(0); + } + + public byte[] getMessageMagic() { + return new byte[] { + 0x12, 0x34, 0x56, 0x78 + }; + } + + public long getBuildTimestamp() { + return this.buildTimestamp; + } + + public String getVersionString() { + return this.buildVersion; + } + + public int getChainHeight() { + try (final Repository repository = RepositoryManager.getRepository()) { + return repository.getBlockRepository().getBlockchainHeight(); + } catch (DataException e) { + LOGGER.error("Repository issue when fetching blockchain height", e); + return 0; + } + } + + // Callbacks for/from network + + public void doNetworkBroadcast() { + Network network = Network.getInstance(); + + // Send our known peers + network.broadcast(network.buildPeersMessage()); + + // Send our current height + network.broadcast(new HeightMessage(this.getChainHeight())); + } + + public void onGeneratedBlock(BlockData newBlockData) { + // XXX we should really be broadcasting the new block sig, not height + // Could even broadcast top two block sigs so that remote peers can see new block references current network-wide last block + + // Broadcast our new height + Network.getInstance().broadcast(new HeightMessage(newBlockData.getHeight())); + } + + public void onNetworkMessage(Peer peer, Message message) { + LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer.getRemoteSocketAddress())); + + switch (message.getType()) { + case HEIGHT: + HeightMessage heightMessage = (HeightMessage) message; + + // If we connected to peer, then update our record of peer's height + if (peer.isOutbound()) + peer.getPeerData().setLastHeight(heightMessage.getHeight()); + + // XXX we should instead test incoming block sigs to see if we have them, and if not do sync + // Is peer's blockchain longer than ours? + if (heightMessage.getHeight() > getChainHeight()) + Synchronizer.getInstance().synchronize(peer); + break; + + default: + break; + } + } + } diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java new file mode 100644 index 00000000..b42e0f56 --- /dev/null +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -0,0 +1,54 @@ +package org.qora.controller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qora.network.Peer; + +public class Synchronizer { + + private static final Logger LOGGER = LogManager.getLogger(Synchronizer.class); + + private static Synchronizer instance; + + private Synchronizer() { + } + + public static Synchronizer getInstance() { + if (instance == null) + instance = new Synchronizer(); + + return instance; + } + + public void synchronize(Peer peer) { + // If we're already synchronizing with another peer then return + + LOGGER.info(String.format("Synchronizing with peer %s", peer.getRemoteSocketAddress())); + + // Peer has different latest block sig to us + + // find common block? + + // if common block is too far behind us then we're on massively different forks so give up, maybe human invention required to download desired fork + + // unwind to common block (unless common block is our latest block) + + // apply some newer blocks from peer + + // commit + + // If our block gen creates a block while we do this - what happens? + // does repository serialization prevent issues? + + // blockgen: block 123: pay X from A to B, commit + // sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit + + // and vice versa? + + // sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit + // blockgen: block 123: pay X from A to B, commit + + // simply block syncing when generating and vice versa by grabbing a Controller-owned non-blocking mutex? + } + +} diff --git a/src/main/java/org/qora/crypto/Credentials.java b/src/main/java/org/qora/crypto/Credentials.java new file mode 100644 index 00000000..2892e803 --- /dev/null +++ b/src/main/java/org/qora/crypto/Credentials.java @@ -0,0 +1,71 @@ +package org.qora.crypto; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyFactory; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.KeySpec; +import java.security.spec.PKCS8EncodedKeySpec; + +public class Credentials { + + public static Certificate readCertFile(String certPath) throws CertificateException { + if (certPath == null || !new File(certPath).exists()) + return null; + + CertificateFactory certFactory; + try { + certFactory = CertificateFactory.getInstance("X509", "BC"); + File certFile = new File(certPath); + try (InputStream certStream = new FileInputStream(certFile)) { + return certFactory.generateCertificate(certStream); + } + } catch (FileNotFoundException e) { + return null; + } catch (CertificateException | IOException | NoSuchProviderException e) { + throw new CertificateException(e); + } + } + + public static Certificate readCertResource(String certResource) throws CertificateException { + ClassLoader loader = Credentials.class.getClassLoader(); + try (InputStream inputStream = loader.getResourceAsStream(certResource)) { + if (inputStream == null) + return null; + + CertificateFactory certFactory; + try { + certFactory = CertificateFactory.getInstance("X509", "BC"); + return certFactory.generateCertificate(inputStream); + } catch (CertificateException | NoSuchProviderException e) { + throw new CertificateException(e); + } + } catch (IOException e) { + return null; + } + } + + public static PrivateKey loadPrivateKey(String privateKeyPath, String keyAlgorithm) throws IOException, NoSuchAlgorithmException, InvalidKeySpecException { + if (privateKeyPath == null || !new File(privateKeyPath).exists()) + throw new FileNotFoundException("Private key file not found at " + privateKeyPath); + + File file = new File(privateKeyPath); + byte[] privKeyBytes = new byte[(int) file.length()]; + try (InputStream in = new FileInputStream(file)) { + in.read(privKeyBytes); + } + KeyFactory keyFactory = KeyFactory.getInstance(keyAlgorithm); + KeySpec ks = new PKCS8EncodedKeySpec(privKeyBytes); + return keyFactory.generatePrivate(ks); + } + +} diff --git a/src/main/java/org/qora/data/network/PeerData.java b/src/main/java/org/qora/data/network/PeerData.java new file mode 100644 index 00000000..48b5288b --- /dev/null +++ b/src/main/java/org/qora/data/network/PeerData.java @@ -0,0 +1,65 @@ +package org.qora.data.network; + +import java.net.InetSocketAddress; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; + +// All properties to be converted to JSON via JAX-RS +@XmlAccessorType(XmlAccessType.FIELD) +public class PeerData { + + // Properties + private InetSocketAddress socketAddress; + private Long lastAttempted; + private Long lastConnected; + private Integer lastHeight; + + // Constructors + + // necessary for JAX-RS serialization + protected PeerData() { + } + + public PeerData(InetSocketAddress socketAddress, Long lastAttempted, Long lastConnected, Integer lastHeight) { + this.socketAddress = socketAddress; + this.lastAttempted = lastAttempted; + this.lastConnected = lastConnected; + this.lastHeight = lastHeight; + } + + public PeerData(InetSocketAddress socketAddress) { + this(socketAddress, null, null, null); + } + + // Getters / setters + + public InetSocketAddress getSocketAddress() { + return this.socketAddress; + } + + public Long getLastAttempted() { + return this.lastAttempted; + } + + public void setLastAttempted(Long lastAttempted) { + this.lastAttempted = lastAttempted; + } + + public Long getLastConnected() { + return this.lastConnected; + } + + public void setLastConnected(Long lastConnected) { + this.lastConnected = lastConnected; + } + + public Integer getLastHeight() { + return this.lastHeight; + } + + public void setLastHeight(Integer lastHeight) { + this.lastHeight = lastHeight; + } + +} diff --git a/src/main/java/org/qora/network/Handshake.java b/src/main/java/org/qora/network/Handshake.java new file mode 100644 index 00000000..87e7ae61 --- /dev/null +++ b/src/main/java/org/qora/network/Handshake.java @@ -0,0 +1,151 @@ +package org.qora.network; + +import java.util.Arrays; + +import org.qora.controller.Controller; +import org.qora.network.message.Message; +import org.qora.network.message.Message.MessageType; +import org.qora.utils.NTP; +import org.qora.network.message.PeerIdMessage; +import org.qora.network.message.ProofMessage; +import org.qora.network.message.VersionMessage; + +public enum Handshake { + STARTED(null) { + @Override + public Handshake onMessage(Peer peer, Message message) { + return VERSION; + } + + @Override + public void action(Peer peer) { + } + }, + VERSION(MessageType.VERSION) { + @Override + public Handshake onMessage(Peer peer, Message message) { + peer.setVersionMessage((VersionMessage) message); + return SELF_CHECK; + } + + @Override + public void action(Peer peer) { + sendVersion(peer); + } + }, + SELF_CHECK(MessageType.PEER_ID) { + @Override + public Handshake onMessage(Peer peer, Message message) { + PeerIdMessage peerIdMessage = (PeerIdMessage) message; + + if (Arrays.equals(peerIdMessage.getPeerId(), Network.getInstance().getOurPeerId())) { + // Connected to self! + // If outgoing connection then record destination as self so we don't try again + if (peer.isOutbound()) + Network.getInstance().noteToSelf(peer); + else + // We still need to send our ID so our outbound connection can mark their address as 'self' + sendMyId(peer); + + // Handshake failure - caller will deal with disconnect + return null; + } + + // If we're both version 2 peers then next stage is proof + if (peer.getVersion() >= 2) + return PROOF; + + // Fall-back for older clients (for now) + return COMPLETED; + } + + @Override + public void action(Peer peer) { + sendMyId(peer); + } + }, + PROOF(MessageType.PROOF) { + @Override + public Handshake onMessage(Peer peer, Message message) { + ProofMessage proofMessage = (ProofMessage) message; + + // Check peer's timestamp is within acceptable bounds + if (Math.abs(proofMessage.getTimestamp() - peer.getConnectionTimestamp()) > MAX_TIMESTAMP_DELTA) + return null; + + // If we connected outbound to peer, then this is a faked confirmation response, so we're good + if (peer.isOutbound()) + return COMPLETED; + + // Check salt hasn't been seen before - this stops multiple peers reusing salt nonce in a Sybil-like attack + if (Proof.seenSalt(proofMessage.getSalt())) + return null; + + if (!Proof.check(proofMessage.getTimestamp(), proofMessage.getSalt(), proofMessage.getNonce())) + return null; + + // Proof valid + return COMPLETED; + } + + @Override + public void action(Peer peer) { + sendProof(peer); + } + }, + COMPLETED(null) { + @Override + public Handshake onMessage(Peer peer, Message message) { + // Handshake completed + return null; + } + + @Override + public void action(Peer peer) { + // Note: this is only called when we've made outbound connection + + // Make a note that we've successfully completed handshake (and when) + peer.getPeerData().setLastConnected(NTP.getTime()); + } + }; + + private static final long MAX_TIMESTAMP_DELTA = 1000; // ms + + public final MessageType expectedMessageType; + + private Handshake(MessageType expectedMessageType) { + this.expectedMessageType = expectedMessageType; + } + + public abstract Handshake onMessage(Peer peer, Message message); + + public abstract void action(Peer peer); + + private static void sendVersion(Peer peer) { + long buildTimestamp = Controller.getInstance().getBuildTimestamp(); + String versionString = Controller.getInstance().getVersionString(); + + Message versionMessage = new VersionMessage(buildTimestamp, versionString); + if (!peer.sendMessage(versionMessage)) + peer.disconnect(); + } + + private static void sendMyId(Peer peer) { + Message peerIdMessage = new PeerIdMessage(Network.getInstance().getOurPeerId()); + if (!peer.sendMessage(peerIdMessage)) + peer.disconnect(); + } + + private static void sendProof(Peer peer) { + if (peer.isOutbound()) { + // For outbound connections we need to generate real proof + new Proof(peer).start(); + } else { + // For incoming connections we only need to send a fake proof message as confirmation + Message proofMessage = new ProofMessage(peer.getConnectionTimestamp(), 0, 0); + if (!peer.sendMessage(proofMessage)) + peer.disconnect(); + } + } + +} diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java new file mode 100644 index 00000000..cf034f3c --- /dev/null +++ b/src/main/java/org/qora/network/Network.java @@ -0,0 +1,445 @@ +package org.qora.network; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qora.controller.Controller; +import org.qora.data.network.PeerData; +import org.qora.network.message.HeightMessage; +import org.qora.network.message.Message; +import org.qora.network.message.PeersMessage; +import org.qora.network.message.PingMessage; +import org.qora.repository.DataException; +import org.qora.repository.Repository; +import org.qora.repository.RepositoryManager; +import org.qora.settings.Settings; +import org.qora.utils.NTP; + +// For managing peers +public class Network extends Thread { + + private static final Logger LOGGER = LogManager.getLogger(Network.class); + private static final int LISTEN_BACKLOG = 10; + private static final int CONNECT_FAILURE_BACKOFF = 60 * 1000; // ms + private static final int BROADCAST_INTERVAL = 60 * 1000; // ms + private static Network instance; + + public static final int PEER_ID_LENGTH = 128; + + private final byte[] ourPeerId; + private List<Peer> connectedPeers; + private List<PeerData> selfPeers; + private ServerSocket listenSocket; + private int minPeers; + private int maxPeers; + private ExecutorService peerExecutor; + private long nextBroadcast; + + // Constructors + + private Network() { + // Grab P2P port from settings + int listenPort = Settings.getInstance().getListenPort(); + + // Grab P2P bind address from settings + try { + InetAddress bindAddr = InetAddress.getByName(Settings.getInstance().getBindAddress()); + InetSocketAddress endpoint = new InetSocketAddress(bindAddr, listenPort); + + // Set up listen socket + listenSocket = new ServerSocket(); + listenSocket.setReuseAddress(true); + listenSocket.setSoTimeout(1); // accept() calls block for at most 1ms + listenSocket.bind(endpoint, LISTEN_BACKLOG); + } catch (UnknownHostException e) { + LOGGER.error("Can't bind listen socket to address " + Settings.getInstance().getBindAddress()); + throw new RuntimeException("Can't bind listen socket to address"); + } catch (IOException e) { + LOGGER.error("Can't create listen socket"); + throw new RuntimeException("Can't create listen socket"); + } + + connectedPeers = new ArrayList<>(); + selfPeers = new ArrayList<>(); + + ourPeerId = new byte[PEER_ID_LENGTH]; + new SecureRandom().nextBytes(ourPeerId); + + minPeers = Settings.getInstance().getMinPeers(); + maxPeers = Settings.getInstance().getMaxPeers(); + + peerExecutor = Executors.newCachedThreadPool(); + nextBroadcast = System.currentTimeMillis(); + } + + // Getters / setters + + public static Network getInstance() { + if (instance == null) + instance = new Network(); + + return instance; + } + + public byte[] getOurPeerId() { + return this.ourPeerId; + } + + public List<Peer> getConnectedPeers() { + synchronized (this.connectedPeers) { + return new ArrayList<>(this.connectedPeers); + } + } + + public List<PeerData> getSelfPeers() { + synchronized (this.selfPeers) { + return new ArrayList<>(this.selfPeers); + } + } + + public void noteToSelf(Peer peer) { + LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer.getRemoteSocketAddress())); + + synchronized (this.selfPeers) { + this.selfPeers.add(peer.getPeerData()); + } + } + + // Main thread + + @Override + public void run() { + Thread.currentThread().setName("Network"); + + // Maintain long-term connections to various peers' API applications + try { + while (true) { + acceptConnection(); + + createConnection(); + + if (System.currentTimeMillis() >= this.nextBroadcast) { + this.nextBroadcast = System.currentTimeMillis() + BROADCAST_INTERVAL; + + // Controller can decide what to broadcast + Controller.getInstance().doNetworkBroadcast(); + } + + // Sleep for a while + Thread.sleep(1000); + } + } catch (InterruptedException e) { + // Fall-through to shutdown + } catch (DataException e) { + LOGGER.warn("Repository issue while running network", e); + // Fall-through to shutdown + } + + // Shutdown + if (!this.listenSocket.isClosed()) + try { + this.listenSocket.close(); + } catch (IOException e) { + // Not important + } + } + + @SuppressWarnings("resource") + private void acceptConnection() throws InterruptedException { + Socket socket; + + try { + socket = this.listenSocket.accept(); + } catch (SocketTimeoutException e) { + // No connections to accept + return; + } catch (IOException e) { + // Something went wrong or listen socket was closed due to shutdown + return; + } + + synchronized (this.connectedPeers) { + if (connectedPeers.size() >= maxPeers) { + // We have enough peers + LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress())); + + try { + socket.close(); + } catch (IOException e) { + // Not important + } + + return; + } + + LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress())); + Peer newPeer = new Peer(socket); + this.connectedPeers.add(newPeer); + peerExecutor.execute(newPeer); + } + } + + private void createConnection() throws InterruptedException, DataException { + synchronized (this.connectedPeers) { + if (connectedPeers.size() >= minPeers) + return; + } + + Peer newPeer; + + try (final Repository repository = RepositoryManager.getRepository()) { + // Find an address to connect to + List<PeerData> peers = repository.getNetworkRepository().getAllPeers(); + + // Don't consider peers with recent connection failures + final long lastAttemptedThreshold = NTP.getTime() - CONNECT_FAILURE_BACKOFF; + peers.removeIf(peerData -> peerData.getLastAttempted() != null && peerData.getLastAttempted() > lastAttemptedThreshold); + + // Don't consider peers that we know loop back to ourself + Predicate<PeerData> hasSamePeerSocketAddress = peerData -> this.selfPeers.stream() + .anyMatch(selfPeerData -> selfPeerData.getSocketAddress().equals(peerData.getSocketAddress())); + + synchronized (this.selfPeers) { + peers.removeIf(hasSamePeerSocketAddress); + } + + // Don't consider already connected peers + Predicate<PeerData> isConnectedPeer = peerData -> this.connectedPeers.stream() + .anyMatch(peer -> peer.getPeerData() != null && peer.getPeerData().getSocketAddress().equals(peerData.getSocketAddress())); + + synchronized (this.connectedPeers) { + peers.removeIf(isConnectedPeer); + } + + // Any left? + if (peers.isEmpty()) + return; + + // Pick random peer + int peerIndex = new SecureRandom().nextInt(peers.size()); + + // Pick candidate + PeerData peerData = peers.get(peerIndex); + newPeer = new Peer(peerData); + + // Update connection attempt info + peerData.setLastAttempted(NTP.getTime()); + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } + + if (!newPeer.connect()) + return; + + synchronized (this.connectedPeers) { + this.connectedPeers.add(newPeer); + } + + peerExecutor.execute(newPeer); + } + + // Peer callbacks + + /** Called when Peer's thread has setup and is ready to process messages */ + public void onPeerReady(Peer peer) { + this.onMessage(peer, null); + } + + public void onDisconnect(Peer peer) { + synchronized (this.connectedPeers) { + this.connectedPeers.remove(peer); + } + } + + /** Called when a new message arrives for a peer. message can be null if called after connection */ + public void onMessage(Peer peer, Message message) { + if (message != null) + LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer.getRemoteSocketAddress())); + + Handshake handshakeStatus = peer.getHandshakeStatus(); + if (handshakeStatus != Handshake.COMPLETED) { + // Still handshaking + + // Check message type is as expected + if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { + LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer.getRemoteSocketAddress(), + handshakeStatus.expectedMessageType)); + peer.disconnect(); + return; + } + + Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message); + + if (newHandshakeStatus == null) { + // Handshake failure + LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer.getRemoteSocketAddress(), message.getType().name())); + peer.disconnect(); + return; + } + + if (peer.isOutbound()) + // If we made outbound connection then we need to act first + newHandshakeStatus.action(peer); + else + // We have inbound connection so we need to respond inline with what we just received + handshakeStatus.action(peer); + + peer.setHandshakeStatus(newHandshakeStatus); + + if (newHandshakeStatus == Handshake.COMPLETED) + this.onHandshakeCompleted(peer); + + return; + } + + // Should be non-handshaking messages from now on + + switch (message.getType()) { + case VERSION: + case PEER_ID: + case PROOF: + LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer.getRemoteSocketAddress())); + peer.disconnect(); + return; + + case PING: + PingMessage pingMessage = (PingMessage) message; + + // Generate 'pong' using same ID + PingMessage pongMessage = new PingMessage(); + pongMessage.setId(pingMessage.getId()); + + if (!peer.sendMessage(pongMessage)) + peer.disconnect(); + + break; + + case PEERS: + PeersMessage peersMessage = (PeersMessage) message; + + List<InetSocketAddress> peerAddresses = new ArrayList<>(); + + for (InetAddress peerAddress : peersMessage.getPeerAddresses()) + peerAddresses.add(new InetSocketAddress(peerAddress, Settings.DEFAULT_LISTEN_PORT)); + + try { + mergePeers(peerAddresses); + } catch (DataException e) { + // Not good + peer.disconnect(); + return; + } + break; + + default: + // Bump up to controller for possible action + Controller.getInstance().onNetworkMessage(peer, message); + break; + } + } + + private void onHandshakeCompleted(Peer peer) { + peer.startPings(); + + Message heightMessage = new HeightMessage(Controller.getInstance().getChainHeight()); + + if (!peer.sendMessage(heightMessage)) { + peer.disconnect(); + return; + } + + Message peersMessage = this.buildPeersMessage(); + if (!peer.sendMessage(peersMessage)) + peer.disconnect(); + } + + public Message buildPeersMessage() { + List<Peer> peers = new ArrayList<>(); + + synchronized (this.connectedPeers) { + // Only outbound peer connections that have completed handshake + peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED) + .collect(Collectors.toList()); + } + + return new PeersMessage(peers); + } + + // Network-wide calls + + private List<Peer> getCompletedPeers() { + List<Peer> completedPeers = new ArrayList<>(); + + synchronized (this.connectedPeers) { + completedPeers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); + } + + return completedPeers; + } + + private void mergePeers(List<InetSocketAddress> peerAddresses) throws DataException { + try (final Repository repository = RepositoryManager.getRepository()) { + List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers(); + + // Resolve known peer hostnames + Function<PeerData, InetSocketAddress> peerDataToSocketAddress = peerData -> new InetSocketAddress(peerData.getSocketAddress().getHostString(), + peerData.getSocketAddress().getPort()); + List<InetSocketAddress> knownPeerAddresses = knownPeers.stream().map(peerDataToSocketAddress).collect(Collectors.toList()); + + // Filter out duplicates + Predicate<InetSocketAddress> addressKnown = peerAddress -> knownPeerAddresses.stream().anyMatch(knownAddress -> knownAddress.equals(peerAddress)); + peerAddresses.removeIf(addressKnown); + + // Save the rest into database + for (InetSocketAddress peerAddress : peerAddresses) { + PeerData peerData = new PeerData(peerAddress); + repository.getNetworkRepository().save(peerData); + } + + repository.saveChanges(); + } + } + + public void broadcast(Message message) { + class Broadcaster implements Runnable { + private List<Peer> targetPeers; + private Message message; + + public Broadcaster(List<Peer> targetPeers, Message message) { + this.targetPeers = targetPeers; + this.message = message; + } + + @Override + public void run() { + for (Peer peer : targetPeers) + if (!peer.sendMessage(message)) + peer.disconnect(); + } + } + + peerExecutor.execute(new Broadcaster(this.getCompletedPeers(), message)); + } + + public void shutdown() { + peerExecutor.shutdownNow(); + + this.interrupt(); + } + +} diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java new file mode 100644 index 00000000..b004fcd8 --- /dev/null +++ b/src/main/java/org/qora/network/Peer.java @@ -0,0 +1,302 @@ +package org.qora.network; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.security.SecureRandom; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qora.controller.Controller; +import org.qora.data.network.PeerData; +import org.qora.network.message.Message; +import org.qora.network.message.Message.MessageType; +import org.qora.network.message.PingMessage; +import org.qora.network.message.VersionMessage; +import org.qora.utils.NTP; + +// For managing one peer +public class Peer implements Runnable { + + private static final Logger LOGGER = LogManager.getLogger(Peer.class); + + private static final int CONNECT_TIMEOUT = 1000; // ms + private static final int RESPONSE_TIMEOUT = 5000; // ms + private static final int PING_INTERVAL = 20000; // ms - just under every 30s is usually ideal to keep NAT mappings refreshed + private static final int INACTIVITY_TIMEOUT = 30000; // ms + + private final boolean isOutbound; + private Socket socket = null; + private PeerData peerData = null; + private InetSocketAddress remoteSocketAddress = null; + private Long connectionTimestamp = null; + private OutputStream out; + private Handshake handshakeStatus = Handshake.STARTED; + private Map<Integer, BlockingQueue<Message>> messages; + private VersionMessage versionMessage = null; + private Integer version; + private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private Long lastPing = null; + + /** Construct unconnected outbound Peer using socket address in peer data */ + public Peer(PeerData peerData) { + this.isOutbound = true; + this.peerData = peerData; + this.remoteSocketAddress = peerData.getSocketAddress(); + } + + /** Construct Peer using existing, connected socket */ + public Peer(Socket socket) { + this.isOutbound = false; + this.socket = socket; + this.remoteSocketAddress = (InetSocketAddress) this.socket.getRemoteSocketAddress(); + } + + // Getters / setters + + public PeerData getPeerData() { + return this.peerData; + } + + public boolean isOutbound() { + return this.isOutbound; + } + + public Handshake getHandshakeStatus() { + return this.handshakeStatus; + } + + public void setHandshakeStatus(Handshake handshakeStatus) { + this.handshakeStatus = handshakeStatus; + } + + public InetSocketAddress getRemoteSocketAddress() { + return this.remoteSocketAddress; + } + + public VersionMessage getVersionMessage() { + return this.versionMessage; + } + + public void setVersionMessage(VersionMessage versionMessage) { + this.versionMessage = versionMessage; + + if (this.versionMessage.getVersionString().startsWith(Controller.VERSION_PREFIX)) { + int index = Controller.VERSION_PREFIX.length(); + try { + this.version = Integer.parseInt(this.versionMessage.getVersionString().substring(index, index + 1)); + } catch (NumberFormatException e) { + this.version = 1; + } + } else { + this.version = 1; + } + } + + public Integer getVersion() { + return this.version; + } + + public Long getConnectionTimestamp() { + return this.connectionTimestamp; + } + + public Long getLastPing() { + return this.lastPing; + } + + public void setLastPing(long lastPing) { + this.lastPing = lastPing; + } + + // Processing + + private void setup() throws IOException { + this.socket.setSoTimeout(INACTIVITY_TIMEOUT); + this.out = this.socket.getOutputStream(); + this.connectionTimestamp = NTP.getTime(); + this.messages = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>()); + } + + public boolean connect() { + LOGGER.trace(String.format("Connecting to peer %s", this.remoteSocketAddress)); + this.socket = new Socket(); + + try { + InetSocketAddress resolvedSocketAddress = new InetSocketAddress(this.remoteSocketAddress.getHostString(), this.remoteSocketAddress.getPort()); + + this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT); + LOGGER.debug(String.format("Connected to peer %s", this.remoteSocketAddress)); + } catch (SocketTimeoutException e) { + LOGGER.trace(String.format("Connection timed out to peer %s", this.remoteSocketAddress)); + return false; + } catch (UnknownHostException e) { + LOGGER.trace(String.format("Connection failed to unresolved peer %s", this.remoteSocketAddress)); + return false; + } catch (IOException e) { + LOGGER.trace(String.format("Connection failed to peer %s", this.remoteSocketAddress)); + return false; + } + + return true; + } + + // Main thread + + @Override + public void run() { + Thread.currentThread().setName("Peer " + this.socket.getRemoteSocketAddress()); + + try (DataInputStream in = new DataInputStream(socket.getInputStream())) { + setup(); + + Network.getInstance().onPeerReady(this); + + while (true) { + // Wait (up to INACTIVITY_TIMEOUT) for, and parse, incoming message + Message message = Message.fromStream(in); + if (message == null) + return; + + // Find potential blocking queue for this id (expect null if id is -1) + BlockingQueue<Message> queue = this.messages.get(message.getId()); + if (queue != null) { + // Adding message to queue will unblock thread waiting for response + this.messages.get(message.getId()).add(message); + } else { + // Nothing waiting for this message - pass up to network + Network.getInstance().onMessage(this, message); + } + } + } catch (IOException e) { + // Fall-through + } finally { + this.disconnect(); + } + } + + /** + * Attempt to send Message to peer + * + * @param message + * @return <code>true</code> if message successfully sent; <code>false</code> otherwise + */ + public boolean sendMessage(Message message) { + if (this.socket.isClosed()) + return false; + + try { + // Send message + LOGGER.trace(String.format("Sending %s message to peer %s", message.getType().name(), this.getRemoteSocketAddress())); + + synchronized (this.out) { + this.out.write(message.toBytes()); + this.out.flush(); + } + } catch (IOException e) { + // Send failure + return false; + } + + // Sent OK + return true; + } + + /** + * Send message to peer and await response. + * <p> + * Message is assigned a random ID and sent. If a response with matching ID is received then it is returned to caller. + * <p> + * If no response with matching ID within timeout, or some other error/exception occurs, then return <code>null</code>. (Assume peer will be rapidly + * disconnected after this). + * + * @param message + * @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs + */ + public Message getResponse(Message message) { + BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<Message>(1); + + // Assign random ID to this message + int id; + do { + id = new SecureRandom().nextInt(Integer.MAX_VALUE - 1) + 1; + message.setId(id); + + // Put queue into map (keyed by message ID) so we can poll for a response + // If putIfAbsent() doesn't return null, then this id is already taken + } while (this.messages.putIfAbsent(id, blockingQueue) != null); + + // Try to send message + if (!this.sendMessage(message)) { + this.messages.remove(id); + return null; + } + + try { + Message response = blockingQueue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS); + return response; + } catch (InterruptedException e) { + // Our thread was interrupted. Probably in shutdown scenario. + return null; + } finally { + this.messages.remove(id); + } + } + + public void startPings() { + class Pinger implements Runnable { + private Peer peer; + + public Pinger(Peer peer) { + this.peer = peer; + } + + @Override + public void run() { + PingMessage pingMessage = new PingMessage(); + + long before = System.currentTimeMillis(); + Message message = peer.getResponse(pingMessage); + long after = System.currentTimeMillis(); + + if (message == null || message.getType() != MessageType.PING) + peer.disconnect(); + + peer.setLastPing(after - before); + } + } + ; + + this.executor.scheduleWithFixedDelay(new Pinger(this), 0, PING_INTERVAL, TimeUnit.MILLISECONDS); + } + + public void disconnect() { + // Shut down pinger + this.executor.shutdownNow(); + + // Close socket + if (!this.socket.isClosed()) { + LOGGER.debug(String.format("Disconnected peer %s", this.getRemoteSocketAddress())); + + try { + this.socket.close(); + } catch (IOException e) { + } + } + + Network.getInstance().onDisconnect(this); + } + +} diff --git a/src/main/java/org/qora/network/Proof.java b/src/main/java/org/qora/network/Proof.java new file mode 100644 index 00000000..d32e7f05 --- /dev/null +++ b/src/main/java/org/qora/network/Proof.java @@ -0,0 +1,112 @@ +package org.qora.network; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.HashSet; + +import org.qora.network.message.ProofMessage; + +import com.google.common.primitives.Longs; + +public class Proof extends Thread { + + private static final int MIN_PROOF_ZEROS = 2; + private static final MessageDigest sha256; + static { + try { + sha256 = MessageDigest.getInstance("SHA256"); + } catch (NoSuchAlgorithmException e) { + // Can't progress + throw new RuntimeException("Message digest SHA256 not available"); + } + } + + private static final HashSet<Long> seenSalts = new HashSet<>(); + + private Peer peer; + + public Proof(Peer peer) { + this.peer = peer; + setDaemon(true); + } + + public static boolean seenSalt(long salt) { + synchronized (seenSalts) { + return seenSalts.contains(salt); + } + } + + public static void addSalt(long salt) { + synchronized (seenSalts) { + seenSalts.add(salt); + } + } + + @Override + public void run() { + setName("Proof for peer " + this.peer.getRemoteSocketAddress()); + + // Do proof-of-work calculation to gain acceptance with remote end + + // Remote end knows this (approximately) + long timestamp = this.peer.getConnectionTimestamp(); + + // Needs to be unique on the remote end + long salt = new SecureRandom().nextLong(); + + byte[] message = new byte[8 + 8 + 8]; // nonce + salt + timestamp + + byte[] saltBytes = Longs.toByteArray(salt); + System.arraycopy(saltBytes, 0, message, 8, saltBytes.length); + + byte[] timestampBytes = Longs.toByteArray(timestamp); + System.arraycopy(timestampBytes, 0, message, 8 + 8, timestampBytes.length); + + long nonce; + for (nonce = 0; nonce < Long.MAX_VALUE; ++nonce) { + // Check whether we're shutting down every so often + if ((nonce & 0xff) == 0 && Thread.currentThread().isInterrupted()) + // throw new InterruptedException("Interrupted during peer proof calculation"); + return; + + byte[] nonceBytes = Longs.toByteArray(nonce); + System.arraycopy(nonceBytes, 0, message, 0, nonceBytes.length); + + byte[] digest = sha256.digest(message); + + if (check(digest)) + break; + } + + ProofMessage proofMessage = new ProofMessage(timestamp, salt, nonce); + peer.sendMessage(proofMessage); + } + + private static boolean check(byte[] digest) { + int idx; + for (idx = 0; idx < MIN_PROOF_ZEROS; ++idx) + if (digest[idx] != 0) + break; + + return idx == MIN_PROOF_ZEROS; + } + + public static boolean check(long timestamp, long salt, long nonce) { + byte[] message = new byte[8 + 8 + 8]; + + byte[] saltBytes = Longs.toByteArray(salt); + System.arraycopy(saltBytes, 0, message, 8, saltBytes.length); + + byte[] timestampBytes = Longs.toByteArray(timestamp); + System.arraycopy(timestampBytes, 0, message, 8 + 8, timestampBytes.length); + + byte[] nonceBytes = Longs.toByteArray(nonce); + System.arraycopy(nonceBytes, 0, message, 0, nonceBytes.length); + + byte[] digest = sha256.digest(message); + + return check(digest); + } + +} diff --git a/src/main/java/org/qora/network/message/HeightMessage.java b/src/main/java/org/qora/network/message/HeightMessage.java new file mode 100644 index 00000000..8d599f05 --- /dev/null +++ b/src/main/java/org/qora/network/message/HeightMessage.java @@ -0,0 +1,47 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import com.google.common.primitives.Ints; + +public class HeightMessage extends Message { + + private int height; + + public HeightMessage(int height) { + this(-1, height); + } + + private HeightMessage(int id, int height) { + super(id, MessageType.HEIGHT); + + this.height = height; + } + + public int getHeight() { + return this.height; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int height = bytes.getInt(); + + return new HeightMessage(id, height); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Ints.toByteArray(this.height)); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/Message.java b/src/main/java/org/qora/network/message/Message.java new file mode 100644 index 00000000..03755e8a --- /dev/null +++ b/src/main/java/org/qora/network/message/Message.java @@ -0,0 +1,212 @@ +package org.qora.network.message; + +import java.util.Map; + +import org.qora.controller.Controller; +import org.qora.crypto.Crypto; + +import com.google.common.primitives.Ints; + +import static java.util.Arrays.stream; +import static java.util.stream.Collectors.toMap; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +public abstract class Message { + + // MAGIC(4) + TYPE(4) + HAS-ID(1) + ID?(4) + DATA-SIZE(4) + CHECKSUM?(4) + DATA?(*) + private static final int MAGIC_LENGTH = 4; + private static final int CHECKSUM_LENGTH = 4; + + private static final int MAX_DATA_SIZE = 1024 * 1024; // 1MB + + public enum MessageType { + GET_PEERS(1), + PEERS(2), + HEIGHT(3), + GET_SIGNATURES(4), + SIGNATURES(5), + GET_BLOCK(6), + BLOCK(7), + TRANSACTION(8), + PING(9), + VERSION(10), + PEER_ID(11), + PROOF(12); + + public final int value; + public final Method fromByteBuffer; + + private static final Map<Integer, MessageType> map = stream(MessageType.values()) + .collect(toMap(messageType -> messageType.value, messageType -> messageType)); + + private MessageType(int value) { + this.value = value; + + String[] classNameParts = this.name().toLowerCase().split("_"); + + for (int i = 0; i < classNameParts.length; ++i) + classNameParts[i] = classNameParts[i].substring(0, 1).toUpperCase().concat(classNameParts[i].substring(1)); + + String className = String.join("", classNameParts); + + Method fromByteBuffer; + try { + Class<?> subclass = Class.forName(String.join("", Message.class.getPackage().getName(), ".", className, "Message")); + + fromByteBuffer = subclass.getDeclaredMethod("fromByteBuffer", int.class, ByteBuffer.class); + } catch (ClassNotFoundException | NoSuchMethodException | SecurityException e) { + fromByteBuffer = null; + } + + this.fromByteBuffer = fromByteBuffer; + } + + public static MessageType valueOf(int value) { + return map.get(value); + } + + public Message fromBytes(int id, byte[] data) { + if (this.fromByteBuffer == null) + return null; + + try { + return (Message) this.fromByteBuffer.invoke(null, id, data == null ? null : ByteBuffer.wrap(data)); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + return null; + } + } + } + + private int id; + private MessageType type; + + protected Message(int id, MessageType type) { + this.id = id; + this.type = type; + } + + protected Message(MessageType type) { + this(-1, type); + } + + public boolean hasId() { + return this.id != -1; + } + + public int getId() { + return this.id; + } + + public void setId(int id) { + this.id = id; + } + + public MessageType getType() { + return this.type; + } + + public static Message fromStream(DataInputStream in) throws SocketTimeoutException { + try { + // Read only enough bytes to cover Message "magic" preamble + byte[] messageMagic = new byte[MAGIC_LENGTH]; + in.readFully(messageMagic); + + if (!Arrays.equals(messageMagic, Controller.getInstance().getMessageMagic())) + // Didn't receive correct Message "magic" + return null; + + int typeValue = in.readInt(); + MessageType messageType = MessageType.valueOf(typeValue); + if (messageType == null) + // Unrecognised message type + return null; + + // Find supporting object + + int hasId = in.read(); + int id = -1; + if (hasId != 0) { + id = in.readInt(); + + if (id <= 0) + // Invalid ID + return null; + } + + int dataSize = in.readInt(); + + if (dataSize > MAX_DATA_SIZE) + // Too large + return null; + + byte[] data = null; + if (dataSize > 0) { + byte[] expectedChecksum = new byte[CHECKSUM_LENGTH]; + in.readFully(expectedChecksum); + + data = new byte[dataSize]; + in.readFully(data); + + // Test checksum + byte[] actualChecksum = generateChecksum(data); + if (!Arrays.equals(expectedChecksum, actualChecksum)) + return null; + } + + return messageType.fromBytes(id, data); + } catch (SocketTimeoutException e) { + throw e; + } catch (IOException e) { + return null; + } + } + + protected static byte[] generateChecksum(byte[] data) { + return Arrays.copyOfRange(Crypto.digest(data), 0, CHECKSUM_LENGTH); + } + + public byte[] toBytes() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + // Magic + bytes.write(Controller.getInstance().getMessageMagic()); + + bytes.write(Ints.toByteArray(this.type.value)); + + if (this.hasId()) { + bytes.write(1); + + bytes.write(Ints.toByteArray(this.id)); + } else { + bytes.write(0); + } + + byte[] data = this.toData(); + if (data == null) + return null; + + bytes.write(Ints.toByteArray(data.length)); + + if (data.length > 0) { + bytes.write(generateChecksum(data)); + bytes.write(data); + } + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + + protected abstract byte[] toData(); + +} diff --git a/src/main/java/org/qora/network/message/PeerIdMessage.java b/src/main/java/org/qora/network/message/PeerIdMessage.java new file mode 100644 index 00000000..75ce0392 --- /dev/null +++ b/src/main/java/org/qora/network/message/PeerIdMessage.java @@ -0,0 +1,52 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import org.qora.network.Network; + +public class PeerIdMessage extends Message { + + private byte[] peerId; + + public PeerIdMessage(byte[] peerId) { + this(-1, peerId); + } + + private PeerIdMessage(int id, byte[] peerId) { + super(id, MessageType.PEER_ID); + + this.peerId = peerId; + } + + public byte[] getPeerId() { + return this.peerId; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + if (bytes.remaining() != Network.PEER_ID_LENGTH) + return null; + + byte[] peerId = new byte[Network.PEER_ID_LENGTH]; + + bytes.get(peerId); + + return new PeerIdMessage(id, peerId); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(this.peerId); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/PeersMessage.java b/src/main/java/org/qora/network/message/PeersMessage.java new file mode 100644 index 00000000..9c6aa448 --- /dev/null +++ b/src/main/java/org/qora/network/message/PeersMessage.java @@ -0,0 +1,93 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.qora.network.Peer; + +import com.google.common.primitives.Ints; + +// NOTE: this legacy message only supports 4-byte IPv4 addresses and doesn't send port number either +public class PeersMessage extends Message { + + private static final int ADDRESS_LENGTH = 4; + + private List<InetAddress> peerAddresses; + + public PeersMessage(List<Peer> peers) { + super(-1, MessageType.PEERS); + + // We have to forcibly resolve into IP addresses as we can't send hostnames + this.peerAddresses = new ArrayList<>(); + + for (Peer peer : peers) { + try { + InetAddress resolvedAddress = InetAddress.getByName(peer.getRemoteSocketAddress().getHostString()); + + // Filter out unsupported address types + if (resolvedAddress.getAddress().length != ADDRESS_LENGTH) + continue; + + this.peerAddresses.add(resolvedAddress); + } catch (UnknownHostException e) { + // Couldn't resolve + continue; + } + } + } + + private PeersMessage(int id, List<InetAddress> peerAddresses) { + super(id, MessageType.PEERS); + + this.peerAddresses = peerAddresses; + } + + public List<InetAddress> getPeerAddresses() { + return this.peerAddresses; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int count = bytes.getInt(); + + if (bytes.remaining() != count * ADDRESS_LENGTH) + return null; + + List<InetAddress> peerAddresses = new ArrayList<>(); + + byte[] addressBytes = new byte[ADDRESS_LENGTH]; + + try { + for (int i = 0; i < count; ++i) { + bytes.get(addressBytes); + peerAddresses.add(InetAddress.getByAddress(addressBytes)); + } + } catch (UnknownHostException e) { + return null; + } + + return new PeersMessage(id, peerAddresses); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Ints.toByteArray(this.peerAddresses.size())); + + for (InetAddress peerAddress : this.peerAddresses) + bytes.write(peerAddress.getAddress()); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/PingMessage.java b/src/main/java/org/qora/network/message/PingMessage.java new file mode 100644 index 00000000..4a9203fa --- /dev/null +++ b/src/main/java/org/qora/network/message/PingMessage.java @@ -0,0 +1,25 @@ +package org.qora.network.message; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +public class PingMessage extends Message { + + public PingMessage() { + this(-1); + } + + private PingMessage(int id) { + super(id, MessageType.PING); + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + return new PingMessage(id); + } + + @Override + protected byte[] toData() { + return new byte[0]; + } + +} diff --git a/src/main/java/org/qora/network/message/ProofMessage.java b/src/main/java/org/qora/network/message/ProofMessage.java new file mode 100644 index 00000000..a0135113 --- /dev/null +++ b/src/main/java/org/qora/network/message/ProofMessage.java @@ -0,0 +1,63 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import com.google.common.primitives.Longs; + +public class ProofMessage extends Message { + + private long timestamp; + private long salt; + private long nonce; + + public ProofMessage(long timestamp, long salt, long nonce) { + this(-1, timestamp, salt, nonce); + } + + private ProofMessage(int id, long timestamp, long salt, long nonce) { + super(id, MessageType.PROOF); + + this.timestamp = timestamp; + this.salt = salt; + this.nonce = nonce; + } + + public long getTimestamp() { + return this.timestamp; + } + + public long getSalt() { + return this.salt; + } + + public long getNonce() { + return this.nonce; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + long timestamp = bytes.getLong(); + long salt = bytes.getLong(); + long nonce = bytes.getLong(); + + return new ProofMessage(id, timestamp, salt, nonce); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Longs.toByteArray(this.timestamp)); + bytes.write(Longs.toByteArray(this.salt)); + bytes.write(Longs.toByteArray(this.nonce)); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/VersionMessage.java b/src/main/java/org/qora/network/message/VersionMessage.java new file mode 100644 index 00000000..802cee4c --- /dev/null +++ b/src/main/java/org/qora/network/message/VersionMessage.java @@ -0,0 +1,67 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import org.qora.utils.Serialization; + +import com.google.common.primitives.Longs; + +public class VersionMessage extends Message { + + private long buildTimestamp; + private String versionString; + + public VersionMessage(long buildTimestamp, String versionString) { + this(-1, buildTimestamp, versionString); + } + + private VersionMessage(int id, long buildTimestamp, String versionString) { + super(id, MessageType.VERSION); + + this.buildTimestamp = buildTimestamp; + this.versionString = versionString; + } + + public long getBuildTimestamp() { + return this.buildTimestamp; + } + + public String getVersionString() { + return this.versionString; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + long buildTimestamp = bytes.getLong(); + + int versionStringLength = bytes.getInt(); + + if (versionStringLength != bytes.remaining()) + return null; + + byte[] versionBytes = new byte[versionStringLength]; + bytes.get(versionBytes); + + String versionString = new String(versionBytes, "UTF-8"); + + return new VersionMessage(id, buildTimestamp, versionString); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Longs.toByteArray(this.buildTimestamp)); + + Serialization.serializeSizedString(bytes, this.versionString); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/repository/NetworkRepository.java b/src/main/java/org/qora/repository/NetworkRepository.java new file mode 100644 index 00000000..55abf726 --- /dev/null +++ b/src/main/java/org/qora/repository/NetworkRepository.java @@ -0,0 +1,15 @@ +package org.qora.repository; + +import java.util.List; + +import org.qora.data.network.PeerData; + +public interface NetworkRepository { + + public List<PeerData> getAllPeers() throws DataException; + + public void save(PeerData peerData) throws DataException; + + public int delete(PeerData peerData) throws DataException; + +} diff --git a/src/main/java/org/qora/repository/Repository.java b/src/main/java/org/qora/repository/Repository.java index 5b87717c..bbe1a06a 100644 --- a/src/main/java/org/qora/repository/Repository.java +++ b/src/main/java/org/qora/repository/Repository.java @@ -14,6 +14,8 @@ public interface Repository extends AutoCloseable { public NameRepository getNameRepository(); + public NetworkRepository getNetworkRepository(); + public TransactionRepository getTransactionRepository(); public VotingRepository getVotingRepository(); @@ -27,4 +29,6 @@ public interface Repository extends AutoCloseable { public void rebuild() throws DataException; + public void setDebug(boolean debugState); + } diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java index 3be6a2da..54ea6175 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java @@ -500,6 +500,12 @@ public class HSQLDBDatabaseUpdates { + "ban_reference Signature, PRIMARY KEY (signature), FOREIGN KEY (signature) REFERENCES Transactions (signature) ON DELETE CASCADE)"); break; + case 30: + // Networking + stmt.execute("CREATE TABLE Peers (hostname VARCHAR(255), port INTEGER, last_connected TIMESTAMP WITH TIME ZONE, last_attempted TIMESTAMP WITH TIME ZONE, " + + "last_height INTEGER, PRIMARY KEY (hostname, port))"); + break; + default: // nothing to do return false; diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java new file mode 100644 index 00000000..4d8b0992 --- /dev/null +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java @@ -0,0 +1,83 @@ +package org.qora.repository.hsqldb; + +import java.net.InetSocketAddress; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; + +import org.qora.data.network.PeerData; +import org.qora.repository.DataException; +import org.qora.repository.NetworkRepository; + +public class HSQLDBNetworkRepository implements NetworkRepository { + + protected HSQLDBRepository repository; + + public HSQLDBNetworkRepository(HSQLDBRepository repository) { + this.repository = repository; + } + + @Override + public List<PeerData> getAllPeers() throws DataException { + List<PeerData> peers = new ArrayList<>(); + + try (ResultSet resultSet = this.repository.checkedExecute("SELECT hostname, port, last_connected, last_attempted, last_height FROM Peers")) { + if (resultSet == null) + return peers; + + // NOTE: do-while because checkedExecute() above has already called rs.next() for us + do { + String hostname = resultSet.getString(1); + int port = resultSet.getInt(2); + InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(hostname, port); + + Timestamp lastConnectedTimestamp = resultSet.getTimestamp(3, Calendar.getInstance(HSQLDBRepository.UTC)); + Long lastConnected = resultSet.wasNull() ? null : lastConnectedTimestamp.getTime(); + + Timestamp lastAttemptedTimestamp = resultSet.getTimestamp(4, Calendar.getInstance(HSQLDBRepository.UTC)); + Long lastAttempted = resultSet.wasNull() ? null : lastAttemptedTimestamp.getTime(); + + Integer lastHeight = resultSet.getInt(5); + if (resultSet.wasNull()) + lastHeight = null; + + peers.add(new PeerData(socketAddress, lastConnected, lastAttempted, lastHeight)); + } while (resultSet.next()); + + return peers; + } catch (SQLException e) { + throw new DataException("Unable to fetch poll votes from repository", e); + } + } + + @Override + public void save(PeerData peerData) throws DataException { + HSQLDBSaver saveHelper = new HSQLDBSaver("Peers"); + + Timestamp lastConnected = peerData.getLastConnected() == null ? null : new Timestamp(peerData.getLastConnected()); + Timestamp lastAttempted = peerData.getLastAttempted() == null ? null : new Timestamp(peerData.getLastAttempted()); + + saveHelper.bind("hostname", peerData.getSocketAddress().getHostString()).bind("port", peerData.getSocketAddress().getPort()) + .bind("last_connected", lastConnected).bind("last_attempted", lastAttempted).bind("last_height", peerData.getLastHeight()); + + try { + saveHelper.execute(this.repository); + } catch (SQLException e) { + throw new DataException("Unable to save peer into repository", e); + } + } + + @Override + public int delete(PeerData peerData) throws DataException { + try { + return this.repository.delete("Peers", "hostname = ? AND port = ?", peerData.getSocketAddress().getHostString(), + peerData.getSocketAddress().getPort()); + } catch (SQLException e) { + throw new DataException("Unable to delete peer from repository", e); + } + } + +} diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java index 3b7ddd7a..16db33ad 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java @@ -17,6 +17,7 @@ import org.qora.repository.BlockRepository; import org.qora.repository.GroupRepository; import org.qora.repository.DataException; import org.qora.repository.NameRepository; +import org.qora.repository.NetworkRepository; import org.qora.repository.Repository; import org.qora.repository.TransactionRepository; import org.qora.repository.VotingRepository; @@ -32,6 +33,7 @@ public class HSQLDBRepository implements Repository { public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); protected Connection connection; + protected boolean debugState = false; // NB: no visibility modifier so only callable from within same package HSQLDBRepository(Connection connection) { @@ -68,6 +70,11 @@ public class HSQLDBRepository implements Repository { return new HSQLDBNameRepository(this); } + @Override + public NetworkRepository getNetworkRepository() { + return new HSQLDBNetworkRepository(this); + } + @Override public TransactionRepository getTransactionRepository() { return new HSQLDBTransactionRepository(this); @@ -131,6 +138,11 @@ public class HSQLDBRepository implements Repository { public void rebuild() throws DataException { } + @Override + public void setDebug(boolean debugState) { + this.debugState = debugState; + } + /** * Execute SQL and return ResultSet with but added checking. * <p> @@ -143,6 +155,9 @@ public class HSQLDBRepository implements Repository { */ @SuppressWarnings("resource") public ResultSet checkedExecute(String sql, Object... objects) throws SQLException { + if (this.debugState) + LOGGER.debug(sql); + PreparedStatement preparedStatement = this.connection.prepareStatement(sql); // Close the PreparedStatement when the ResultSet is closed otherwise there's a potential resource leak. @@ -153,9 +168,9 @@ public class HSQLDBRepository implements Repository { ResultSet resultSet = this.checkedExecuteResultSet(preparedStatement, objects); - long queryTime = System.currentTimeMillis() - beforeQuery; + long queryTime = System.currentTimeMillis() - beforeQuery; if (queryTime > MAX_QUERY_TIME) - LOGGER.info(String.format("HSQLDB query took %d ms: %s", queryTime, sql)); + LOGGER.info(String.format("HSQLDB query took %d ms: %s", queryTime, sql)); return resultSet; } @@ -281,9 +296,9 @@ public class HSQLDBRepository implements Repository { * @param objects * @throws SQLException */ - public void delete(String tableName, String whereClause, Object... objects) throws SQLException { + public int delete(String tableName, String whereClause, Object... objects) throws SQLException { try (PreparedStatement preparedStatement = this.connection.prepareStatement("DELETE FROM " + tableName + " WHERE " + whereClause)) { - this.checkedExecuteUpdateCount(preparedStatement, objects); + return this.checkedExecuteUpdateCount(preparedStatement, objects); } } diff --git a/src/main/java/org/qora/settings/Settings.java b/src/main/java/org/qora/settings/Settings.java index cefc19f7..463c4790 100644 --- a/src/main/java/org/qora/settings/Settings.java +++ b/src/main/java/org/qora/settings/Settings.java @@ -33,10 +33,17 @@ public class Settings { /** Max milliseconds into future for accepting new, unconfirmed transactions */ private long maxTransactionTimestampFuture = 24 * 60 * 60 * 1000; // milliseconds - // RPC - private int rpcPort = 9085; - private List<String> rpcAllowed = new ArrayList<String>(Arrays.asList("127.0.0.1", "::1")); // ipv4, ipv6 - private boolean rpcEnabled = true; + // API + private int apiPort = 9085; + private List<String> apiAllowed = new ArrayList<String>(Arrays.asList("127.0.0.1", "::1")); // ipv4, ipv6 + private boolean apiEnabled = true; + + // Peer-to-peer networking + public static final int DEFAULT_LISTEN_PORT = 9084; + private int listenPort = DEFAULT_LISTEN_PORT; + private String bindAddress = null; // listen on all local addresses + private int minPeers = 3; + private int maxPeers = 10; // Constants private static final String SETTINGS_FILENAME = "settings.json"; @@ -60,6 +67,7 @@ public class Settings { break; } + LOGGER.info("Using settings file: " + path + filename); List<String> lines = Files.readLines(file, Charsets.UTF_8); // Concatenate lines for JSON parsing @@ -115,25 +123,39 @@ public class Settings { } private void process(JSONObject json) { - // RPC - if (json.containsKey("rpcport")) - this.rpcPort = ((Long) json.get("rpcport")).intValue(); + // API + if (json.containsKey("apiPort")) + this.apiPort = ((Long) json.get("apiPort")).intValue(); - if (json.containsKey("rpcallowed")) { - JSONArray allowedArray = (JSONArray) json.get("rpcallowed"); + if (json.containsKey("apiAllowed")) { + JSONArray allowedArray = (JSONArray) json.get("apiAllowed"); - this.rpcAllowed = new ArrayList<String>(); + this.apiAllowed = new ArrayList<String>(); for (Object entry : allowedArray) { if (!(entry instanceof String)) - throw new RuntimeException("Entry inside 'rpcallowed' is not string"); + throw new RuntimeException("Entry inside 'apiAllowed' is not string"); - this.rpcAllowed.add((String) entry); + this.apiAllowed.add((String) entry); } } - if (json.containsKey("rpcenabled")) - this.rpcEnabled = ((Boolean) json.get("rpcenabled")).booleanValue(); + if (json.containsKey("apiEnabled")) + this.apiEnabled = ((Boolean) json.get("apiEnabled")).booleanValue(); + + // Peer-to-peer networking + + if (json.containsKey("listenPort")) + this.listenPort = ((Long) getTypedJson(json, "listenPort", Long.class)).intValue(); + + if (json.containsKey("bindAddress")) + this.bindAddress = (String) getTypedJson(json, "bindAddress", String.class); + + if (json.containsKey("minPeers")) + this.minPeers = ((Long) getTypedJson(json, "minPeers", Long.class)).intValue(); + + if (json.containsKey("maxPeers")) + this.maxPeers = ((Long) getTypedJson(json, "maxPeers", Long.class)).intValue(); // Node-specific behaviour @@ -174,16 +196,36 @@ public class Settings { return this.userpath; } - public int getRpcPort() { - return this.rpcPort; + public int getApiPort() { + return this.apiPort; } - public List<String> getRpcAllowed() { - return this.rpcAllowed; + public List<String> getApiAllowed() { + return this.apiAllowed; } - public boolean isRpcEnabled() { - return this.rpcEnabled; + public boolean isApiEnabled() { + return this.apiEnabled; + } + + public int getListenPort() { + return this.listenPort; + } + + public int getDefaultListenPort() { + return DEFAULT_LISTEN_PORT; + } + + public String getBindAddress() { + return this.bindAddress; + } + + public int getMinPeers() { + return this.minPeers; + } + + public int getMaxPeers() { + return this.maxPeers; } public boolean useBitcoinTestNet() { diff --git a/src/main/resources/build.properties b/src/main/resources/build.properties new file mode 100644 index 00000000..7d9b532d --- /dev/null +++ b/src/main/resources/build.properties @@ -0,0 +1,2 @@ +build.timestamp=${build.timestamp} +build.version=${project.version}