Tidying up peer info.

lastHeight/blockSig/blockTimestamp, etc. moved from PeerData/repository
to Peer as it's transient so no need to store in repository.

Repository now keeps track of when/who added peer, e.g. API/INIT/another peer.

API calls DELETE /peers and DELETE /peers/known also disconnect peer
as well as deleting from repository.

Connection timestamp now reported by API call GET /peers

Some repository-updating code removed from Network/Controller as no
longer needed.

Removed obsolete Controller.hasShorterBlockchain predicate.
This commit is contained in:
catbref 2019-06-19 16:55:02 +01:00
parent 8b135eb447
commit fc82bcaf49
11 changed files with 304 additions and 249 deletions

View File

@ -19,6 +19,7 @@ public class ConnectedPeer {
public Direction direction; public Direction direction;
public Handshake handshakeStatus; public Handshake handshakeStatus;
public Long lastPing; public Long lastPing;
public Long connectedWhen;
public String address; public String address;
public String version; public String version;
@ -38,6 +39,7 @@ public class ConnectedPeer {
this.lastPing = peer.getLastPing(); this.lastPing = peer.getLastPing();
PeerData peerData = peer.getPeerData(); PeerData peerData = peer.getPeerData();
this.connectedWhen = peerData.getLastConnected();
this.address = peerData.getAddress().toString(); this.address = peerData.getAddress().toString();
if (peer.getVersionMessage() != null) { if (peer.getVersionMessage() != null) {
@ -45,9 +47,9 @@ public class ConnectedPeer {
this.buildTimestamp = peer.getVersionMessage().getBuildTimestamp(); this.buildTimestamp = peer.getVersionMessage().getBuildTimestamp();
} }
this.lastHeight = peerData.getLastHeight(); this.lastHeight = peer.getLastHeight();
this.lastBlockSignature = peerData.getLastBlockSignature(); this.lastBlockSignature = peer.getLastBlockSignature();
this.lastBlockTimestamp = peerData.getLastBlockTimestamp(); this.lastBlockTimestamp = peer.getLastBlockTimestamp();
} }
} }

View File

