Interim networking code commit in case of dev catastrophe!

DB shape change from v29 to add peer info.
New NetworkRepository to handle above.

Peer handshaking with v2 anti-DoS PoW code.
Handshaking refactored into a state-machine-like enum.
Some peer-related API calls added.

Peers exchange pings, heights, peers.

No actual peer sync yet.

Other changes:

Peer version info taken from Maven build properties/resource file.
AnnotationPostProcessor more resilient when fetching PathItems.
Per-repository session debugging flag that can be toggled at will.
HSQLDBRepository.delete() now returns int so callers can detect
 whether anything was actually deleted.
Some renaming to settings.
This commit is contained in:
catbref 2019-01-30 18:24:10 +00:00
parent 79b3074d01
commit 0db43451d4
32 changed files with 2445 additions and 53 deletions

View File

@ -1,11 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="target/generated-sources/package-info">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
@ -41,6 +35,12 @@
<attribute name="m2e-apt" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/classes" path="target/generated-sources/package-info">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="target/generated-test-sources/test-annotations">
<attributes>
<attribute name="optional" value="true"/>

View File

@ -10,6 +10,10 @@ rootLogger.appenderRef.rolling.ref = FILE
logger.hsqldb.name = hsqldb.db
logger.hsqldb.level = warn
# Support optional, per-session HSQLDB debugging
logger.hsqldbDebug.name = org.qora.repository.hsqldb.HSQLDBRepository
logger.hsqldbDebug.level = debug
# Suppress extraneous Jersey warning
logger.jerseyInject.name = org.glassfish.jersey.internal.inject.Providers
logger.jerseyInject.level = error
@ -18,6 +22,16 @@ logger.jerseyInject.level = error
logger.txSearch.name = org.qora.repository.hsqldb.transaction.HSQLDBTransactionRepository
logger.txSearch.level = trace
# Debug block generator
logger.blockgen.name = org.qora.block.BlockGenerator
logger.blockgen.level = trace
# Debug networking
logger.network.name = org.qora.network.Network
logger.network.level = trace
logger.peer.name = org.qora.network.Peer
logger.peer.level = trace
appender.console.type = Console
appender.console.name = stdout
appender.console.layout.type = PatternLayout

18
pom.xml
View File

@ -8,6 +8,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<bouncycastle.version>1.60</bouncycastle.version>
<dagger.version>1.2.2</dagger.version>
<hsqldb.version>r5836</hsqldb.version>
<jetty.version>9.4.11.v20180605</jetty.version>
<jersey.version>2.27</jersey.version>
@ -16,10 +17,17 @@
<swagger-api.version>2.0.6</swagger-api.version>
<swagger-ui.version>3.19.0</swagger-ui.version>
<felix-bundle-plugin.version>3.5.0</felix-bundle-plugin.version>
<build.timestamp>${maven.build.timestamp}</build.timestamp>
</properties>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
@ -357,6 +365,11 @@
<artifactId>jetty-rewrite</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<!-- Jersey -->
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
@ -416,5 +429,10 @@
<artifactId>bcprov-jdk15on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bctls-jdk15on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -31,11 +31,11 @@ public class ApiService {
config.register(AnnotationPostProcessor.class);
// Create RPC server
this.server = new Server(Settings.getInstance().getRpcPort());
this.server = new Server(Settings.getInstance().getApiPort());
// IP address based access control
InetAccessHandler accessHandler = new InetAccessHandler();
for (String pattern : Settings.getInstance().getRpcAllowed()) {
for (String pattern : Settings.getInstance().getApiAllowed()) {
accessHandler.include(pattern);
}
this.server.setHandler(accessHandler);

View File

@ -0,0 +1,34 @@
package org.qora.api.model;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import org.qora.network.Peer;
@XmlAccessorType(XmlAccessType.FIELD)
public class ConnectedPeer {
public String hostname;
public int port;
public Long lastPing;
public Integer lastHeight;
public enum Direction {
INBOUND,
OUTBOUND;
}
public Direction direction;
protected ConnectedPeer() {
}
public ConnectedPeer(Peer peer) {
this.hostname = peer.getRemoteSocketAddress().getHostString();
this.port = peer.getRemoteSocketAddress().getPort();
this.lastPing = peer.getLastPing();
this.direction = peer.isOutbound() ? Direction.OUTBOUND : Direction.INBOUND;
this.lastHeight = peer.getPeerData() == null ? null : peer.getPeerData().getLastHeight();
}
}

View File

@ -73,7 +73,7 @@ public class AdminResource {
new Thread(new Runnable() {
@Override
public void run() {
Controller.shutdown();
Controller.getInstance().shutdownAndExit();
}
}).start();

View File

@ -69,6 +69,12 @@ public class AnnotationPostProcessor implements ReaderListener {
LOGGER.trace("Found @ApiErrors annotation on " + clazz.getSimpleName() + "." + method.getName());
PathItem pathItem = getPathItemFromMethod(openAPI, classPathString, method);
if (pathItem == null) {
LOGGER.error(String.format("Couldn't get PathItem for %s", clazz.getSimpleName() + "." + method.getName()));
LOGGER.error(String.format("Known paths: %s", String.join(", ", openAPI.getPaths().keySet())));
continue;
}
for (Operation operation : pathItem.readOperations())
for (ApiError apiError : apiErrors.value())
addApiErrorResponse(operation, apiError);
@ -82,6 +88,10 @@ public class AnnotationPostProcessor implements ReaderListener {
return openAPI.getPaths().get(classPathString);
String pathString = path.value();
if (pathString.equals("/"))
pathString = "";
return openAPI.getPaths().get(classPathString + pathString);
}
@ -107,7 +117,7 @@ public class AnnotationPostProcessor implements ReaderListener {
// XXX: addExamples(..) is not working in Swagger 2.0.4. This bug is referenced in https://github.com/swagger-api/swagger-ui/issues/2651
// Replace the call to .setExample(..) by .addExamples(..) when the bug is fixed.
apiResponse.getContent().get(javax.ws.rs.core.MediaType.APPLICATION_JSON).setExample(example);
//apiResponse.getContent().get(javax.ws.rs.core.MediaType.APPLICATION_JSON).addExamples(Integer.toString(apiErrorCode), example);
// apiResponse.getContent().get(javax.ws.rs.core.MediaType.APPLICATION_JSON).addExamples(Integer.toString(apiErrorCode), example);
}
}

View File

@ -16,6 +16,7 @@ import io.swagger.v3.oas.annotations.tags.Tag;
@Tag(name = "Groups"),
@Tag(name = "Names"),
@Tag(name = "Payments"),
@Tag(name = "Peers"),
@Tag(name = "Transactions"),
@Tag(name = "Utilities")
},

View File

@ -0,0 +1,229 @@
package org.qora.api.resource;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.qora.api.ApiError;
import org.qora.api.ApiErrors;
import org.qora.api.ApiException;
import org.qora.api.ApiExceptionFactory;
import org.qora.api.Security;
import org.qora.api.model.ConnectedPeer;
import org.qora.data.network.PeerData;
import org.qora.network.Network;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
import org.qora.repository.RepositoryManager;
import org.qora.settings.Settings;
@Path("/peers")
@Produces({
MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN
})
@Tag(
name = "Peers"
)
public class PeersResource {
@Context
HttpServletRequest request;
@GET
@Operation(
summary = "Fetch list of connected peers",
responses = {
@ApiResponse(
content = @Content(
mediaType = MediaType.APPLICATION_JSON,
array = @ArraySchema(
schema = @Schema(
implementation = ConnectedPeer.class
)
)
)
)
}
)
public List<ConnectedPeer> getPeers() {
return Network.getInstance().getConnectedPeers().stream().map(peer -> new ConnectedPeer(peer)).collect(Collectors.toList());
}
@GET
@Path("/known")
@Operation(
summary = "Fetch list of all known peers",
responses = {
@ApiResponse(
content = @Content(
mediaType = MediaType.APPLICATION_JSON,
array = @ArraySchema(
schema = @Schema(
implementation = PeerData.class
)
)
)
)
}
)
@ApiErrors({
ApiError.REPOSITORY_ISSUE
})
public List<PeerData> getKnownPeers() {
try (final Repository repository = RepositoryManager.getRepository()) {
return repository.getNetworkRepository().getAllPeers();
} catch (DataException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e);
}
}
@GET
@Path("/self")
@Operation(
summary = "Fetch list of peers that connect to self",
responses = {
@ApiResponse(
content = @Content(
mediaType = MediaType.APPLICATION_JSON,
array = @ArraySchema(
schema = @Schema(
implementation = PeerData.class
)
)
)
)
}
)
public List<PeerData> getSelfPeers() {
return Network.getInstance().getSelfPeers();
}
@POST
@Operation(
summary = "Add new peer address",
requestBody = @RequestBody(
required = true,
content = @Content(
mediaType = MediaType.TEXT_PLAIN,
schema = @Schema(
type = "string"
)
)
),
responses = {
@ApiResponse(
description = "true if accepted",
content = @Content(
schema = @Schema(
type = "string"
)
)
)
}
)
@ApiErrors({
ApiError.INVALID_DATA, ApiError.REPOSITORY_ISSUE
})
public String addPeer(String peerAddress) {
Security.checkApiCallAllowed(request);
try (final Repository repository = RepositoryManager.getRepository()) {
String[] peerParts = peerAddress.split(":");
// Expecting one or two parts
if (peerParts.length < 1 || peerParts.length > 2)
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
String hostname = peerParts[0];
int port = peerParts.length == 2 ? Integer.parseInt(peerParts[1]) : Settings.DEFAULT_LISTEN_PORT;
InetSocketAddress socketAddress = new InetSocketAddress(hostname, port);
PeerData peerData = new PeerData(socketAddress);
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
return "true";
} catch (IllegalArgumentException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
} catch (ApiException e) {
throw e;
} catch (DataException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e);
}
}
@DELETE
@Operation(
summary = "Remove peer address from database",
requestBody = @RequestBody(
required = true,
content = @Content(
mediaType = MediaType.TEXT_PLAIN,
schema = @Schema(
type = "string"
)
)
),
responses = {
@ApiResponse(
description = "true if removed, false if not found",
content = @Content(
schema = @Schema(
type = "string"
)
)
)
}
)
@ApiErrors({
ApiError.INVALID_DATA, ApiError.REPOSITORY_ISSUE
})
public String removePeer(String peerAddress) {
Security.checkApiCallAllowed(request);
try (final Repository repository = RepositoryManager.getRepository()) {
String[] peerParts = peerAddress.split(":");
// Expecting one or two parts
if (peerParts.length < 1 || peerParts.length > 2)
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
String hostname = peerParts[0];
int port = peerParts.length == 2 ? Integer.parseInt(peerParts[1]) : Settings.DEFAULT_LISTEN_PORT;
InetSocketAddress socketAddress = new InetSocketAddress(hostname, port);
PeerData peerData = new PeerData(socketAddress);
int numDeleted = repository.getNetworkRepository().delete(peerData);
repository.saveChanges();
return numDeleted != 0 ? "true" : "false";
} catch (IllegalArgumentException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
} catch (ApiException e) {
throw e;
} catch (DataException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e);
}
}
}

