Improved peer addresses.

Added short sleep() to GET /admin/stop to allow time for HTTP response body to be sent back.

Improved documentation for /peers API resources. Added examples, tidied API output models.
Fixed issue where IPv6 literals with port couldn't be parsed. Now uses RFC5952/RFC3986 style
literal IPv6 addresses with ports, e.g. [::1]:9084

Fixed NPE in Controller.potentiallySynchronize() where peer might not have sent height yet.

Improved Handshake to discard inbound connections if we already have an outbound connection
to a peer with that ID. This prevents us from having two connections to the same peer, one
in each direction.

Network.mergePeers() now runs in a separate thread as acquiring the lock might block.

Network.creationConnection() exits fast based on number of outbound connections, instead of
number of total connections.

Network no longer sends 'local' peer addresses to non-local peers.
e.g. it won't send localhost:9084 to node4.qora.org:9084

Added try-catch to Network.broadcast for when we try to broadcast while shutting down.

Added PeerAddress class to deal with the whole hostname/IPv4/IPv6 address situation.

Reworked PEERS_V2 message type to only send sized-strings instead of separate port,
and potentially IPv6 byte arrays.

Change to HSQLDB database shape.
Corresponding changes to HSQLDBNetworkRepository.
This commit is contained in:
catbref 2019-02-12 18:50:49 +00:00
parent 174a1a5909
commit 82e9e1e7dc
15 changed files with 456 additions and 230 deletions

View File