@ -31,6 +31,7 @@ import org.qora.network.PeerAddress;
import org.qora.repository.DataException; import org.qora.repository.DataException;
import org.qora.repository.Repository; import org.qora.repository.Repository;
import org.qora.repository.RepositoryManager; import org.qora.repository.RepositoryManager;
import org.qora.utils.NTP;
@Path("/peers") @Path("/peers")
@Tag(name = "Peers") @Tag(name = "Peers")
@ -145,7 +146,7 @@ public class PeersResource {
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
PeerAddress peerAddress = PeerAddress.fromString(address); PeerAddress peerAddress = PeerAddress.fromString(address);
PeerData peerData = new PeerData(peerAddress); PeerData peerData = new PeerData(peerAddress, NTP.getTime(), "API");
repository.getNetworkRepository().save(peerData); repository.getNetworkRepository().save(peerData);
repository.saveChanges(); repository.saveChanges();
@ -193,15 +194,11 @@ public class PeersResource {
public String removePeer(String address) { public String removePeer(String address) {
Security.checkApiCallAllowed(request); Security.checkApiCallAllowed(request);
try (final Repository repository = RepositoryManager.getRepository()) { try {
PeerAddress peerAddress = PeerAddress.fromString(address); PeerAddress peerAddress = PeerAddress.fromString(address);
PeerData peerData = new PeerData(peerAddress); boolean wasKnown = Network.getInstance().forgetPeer(peerAddress);
return wasKnown ? "true" : "false";
int numDeleted = repository.getNetworkRepository().delete(peerData);
repository.saveChanges();
return numDeleted != 0 ? "true" : "false";
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
} catch (ApiException e) { } catch (ApiException e) {
@ -214,7 +211,7 @@ public class PeersResource {
@DELETE @DELETE
@Path("/known") @Path("/known")
@Operation( @Operation(
summary = "Remove any known peers from database", summary = "Remove all known peers from database",
responses = { responses = {
@ApiResponse( @ApiResponse(
description = "true if any peers were removed, false if there were no peers to delete", description = "true if any peers were removed, false if there were no peers to delete",
@ -232,9 +229,8 @@ public class PeersResource {
public String removeKnownPeers(String address) { public String removeKnownPeers(String address) {
Security.checkApiCallAllowed(request); Security.checkApiCallAllowed(request);
try (final Repository repository = RepositoryManager.getRepository()) { try {
int numDeleted = repository.getNetworkRepository().deleteAllPeers(); int numDeleted = Network.getInstance().forgetAllPeers();
repository.saveChanges();
return numDeleted != 0 ? "true" : "false"; return numDeleted != 0 ? "true" : "false";
} catch (ApiException e) { } catch (ApiException e) {

View File

@ -92,7 +92,7 @@ public class BlockGenerator extends Thread {
final long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); final long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp();
// Disregard peers that don't have a recent block // Disregard peers that don't have a recent block
peers.removeIf(peer -> peer.getPeerData().getLastBlockTimestamp() == null || peer.getPeerData().getLastBlockTimestamp() < minLatestBlockTimestamp); peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp);
// If we have any peers with a recent block, but our latest block isn't recent // If we have any peers with a recent block, but our latest block isn't recent
// then we need to synchronize instead of generating. // then we need to synchronize instead of generating.

View File

@ -314,12 +314,12 @@ public class Controller extends Thread {
// Disregard peers that don't have a recent block // Disregard peers that don't have a recent block
final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();
peers.removeIf(peer -> peer.getPeerData().getLastBlockTimestamp() == null || peer.getPeerData().getLastBlockTimestamp() < minLatestBlockTimestamp); peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp);
BlockData latestBlockData = getChainTip(); BlockData latestBlockData = getChainTip();
// Disregard peers that have no block signature or the same block signature as us // Disregard peers that have no block signature or the same block signature as us
peers.removeIf(peer -> peer.getPeerData().getLastBlockSignature() == null || Arrays.equals(latestBlockData.getSignature(), peer.getPeerData().getLastBlockSignature())); peers.removeIf(peer -> peer.getLastBlockSignature() == null || Arrays.equals(latestBlockData.getSignature(), peer.getLastBlockSignature()));
if (!peers.isEmpty()) { if (!peers.isEmpty()) {
// Pick random peer to sync with // Pick random peer to sync with
@ -522,17 +522,7 @@ public class Controller extends Thread {
if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId()))
continue; continue;
PeerData peerData = connectedPeer.getPeerData(); connectedPeer.setLastHeight(heightMessage.getHeight());
peerData.setLastHeight(heightMessage.getHeight());
// Only save to repository if outbound peer
if (connectedPeer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating height of peer %s", connectedPeer), e);
}
} }
// Potentially synchronize // Potentially synchronize
@ -551,28 +541,17 @@ public class Controller extends Thread {
if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId())) if (connectedPeer.getPeerId() == null || !Arrays.equals(connectedPeer.getPeerId(), peer.getPeerId()))
continue; continue;
PeerData peerData = connectedPeer.getPeerData();
// We want to update atomically so use lock // We want to update atomically so use lock
ReentrantLock peerDataLock = connectedPeer.getPeerDataLock(); ReentrantLock peerLock = connectedPeer.getPeerLock();
peerDataLock.lock(); peerLock.lock();
try { try {
peerData.setLastHeight(heightV2Message.getHeight()); peer.setLastHeight(heightV2Message.getHeight());
peerData.setLastBlockSignature(heightV2Message.getSignature()); peer.setLastBlockSignature(heightV2Message.getSignature());
peerData.setLastBlockTimestamp(heightV2Message.getTimestamp()); peer.setLastBlockTimestamp(heightV2Message.getTimestamp());
peerData.setLastBlockGenerator(heightV2Message.getGenerator()); peer.setLastBlockGenerator(heightV2Message.getGenerator());
} finally { } finally {
peerDataLock.unlock(); peerLock.unlock();
} }
// Only save to repository if outbound peer
if (connectedPeer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating info of peer %s", connectedPeer), e);
}
} }
// Potentially synchronize // Potentially synchronize
@ -997,26 +976,6 @@ public class Controller extends Thread {
return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF; return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF;
}; };
/** True if peer has unknown height, lower height or same height and same block signature (unless we don't have their block signature). */
public static Predicate<Peer> hasShorterBlockchain() {
BlockData highestBlockData = getInstance().getChainTip();
int ourHeight = highestBlockData.getHeight();
return peer -> {
PeerData peerData = peer.getPeerData();
Integer peerHeight = peerData.getLastHeight();
if (peerHeight == null || peerHeight < ourHeight)
return true;
if (peerHeight > ourHeight || peerData.getLastBlockSignature() == null)
return false;
// Remove if signatures match
return Arrays.equals(peerData.getLastBlockSignature(), highestBlockData.getSignature());
};
}
/** Returns whether we think our node has up-to-date blockchain based on our info about other peers. */ /** Returns whether we think our node has up-to-date blockchain based on our info about other peers. */
public boolean isUpToDate() { public boolean isUpToDate() {
final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();
@ -1036,7 +995,7 @@ public class Controller extends Thread {
return false; return false;
// Disregard peers that don't have a recent block // Disregard peers that don't have a recent block
peers.removeIf(peer -> peer.getPeerData().getLastBlockTimestamp() == null || peer.getPeerData().getLastBlockTimestamp() < minLatestBlockTimestamp); peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp);
// If we don't have any peers left then can't synchronize, therefore consider ourself not up to date // If we don't have any peers left then can't synchronize, therefore consider ourself not up to date
return !peers.isEmpty(); return !peers.isEmpty();

View File

@ -83,7 +83,18 @@ public class Synchronizer {
final BlockData ourLatestBlockData = this.repository.getBlockRepository().getLastBlock(); final BlockData ourLatestBlockData = this.repository.getBlockRepository().getLastBlock();
final int ourInitialHeight = ourLatestBlockData.getHeight(); final int ourInitialHeight = ourLatestBlockData.getHeight();
int ourHeight = ourInitialHeight; int ourHeight = ourInitialHeight;
int peerHeight = peer.getPeerData().getLastHeight();
int peerHeight;
byte[] peersLastBlockSignature;
ReentrantLock peerLock = peer.getPeerLock();
peerLock.lockInterruptibly();
try {
peerHeight = peer.getLastHeight();
peersLastBlockSignature = peer.getLastBlockSignature();
} finally {
peerLock.unlock();
}
// If peer is at genesis block then peer has no blocks so ignore them for a while // If peer is at genesis block then peer has no blocks so ignore them for a while
if (peerHeight == 1) if (peerHeight == 1)
@ -97,7 +108,6 @@ public class Synchronizer {
return SynchronizationResult.TOO_FAR_BEHIND; return SynchronizationResult.TOO_FAR_BEHIND;
} }
byte[] peersLastBlockSignature = peer.getPeerData().getLastBlockSignature();
byte[] ourLastBlockSignature = ourLatestBlockData.getSignature(); byte[] ourLastBlockSignature = ourLatestBlockData.getSignature();
if (peerHeight == ourHeight && (peersLastBlockSignature == null || !Arrays.equals(peersLastBlockSignature, ourLastBlockSignature))) if (peerHeight == ourHeight && (peersLastBlockSignature == null || !Arrays.equals(peersLastBlockSignature, ourLastBlockSignature)))
LOGGER.debug(String.format("Synchronizing with peer %s at height %d, our height %d, signatures differ", peer, peerHeight, ourHeight)); LOGGER.debug(String.format("Synchronizing with peer %s at height %d, our height %d, signatures differ", peer, peerHeight, ourHeight));

View File

@ -19,13 +19,12 @@ public class PeerData {
@XmlTransient @XmlTransient
@Schema(hidden = true) @Schema(hidden = true)
private PeerAddress peerAddress; private PeerAddress peerAddress;
private Long lastAttempted; private Long lastAttempted;
private Long lastConnected; private Long lastConnected;
private Integer lastHeight;
private byte[] lastBlockSignature;
private Long lastBlockTimestamp;
private byte[] lastBlockGenerator;
private Long lastMisbehaved; private Long lastMisbehaved;
private Long addedWhen;
private String addedBy;
// Constructors // Constructors
@ -33,19 +32,21 @@ public class PeerData {
protected PeerData() { protected PeerData() {
} }
public PeerData(PeerAddress peerAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, byte[] lastBlockSignature, Long lastBlockTimestamp, byte[] lastBlockGenerator, Long lastMisbehaved) { public PeerData(PeerAddress peerAddress, Long lastAttempted, Long lastConnected, Long lastMisbehaved, Long addedWhen, String addedBy) {
this.peerAddress = peerAddress; this.peerAddress = peerAddress;
this.lastAttempted = lastAttempted; this.lastAttempted = lastAttempted;
this.lastConnected = lastConnected; this.lastConnected = lastConnected;
this.lastHeight = lastHeight;
this.lastBlockSignature = lastBlockSignature;
this.lastBlockTimestamp = lastBlockTimestamp;
this.lastBlockGenerator = lastBlockGenerator;
this.lastMisbehaved = lastMisbehaved; this.lastMisbehaved = lastMisbehaved;
this.addedWhen = addedWhen;
this.addedBy = addedBy;
}
public PeerData(PeerAddress peerAddress, Long addedWhen, String addedBy) {
this(peerAddress, null, null, null, addedWhen, addedBy);
} }
public PeerData(PeerAddress peerAddress) { public PeerData(PeerAddress peerAddress) {
this(peerAddress, null, null, null, null, null, null, null); this(peerAddress, null, null, null, null, null);
} }
// Getters / setters // Getters / setters
@ -73,38 +74,6 @@ public class PeerData {
this.lastConnected = lastConnected; this.lastConnected = lastConnected;
} }
public Integer getLastHeight() {
return this.lastHeight;
}
public void setLastHeight(Integer lastHeight) {
this.lastHeight = lastHeight;
}
public byte[] getLastBlockSignature() {
return lastBlockSignature;
}
public void setLastBlockSignature(byte[] lastBlockSignature) {
this.lastBlockSignature = lastBlockSignature;
}
public Long getLastBlockTimestamp() {
return lastBlockTimestamp;
}
public void setLastBlockTimestamp(Long lastBlockTimestamp) {
this.lastBlockTimestamp = lastBlockTimestamp;
}
public byte[] getLastBlockGenerator() {
return lastBlockGenerator;
}
public void setLastBlockGenerator(byte[] lastBlockGenerator) {
this.lastBlockGenerator = lastBlockGenerator;
}
public Long getLastMisbehaved() { public Long getLastMisbehaved() {
return this.lastMisbehaved; return this.lastMisbehaved;
} }
@ -113,6 +82,14 @@ public class PeerData {
this.lastMisbehaved = lastMisbehaved; this.lastMisbehaved = lastMisbehaved;
} }
public Long getAddedWhen() {
return this.addedWhen;
}
public String getAddedBy() {
return this.addedBy;
}
// Pretty peerAddress getter for JAXB // Pretty peerAddress getter for JAXB
@XmlElement(name = "address") @XmlElement(name = "address")
protected String getPrettyAddress() { protected String getPrettyAddress() {

View File

@ -148,10 +148,16 @@ public class Network extends Thread {
return instance; return instance;
} }
public byte[] getMessageMagic() {
return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC;
}
public byte[] getOurPeerId() { public byte[] getOurPeerId() {
return this.ourPeerId; return this.ourPeerId;
} }
// Peer lists
public List<Peer> getConnectedPeers() { public List<Peer> getConnectedPeers() {
synchronized (this.connectedPeers) { synchronized (this.connectedPeers) {
return new ArrayList<>(this.connectedPeers); return new ArrayList<>(this.connectedPeers);
@ -164,16 +170,64 @@ public class Network extends Thread {
} }
} }
public void noteToSelf(Peer peer) { /** Returns list of connected peers that have completed handshaking. */
LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer)); public List<Peer> getHandshakedPeers() {
List<Peer> peers = new ArrayList<>();
synchronized (this.selfPeers) { synchronized (this.connectedPeers) {
this.selfPeers.add(peer.getPeerData().getAddress()); peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
}
return peers;
}
/** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */
public List<Peer> getUniqueHandshakedPeers() {
final List<Peer> peers;
synchronized (this.connectedPeers) {
peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
}
// Returns true if this [inbound] peer has corresponding outbound peer with same ID
Predicate<Peer> hasOutboundWithSameId = peer -> {
// Peer is outbound so return fast
if (peer.isOutbound())
return false;
return peers.stream().anyMatch(otherPeer -> otherPeer.isOutbound() && Arrays.equals(otherPeer.getPeerId(), peer.getPeerId()));
};
// Filter out [inbound] peers that have corresponding outbound peer with the same ID
peers.removeIf(hasOutboundWithSameId);
return peers;
}
/** Returns list of peers we connected to that have completed handshaking. */
public List<Peer> getOutboundHandshakedPeers() {
List<Peer> peers = new ArrayList<>();
synchronized (this.connectedPeers) {
peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED)
.collect(Collectors.toList());
}
return peers;
}
/** Returns Peer with inbound connection and matching ID, or null if none found. */
public Peer getInboundPeerWithId(byte[] peerId) {
synchronized (this.connectedPeers) {
return this.connectedPeers.stream().filter(peer -> !peer.isOutbound() && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null);
} }
} }
public byte[] getMessageMagic() { /** Returns handshake-completed Peer with outbound connection and matching ID, or null if none found. */
return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC; public Peer getOutboundHandshakedPeerWithId(byte[] peerId) {
synchronized (this.connectedPeers) {
return this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null);
}
} }
// Initial setup // Initial setup
@ -182,7 +236,7 @@ public class Network extends Thread {
for (String address : INITIAL_PEERS) { for (String address : INITIAL_PEERS) {
PeerAddress peerAddress = PeerAddress.fromString(address); PeerAddress peerAddress = PeerAddress.fromString(address);
PeerData peerData = new PeerData(peerAddress); PeerData peerData = new PeerData(peerAddress, NTP.getTime(), "INIT");
repository.getNetworkRepository().save(peerData); repository.getNetworkRepository().save(peerData);
} }
@ -288,7 +342,7 @@ public class Network extends Thread {
for (PeerData peerData : peers) { for (PeerData peerData : peers) {
LOGGER.debug(String.format("Deleting old peer %s from repository", peerData.getAddress().toString())); LOGGER.debug(String.format("Deleting old peer %s from repository", peerData.getAddress().toString()));
repository.getNetworkRepository().delete(peerData); repository.getNetworkRepository().delete(peerData.getAddress());
} }
repository.saveChanges(); repository.saveChanges();
@ -398,7 +452,7 @@ public class Network extends Thread {
// as remote port is not likely to be remote peer's listen port // as remote port is not likely to be remote peer's listen port
if (!peer.isOutbound()) if (!peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().delete(peer.getPeerData()); repository.getNetworkRepository().delete(peer.getPeerData().getAddress());
repository.saveChanges(); repository.saveChanges();
} catch (DataException e) { } catch (DataException e) {
LOGGER.warn(String.format("Repository issue while trying to delete inbound peer %s", peer)); LOGGER.warn(String.format("Repository issue while trying to delete inbound peer %s", peer));
@ -499,7 +553,7 @@ public class Network extends Thread {
// Also add peer's details // Also add peer's details
peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost())); peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost()));
mergePeers(peerAddresses); mergePeers(peer.toString(), peerAddresses);
break; break;
case PEERS_V2: case PEERS_V2:
@ -518,7 +572,7 @@ public class Network extends Thread {
peerV2Addresses.add(0, sendingPeerAddress); peerV2Addresses.add(0, sendingPeerAddress);
} }
mergePeers(peerV2Addresses); mergePeers(peer.toString(), peerV2Addresses);
break; break;
case GET_PEERS: case GET_PEERS:
@ -587,6 +641,8 @@ public class Network extends Thread {
Controller.getInstance().onPeerHandshakeCompleted(peer); Controller.getInstance().onPeerHandshakeCompleted(peer);
} }
// Message-building calls
/** Returns PEERS message made from peers we've connected to recently, and this node's details */ /** Returns PEERS message made from peers we've connected to recently, and this node's details */
public Message buildPeersMessage(Peer peer) { public Message buildPeersMessage(Peer peer) {
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
@ -691,117 +747,115 @@ public class Network extends Thread {
return new GetUnconfirmedTransactionsMessage(); return new GetUnconfirmedTransactionsMessage();
} }
// Peer-management calls
public void noteToSelf(Peer peer) {
LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer));
synchronized (this.selfPeers) {
this.selfPeers.add(peer.getPeerData().getAddress());
}
}
public boolean forgetPeer(PeerAddress peerAddress) throws DataException {
try (final Repository repository = RepositoryManager.getRepository()) {
int numDeleted = repository.getNetworkRepository().delete(peerAddress);
repository.saveChanges();
disconnectPeer(peerAddress);
return numDeleted != 0;
}
}
public int forgetAllPeers() throws DataException {
try (final Repository repository = RepositoryManager.getRepository()) {
int numDeleted = repository.getNetworkRepository().deleteAllPeers();
repository.saveChanges();
for (Peer peer : this.getConnectedPeers())
peer.disconnect("to be forgotten");
return numDeleted;
}
}
private void disconnectPeer(PeerAddress peerAddress) {
// Disconnect peer
try {
InetSocketAddress knownAddress = peerAddress.toSocketAddress();
List<Peer> peers = this.getConnectedPeers();
peers.removeIf(peer -> !Peer.addressEquals(knownAddress, peer.getResolvedAddress()));
for (Peer peer : peers)
peer.disconnect("to be forgotten");
} catch (UnknownHostException e) {
// Unknown host isn't going to match any of our connected peers so ignore
}
}
// Network-wide calls // Network-wide calls
/** Returns list of connected peers that have completed handshaking. */ private void mergePeers(String addedBy, List<PeerAddress> peerAddresses) {
public List<Peer> getHandshakedPeers() {
List<Peer> peers = new ArrayList<>();
synchronized (this.connectedPeers) {
peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
}
return peers;
}
/** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */
public List<Peer> getUniqueHandshakedPeers() {
final List<Peer> peers;
synchronized (this.connectedPeers) {
peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
}
// Returns true if this [inbound] peer has corresponding outbound peer with same ID
Predicate<Peer> hasOutboundWithSameId = peer -> {
// Peer is outbound so return fast
if (peer.isOutbound())
return false;
return peers.stream().anyMatch(otherPeer -> otherPeer.isOutbound() && Arrays.equals(otherPeer.getPeerId(), peer.getPeerId()));
};
// Filter out [inbound] peers that have corresponding outbound peer with the same ID
peers.removeIf(hasOutboundWithSameId);
return peers;
}
/** Returns list of peers we connected to that have completed handshaking. */
public List<Peer> getOutboundHandshakedPeers() {
List<Peer> peers = new ArrayList<>();
synchronized (this.connectedPeers) {
peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED)
.collect(Collectors.toList());
}
return peers;
}
/** Returns Peer with inbound connection and matching ID, or null if none found. */
public Peer getInboundPeerWithId(byte[] peerId) {
synchronized (this.connectedPeers) {
return this.connectedPeers.stream().filter(peer -> !peer.isOutbound() && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null);
}
}
/** Returns handshake-completed Peer with outbound connection and matching ID, or null if none found. */
public Peer getOutboundHandshakedPeerWithId(byte[] peerId) {
synchronized (this.connectedPeers) {
return this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED && peer.getPeerId() != null && Arrays.equals(peer.getPeerId(), peerId)).findAny().orElse(null);
}
}
private void mergePeers(List<PeerAddress> peerAddresses) {
// This can block (due to lock) so fire off in separate thread // This can block (due to lock) so fire off in separate thread
class PeersMerger implements Runnable { class PeersMerger implements Runnable {
private String addedBy;
private List<PeerAddress> peerAddresses; private List<PeerAddress> peerAddresses;
public PeersMerger(List<PeerAddress> peerAddresses) { public PeersMerger(String addedBy, List<PeerAddress> peerAddresses) {
this.addedBy = addedBy;
this.peerAddresses = peerAddresses; this.peerAddresses = peerAddresses;
} }
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("Merging peers"); Thread.currentThread().setName(String.format("Merging peers from %s", this.addedBy));
// Serialize using lock to prevent repository deadlocks // Serialize using lock to prevent repository deadlocks
mergePeersLock.lock();
try { try {
try (final Repository repository = RepositoryManager.getRepository()) { mergePeersLock.lockInterruptibly();
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
for (PeerData peerData : knownPeers) final long addedWhen = NTP.getTime();
LOGGER.trace(String.format("Known peer %s", peerData.getAddress()));
// Filter out duplicates try {
Predicate<PeerAddress> isKnownAddress = peerAddress -> { try (final Repository repository = RepositoryManager.getRepository()) {
return knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress)); List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
};
peerAddresses.removeIf(isKnownAddress); for (PeerData peerData : knownPeers)
LOGGER.trace(String.format("Known peer %s", peerData.getAddress()));
// Save the rest into database // Filter out duplicates
for (PeerAddress peerAddress : peerAddresses) { Predicate<PeerAddress> isKnownAddress = peerAddress -> {
PeerData peerData = new PeerData(peerAddress); return knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress));
LOGGER.info(String.format("Adding new peer %s to repository", peerAddress)); };
repository.getNetworkRepository().save(peerData);
peerAddresses.removeIf(isKnownAddress);
// Save the rest into database
for (PeerAddress peerAddress : peerAddresses) {
PeerData peerData = new PeerData(peerAddress, addedWhen, addedBy);
LOGGER.info(String.format("Adding new peer %s to repository", peerAddress));
repository.getNetworkRepository().save(peerData);
}
repository.saveChanges();
} catch (DataException e) {
LOGGER.error("Repository issue while merging peers list from remote node", e);
} }
} finally {
repository.saveChanges(); mergePeersLock.unlock();
} catch (DataException e) {
LOGGER.error("Repository issue while merging peers list from remote node", e);
} }
} finally { } catch (InterruptedException e1) {
mergePeersLock.unlock(); // We're exiting anyway...
} }
Thread.currentThread().setName("Merging peers (dormant)");
} }
} }
try { try {
mergePeersExecutor.execute(new PeersMerger(peerAddresses)); mergePeersExecutor.execute(new PeersMerger(addedBy, peerAddresses));
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore // Can't execute - probably because we're shutting down, so ignore
} }
@ -849,6 +903,8 @@ public class Network extends Thread {
} }
} }
// Shutdown
public void shutdown() { public void shutdown() {
this.isStopping = true; this.isStopping = true;
@ -891,6 +947,6 @@ public class Network extends Thread {
for (Peer peer : this.connectedPeers) for (Peer peer : this.connectedPeers)
peer.shutdown(); peer.shutdown();
} }
} }
} }