View File

@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qora.account.PrivateKeyAccount;
import org.qora.block.Block.ValidationResult;
import org.qora.controller.Controller;
import org.qora.data.block.BlockData;
import org.qora.data.transaction.TransactionData;
import org.qora.repository.BlockRepository;
@ -93,6 +94,9 @@ public class BlockGenerator extends Thread {
newBlock.process();
LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight());
repository.saveChanges();
// Notify controller
Controller.getInstance().onGeneratedBlock(newBlock.getBlockData());
} catch (DataException e) {
// Unable to process block - report and discard
LOGGER.error("Unable to process newly generated block?", e);

View File

@ -1,41 +1,85 @@
package org.qora.controller;
import java.io.IOException;
import java.io.InputStream;
import java.security.Security;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider;
import org.qora.api.ApiService;
import org.qora.block.BlockChain;
import org.qora.block.BlockGenerator;
import org.qora.data.block.BlockData;
import org.qora.network.Network;
import org.qora.network.Peer;
import org.qora.network.message.HeightMessage;
import org.qora.network.message.Message;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
import org.qora.repository.RepositoryFactory;
import org.qora.repository.RepositoryManager;
import org.qora.repository.hsqldb.HSQLDBRepositoryFactory;
import org.qora.settings.Settings;
import org.qora.utils.Base58;
public class Controller {
public class Controller extends Thread {
static {
// This must go before any calls to LogManager/Logger
System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager");
}
private static final Logger LOGGER = LogManager.getLogger(Controller.class);
public static final String connectionUrl = "jdbc:hsqldb:file:db/blockchain;create=true";
public static final long startTime = System.currentTimeMillis();
public static final String VERSION_PREFIX = "qora-core-";
private static final Logger LOGGER = LogManager.getLogger(Controller.class);
private static final Object shutdownLock = new Object();
private static boolean isStopping = false;
private static BlockGenerator blockGenerator = null;
private static Controller instance;
private final String buildVersion;
private final long buildTimestamp;
private static BlockGenerator blockGenerator;
private Controller() {
Properties properties = new Properties();
try (InputStream in = ClassLoader.getSystemResourceAsStream("build.properties")) {
properties.load(in);
} catch (IOException e) {
throw new RuntimeException("Can't read build.properties resource", e);
}
String buildTimestamp = properties.getProperty("build.timestamp");
if (buildTimestamp == null)
throw new RuntimeException("Can't read build.timestamp from build.properties resource");
this.buildTimestamp = LocalDateTime.parse(buildTimestamp, DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssX")).toEpochSecond(ZoneOffset.UTC);
String buildVersion = properties.getProperty("build.version");
if (buildVersion == null)
throw new RuntimeException("Can't read build.version from build.properties resource");
this.buildVersion = VERSION_PREFIX + buildVersion;
}
public static Controller getInstance() {
if (instance == null)
instance = new Controller();
return instance;
}
public static void main(String args[]) {
LOGGER.info("Starting up...");
Security.insertProviderAt(new BouncyCastleProvider(), 0);
Security.insertProviderAt(new BouncyCastleJsseProvider(), 1);
// Load/check settings, which potentially sets up blockchain config, etc.
Settings.getInstance();
@ -57,12 +101,15 @@ public class Controller {
System.exit(2);
}
LOGGER.info("Starting block generator");
byte[] privateKey = Base58.decode("A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6");
blockGenerator = new BlockGenerator(privateKey);
blockGenerator.start();
// XXX work to be done here!
if (args.length == 0) {
LOGGER.info("Starting block generator");
byte[] privateKey = Base58.decode("A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6");
blockGenerator = new BlockGenerator(privateKey);
blockGenerator.start();
}
LOGGER.info("Starting API");
LOGGER.info("Starting API on port " + Settings.getInstance().getApiPort());
try {
ApiService apiService = ApiService.getInstance();
apiService.start();
@ -71,35 +118,74 @@ public class Controller {
System.exit(1);
}
LOGGER.info("Starting networking");
try {
Network network = Network.getInstance();
network.start();
} catch (Exception e) {
LOGGER.error("Unable to start networking", e);
System.exit(1);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Controller.shutdown();
Controller.getInstance().shutdown();
}
});
LOGGER.info("Starting controller");
Controller.getInstance().start();
}
public static void shutdown() {
@Override
public void run() {
Thread.currentThread().setName("Controller");
try {
while (true) {
Thread.sleep(1000);
// Query random connections for their blockchain status
// If height > ours then potentially synchronize
// Query random connections for unconfirmed transactions
}
} catch (InterruptedException e) {
// time to exit
return;
}
}
public void shutdown() {
synchronized (shutdownLock) {
if (!isStopping) {
isStopping = true;
LOGGER.info("Shutting down controller");
this.interrupt();
LOGGER.info("Shutting down networking");
Network.getInstance().shutdown();
LOGGER.info("Shutting down API");
ApiService.getInstance().stop();
LOGGER.info("Shutting down block generator");
blockGenerator.shutdown();
try {
blockGenerator.join();
} catch (InterruptedException e) {
e.printStackTrace();
if (blockGenerator != null) {
LOGGER.info("Shutting down block generator");
blockGenerator.shutdown();
try {
blockGenerator.join();
} catch (InterruptedException e) {
// We were interrupted while waiting for thread to 'join'
}
}
try {
LOGGER.info("Shutting down repository");
RepositoryManager.closeRepositoryFactory();
} catch (DataException e) {
e.printStackTrace();
LOGGER.error("Error occurred while shutting down repository", e);
}
LOGGER.info("Shutdown complete!");
@ -107,4 +193,74 @@ public class Controller {
}
}
public void shutdownAndExit() {
this.shutdown();
System.exit(0);
}
public byte[] getMessageMagic() {
return new byte[] {
0x12, 0x34, 0x56, 0x78
};
}
public long getBuildTimestamp() {
return this.buildTimestamp;
}
public String getVersionString() {
return this.buildVersion;
}
public int getChainHeight() {
try (final Repository repository = RepositoryManager.getRepository()) {
return repository.getBlockRepository().getBlockchainHeight();
} catch (DataException e) {
LOGGER.error("Repository issue when fetching blockchain height", e);
return 0;
}
}
// Callbacks for/from network
public void doNetworkBroadcast() {
Network network = Network.getInstance();
// Send our known peers
network.broadcast(network.buildPeersMessage());
// Send our current height
network.broadcast(new HeightMessage(this.getChainHeight()));
}
public void onGeneratedBlock(BlockData newBlockData) {
// XXX we should really be broadcasting the new block sig, not height
// Could even broadcast top two block sigs so that remote peers can see new block references current network-wide last block
// Broadcast our new height
Network.getInstance().broadcast(new HeightMessage(newBlockData.getHeight()));
}
public void onNetworkMessage(Peer peer, Message message) {
LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer.getRemoteSocketAddress()));
switch (message.getType()) {
case HEIGHT:
HeightMessage heightMessage = (HeightMessage) message;
// If we connected to peer, then update our record of peer's height
if (peer.isOutbound())
peer.getPeerData().setLastHeight(heightMessage.getHeight());
// XXX we should instead test incoming block sigs to see if we have them, and if not do sync
// Is peer's blockchain longer than ours?
if (heightMessage.getHeight() > getChainHeight())
Synchronizer.getInstance().synchronize(peer);
break;
default:
break;
}
}
}

View File

@ -0,0 +1,54 @@
package org.qora.controller;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qora.network.Peer;
public class Synchronizer {
private static final Logger LOGGER = LogManager.getLogger(Synchronizer.class);
private static Synchronizer instance;
private Synchronizer() {
}
public static Synchronizer getInstance() {
if (instance == null)
instance = new Synchronizer();
return instance;
}
public void synchronize(Peer peer) {
// If we're already synchronizing with another peer then return
LOGGER.info(String.format("Synchronizing with peer %s", peer.getRemoteSocketAddress()));
// Peer has different latest block sig to us
// find common block?
// if common block is too far behind us then we're on massively different forks so give up, maybe human invention required to download desired fork
// unwind to common block (unless common block is our latest block)
// apply some newer blocks from peer
// commit
// If our block gen creates a block while we do this - what happens?
// does repository serialization prevent issues?
// blockgen: block 123: pay X from A to B, commit
// sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit
// and vice versa?
// sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit
// blockgen: block 123: pay X from A to B, commit
// simply block syncing when generating and vice versa by grabbing a Controller-owned non-blocking mutex?
}
}

View File

@ -0,0 +1,71 @@
package org.qora.crypto;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.KeySpec;
import java.security.spec.PKCS8EncodedKeySpec;
public class Credentials {
public static Certificate readCertFile(String certPath) throws CertificateException {
if (certPath == null || !new File(certPath).exists())
return null;
CertificateFactory certFactory;
try {
certFactory = CertificateFactory.getInstance("X509", "BC");
File certFile = new File(certPath);
try (InputStream certStream = new FileInputStream(certFile)) {
return certFactory.generateCertificate(certStream);
}
} catch (FileNotFoundException e) {
return null;
} catch (CertificateException | IOException | NoSuchProviderException e) {
throw new CertificateException(e);
}
}
public static Certificate readCertResource(String certResource) throws CertificateException {
ClassLoader loader = Credentials.class.getClassLoader();
try (InputStream inputStream = loader.getResourceAsStream(certResource)) {
if (inputStream == null)
return null;
CertificateFactory certFactory;
try {
certFactory = CertificateFactory.getInstance("X509", "BC");
return certFactory.generateCertificate(inputStream);
} catch (CertificateException | NoSuchProviderException e) {
throw new CertificateException(e);
}
} catch (IOException e) {
return null;
}
}
public static PrivateKey loadPrivateKey(String privateKeyPath, String keyAlgorithm) throws IOException, NoSuchAlgorithmException, InvalidKeySpecException {
if (privateKeyPath == null || !new File(privateKeyPath).exists())
throw new FileNotFoundException("Private key file not found at " + privateKeyPath);
File file = new File(privateKeyPath);
byte[] privKeyBytes = new byte[(int) file.length()];
try (InputStream in = new FileInputStream(file)) {
in.read(privKeyBytes);
}
KeyFactory keyFactory = KeyFactory.getInstance(keyAlgorithm);
KeySpec ks = new PKCS8EncodedKeySpec(privKeyBytes);
return keyFactory.generatePrivate(ks);
}
}

View File

@ -0,0 +1,65 @@
package org.qora.data.network;
import java.net.InetSocketAddress;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
// All properties to be converted to JSON via JAX-RS
@XmlAccessorType(XmlAccessType.FIELD)
public class PeerData {
// Properties
private InetSocketAddress socketAddress;
private Long lastAttempted;
private Long lastConnected;
private Integer lastHeight;
// Constructors
// necessary for JAX-RS serialization
protected PeerData() {
}
public PeerData(InetSocketAddress socketAddress, Long lastAttempted, Long lastConnected, Integer lastHeight) {
this.socketAddress = socketAddress;
this.lastAttempted = lastAttempted;
this.lastConnected = lastConnected;
this.lastHeight = lastHeight;
}
public PeerData(InetSocketAddress socketAddress) {
this(socketAddress, null, null, null);
}
// Getters / setters
public InetSocketAddress getSocketAddress() {
return this.socketAddress;
}
public Long getLastAttempted() {
return this.lastAttempted;
}
public void setLastAttempted(Long lastAttempted) {
this.lastAttempted = lastAttempted;
}
public Long getLastConnected() {
return this.lastConnected;
}
public void setLastConnected(Long lastConnected) {
this.lastConnected = lastConnected;
}
public Integer getLastHeight() {
return this.lastHeight;
}
public void setLastHeight(Integer lastHeight) {
this.lastHeight = lastHeight;
}
}

View File

@ -0,0 +1,151 @@
package org.qora.network;
import java.util.Arrays;
import org.qora.controller.Controller;
import org.qora.network.message.Message;
import org.qora.network.message.Message.MessageType;
import org.qora.utils.NTP;
import org.qora.network.message.PeerIdMessage;
import org.qora.network.message.ProofMessage;
import org.qora.network.message.VersionMessage;
public enum Handshake {
STARTED(null) {
@Override
public Handshake onMessage(Peer peer, Message message) {
return VERSION;
}
@Override
public void action(Peer peer) {
}
},
VERSION(MessageType.VERSION) {
@Override
public Handshake onMessage(Peer peer, Message message) {
peer.setVersionMessage((VersionMessage) message);
return SELF_CHECK;
}
@Override
public void action(Peer peer) {
sendVersion(peer);
}
},
SELF_CHECK(MessageType.PEER_ID) {
@Override
public Handshake onMessage(Peer peer, Message message) {
PeerIdMessage peerIdMessage = (PeerIdMessage) message;
if (Arrays.equals(peerIdMessage.getPeerId(), Network.getInstance().getOurPeerId())) {
// Connected to self!
// If outgoing connection then record destination as self so we don't try again
if (peer.isOutbound())
Network.getInstance().noteToSelf(peer);
else
// We still need to send our ID so our outbound connection can mark their address as 'self'
sendMyId(peer);
// Handshake failure - caller will deal with disconnect
return null;
}
// If we're both version 2 peers then next stage is proof
if (peer.getVersion() >= 2)
return PROOF;
// Fall-back for older clients (for now)
return COMPLETED;
}
@Override
public void action(Peer peer) {
sendMyId(peer);
}
},
PROOF(MessageType.PROOF) {
@Override
public Handshake onMessage(Peer peer, Message message) {
ProofMessage proofMessage = (ProofMessage) message;
// Check peer's timestamp is within acceptable bounds
if (Math.abs(proofMessage.getTimestamp() - peer.getConnectionTimestamp()) > MAX_TIMESTAMP_DELTA)
return null;
// If we connected outbound to peer, then this is a faked confirmation response, so we're good
if (peer.isOutbound())
return COMPLETED;
// Check salt hasn't been seen before - this stops multiple peers reusing salt nonce in a Sybil-like attack
if (Proof.seenSalt(proofMessage.getSalt()))
return null;
if (!Proof.check(proofMessage.getTimestamp(), proofMessage.getSalt(), proofMessage.getNonce()))
return null;
// Proof valid
return COMPLETED;
}
@Override
public void action(Peer peer) {
sendProof(peer);
}
},
COMPLETED(null) {
@Override
public Handshake onMessage(Peer peer, Message message) {
// Handshake completed
return null;
}
@Override
public void action(Peer peer) {
// Note: this is only called when we've made outbound connection
// Make a note that we've successfully completed handshake (and when)
peer.getPeerData().setLastConnected(NTP.getTime());
}
};
private static final long MAX_TIMESTAMP_DELTA = 1000; // ms
public final MessageType expectedMessageType;
private Handshake(MessageType expectedMessageType) {
this.expectedMessageType = expectedMessageType;
}
public abstract Handshake onMessage(Peer peer, Message message);
public abstract void action(Peer peer);
private static void sendVersion(Peer peer) {
long buildTimestamp = Controller.getInstance().getBuildTimestamp();
String versionString = Controller.getInstance().getVersionString();
Message versionMessage = new VersionMessage(buildTimestamp, versionString);
if (!peer.sendMessage(versionMessage))
peer.disconnect();
}
private static void sendMyId(Peer peer) {
Message peerIdMessage = new PeerIdMessage(Network.getInstance().getOurPeerId());
if (!peer.sendMessage(peerIdMessage))
peer.disconnect();
}
private static void sendProof(Peer peer) {
if (peer.isOutbound()) {
// For outbound connections we need to generate real proof
new Proof(peer).start();
} else {
// For incoming connections we only need to send a fake proof message as confirmation
Message proofMessage = new ProofMessage(peer.getConnectionTimestamp(), 0, 0);
if (!peer.sendMessage(proofMessage))
peer.disconnect();
}
}
}

View File

@ -0,0 +1,445 @@
package org.qora.network;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qora.controller.Controller;
import org.qora.data.network.PeerData;
import org.qora.network.message.HeightMessage;
import org.qora.network.message.Message;
import org.qora.network.message.PeersMessage;
import org.qora.network.message.PingMessage;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
import org.qora.repository.RepositoryManager;
import org.qora.settings.Settings;
import org.qora.utils.NTP;
// For managing peers
public class Network extends Thread {
private static final Logger LOGGER = LogManager.getLogger(Network.class);
private static final int LISTEN_BACKLOG = 10;
private static final int CONNECT_FAILURE_BACKOFF = 60 * 1000; // ms
private static final int BROADCAST_INTERVAL = 60 * 1000; // ms
private static Network instance;
public static final int PEER_ID_LENGTH = 128;
private final byte[] ourPeerId;
private List<Peer> connectedPeers;
private List<PeerData> selfPeers;
private ServerSocket listenSocket;
private int minPeers;
private int maxPeers;
private ExecutorService peerExecutor;
private long nextBroadcast;
// Constructors
private Network() {
// Grab P2P port from settings
int listenPort = Settings.getInstance().getListenPort();
// Grab P2P bind address from settings
try {
InetAddress bindAddr = InetAddress.getByName(Settings.getInstance().getBindAddress());
InetSocketAddress endpoint = new InetSocketAddress(bindAddr, listenPort);
// Set up listen socket
listenSocket = new ServerSocket();
listenSocket.setReuseAddress(true);
listenSocket.setSoTimeout(1); // accept() calls block for at most 1ms
listenSocket.bind(endpoint, LISTEN_BACKLOG);
} catch (UnknownHostException e) {
LOGGER.error("Can't bind listen socket to address " + Settings.getInstance().getBindAddress());
throw new RuntimeException("Can't bind listen socket to address");
} catch (IOException e) {
LOGGER.error("Can't create listen socket");
throw new RuntimeException("Can't create listen socket");
}
connectedPeers = new ArrayList<>();
selfPeers = new ArrayList<>();
ourPeerId = new byte[PEER_ID_LENGTH];
new SecureRandom().nextBytes(ourPeerId);
minPeers = Settings.getInstance().getMinPeers();
maxPeers = Settings.getInstance().getMaxPeers();
peerExecutor = Executors.newCachedThreadPool();
nextBroadcast = System.currentTimeMillis();
}
// Getters / setters
public static Network getInstance() {
if (instance == null)
instance = new Network();
return instance;
}
public byte[] getOurPeerId() {
return this.ourPeerId;
}
public List<Peer> getConnectedPeers() {
synchronized (this.connectedPeers) {
return new ArrayList<>(this.connectedPeers);
}
}
public List<PeerData> getSelfPeers() {
synchronized (this.selfPeers) {
return new ArrayList<>(this.selfPeers);
}
}
public void noteToSelf(Peer peer) {
LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer.getRemoteSocketAddress()));
synchronized (this.selfPeers) {
this.selfPeers.add(peer.getPeerData());
}
}
// Main thread
@Override
public void run() {
Thread.currentThread().setName("Network");
// Maintain long-term connections to various peers' API applications
try {
while (true) {
acceptConnection();
createConnection();
if (System.currentTimeMillis() >= this.nextBroadcast) {
this.nextBroadcast = System.currentTimeMillis() + BROADCAST_INTERVAL;
// Controller can decide what to broadcast
Controller.getInstance().doNetworkBroadcast();
}
// Sleep for a while
Thread.sleep(1000);
}
} catch (InterruptedException e) {
// Fall-through to shutdown
} catch (DataException e) {
LOGGER.warn("Repository issue while running network", e);
// Fall-through to shutdown
}
// Shutdown
if (!this.listenSocket.isClosed())
try {
this.listenSocket.close();
} catch (IOException e) {
// Not important
}
}
@SuppressWarnings("resource")
private void acceptConnection() throws InterruptedException {
Socket socket;
try {
socket = this.listenSocket.accept();
} catch (SocketTimeoutException e) {
// No connections to accept
return;
} catch (IOException e) {
// Something went wrong or listen socket was closed due to shutdown
return;
}
synchronized (this.connectedPeers) {
if (connectedPeers.size() >= maxPeers) {
// We have enough peers
LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress()));
try {
socket.close();
} catch (IOException e) {
// Not important
}
return;
}
LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress()));
Peer newPeer = new Peer(socket);
this.connectedPeers.add(newPeer);
peerExecutor.execute(newPeer);
}
}
private void createConnection() throws InterruptedException, DataException {
synchronized (this.connectedPeers) {
if (connectedPeers.size() >= minPeers)
return;
}
Peer newPeer;
try (final Repository repository = RepositoryManager.getRepository()) {
// Find an address to connect to
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
// Don't consider peers with recent connection failures
final long lastAttemptedThreshold = NTP.getTime() - CONNECT_FAILURE_BACKOFF;
peers.removeIf(peerData -> peerData.getLastAttempted() != null && peerData.getLastAttempted() > lastAttemptedThreshold);
// Don't consider peers that we know loop back to ourself
Predicate<PeerData> hasSamePeerSocketAddress = peerData -> this.selfPeers.stream()
.anyMatch(selfPeerData -> selfPeerData.getSocketAddress().equals(peerData.getSocketAddress()));
synchronized (this.selfPeers) {
peers.removeIf(hasSamePeerSocketAddress);
}
// Don't consider already connected peers
Predicate<PeerData> isConnectedPeer = peerData -> this.connectedPeers.stream()
.anyMatch(peer -> peer.getPeerData() != null && peer.getPeerData().getSocketAddress().equals(peerData.getSocketAddress()));
synchronized (this.connectedPeers) {
peers.removeIf(isConnectedPeer);
}
// Any left?
if (peers.isEmpty())
return;
// Pick random peer
int peerIndex = new SecureRandom().nextInt(peers.size());
// Pick candidate
PeerData peerData = peers.get(peerIndex);
newPeer = new Peer(peerData);
// Update connection attempt info
peerData.setLastAttempted(NTP.getTime());
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
}
if (!newPeer.connect())
return;
synchronized (this.connectedPeers) {
this.connectedPeers.add(newPeer);
}
peerExecutor.execute(newPeer);
}
// Peer callbacks
/** Called when Peer's thread has setup and is ready to process messages */
public void onPeerReady(Peer peer) {
this.onMessage(peer, null);
}
public void onDisconnect(Peer peer) {
synchronized (this.connectedPeers) {
this.connectedPeers.remove(peer);
}
}
/** Called when a new message arrives for a peer. message can be null if called after connection */
public void onMessage(Peer peer, Message message) {
if (message != null)
LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer.getRemoteSocketAddress()));
Handshake handshakeStatus = peer.getHandshakeStatus();
if (handshakeStatus != Handshake.COMPLETED) {
// Still handshaking
// Check message type is as expected
if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) {
LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer.getRemoteSocketAddress(),
handshakeStatus.expectedMessageType));
peer.disconnect();
return;
}
Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message);
if (newHandshakeStatus == null) {
// Handshake failure
LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer.getRemoteSocketAddress(), message.getType().name()));
peer.disconnect();
return;
}
if (peer.isOutbound())
// If we made outbound connection then we need to act first
newHandshakeStatus.action(peer);
else
// We have inbound connection so we need to respond inline with what we just received
handshakeStatus.action(peer);
peer.setHandshakeStatus(newHandshakeStatus);
if (newHandshakeStatus == Handshake.COMPLETED)
this.onHandshakeCompleted(peer);
return;
}
// Should be non-handshaking messages from now on
switch (message.getType()) {
case VERSION:
case PEER_ID:
case PROOF:
LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer.getRemoteSocketAddress()));
peer.disconnect();
return;
case PING:
PingMessage pingMessage = (PingMessage) message;
// Generate 'pong' using same ID
PingMessage pongMessage = new PingMessage();
pongMessage.setId(pingMessage.getId());
if (!peer.sendMessage(pongMessage))
peer.disconnect();
break;
case PEERS:
PeersMessage peersMessage = (PeersMessage) message;
List<InetSocketAddress> peerAddresses = new ArrayList<>();
for (InetAddress peerAddress : peersMessage.getPeerAddresses())
peerAddresses.add(new InetSocketAddress(peerAddress, Settings.DEFAULT_LISTEN_PORT));
try {
mergePeers(peerAddresses);
} catch (DataException e) {
// Not good
peer.disconnect();
return;
}
break;
default:
// Bump up to controller for possible action
Controller.getInstance().onNetworkMessage(peer, message);
break;
}
}
private void onHandshakeCompleted(Peer peer) {
peer.startPings();
Message heightMessage = new HeightMessage(Controller.getInstance().getChainHeight());
if (!peer.sendMessage(heightMessage)) {
peer.disconnect();
return;
}
Message peersMessage = this.buildPeersMessage();
if (!peer.sendMessage(peersMessage))
peer.disconnect();
}
public Message buildPeersMessage() {
List<Peer> peers = new ArrayList<>();
synchronized (this.connectedPeers) {
// Only outbound peer connections that have completed handshake
peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED)
.collect(Collectors.toList());
}
return new PeersMessage(peers);
}
// Network-wide calls
private List<Peer> getCompletedPeers() {
List<Peer> completedPeers = new ArrayList<>();
synchronized (this.connectedPeers) {
completedPeers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
}
return completedPeers;
}
private void mergePeers(List<InetSocketAddress> peerAddresses) throws DataException {
try (final Repository repository = RepositoryManager.getRepository()) {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
// Resolve known peer hostnames
Function<PeerData, InetSocketAddress> peerDataToSocketAddress = peerData -> new InetSocketAddress(peerData.getSocketAddress().getHostString(),
peerData.getSocketAddress().getPort());
List<InetSocketAddress> knownPeerAddresses = knownPeers.stream().map(peerDataToSocketAddress).collect(Collectors.toList());
// Filter out duplicates
Predicate<InetSocketAddress> addressKnown = peerAddress -> knownPeerAddresses.stream().anyMatch(knownAddress -> knownAddress.equals(peerAddress));
peerAddresses.removeIf(addressKnown);
// Save the rest into database
for (InetSocketAddress peerAddress : peerAddresses) {
PeerData peerData = new PeerData(peerAddress);
repository.getNetworkRepository().save(peerData);
}
repository.saveChanges();
}
}
public void broadcast(Message message) {
class Broadcaster implements Runnable {
private List<Peer> targetPeers;
private Message message;
public Broadcaster(List<Peer> targetPeers, Message message) {
this.targetPeers = targetPeers;
this.message = message;
}
@Override
public void run() {
for (Peer peer : targetPeers)
if (!peer.sendMessage(message))
peer.disconnect();
}
}
peerExecutor.execute(new Broadcaster(this.getCompletedPeers(), message));
}
public void shutdown() {
peerExecutor.shutdownNow();
this.interrupt();
}
}