@ -8,8 +8,7 @@ import org.qora.network.Peer;
@XmlAccessorType(XmlAccessType.FIELD)
public class ConnectedPeer {
public String hostname;
public int port;
public String address;
public Long lastPing;
public Integer lastHeight;
@ -24,8 +23,7 @@ public class ConnectedPeer {
}
public ConnectedPeer(Peer peer) {
this.hostname = peer.getRemoteSocketAddress().getHostString();
this.port = peer.getRemoteSocketAddress().getPort();
this.address = peer.getPeerData().getAddress().toString();
this.lastPing = peer.getLastPing();
this.direction = peer.isOutbound() ? Direction.OUTBOUND : Direction.INBOUND;
this.lastHeight = peer.getPeerData() == null ? null : peer.getPeerData().getLastHeight();

View File

@ -73,6 +73,13 @@ public class AdminResource {
new Thread(new Runnable() {
@Override
public void run() {
// Short sleep to allow HTTP response body to be emitted
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Not important
}
Controller.getInstance().shutdownAndExit();
}
}).start();

View File

@ -8,7 +8,6 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Collectors;
@ -29,10 +28,10 @@ import org.qora.api.Security;
import org.qora.api.model.ConnectedPeer;
import org.qora.data.network.PeerData;
import org.qora.network.Network;
import org.qora.network.PeerAddress;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
import org.qora.repository.RepositoryManager;
import org.qora.settings.Settings;
@Path("/peers")
@Produces({
@ -104,26 +103,31 @@ public class PeersResource {
mediaType = MediaType.APPLICATION_JSON,
array = @ArraySchema(
schema = @Schema(
implementation = PeerData.class
implementation = PeerAddress.class
)
)
)
)
}
)
public List<PeerData> getSelfPeers() {
public List<PeerAddress> getSelfPeers() {
return Network.getInstance().getSelfPeers();
}
@POST
@Operation(
summary = "Add new peer address",
description = "Specify a new peer using hostname, IPv4 address, IPv6 address and optional port number preceeded with colon (e.g. :9084)<br>"
+ "Note that IPv6 literal addresses must be surrounded with brackets.<br>" + "Examples:<br><ul>" + "<li>some-peer.example.com</li>"
+ "<li>some-peer.example.com:9084</li>" + "<li>10.1.2.3</li>" + "<li>10.1.2.3:9084</li>" + "<li>[2001:d8b::1]</li>"
+ "<li>[2001:d8b::1]:9084</li>" + "</ul>",
requestBody = @RequestBody(
required = true,
content = @Content(
mediaType = MediaType.TEXT_PLAIN,
schema = @Schema(
type = "string"
type = "string",
example = "some-peer.example.com"
)
)
),
@ -141,22 +145,13 @@ public class PeersResource {
@ApiErrors({
ApiError.INVALID_DATA, ApiError.REPOSITORY_ISSUE
})
public String addPeer(String peerAddress) {
public String addPeer(String address) {
Security.checkApiCallAllowed(request);
try (final Repository repository = RepositoryManager.getRepository()) {
String[] peerParts = peerAddress.split(":");
PeerAddress peerAddress = PeerAddress.fromString(address);
// Expecting one or two parts
if (peerParts.length < 1 || peerParts.length > 2)
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
String hostname = peerParts[0];
int port = peerParts.length == 2 ? Integer.parseInt(peerParts[1]) : Settings.DEFAULT_LISTEN_PORT;
InetSocketAddress socketAddress = new InetSocketAddress(hostname, port);
PeerData peerData = new PeerData(socketAddress);
PeerData peerData = new PeerData(peerAddress);
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
@ -173,12 +168,17 @@ public class PeersResource {
@DELETE
@Operation(
summary = "Remove peer address from database",
description = "Specify peer to be removed using hostname, IPv4 address, IPv6 address and optional port number preceeded with colon (e.g. :9084)<br>"
+ "Note that IPv6 literal addresses must be surrounded with brackets.<br>" + "Examples:<br><ul>" + "<li>some-peer.example.com</li>"
+ "<li>some-peer.example.com:9084</li>" + "<li>10.1.2.3</li>" + "<li>10.1.2.3:9084</li>" + "<li>[2001:d8b::1]</li>"
+ "<li>[2001:d8b::1]:9084</li>" + "</ul>",
requestBody = @RequestBody(
required = true,
content = @Content(
mediaType = MediaType.TEXT_PLAIN,
schema = @Schema(
type = "string"
type = "string",
example = "some-peer.example.com"
)
)
),
@ -196,22 +196,13 @@ public class PeersResource {
@ApiErrors({
ApiError.INVALID_DATA, ApiError.REPOSITORY_ISSUE
})
public String removePeer(String peerAddress) {
public String removePeer(String address) {
Security.checkApiCallAllowed(request);
try (final Repository repository = RepositoryManager.getRepository()) {
String[] peerParts = peerAddress.split(":");
PeerAddress peerAddress = PeerAddress.fromString(address);
// Expecting one or two parts
if (peerParts.length < 1 || peerParts.length > 2)
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
String hostname = peerParts[0];
int port = peerParts.length == 2 ? Integer.parseInt(peerParts[1]) : Settings.DEFAULT_LISTEN_PORT;
InetSocketAddress socketAddress = new InetSocketAddress(hostname, port);
PeerData peerData = new PeerData(socketAddress);
PeerData peerData = new PeerData(peerAddress);
int numDeleted = repository.getNetworkRepository().delete(peerData);
repository.saveChanges();

View File

@ -226,7 +226,10 @@ public class Controller extends Thread {
for(Peer peer : peers)
LOGGER.trace(String.format("Peer %s is at height %d", peer, peer.getPeerData().getLastHeight()));
peers.removeIf(peer -> peer.getPeerData().getLastHeight() <= ourHeight);
peers.removeIf(peer -> {
Integer peerHeight = peer.getPeerData().getLastHeight();
return peerHeight == null || peerHeight <= ourHeight;
});
if (!peers.isEmpty()) {
// Pick random peer to sync with

View File

@ -1,16 +1,24 @@
package org.qora.data.network;
import java.net.InetSocketAddress;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlTransient;
// All properties to be converted to JSON via JAX-RS
import org.qora.network.PeerAddress;
import io.swagger.v3.oas.annotations.media.Schema;
// All properties to be converted to JSON via JAXB
@XmlAccessorType(XmlAccessType.FIELD)
public class PeerData {
// Properties
private InetSocketAddress socketAddress;
// Don't expose this via JAXB - use pretty getter instead
@XmlTransient
@Schema(hidden = true)
private PeerAddress peerAddress;
private Long lastAttempted;
private Long lastConnected;
private Integer lastHeight;
@ -18,26 +26,29 @@ public class PeerData {
// Constructors
// necessary for JAX-RS serialization
// necessary for JAXB serialization
protected PeerData() {
}
public PeerData(InetSocketAddress socketAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, Long lastMisbehaved) {
this.socketAddress = socketAddress;
public PeerData(PeerAddress peerAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, Long lastMisbehaved) {
this.peerAddress = peerAddress;
this.lastAttempted = lastAttempted;
this.lastConnected = lastConnected;
this.lastHeight = lastHeight;
this.lastMisbehaved = lastMisbehaved;
}
public PeerData(InetSocketAddress socketAddress) {
this(socketAddress, null, null, null, null);
public PeerData(PeerAddress peerAddress) {
this(peerAddress, null, null, null, null);
}
// Getters / setters
public InetSocketAddress getSocketAddress() {
return this.socketAddress;
// Don't let JAXB use this getter
@XmlTransient
@Schema(hidden = true)
public PeerAddress getAddress() {
return this.peerAddress;
}
public Long getLastAttempted() {
@ -72,4 +83,10 @@ public class PeerData {
this.lastMisbehaved = lastMisbehaved;
}
// Pretty peerAddress getter for JAXB
@XmlElement(name = "address")
protected String getPrettyAddress() {
return this.peerAddress.toString();
}
}

View File

@ -2,6 +2,8 @@ package org.qora.network;
import java.util.Arrays;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qora.controller.Controller;
import org.qora.network.message.Message;
import org.qora.network.message.Message.MessageType;
@ -36,8 +38,9 @@ public enum Handshake {
@Override
public Handshake onMessage(Peer peer, Message message) {
PeerIdMessage peerIdMessage = (PeerIdMessage) message;
byte[] peerId = peerIdMessage.getPeerId();
if (Arrays.equals(peerIdMessage.getPeerId(), Network.getInstance().getOurPeerId())) {
if (Arrays.equals(peerId, Network.getInstance().getOurPeerId())) {
// Connected to self!
// If outgoing connection then record destination as self so we don't try again
if (peer.isOutbound())
@ -50,6 +53,16 @@ public enum Handshake {
return null;
}
// Set peer's ID
peer.setPeerId(peerId);
// Is this ID already connected? We don't want both inbound and outbound so discard inbound if possible
Peer similarPeer = Network.getInstance().getOutboundPeerWithId(peerId);
if (similarPeer != null && similarPeer != peer) {
LOGGER.trace(String.format("Discarding inbound peer %s with existing ID", peer));
return null;
}
// If we're both version 2 peers then next stage is proof
if (peer.getVersion() >= 2)
return PROOF;
@ -105,6 +118,8 @@ public enum Handshake {
}
};
private static final Logger LOGGER = LogManager.getLogger(Handshake.class);
private static final long MAX_TIMESTAMP_DELTA = 2000; // ms
public final MessageType expectedMessageType;

View File

@ -1,6 +1,7 @@
package org.qora.network;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@ -9,10 +10,12 @@ import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
@ -54,11 +57,12 @@ public class Network extends Thread {
private final byte[] ourPeerId;
private List<Peer> connectedPeers;
private List<PeerData> selfPeers;
private List<PeerAddress> selfPeers;
private ServerSocket listenSocket;
private int minPeers;
private int maxPeers;
private ExecutorService peerExecutor;
private ExecutorService mergePeersExecutor;
private long nextBroadcast;
private Lock mergePeersLock;
@ -99,6 +103,7 @@ public class Network extends Thread {
nextBroadcast = System.currentTimeMillis();
mergePeersLock = new ReentrantLock();
mergePeersExecutor = Executors.newCachedThreadPool();
}
// Getters / setters
@ -120,7 +125,7 @@ public class Network extends Thread {
}
}
public List<PeerData> getSelfPeers() {
public List<PeerAddress> getSelfPeers() {
synchronized (this.selfPeers) {
return new ArrayList<>(this.selfPeers);
}
@ -130,7 +135,7 @@ public class Network extends Thread {
LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer));
synchronized (this.selfPeers) {
this.selfPeers.add(peer.getPeerData());
this.selfPeers.add(peer.getPeerData().getAddress());
}
}
@ -211,10 +216,14 @@ public class Network extends Thread {
}
private void createConnection() throws InterruptedException, DataException {
/*
synchronized (this.connectedPeers) {
if (connectedPeers.size() >= minPeers)
return;
}
*/
if (this.getOutboundHandshakeCompletedPeers().size() >= minPeers)
return;
Peer newPeer;
@ -227,16 +236,20 @@ public class Network extends Thread {
peers.removeIf(peerData -> peerData.getLastAttempted() != null && peerData.getLastAttempted() > lastAttemptedThreshold);
// Don't consider peers that we know loop back to ourself
Predicate<PeerData> hasSamePeerSocketAddress = peerData -> this.selfPeers.stream()
.anyMatch(selfPeerData -> selfPeerData.getSocketAddress().equals(peerData.getSocketAddress()));
Predicate<PeerData> isSelfPeer = peerData -> {
PeerAddress peerAddress = peerData.getAddress();
return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress));
};
synchronized (this.selfPeers) {
peers.removeIf(hasSamePeerSocketAddress);
peers.removeIf(isSelfPeer);
}
// Don't consider already connected peers
Predicate<PeerData> isConnectedPeer = peerData -> this.connectedPeers.stream()
.anyMatch(peer -> peer.getPeerData().getSocketAddress().equals(peerData.getSocketAddress()));
Predicate<PeerData> isConnectedPeer = peerData -> {
PeerAddress peerAddress = peerData.getAddress();
return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress));
};
synchronized (this.connectedPeers) {
peers.removeIf(isConnectedPeer);
@ -347,14 +360,15 @@ public class Network extends Thread {
case PEERS:
PeersMessage peersMessage = (PeersMessage) message;
List<InetSocketAddress> peerAddresses = new ArrayList<>();
List<PeerAddress> peerAddresses = new ArrayList<>();
// v1 PEERS message doesn't support port numbers so we have to add default port
for (InetAddress peerAddress : peersMessage.getPeerAddresses())
peerAddresses.add(new InetSocketAddress(peerAddress, Settings.DEFAULT_LISTEN_PORT));
// This is always IPv4 so we don't have to worry about bracketing IPv6.
peerAddresses.add(PeerAddress.fromString(peerAddress.getHostAddress()));
// Also add peer's details
peerAddresses.add(new InetSocketAddress(peer.getRemoteSocketAddress().getHostString(), Settings.DEFAULT_LISTEN_PORT));
peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost()));
mergePeers(peerAddresses);
break;
@ -362,13 +376,15 @@ public class Network extends Thread {
case PEERS_V2:
PeersV2Message peersV2Message = (PeersV2Message) message;
List<InetSocketAddress> peerV2Addresses = peersV2Message.getPeerAddresses();
List<PeerAddress> peerV2Addresses = peersV2Message.getPeerAddresses();
// First entry contains remote peer's listen port but empty address.
// Overwrite address with one obtained from socket.
int peerPort = peerV2Addresses.get(0).getPort();
peerV2Addresses.remove(0);
peerV2Addresses.add(0, InetSocketAddress.createUnresolved(peer.getRemoteSocketAddress().getHostString(), peerPort));
PeerAddress sendingPeerAddress = PeerAddress.fromString(peer.getPeerData().getAddress().getHost() + ":" + peerPort);
LOGGER.trace("PEERS_V2 sending peer's listen address: " + sendingPeerAddress.toString());
peerV2Addresses.add(0, sendingPeerAddress);
mergePeers(peerV2Addresses);
break;
@ -424,15 +440,52 @@ public class Network extends Thread {
long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD;
knownPeers.removeIf(peerData -> peerData.getLastConnected() == null || peerData.getLastConnected() < connectionThreshold);
// Map to socket addresses
List<InetSocketAddress> peerSocketAddresses = knownPeers.stream().map(peerData -> peerData.getSocketAddress()).collect(Collectors.toList());
if (peer.getVersion() >= 2) {
List<PeerAddress> peerAddresses = new ArrayList<>();
for (PeerData peerData : knownPeers) {
try {
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qora.org
if (!peer.getIsLocal() && !Peer.isAddressLocal(address))
continue;
peerAddresses.add(peerData.getAddress());
} catch (UnknownHostException e) {
// Couldn't resolve hostname to IP address so discard
}
}
if (peer.getVersion() >= 2)
// New format PEERS_V2 message that supports hostnames, IPv6 and ports
return new PeersV2Message(peerSocketAddresses);
else
return new PeersV2Message(peerAddresses);
} else {
// Map to socket addresses
List<InetAddress> peerAddresses = new ArrayList<>();
for (PeerData peerData : knownPeers) {
try {
// We have to resolve to literal IP address to check for IPv4-ness.
// This isn't great if hostnames have both IPv6 and IPv4 DNS entries.
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
// Legacy PEERS message doesn't support IPv6
if (address instanceof Inet6Address)
continue;
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qora.org
if (!peer.getIsLocal() && !Peer.isAddressLocal(address))
continue;
peerAddresses.add(address);
} catch (UnknownHostException e) {
// Couldn't resolve hostname to IP address so discard
}
}
// Legacy PEERS message that only sends IPv4 addresses
return new PeersMessage(peerSocketAddresses);
return new PeersMessage(peerAddresses);
}
} catch (DataException e) {
LOGGER.error("Repository issue while building PEERS message", e);
return new PeersMessage(Collections.emptyList());
@ -464,46 +517,61 @@ public class Network extends Thread {
return peers;
}
private void mergePeers(List<InetSocketAddress> peerAddresses) {
mergePeersLock.lock();
try {
try (final Repository repository = RepositoryManager.getRepository()) {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
for (PeerData peerData : knownPeers)
LOGGER.trace(String.format("Known peer %s", peerData.getSocketAddress()));
// Resolve known peer hostnames
Function<PeerData, InetSocketAddress> peerDataToSocketAddress = peerData -> new InetSocketAddress(peerData.getSocketAddress().getHostString(),
peerData.getSocketAddress().getPort());
List<InetSocketAddress> knownPeerAddresses = knownPeers.stream().map(peerDataToSocketAddress).collect(Collectors.toList());
for (InetSocketAddress address : knownPeerAddresses)
LOGGER.trace(String.format("Resolved known peer %s", address));
// Filter out duplicates
// We have to use our own Peer.addressEquals as InetSocketAddress.equals isn't quite right for us
Predicate<InetSocketAddress> addressKnown = peerAddress -> knownPeerAddresses.stream()
.anyMatch(knownAddress -> Peer.addressEquals(knownAddress, peerAddress));
peerAddresses.removeIf(addressKnown);
// Save the rest into database
for (InetSocketAddress peerAddress : peerAddresses) {
PeerData peerData = new PeerData(peerAddress);
LOGGER.trace(String.format("Adding new peer %s to repository", peerAddress));
repository.getNetworkRepository().save(peerData);
}
repository.saveChanges();
} catch (DataException e) {
LOGGER.error("Repository issue while merging peers list from remote node", e);
}
} finally {
mergePeersLock.unlock();
/** Returns Peer with outbound connection and passed ID, or null if none found. */
public Peer getOutboundPeerWithId(byte[] peerId) {
synchronized (this.connectedPeers) {
return this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null);
}
}
private void mergePeers(List<PeerAddress> peerAddresses) {
// This can block (due to lock) so fire off in separate thread
class PeersMerger implements Runnable {
private List<PeerAddress> peerAddresses;
public PeersMerger(List<PeerAddress> peerAddresses) {
this.peerAddresses = peerAddresses;
}
@Override
public void run() {
// Serialize using lock to prevent repository deadlocks
mergePeersLock.lock();
try {
try (final Repository repository = RepositoryManager.getRepository()) {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
for (PeerData peerData : knownPeers)
LOGGER.trace(String.format("Known peer %s", peerData.getAddress()));
// Filter out duplicates
Predicate<PeerAddress> isKnownAddress = peerAddress -> {
return knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress));
};
peerAddresses.removeIf(isKnownAddress);
// Save the rest into database
for (PeerAddress peerAddress : peerAddresses) {
PeerData peerData = new PeerData(peerAddress);
LOGGER.trace(String.format("Adding new peer %s to repository", peerAddress));
repository.getNetworkRepository().save(peerData);
}
repository.saveChanges();
} catch (DataException e) {
LOGGER.error("Repository issue while merging peers list from remote node", e);
}
} finally {
mergePeersLock.unlock();
}
}
}
mergePeersExecutor.execute(new PeersMerger(peerAddresses));
}
public void broadcast(Function<Peer, Message> peerMessage) {
class Broadcaster implements Runnable {
private List<Peer> targetPeers;
@ -522,7 +590,11 @@ public class Network extends Thread {
}
}
peerExecutor.execute(new Broadcaster(this.getHandshakeCompletedPeers(), peerMessage));
try {
peerExecutor.execute(new Broadcaster(this.getHandshakeCompletedPeers(), peerMessage));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}
}
public void shutdown() {

View File

@ -3,6 +3,7 @@ package org.qora.network;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@ -23,10 +24,14 @@ import org.qora.controller.Controller;
import org.qora.data.network.PeerData;
import org.qora.network.message.Message;
import org.qora.network.message.Message.MessageType;
import org.qora.settings.Settings;
import org.qora.network.message.PingMessage;
import org.qora.network.message.VersionMessage;
import org.qora.utils.NTP;
import com.google.common.net.HostAndPort;
import com.google.common.net.InetAddresses;
// For managing one peer
public class Peer implements Runnable {
@ -40,7 +45,6 @@ public class Peer implements Runnable {
private final boolean isOutbound;
private Socket socket = null;
private PeerData peerData = null;
private InetSocketAddress remoteSocketAddress = null;
private Long connectionTimestamp = null;
private OutputStream out;
private Handshake handshakeStatus = Handshake.STARTED;
@ -49,20 +53,24 @@ public class Peer implements Runnable {
private Integer version;
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private Long lastPing = null;
private boolean isLocal;
private byte[] peerId;
/** Construct unconnected outbound Peer using socket address in peer data */
public Peer(PeerData peerData) {
this.isOutbound = true;
this.peerData = peerData;
this.remoteSocketAddress = peerData.getSocketAddress();
}
/** Construct Peer using existing, connected socket */
public Peer(Socket socket) {
this.isOutbound = false;
this.socket = socket;
this.remoteSocketAddress = (InetSocketAddress) this.socket.getRemoteSocketAddress();
this.peerData = new PeerData(this.remoteSocketAddress);
this.isLocal = isAddressLocal(((InetSocketAddress) socket.getRemoteSocketAddress()).getAddress());
PeerAddress peerAddress = PeerAddress.fromSocket(socket);
this.peerData = new PeerData(peerAddress);
}
// Getters / setters
@ -83,10 +91,6 @@ public class Peer implements Runnable {
this.handshakeStatus = handshakeStatus;
}
public InetSocketAddress getRemoteSocketAddress() {
return this.remoteSocketAddress;
}
public VersionMessage getVersionMessage() {
return this.versionMessage;
}
@ -122,13 +126,23 @@ public class Peer implements Runnable {
this.lastPing = lastPing;
}
public boolean getIsLocal() {
return this.isLocal;
}
public byte[] getPeerId() {
return this.peerId;
}
public void setPeerId(byte[] peerId) {
this.peerId = peerId;
}
// Easier, and nicer output, than peer.getRemoteSocketAddress()
@Override
public String toString() {
InetSocketAddress socketAddress = this.getRemoteSocketAddress();
return socketAddress.getHostString() + ":" + socketAddress.getPort();
return this.peerData.getAddress().toString();
}
// Processing
@ -145,7 +159,9 @@ public class Peer implements Runnable {
this.socket = new Socket();
try {
InetSocketAddress resolvedSocketAddress = new InetSocketAddress(this.remoteSocketAddress.getHostString(), this.remoteSocketAddress.getPort());
InetSocketAddress resolvedSocketAddress = this.peerData.getAddress().toSocketAddress();
this.isLocal = isAddressLocal(resolvedSocketAddress.getAddress());
this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT);
LOGGER.debug(String.format("Connected to peer %s", this));
@ -196,6 +212,7 @@ public class Peer implements Runnable {
// Fall-through
} finally {
this.disconnect();
Thread.currentThread().setName("disconnected peer");
}
}
@ -311,6 +328,8 @@ public class Peer implements Runnable {
Network.getInstance().onDisconnect(this);
}
// Utility methods
/** Returns true if ports and addresses (or hostnames) match */
public static boolean addressEquals(InetSocketAddress knownAddress, InetSocketAddress peerAddress) {
if (knownAddress.getPort() != peerAddress.getPort())
@ -319,4 +338,17 @@ public class Peer implements Runnable {
return knownAddress.getHostString().equalsIgnoreCase(peerAddress.getHostString());
}
public static InetSocketAddress parsePeerAddress(String peerAddress) throws IllegalArgumentException {
HostAndPort hostAndPort = HostAndPort.fromString(peerAddress).requireBracketsForIPv6();
// HostAndPort doesn't try to validate host so we do extra checking here
InetAddress address = InetAddresses.forString(hostAndPort.getHost());
return new InetSocketAddress(address, hostAndPort.getPortOrDefault(Settings.DEFAULT_LISTEN_PORT));
}
public static boolean isAddressLocal(InetAddress address) {
return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress();
}
}

View File

@ -0,0 +1,135 @@
package org.qora.network;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import org.qora.settings.Settings;
import com.google.common.net.HostAndPort;
import com.google.common.net.InetAddresses;
/**
* Convenience class for encapsulating/parsing/rendering/converting peer addresses
* including late-stage resolving before actual use by a socket.
*/
public class PeerAddress {
// Properties
private String host;
private int port;
private PeerAddress(String host, int port) {
this.host = host;
this.port = port;
}
// Constructors
// For JAXB
protected PeerAddress() {
}
/** Constructs new PeerAddress using remote address from passed connected socket. */
public static PeerAddress fromSocket(Socket socket) {
InetSocketAddress socketAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
InetAddress address = socketAddress.getAddress();
String host = InetAddresses.toAddrString(address);
// Make sure we encapsulate IPv6 addresses in brackets
if (address instanceof Inet6Address)
host = "[" + host + "]";
return new PeerAddress(host, socketAddress.getPort());
}
/**
* Constructs new PeerAddress using hostname or literal IP address and optional port.<br>
* Literal IPv6 addresses must be enclosed within square brackets.
* <p>
* Examples:
* <ul>
* <li>peer.example.com
* <li>peer.example.com:9084
* <li>192.0.2.1
* <li>192.0.2.1:9084
* <li>[2001:db8::1]
* <li>[2001:db8::1]:9084
* </ul>
* <p>
* Not allowed:
* <ul>
* <li>2001:db8::1
* <li>2001:db8::1:9084
* </ul>
*/
public static PeerAddress fromString(String addressString) throws IllegalArgumentException {
boolean isBracketed = addressString.startsWith("[");
// Attempt to parse string into host and port
HostAndPort hostAndPort = HostAndPort.fromString(addressString).withDefaultPort(Settings.DEFAULT_LISTEN_PORT).requireBracketsForIPv6();
String host = hostAndPort.getHost();
if (host.isEmpty())
throw new IllegalArgumentException("Empty host part");
// Validate IP literals by attempting to convert to InetAddress, without DNS lookups
if (host.contains(":") || host.matches("[0-9.]+"))
InetAddresses.forString(host);
// If we've reached this far then we have a valid address
// Make sure we encapsulate IPv6 addresses in brackets
if (isBracketed)
host = "[" + host + "]";
return new PeerAddress(host, hostAndPort.getPort());
}
// Getters
/** Returns hostname or literal IP address, bracketed if IPv6 */
public String getHost() {
return this.host;
}
public int getPort() {
return this.port;
}
// Conversions
/** Returns InetSocketAddress for use with Socket.connect(), or throws UnknownHostException if address could not be resolved by DNS lookup. */
public InetSocketAddress toSocketAddress() throws UnknownHostException {
// Attempt to construct new InetSocketAddress with DNS lookups.
// There's no control here over whether IPv6 or IPv4 will be used.
InetSocketAddress socketAddress = new InetSocketAddress(this.host, this.port);
// If we couldn't resolve then return null
if (socketAddress.isUnresolved())
throw new UnknownHostException();
return socketAddress;
}
@Override
public String toString() {
return this.host + ":" + this.port;
}
// Utilities
/** Returns true if other PeerAddress has same port and same case-insensitive host part, without DNS lookups */
public boolean equals(PeerAddress other) {
// Ports must match
if (this.port != other.port)
return false;
// Compare host parts but without DNS lookups
return this.host.equalsIgnoreCase(other.host);
}
}

View File

@ -35,7 +35,7 @@ public class Proof extends Thread {
@Override
public void run() {
setName("Proof for peer " + this.peer.getRemoteSocketAddress());
setName("Proof for peer " + this.peer);
// Do proof-of-work calculation to gain acceptance with remote end

View File

@ -3,8 +3,8 @@ package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -19,26 +19,13 @@ public class PeersMessage extends Message {
private List<InetAddress> peerAddresses;
public PeersMessage(List<InetSocketAddress> peerSocketAddresses) {
public PeersMessage(List<InetAddress> peerAddresses) {
super(MessageType.PEERS);
// We have to forcibly resolve into IP addresses as we can't send hostnames
this.peerAddresses = new ArrayList<>();
this.peerAddresses = new ArrayList<>(peerAddresses);
for (InetSocketAddress peerSocketAddress : peerSocketAddresses) {
try {
InetAddress resolvedAddress = InetAddress.getByName(peerSocketAddress.getHostString());
// Filter out unsupported address types
if (resolvedAddress.getAddress().length != ADDRESS_LENGTH)
continue;
this.peerAddresses.add(resolvedAddress);
} catch (UnknownHostException e) {
// Couldn't resolve
continue;
}
}
// Legacy PEERS message doesn't support IPv6
this.peerAddresses.removeIf(address -> address instanceof Inet6Address);
}
private PeersMessage(int id, List<InetAddress> peerAddresses) {

View File

@ -3,78 +3,56 @@ package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.qora.network.PeerAddress;
import org.qora.settings.Settings;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints;
// NOTE: this message supports hostnames, IPv6, port numbers and IPv4 addresses (in IPv6 form)
// NOTE: this message supports hostnames, literal IP addresses (IPv4 and IPv6) with port numbers
public class PeersV2Message extends Message {
private static final byte[] IPV6_V4_PREFIX = new byte[] {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff
};
private List<PeerAddress> peerAddresses;
private List<InetSocketAddress> peerSocketAddresses;
public PeersV2Message(List<InetSocketAddress> peerSocketAddresses) {
this(-1, peerSocketAddresses);
public PeersV2Message(List<PeerAddress> peerAddresses) {
this(-1, peerAddresses);
}
private PeersV2Message(int id, List<InetSocketAddress> peerSocketAddresses) {
private PeersV2Message(int id, List<PeerAddress> peerAddresses) {
super(id, MessageType.PEERS_V2);
this.peerSocketAddresses = peerSocketAddresses;
this.peerAddresses = peerAddresses;
}
public List<InetSocketAddress> getPeerAddresses() {
return this.peerSocketAddresses;
public List<PeerAddress> getPeerAddresses() {
return this.peerAddresses;
}
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws UnsupportedEncodingException {
// Read entry count
int count = byteBuffer.getInt();
List<InetSocketAddress> peerSocketAddresses = new ArrayList<>();
byte[] ipAddressBytes = new byte[16];
int port;
List<PeerAddress> peerAddresses = new ArrayList<>();
for (int i = 0; i < count; ++i) {
byte addressSize = byteBuffer.get();
if (addressSize == 0) {
// Address size of 0 indicates IP address (always in IPv6 form)
byteBuffer.get(ipAddressBytes);
byte[] addressBytes = new byte[addressSize & 0xff];
byteBuffer.get(addressBytes);
String addressString = new String(addressBytes, "UTF-8");
port = byteBuffer.getInt();
try {
InetAddress address = InetAddress.getByAddress(ipAddressBytes);
peerSocketAddresses.add(new InetSocketAddress(address, port));
} catch (UnknownHostException e) {
// Ignore and continue
}
} else {
byte[] hostnameBytes = new byte[addressSize & 0xff];
byteBuffer.get(hostnameBytes);
String hostname = new String(hostnameBytes, "UTF-8");
port = byteBuffer.getInt();
peerSocketAddresses.add(InetSocketAddress.createUnresolved(hostname, port));
try {
PeerAddress peerAddress = PeerAddress.fromString(addressString);
peerAddresses.add(peerAddress);
} catch (IllegalArgumentException e) {
// Not valid - ignore
}
}
return new PeersV2Message(id, peerSocketAddresses);
return new PeersV2Message(id, peerAddresses);
}
@Override
@ -82,50 +60,28 @@ public class PeersV2Message extends Message {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
List<byte[]> addresses = new ArrayList<>();
// First entry represents sending node but contains only port number with empty address.
List<InetSocketAddress> socketAddresses = new ArrayList<>(this.peerSocketAddresses);
socketAddresses.add(0, new InetSocketAddress(Settings.getInstance().getListenPort()));
addresses.add(new String("0.0.0.0:" + Settings.getInstance().getListenPort()).getBytes("UTF-8"));
// Number of entries we are sending.
int count = socketAddresses.size();
for (PeerAddress peerAddress : this.peerAddresses)
addresses.add(peerAddress.toString().getBytes("UTF-8"));
for (InetSocketAddress socketAddress : socketAddresses) {
// Hostname preferred, failing that IP address
if (socketAddress.isUnresolved()) {
String hostname = socketAddress.getHostString();
// We can't send addresses that are longer than 255 bytes as length itself is encoded in one byte.
addresses.removeIf(addressString -> addressString.length > 255);
byte[] hostnameBytes = hostname.getBytes("UTF-8");
// Serialize
// We don't support hostnames that are longer than 256 bytes
if (hostnameBytes.length > 256) {
--count;
continue;
}
// Number of entries
bytes.write(Ints.toByteArray(addresses.size()));
bytes.write(hostnameBytes.length);
bytes.write(hostnameBytes);
} else {
// IP address
byte[] ipAddressBytes = socketAddress.getAddress().getAddress();
// IPv4? Convert to IPv6 form
if (ipAddressBytes.length == 4)
ipAddressBytes = Bytes.concat(IPV6_V4_PREFIX, ipAddressBytes);
// Write zero length to indicate IP address follows
bytes.write(0);
bytes.write(ipAddressBytes);
}
// Port
bytes.write(Ints.toByteArray(socketAddress.getPort()));
for (byte[] address : addresses) {
bytes.write(address.length);
bytes.write(address);
}
// Prepend updated entry count
byte[] countBytes = Ints.toByteArray(count);
return Bytes.concat(countBytes, bytes.toByteArray());
return bytes.toByteArray();
} catch (IOException e) {
return null;
}

View File

@ -92,7 +92,7 @@ public class HSQLDBBlockRepository implements BlockRepository {
@Override
public int getBlockchainHeight() throws DataException {
try (ResultSet resultSet = this.repository.checkedExecute("SELECT MAX(height) FROM Blocks")) {
try (ResultSet resultSet = this.repository.checkedExecute("SELECT MAX(height) FROM Blocks LIMIT 1")) {
if (resultSet == null)
return 0;

View File

@ -511,6 +511,21 @@ public class HSQLDBDatabaseUpdates {
stmt.execute("SET DATABASE TRANSACTION CONTROL MVCC"); // Use MVCC over default two-phase locking, a-k-a "LOCKS"
break;
case 32:
// Unified PeerAddress requires peer hostname & port stored as one string
stmt.execute("ALTER TABLE Peers ALTER COLUMN hostname RENAME TO address");
// Make sure literal IPv6 addresses are enclosed in square brackets.
stmt.execute("UPDATE Peers SET address=CONCAT('[', address, ']') WHERE POSITION(':' IN address) != 0");
stmt.execute("UPDATE Peers SET address=CONCAT(address, ':', port)");
// We didn't name the PRIMARY KEY constraint when creating Peers table, so can't easily drop it
// Workaround is to create a new table with new constraint, drop old table, then rename.
stmt.execute("CREATE TABLE PeersTEMP AS (SELECT * FROM Peers) WITH DATA");
stmt.execute("ALTER TABLE PeersTEMP DROP COLUMN port");
stmt.execute("ALTER TABLE PeersTEMP ADD PRIMARY KEY (address)");
stmt.execute("DROP TABLE Peers");
stmt.execute("ALTER TABLE PeersTEMP RENAME TO Peers");
break;
default:
// nothing to do
return false;

View File

@ -1,6 +1,5 @@
package org.qora.repository.hsqldb;
import java.net.InetSocketAddress;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
@ -9,6 +8,7 @@ import java.util.Calendar;
import java.util.List;
import org.qora.data.network.PeerData;
import org.qora.network.PeerAddress;
import org.qora.repository.DataException;
import org.qora.repository.NetworkRepository;
@ -24,36 +24,36 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
public List<PeerData> getAllPeers() throws DataException {
List<PeerData> peers = new ArrayList<>();
try (ResultSet resultSet = this.repository
.checkedExecute("SELECT hostname, port, last_connected, last_attempted, last_height, last_misbehaved FROM Peers")) {
try (ResultSet resultSet = this.repository.checkedExecute("SELECT address, last_connected, last_attempted, last_height, last_misbehaved FROM Peers")) {
if (resultSet == null)
return peers;
// NOTE: do-while because checkedExecute() above has already called rs.next() for us
do {
String hostname = resultSet.getString(1);
int port = resultSet.getInt(2);
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(hostname, port);
String address = resultSet.getString(1);
PeerAddress peerAddress = PeerAddress.fromString(address);
Timestamp lastConnectedTimestamp = resultSet.getTimestamp(3, Calendar.getInstance(HSQLDBRepository.UTC));
Timestamp lastConnectedTimestamp = resultSet.getTimestamp(2, Calendar.getInstance(HSQLDBRepository.UTC));
Long lastConnected = resultSet.wasNull() ? null : lastConnectedTimestamp.getTime();
Timestamp lastAttemptedTimestamp = resultSet.getTimestamp(4, Calendar.getInstance(HSQLDBRepository.UTC));
Timestamp lastAttemptedTimestamp = resultSet.getTimestamp(3, Calendar.getInstance(HSQLDBRepository.UTC));
Long lastAttempted = resultSet.wasNull() ? null : lastAttemptedTimestamp.getTime();
Integer lastHeight = resultSet.getInt(5);
Integer lastHeight = resultSet.getInt(4);
if (resultSet.wasNull())
lastHeight = null;
Timestamp lastMisbehavedTimestamp = resultSet.getTimestamp(6, Calendar.getInstance(HSQLDBRepository.UTC));
Timestamp lastMisbehavedTimestamp = resultSet.getTimestamp(5, Calendar.getInstance(HSQLDBRepository.UTC));
Long lastMisbehaved = resultSet.wasNull() ? null : lastMisbehavedTimestamp.getTime();
peers.add(new PeerData(socketAddress, lastConnected, lastAttempted, lastHeight, lastMisbehaved));
peers.add(new PeerData(peerAddress, lastConnected, lastAttempted, lastHeight, lastMisbehaved));
} while (resultSet.next());
return peers;
} catch (IllegalArgumentException e) {
throw new DataException("Refusing to fetch invalid peer from repository", e);
} catch (SQLException e) {
throw new DataException("Unable to fetch poll votes from repository", e);
throw new DataException("Unable to fetch peers from repository", e);
}
}
@ -65,9 +65,8 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
Timestamp lastAttempted = peerData.getLastAttempted() == null ? null : new Timestamp(peerData.getLastAttempted());
Timestamp lastMisbehaved = peerData.getLastMisbehaved() == null ? null : new Timestamp(peerData.getLastMisbehaved());
saveHelper.bind("hostname", peerData.getSocketAddress().getHostString()).bind("port", peerData.getSocketAddress().getPort())
.bind("last_connected", lastConnected).bind("last_attempted", lastAttempted).bind("last_height", peerData.getLastHeight())
.bind("last_misbehaved", lastMisbehaved);
saveHelper.bind("address", peerData.getAddress().toString()).bind("last_connected", lastConnected).bind("last_attempted", lastAttempted)
.bind("last_height", peerData.getLastHeight()).bind("last_misbehaved", lastMisbehaved);
try {
saveHelper.execute(this.repository);
@ -79,8 +78,7 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
@Override
public int delete(PeerData peerData) throws DataException {
try {
return this.repository.delete("Peers", "hostname = ? AND port = ?", peerData.getSocketAddress().getHostString(),
peerData.getSocketAddress().getPort());
return this.repository.delete("Peers", "address = ?", peerData.getAddress().toString());
} catch (SQLException e) {
throw new DataException("Unable to delete peer from repository", e);
}