View File

@ -47,30 +47,54 @@ public class Peer extends Thread {
private static final int SOCKET_TIMEOUT = 10000; // ms private static final int SOCKET_TIMEOUT = 10000; // ms
private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10; private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10;
private final boolean isOutbound;
private volatile boolean isStopping = false; private volatile boolean isStopping = false;
private Socket socket = null; private Socket socket = null;
private PeerData peerData = null; private InetSocketAddress resolvedAddress = null;
private final ReentrantLock peerDataLock = new ReentrantLock(); /** True if remote address is loopback/link-local/site-local, false otherwise. */
private Long connectionTimestamp = null; private boolean isLocal;
private OutputStream out; private OutputStream out;
private Handshake handshakeStatus = Handshake.STARTED;
private Map<Integer, BlockingQueue<Message>> replyQueues; private Map<Integer, BlockingQueue<Message>> replyQueues;
private BlockingQueue<Message> unsolicitedQueue; private BlockingQueue<Message> unsolicitedQueue;
private ExecutorService messageExecutor; private ExecutorService messageExecutor;
private VersionMessage versionMessage = null;
private Integer version;
private ScheduledExecutorService pingExecutor; private ScheduledExecutorService pingExecutor;
private Long lastPing = null;
private InetSocketAddress resolvedAddress = null; /** True if we created connection to peer, false if we accepted incoming connection from peer. */
private boolean isLocal; private final boolean isOutbound;
/** Numeric protocol version, typically 1 or 2. */
private Integer version;
private byte[] peerId; private byte[] peerId;
private Handshake handshakeStatus = Handshake.STARTED;
private byte[] pendingPeerId; private byte[] pendingPeerId;
private byte[] verificationCodeSent; private byte[] verificationCodeSent;
private byte[] verificationCodeExpected; private byte[] verificationCodeExpected;
/** Construct unconnected outbound Peer using socket address in peer data */ private PeerData peerData = null;
private final ReentrantLock peerLock = new ReentrantLock();
/** Timestamp of when socket was accepted, or connected. */
private Long connectionTimestamp = null;
/** Version info as reported by peer. */
private VersionMessage versionMessage = null;
/** Last PING message round-trip time (ms). */
private Long lastPing = null;
/** Latest block height as reported by peer. */
private Integer lastHeight;
/** Latest block signature as reported by peer. */
private byte[] lastBlockSignature;
/** Latest block timestamp as reported by peer. */
private Long lastBlockTimestamp;
/** Latest block generator public key as reported by peer. */
private byte[] lastBlockGenerator;
// Constructors
/** Construct unconnected, outbound Peer using socket address in peer data */
public Peer(PeerData peerData) { public Peer(PeerData peerData) {
this.isOutbound = true; this.isOutbound = true;
this.peerData = peerData; this.peerData = peerData;
@ -91,13 +115,7 @@ public class Peer extends Thread {
// Getters / setters // Getters / setters
public PeerData getPeerData() { public PeerData getPeerData() {
this.peerDataLock.lock(); return this.peerData;
try {
return this.peerData;
} finally {
this.peerDataLock.unlock();
}
} }
public boolean isOutbound() { public boolean isOutbound() {
@ -179,9 +197,41 @@ public class Peer extends Thread {
this.verificationCodeExpected = expected; this.verificationCodeExpected = expected;
} }
/** Returns the lock used for synchronizing access to peer's PeerData. */ public Integer getLastHeight() {
public ReentrantLock getPeerDataLock() { return this.lastHeight;
return this.peerDataLock; }
public void setLastHeight(Integer lastHeight) {
this.lastHeight = lastHeight;
}
public byte[] getLastBlockSignature() {
return lastBlockSignature;
}
public void setLastBlockSignature(byte[] lastBlockSignature) {
this.lastBlockSignature = lastBlockSignature;
}
public Long getLastBlockTimestamp() {
return lastBlockTimestamp;
}
public void setLastBlockTimestamp(Long lastBlockTimestamp) {
this.lastBlockTimestamp = lastBlockTimestamp;
}
public byte[] getLastBlockGenerator() {
return lastBlockGenerator;
}
public void setLastBlockGenerator(byte[] lastBlockGenerator) {
this.lastBlockGenerator = lastBlockGenerator;
}
/** Returns the lock used for synchronizing access to peer info. */
public ReentrantLock getPeerLock() {
return this.peerLock;
} }
// Easier, and nicer output, than peer.getRemoteSocketAddress() // Easier, and nicer output, than peer.getRemoteSocketAddress()
@ -488,6 +538,7 @@ public class Peer extends Thread {
return new InetSocketAddress(address, hostAndPort.getPortOrDefault(Settings.DEFAULT_LISTEN_PORT)); return new InetSocketAddress(address, hostAndPort.getPortOrDefault(Settings.DEFAULT_LISTEN_PORT));
} }
/** Returns true if address is loopback/link-local/site-local, false otherwise. */
public static boolean isAddressLocal(InetAddress address) { public static boolean isAddressLocal(InetAddress address) {
return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress(); return address.isLoopbackAddress() || address.isLinkLocalAddress() || address.isSiteLocalAddress();
} }

View File

@ -3,6 +3,7 @@ package org.qora.repository;
import java.util.List; import java.util.List;
import org.qora.data.network.PeerData; import org.qora.data.network.PeerData;
import org.qora.network.PeerAddress;
public interface NetworkRepository { public interface NetworkRepository {
@ -10,7 +11,7 @@ public interface NetworkRepository {
public void save(PeerData peerData) throws DataException; public void save(PeerData peerData) throws DataException;
public int delete(PeerData peerData) throws DataException; public int delete(PeerAddress peerAddress) throws DataException;
public int deleteAllPeers() throws DataException; public int deleteAllPeers() throws DataException;

View File

@ -766,6 +766,16 @@ public class HSQLDBDatabaseUpdates {
stmt.execute("ALTER TABLE ArbitraryTransactions ALTER COLUMN data_hash RENAME TO data"); stmt.execute("ALTER TABLE ArbitraryTransactions ALTER COLUMN data_hash RENAME TO data");
break; break;
case 53:
// Change what we store about peers (again)
stmt.execute("ALTER TABLE Peers DROP COLUMN last_block_signature");
stmt.execute("ALTER TABLE Peers DROP COLUMN last_block_timestamp");
stmt.execute("ALTER TABLE Peers DROP COLUMN last_block_generator");
stmt.execute("ALTER TABLE Peers DROP COLUMN last_height");
stmt.execute("ALTER TABLE Peers ADD COLUMN added_when TIMESTAMP WITH TIME ZONE");
stmt.execute("ALTER TABLE Peers ADD COLUMN added_by VARCHAR(255)");
break;
default: default:
// nothing to do // nothing to do
return false; return false;

View File

@ -20,7 +20,7 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
@Override @Override
public List<PeerData> getAllPeers() throws DataException { public List<PeerData> getAllPeers() throws DataException {
String sql = "SELECT address, last_connected, last_attempted, last_height, last_block_signature, last_block_timestamp, last_block_generator, last_misbehaved FROM Peers"; String sql = "SELECT address, last_connected, last_attempted, last_misbehaved, added_when, added_by FROM Peers";
List<PeerData> peers = new ArrayList<>(); List<PeerData> peers = new ArrayList<>();
@ -37,19 +37,13 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
Long lastAttempted = HSQLDBRepository.getZonedTimestampMilli(resultSet, 3); Long lastAttempted = HSQLDBRepository.getZonedTimestampMilli(resultSet, 3);
Integer lastHeight = resultSet.getInt(4); Long lastMisbehaved = HSQLDBRepository.getZonedTimestampMilli(resultSet, 4);
if (lastHeight == 0 && resultSet.wasNull())
lastHeight = null;
byte[] lastBlockSignature = resultSet.getBytes(5); Long addedWhen = HSQLDBRepository.getZonedTimestampMilli(resultSet, 5);
Long lastBlockTimestamp = HSQLDBRepository.getZonedTimestampMilli(resultSet, 6); String addedBy = resultSet.getString(6);
byte[] lastBlockGenerator = resultSet.getBytes(7); peers.add(new PeerData(peerAddress, lastAttempted, lastConnected, lastMisbehaved, addedWhen, addedBy));
Long lastMisbehaved = HSQLDBRepository.getZonedTimestampMilli(resultSet, 8);
peers.add(new PeerData(peerAddress, lastAttempted, lastConnected, lastHeight, lastBlockSignature, lastBlockTimestamp, lastBlockGenerator, lastMisbehaved));
} while (resultSet.next()); } while (resultSet.next());
return peers; return peers;
@ -66,10 +60,9 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
saveHelper.bind("address", peerData.getAddress().toString()).bind("last_connected", HSQLDBRepository.toOffsetDateTime(peerData.getLastConnected())) saveHelper.bind("address", peerData.getAddress().toString()).bind("last_connected", HSQLDBRepository.toOffsetDateTime(peerData.getLastConnected()))
.bind("last_attempted", HSQLDBRepository.toOffsetDateTime(peerData.getLastAttempted())) .bind("last_attempted", HSQLDBRepository.toOffsetDateTime(peerData.getLastAttempted()))
.bind("last_height", peerData.getLastHeight()).bind("last_block_signature", peerData.getLastBlockSignature()) .bind("last_misbehaved", HSQLDBRepository.toOffsetDateTime(peerData.getLastMisbehaved()))
.bind("last_block_timestamp", HSQLDBRepository.toOffsetDateTime(peerData.getLastBlockTimestamp())) .bind("added_when", HSQLDBRepository.toOffsetDateTime(peerData.getAddedWhen()))
.bind("last_block_generator", peerData.getLastBlockGenerator()) .bind("added_by", peerData.getAddedBy());
.bind("last_misbehaved", HSQLDBRepository.toOffsetDateTime(peerData.getLastMisbehaved()));
try { try {
saveHelper.execute(this.repository); saveHelper.execute(this.repository);
@ -79,9 +72,9 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
} }
@Override @Override
public int delete(PeerData peerData) throws DataException { public int delete(PeerAddress peerAddress) throws DataException {
try { try {
return this.repository.delete("Peers", "address = ?", peerData.getAddress().toString()); return this.repository.delete("Peers", "address = ?", peerAddress.toString());
} catch (SQLException e) { } catch (SQLException e) {
throw new DataException("Unable to delete peer from repository", e); throw new DataException("Unable to delete peer from repository", e);
} }