View File

@ -0,0 +1,302 @@
package org.qora.network;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qora.controller.Controller;
import org.qora.data.network.PeerData;
import org.qora.network.message.Message;
import org.qora.network.message.Message.MessageType;
import org.qora.network.message.PingMessage;
import org.qora.network.message.VersionMessage;
import org.qora.utils.NTP;
// For managing one peer
public class Peer implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(Peer.class);
private static final int CONNECT_TIMEOUT = 1000; // ms
private static final int RESPONSE_TIMEOUT = 5000; // ms
private static final int PING_INTERVAL = 20000; // ms - just under every 30s is usually ideal to keep NAT mappings refreshed
private static final int INACTIVITY_TIMEOUT = 30000; // ms
private final boolean isOutbound;
private Socket socket = null;
private PeerData peerData = null;
private InetSocketAddress remoteSocketAddress = null;
private Long connectionTimestamp = null;
private OutputStream out;
private Handshake handshakeStatus = Handshake.STARTED;
private Map<Integer, BlockingQueue<Message>> messages;
private VersionMessage versionMessage = null;
private Integer version;
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private Long lastPing = null;
/** Construct unconnected outbound Peer using socket address in peer data */
public Peer(PeerData peerData) {
this.isOutbound = true;
this.peerData = peerData;
this.remoteSocketAddress = peerData.getSocketAddress();
}
/** Construct Peer using existing, connected socket */
public Peer(Socket socket) {
this.isOutbound = false;
this.socket = socket;
this.remoteSocketAddress = (InetSocketAddress) this.socket.getRemoteSocketAddress();
}
// Getters / setters
public PeerData getPeerData() {
return this.peerData;
}
public boolean isOutbound() {
return this.isOutbound;
}
public Handshake getHandshakeStatus() {
return this.handshakeStatus;
}
public void setHandshakeStatus(Handshake handshakeStatus) {
this.handshakeStatus = handshakeStatus;
}
public InetSocketAddress getRemoteSocketAddress() {
return this.remoteSocketAddress;
}
public VersionMessage getVersionMessage() {
return this.versionMessage;
}
public void setVersionMessage(VersionMessage versionMessage) {
this.versionMessage = versionMessage;
if (this.versionMessage.getVersionString().startsWith(Controller.VERSION_PREFIX)) {
int index = Controller.VERSION_PREFIX.length();
try {
this.version = Integer.parseInt(this.versionMessage.getVersionString().substring(index, index + 1));
} catch (NumberFormatException e) {
this.version = 1;
}
} else {
this.version = 1;
}
}
public Integer getVersion() {
return this.version;
}
public Long getConnectionTimestamp() {
return this.connectionTimestamp;
}
public Long getLastPing() {
return this.lastPing;
}
public void setLastPing(long lastPing) {
this.lastPing = lastPing;
}
// Processing
private void setup() throws IOException {
this.socket.setSoTimeout(INACTIVITY_TIMEOUT);
this.out = this.socket.getOutputStream();
this.connectionTimestamp = NTP.getTime();
this.messages = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
}
public boolean connect() {
LOGGER.trace(String.format("Connecting to peer %s", this.remoteSocketAddress));
this.socket = new Socket();
try {
InetSocketAddress resolvedSocketAddress = new InetSocketAddress(this.remoteSocketAddress.getHostString(), this.remoteSocketAddress.getPort());
this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT);
LOGGER.debug(String.format("Connected to peer %s", this.remoteSocketAddress));
} catch (SocketTimeoutException e) {
LOGGER.trace(String.format("Connection timed out to peer %s", this.remoteSocketAddress));
return false;
} catch (UnknownHostException e) {
LOGGER.trace(String.format("Connection failed to unresolved peer %s", this.remoteSocketAddress));
return false;
} catch (IOException e) {
LOGGER.trace(String.format("Connection failed to peer %s", this.remoteSocketAddress));
return false;
}
return true;
}
// Main thread
@Override
public void run() {
Thread.currentThread().setName("Peer " + this.socket.getRemoteSocketAddress());
try (DataInputStream in = new DataInputStream(socket.getInputStream())) {
setup();
Network.getInstance().onPeerReady(this);
while (true) {
// Wait (up to INACTIVITY_TIMEOUT) for, and parse, incoming message
Message message = Message.fromStream(in);
if (message == null)
return;
// Find potential blocking queue for this id (expect null if id is -1)
BlockingQueue<Message> queue = this.messages.get(message.getId());
if (queue != null) {
// Adding message to queue will unblock thread waiting for response
this.messages.get(message.getId()).add(message);
} else {
// Nothing waiting for this message - pass up to network
Network.getInstance().onMessage(this, message);
}
}
} catch (IOException e) {
// Fall-through
} finally {
this.disconnect();
}
}
/**
* Attempt to send Message to peer
*
* @param message
* @return <code>true</code> if message successfully sent; <code>false</code> otherwise
*/
public boolean sendMessage(Message message) {
if (this.socket.isClosed())
return false;
try {
// Send message
LOGGER.trace(String.format("Sending %s message to peer %s", message.getType().name(), this.getRemoteSocketAddress()));
synchronized (this.out) {
this.out.write(message.toBytes());
this.out.flush();
}
} catch (IOException e) {
// Send failure
return false;
}
// Sent OK
return true;
}
/**
* Send message to peer and await response.
* <p>
* Message is assigned a random ID and sent. If a response with matching ID is received then it is returned to caller.
* <p>
* If no response with matching ID within timeout, or some other error/exception occurs, then return <code>null</code>. (Assume peer will be rapidly
* disconnected after this).
*
* @param message
* @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs
*/
public Message getResponse(Message message) {
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<Message>(1);
// Assign random ID to this message
int id;
do {
id = new SecureRandom().nextInt(Integer.MAX_VALUE - 1) + 1;
message.setId(id);
// Put queue into map (keyed by message ID) so we can poll for a response
// If putIfAbsent() doesn't return null, then this id is already taken
} while (this.messages.putIfAbsent(id, blockingQueue) != null);
// Try to send message
if (!this.sendMessage(message)) {
this.messages.remove(id);
return null;
}
try {
Message response = blockingQueue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
return response;
} catch (InterruptedException e) {
// Our thread was interrupted. Probably in shutdown scenario.
return null;
} finally {
this.messages.remove(id);
}
}
public void startPings() {
class Pinger implements Runnable {
private Peer peer;
public Pinger(Peer peer) {
this.peer = peer;
}
@Override
public void run() {
PingMessage pingMessage = new PingMessage();
long before = System.currentTimeMillis();
Message message = peer.getResponse(pingMessage);
long after = System.currentTimeMillis();
if (message == null || message.getType() != MessageType.PING)
peer.disconnect();
peer.setLastPing(after - before);
}
}
;
this.executor.scheduleWithFixedDelay(new Pinger(this), 0, PING_INTERVAL, TimeUnit.MILLISECONDS);
}
public void disconnect() {
// Shut down pinger
this.executor.shutdownNow();
// Close socket
if (!this.socket.isClosed()) {
LOGGER.debug(String.format("Disconnected peer %s", this.getRemoteSocketAddress()));
try {
this.socket.close();
} catch (IOException e) {
}
}
Network.getInstance().onDisconnect(this);
}
}

View File

@ -0,0 +1,112 @@
package org.qora.network;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.HashSet;
import org.qora.network.message.ProofMessage;
import com.google.common.primitives.Longs;
public class Proof extends Thread {
private static final int MIN_PROOF_ZEROS = 2;
private static final MessageDigest sha256;
static {
try {
sha256 = MessageDigest.getInstance("SHA256");
} catch (NoSuchAlgorithmException e) {
// Can't progress
throw new RuntimeException("Message digest SHA256 not available");
}
}
private static final HashSet<Long> seenSalts = new HashSet<>();
private Peer peer;
public Proof(Peer peer) {
this.peer = peer;
setDaemon(true);
}
public static boolean seenSalt(long salt) {
synchronized (seenSalts) {
return seenSalts.contains(salt);
}
}
public static void addSalt(long salt) {
synchronized (seenSalts) {
seenSalts.add(salt);
}
}
@Override
public void run() {
setName("Proof for peer " + this.peer.getRemoteSocketAddress());
// Do proof-of-work calculation to gain acceptance with remote end
// Remote end knows this (approximately)
long timestamp = this.peer.getConnectionTimestamp();
// Needs to be unique on the remote end
long salt = new SecureRandom().nextLong();
byte[] message = new byte[8 + 8 + 8]; // nonce + salt + timestamp
byte[] saltBytes = Longs.toByteArray(salt);
System.arraycopy(saltBytes, 0, message, 8, saltBytes.length);
byte[] timestampBytes = Longs.toByteArray(timestamp);
System.arraycopy(timestampBytes, 0, message, 8 + 8, timestampBytes.length);
long nonce;
for (nonce = 0; nonce < Long.MAX_VALUE; ++nonce) {
// Check whether we're shutting down every so often
if ((nonce & 0xff) == 0 && Thread.currentThread().isInterrupted())
// throw new InterruptedException("Interrupted during peer proof calculation");
return;
byte[] nonceBytes = Longs.toByteArray(nonce);
System.arraycopy(nonceBytes, 0, message, 0, nonceBytes.length);
byte[] digest = sha256.digest(message);
if (check(digest))
break;
}
ProofMessage proofMessage = new ProofMessage(timestamp, salt, nonce);
peer.sendMessage(proofMessage);
}
private static boolean check(byte[] digest) {
int idx;
for (idx = 0; idx < MIN_PROOF_ZEROS; ++idx)
if (digest[idx] != 0)
break;
return idx == MIN_PROOF_ZEROS;
}
public static boolean check(long timestamp, long salt, long nonce) {
byte[] message = new byte[8 + 8 + 8];
byte[] saltBytes = Longs.toByteArray(salt);
System.arraycopy(saltBytes, 0, message, 8, saltBytes.length);
byte[] timestampBytes = Longs.toByteArray(timestamp);
System.arraycopy(timestampBytes, 0, message, 8 + 8, timestampBytes.length);
byte[] nonceBytes = Longs.toByteArray(nonce);
System.arraycopy(nonceBytes, 0, message, 0, nonceBytes.length);
byte[] digest = sha256.digest(message);
return check(digest);
}
}

View File

@ -0,0 +1,47 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import com.google.common.primitives.Ints;
public class HeightMessage extends Message {
private int height;
public HeightMessage(int height) {
this(-1, height);
}
private HeightMessage(int id, int height) {
super(id, MessageType.HEIGHT);
this.height = height;
}
public int getHeight() {
return this.height;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int height = bytes.getInt();
return new HeightMessage(id, height);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.height));
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,212 @@
package org.qora.network.message;
import java.util.Map;
import org.qora.controller.Controller;
import org.qora.crypto.Crypto;
import com.google.common.primitives.Ints;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toMap;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.Arrays;
public abstract class Message {
// MAGIC(4) + TYPE(4) + HAS-ID(1) + ID?(4) + DATA-SIZE(4) + CHECKSUM?(4) + DATA?(*)
private static final int MAGIC_LENGTH = 4;
private static final int CHECKSUM_LENGTH = 4;
private static final int MAX_DATA_SIZE = 1024 * 1024; // 1MB
public enum MessageType {
GET_PEERS(1),
PEERS(2),
HEIGHT(3),
GET_SIGNATURES(4),
SIGNATURES(5),
GET_BLOCK(6),
BLOCK(7),
TRANSACTION(8),
PING(9),
VERSION(10),
PEER_ID(11),
PROOF(12);
public final int value;
public final Method fromByteBuffer;
private static final Map<Integer, MessageType> map = stream(MessageType.values())
.collect(toMap(messageType -> messageType.value, messageType -> messageType));
private MessageType(int value) {
this.value = value;
String[] classNameParts = this.name().toLowerCase().split("_");
for (int i = 0; i < classNameParts.length; ++i)
classNameParts[i] = classNameParts[i].substring(0, 1).toUpperCase().concat(classNameParts[i].substring(1));
String className = String.join("", classNameParts);
Method fromByteBuffer;
try {
Class<?> subclass = Class.forName(String.join("", Message.class.getPackage().getName(), ".", className, "Message"));
fromByteBuffer = subclass.getDeclaredMethod("fromByteBuffer", int.class, ByteBuffer.class);
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException e) {
fromByteBuffer = null;
}
this.fromByteBuffer = fromByteBuffer;
}
public static MessageType valueOf(int value) {
return map.get(value);
}
public Message fromBytes(int id, byte[] data) {
if (this.fromByteBuffer == null)
return null;
try {
return (Message) this.fromByteBuffer.invoke(null, id, data == null ? null : ByteBuffer.wrap(data));
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
return null;
}
}
}
private int id;
private MessageType type;
protected Message(int id, MessageType type) {
this.id = id;
this.type = type;
}
protected Message(MessageType type) {
this(-1, type);
}
public boolean hasId() {
return this.id != -1;
}
public int getId() {
return this.id;
}
public void setId(int id) {
this.id = id;
}
public MessageType getType() {
return this.type;
}
public static Message fromStream(DataInputStream in) throws SocketTimeoutException {
try {
// Read only enough bytes to cover Message "magic" preamble
byte[] messageMagic = new byte[MAGIC_LENGTH];
in.readFully(messageMagic);
if (!Arrays.equals(messageMagic, Controller.getInstance().getMessageMagic()))
// Didn't receive correct Message "magic"
return null;
int typeValue = in.readInt();
MessageType messageType = MessageType.valueOf(typeValue);
if (messageType == null)
// Unrecognised message type
return null;
// Find supporting object
int hasId = in.read();
int id = -1;
if (hasId != 0) {
id = in.readInt();
if (id <= 0)
// Invalid ID
return null;
}
int dataSize = in.readInt();
if (dataSize > MAX_DATA_SIZE)
// Too large
return null;
byte[] data = null;
if (dataSize > 0) {
byte[] expectedChecksum = new byte[CHECKSUM_LENGTH];
in.readFully(expectedChecksum);
data = new byte[dataSize];
in.readFully(data);
// Test checksum
byte[] actualChecksum = generateChecksum(data);
if (!Arrays.equals(expectedChecksum, actualChecksum))
return null;
}
return messageType.fromBytes(id, data);
} catch (SocketTimeoutException e) {
throw e;
} catch (IOException e) {
return null;
}
}
protected static byte[] generateChecksum(byte[] data) {
return Arrays.copyOfRange(Crypto.digest(data), 0, CHECKSUM_LENGTH);
}
public byte[] toBytes() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
// Magic
bytes.write(Controller.getInstance().getMessageMagic());
bytes.write(Ints.toByteArray(this.type.value));
if (this.hasId()) {
bytes.write(1);
bytes.write(Ints.toByteArray(this.id));
} else {
bytes.write(0);
}
byte[] data = this.toData();
if (data == null)
return null;
bytes.write(Ints.toByteArray(data.length));
if (data.length > 0) {
bytes.write(generateChecksum(data));
bytes.write(data);
}
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
protected abstract byte[] toData();
}

View File

@ -0,0 +1,52 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.qora.network.Network;
public class PeerIdMessage extends Message {
private byte[] peerId;
public PeerIdMessage(byte[] peerId) {
this(-1, peerId);
}
private PeerIdMessage(int id, byte[] peerId) {
super(id, MessageType.PEER_ID);
this.peerId = peerId;
}
public byte[] getPeerId() {
return this.peerId;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != Network.PEER_ID_LENGTH)
return null;
byte[] peerId = new byte[Network.PEER_ID_LENGTH];
bytes.get(peerId);
return new PeerIdMessage(id, peerId);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.peerId);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,93 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.qora.network.Peer;
import com.google.common.primitives.Ints;
// NOTE: this legacy message only supports 4-byte IPv4 addresses and doesn't send port number either
public class PeersMessage extends Message {
private static final int ADDRESS_LENGTH = 4;
private List<InetAddress> peerAddresses;
public PeersMessage(List<Peer> peers) {
super(-1, MessageType.PEERS);
// We have to forcibly resolve into IP addresses as we can't send hostnames
this.peerAddresses = new ArrayList<>();
for (Peer peer : peers) {
try {
InetAddress resolvedAddress = InetAddress.getByName(peer.getRemoteSocketAddress().getHostString());
// Filter out unsupported address types
if (resolvedAddress.getAddress().length != ADDRESS_LENGTH)
continue;
this.peerAddresses.add(resolvedAddress);
} catch (UnknownHostException e) {
// Couldn't resolve
continue;
}
}
}
private PeersMessage(int id, List<InetAddress> peerAddresses) {
super(id, MessageType.PEERS);
this.peerAddresses = peerAddresses;
}
public List<InetAddress> getPeerAddresses() {
return this.peerAddresses;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int count = bytes.getInt();
if (bytes.remaining() != count * ADDRESS_LENGTH)
return null;
List<InetAddress> peerAddresses = new ArrayList<>();
byte[] addressBytes = new byte[ADDRESS_LENGTH];
try {
for (int i = 0; i < count; ++i) {
bytes.get(addressBytes);
peerAddresses.add(InetAddress.getByAddress(addressBytes));
}
} catch (UnknownHostException e) {
return null;
}
return new PeersMessage(id, peerAddresses);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.peerAddresses.size()));
for (InetAddress peerAddress : this.peerAddresses)
bytes.write(peerAddress.getAddress());
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,25 @@
package org.qora.network.message;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
public class PingMessage extends Message {
public PingMessage() {
this(-1);
}
private PingMessage(int id) {
super(id, MessageType.PING);
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
return new PingMessage(id);
}
@Override
protected byte[] toData() {
return new byte[0];
}
}

View File

@ -0,0 +1,63 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import com.google.common.primitives.Longs;
public class ProofMessage extends Message {
private long timestamp;
private long salt;
private long nonce;
public ProofMessage(long timestamp, long salt, long nonce) {
this(-1, timestamp, salt, nonce);
}
private ProofMessage(int id, long timestamp, long salt, long nonce) {
super(id, MessageType.PROOF);
this.timestamp = timestamp;
this.salt = salt;
this.nonce = nonce;
}
public long getTimestamp() {
return this.timestamp;
}
public long getSalt() {
return this.salt;
}
public long getNonce() {
return this.nonce;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
long timestamp = bytes.getLong();
long salt = bytes.getLong();
long nonce = bytes.getLong();
return new ProofMessage(id, timestamp, salt, nonce);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Longs.toByteArray(this.timestamp));
bytes.write(Longs.toByteArray(this.salt));
bytes.write(Longs.toByteArray(this.nonce));
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,67 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.qora.utils.Serialization;
import com.google.common.primitives.Longs;
public class VersionMessage extends Message {
private long buildTimestamp;
private String versionString;
public VersionMessage(long buildTimestamp, String versionString) {
this(-1, buildTimestamp, versionString);
}
private VersionMessage(int id, long buildTimestamp, String versionString) {
super(id, MessageType.VERSION);
this.buildTimestamp = buildTimestamp;
this.versionString = versionString;
}
public long getBuildTimestamp() {
return this.buildTimestamp;
}
public String getVersionString() {
return this.versionString;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
long buildTimestamp = bytes.getLong();
int versionStringLength = bytes.getInt();
if (versionStringLength != bytes.remaining())
return null;
byte[] versionBytes = new byte[versionStringLength];
bytes.get(versionBytes);
String versionString = new String(versionBytes, "UTF-8");
return new VersionMessage(id, buildTimestamp, versionString);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Longs.toByteArray(this.buildTimestamp));
Serialization.serializeSizedString(bytes, this.versionString);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,15 @@
package org.qora.repository;
import java.util.List;
import org.qora.data.network.PeerData;
public interface NetworkRepository {
public List<PeerData> getAllPeers() throws DataException;
public void save(PeerData peerData) throws DataException;
public int delete(PeerData peerData) throws DataException;
}

View File

@ -14,6 +14,8 @@ public interface Repository extends AutoCloseable {
public NameRepository getNameRepository();
public NetworkRepository getNetworkRepository();
public TransactionRepository getTransactionRepository();
public VotingRepository getVotingRepository();
@ -27,4 +29,6 @@ public interface Repository extends AutoCloseable {
public void rebuild() throws DataException;
public void setDebug(boolean debugState);
}

View File

@ -500,6 +500,12 @@ public class HSQLDBDatabaseUpdates {
+ "ban_reference Signature, PRIMARY KEY (signature), FOREIGN KEY (signature) REFERENCES Transactions (signature) ON DELETE CASCADE)");
break;
case 30:
// Networking
stmt.execute("CREATE TABLE Peers (hostname VARCHAR(255), port INTEGER, last_connected TIMESTAMP WITH TIME ZONE, last_attempted TIMESTAMP WITH TIME ZONE, "
+ "last_height INTEGER, PRIMARY KEY (hostname, port))");
break;
default:
// nothing to do
return false;

View File

@ -0,0 +1,83 @@
package org.qora.repository.hsqldb;
import java.net.InetSocketAddress;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import org.qora.data.network.PeerData;
import org.qora.repository.DataException;
import org.qora.repository.NetworkRepository;
public class HSQLDBNetworkRepository implements NetworkRepository {
protected HSQLDBRepository repository;
public HSQLDBNetworkRepository(HSQLDBRepository repository) {
this.repository = repository;
}
@Override
public List<PeerData> getAllPeers() throws DataException {
List<PeerData> peers = new ArrayList<>();
try (ResultSet resultSet = this.repository.checkedExecute("SELECT hostname, port, last_connected, last_attempted, last_height FROM Peers")) {
if (resultSet == null)
return peers;
// NOTE: do-while because checkedExecute() above has already called rs.next() for us
do {
String hostname = resultSet.getString(1);
int port = resultSet.getInt(2);
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(hostname, port);
Timestamp lastConnectedTimestamp = resultSet.getTimestamp(3, Calendar.getInstance(HSQLDBRepository.UTC));
Long lastConnected = resultSet.wasNull() ? null : lastConnectedTimestamp.getTime();
Timestamp lastAttemptedTimestamp = resultSet.getTimestamp(4, Calendar.getInstance(HSQLDBRepository.UTC));
Long lastAttempted = resultSet.wasNull() ? null : lastAttemptedTimestamp.getTime();
Integer lastHeight = resultSet.getInt(5);
if (resultSet.wasNull())
lastHeight = null;
peers.add(new PeerData(socketAddress, lastConnected, lastAttempted, lastHeight));
} while (resultSet.next());
return peers;
} catch (SQLException e) {
throw new DataException("Unable to fetch poll votes from repository", e);
}
}
@Override
public void save(PeerData peerData) throws DataException {
HSQLDBSaver saveHelper = new HSQLDBSaver("Peers");
Timestamp lastConnected = peerData.getLastConnected() == null ? null : new Timestamp(peerData.getLastConnected());
Timestamp lastAttempted = peerData.getLastAttempted() == null ? null : new Timestamp(peerData.getLastAttempted());
saveHelper.bind("hostname", peerData.getSocketAddress().getHostString()).bind("port", peerData.getSocketAddress().getPort())
.bind("last_connected", lastConnected).bind("last_attempted", lastAttempted).bind("last_height", peerData.getLastHeight());
try {
saveHelper.execute(this.repository);
} catch (SQLException e) {
throw new DataException("Unable to save peer into repository", e);
}
}
@Override
public int delete(PeerData peerData) throws DataException {
try {
return this.repository.delete("Peers", "hostname = ? AND port = ?", peerData.getSocketAddress().getHostString(),
peerData.getSocketAddress().getPort());
} catch (SQLException e) {
throw new DataException("Unable to delete peer from repository", e);
}
}
}

View File

@ -17,6 +17,7 @@ import org.qora.repository.BlockRepository;
import org.qora.repository.GroupRepository;
import org.qora.repository.DataException;
import org.qora.repository.NameRepository;
import org.qora.repository.NetworkRepository;
import org.qora.repository.Repository;
import org.qora.repository.TransactionRepository;
import org.qora.repository.VotingRepository;
@ -32,6 +33,7 @@ public class HSQLDBRepository implements Repository {
public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
protected Connection connection;
protected boolean debugState = false;
// NB: no visibility modifier so only callable from within same package
HSQLDBRepository(Connection connection) {
@ -68,6 +70,11 @@ public class HSQLDBRepository implements Repository {
return new HSQLDBNameRepository(this);
}
@Override
public NetworkRepository getNetworkRepository() {
return new HSQLDBNetworkRepository(this);
}
@Override
public TransactionRepository getTransactionRepository() {
return new HSQLDBTransactionRepository(this);
@ -131,6 +138,11 @@ public class HSQLDBRepository implements Repository {
public void rebuild() throws DataException {
}
@Override
public void setDebug(boolean debugState) {
this.debugState = debugState;
}
/**
* Execute SQL and return ResultSet with but added checking.
* <p>
@ -143,6 +155,9 @@ public class HSQLDBRepository implements Repository {
*/
@SuppressWarnings("resource")
public ResultSet checkedExecute(String sql, Object... objects) throws SQLException {
if (this.debugState)
LOGGER.debug(sql);
PreparedStatement preparedStatement = this.connection.prepareStatement(sql);
// Close the PreparedStatement when the ResultSet is closed otherwise there's a potential resource leak.
@ -153,9 +168,9 @@ public class HSQLDBRepository implements Repository {
ResultSet resultSet = this.checkedExecuteResultSet(preparedStatement, objects);
long queryTime = System.currentTimeMillis() - beforeQuery;
long queryTime = System.currentTimeMillis() - beforeQuery;
if (queryTime > MAX_QUERY_TIME)
LOGGER.info(String.format("HSQLDB query took %d ms: %s", queryTime, sql));
LOGGER.info(String.format("HSQLDB query took %d ms: %s", queryTime, sql));
return resultSet;
}
@ -281,9 +296,9 @@ public class HSQLDBRepository implements Repository {
* @param objects
* @throws SQLException
*/
public void delete(String tableName, String whereClause, Object... objects) throws SQLException {
public int delete(String tableName, String whereClause, Object... objects) throws SQLException {
try (PreparedStatement preparedStatement = this.connection.prepareStatement("DELETE FROM " + tableName + " WHERE " + whereClause)) {
this.checkedExecuteUpdateCount(preparedStatement, objects);
return this.checkedExecuteUpdateCount(preparedStatement, objects);
}
}

View File

@ -33,10 +33,17 @@ public class Settings {
/** Max milliseconds into future for accepting new, unconfirmed transactions */
private long maxTransactionTimestampFuture = 24 * 60 * 60 * 1000; // milliseconds
// RPC
private int rpcPort = 9085;
private List<String> rpcAllowed = new ArrayList<String>(Arrays.asList("127.0.0.1", "::1")); // ipv4, ipv6
private boolean rpcEnabled = true;
// API
private int apiPort = 9085;
private List<String> apiAllowed = new ArrayList<String>(Arrays.asList("127.0.0.1", "::1")); // ipv4, ipv6
private boolean apiEnabled = true;
// Peer-to-peer networking
public static final int DEFAULT_LISTEN_PORT = 9084;
private int listenPort = DEFAULT_LISTEN_PORT;
private String bindAddress = null; // listen on all local addresses
private int minPeers = 3;
private int maxPeers = 10;
// Constants
private static final String SETTINGS_FILENAME = "settings.json";
@ -60,6 +67,7 @@ public class Settings {
break;
}
LOGGER.info("Using settings file: " + path + filename);
List<String> lines = Files.readLines(file, Charsets.UTF_8);
// Concatenate lines for JSON parsing
@ -115,25 +123,39 @@ public class Settings {
}
private void process(JSONObject json) {
// RPC
if (json.containsKey("rpcport"))
this.rpcPort = ((Long) json.get("rpcport")).intValue();
// API
if (json.containsKey("apiPort"))
this.apiPort = ((Long) json.get("apiPort")).intValue();
if (json.containsKey("rpcallowed")) {
JSONArray allowedArray = (JSONArray) json.get("rpcallowed");
if (json.containsKey("apiAllowed")) {
JSONArray allowedArray = (JSONArray) json.get("apiAllowed");
this.rpcAllowed = new ArrayList<String>();
this.apiAllowed = new ArrayList<String>();
for (Object entry : allowedArray) {
if (!(entry instanceof String))
throw new RuntimeException("Entry inside 'rpcallowed' is not string");
throw new RuntimeException("Entry inside 'apiAllowed' is not string");
this.rpcAllowed.add((String) entry);
this.apiAllowed.add((String) entry);
}
}
if (json.containsKey("rpcenabled"))
this.rpcEnabled = ((Boolean) json.get("rpcenabled")).booleanValue();
if (json.containsKey("apiEnabled"))
this.apiEnabled = ((Boolean) json.get("apiEnabled")).booleanValue();
// Peer-to-peer networking
if (json.containsKey("listenPort"))
this.listenPort = ((Long) getTypedJson(json, "listenPort", Long.class)).intValue();
if (json.containsKey("bindAddress"))
this.bindAddress = (String) getTypedJson(json, "bindAddress", String.class);
if (json.containsKey("minPeers"))
this.minPeers = ((Long) getTypedJson(json, "minPeers", Long.class)).intValue();
if (json.containsKey("maxPeers"))
this.maxPeers = ((Long) getTypedJson(json, "maxPeers", Long.class)).intValue();
// Node-specific behaviour
@ -174,16 +196,36 @@ public class Settings {
return this.userpath;
}
public int getRpcPort() {
return this.rpcPort;
public int getApiPort() {
return this.apiPort;
}
public List<String> getRpcAllowed() {
return this.rpcAllowed;
public List<String> getApiAllowed() {
return this.apiAllowed;
}
public boolean isRpcEnabled() {
return this.rpcEnabled;
public boolean isApiEnabled() {
return this.apiEnabled;
}
public int getListenPort() {
return this.listenPort;
}
public int getDefaultListenPort() {
return DEFAULT_LISTEN_PORT;
}
public String getBindAddress() {
return this.bindAddress;
}
public int getMinPeers() {
return this.minPeers;
}
public int getMaxPeers() {
return this.maxPeers;
}
public boolean useBitcoinTestNet() {

View File

@ -0,0 +1,2 @@
build.timestamp=${build.timestamp}
build.version=${project.version}