forked from Qortal/qortal
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bd0bc79ea6 | ||
|
|
0cc9cd728e | ||
|
|
e5cf76f3e0 | ||
|
|
44e8b3e6e7 | ||
|
|
1bca152d9c | ||
|
|
136188339d | ||
|
|
48de33fe24 | ||
|
|
df4798e2a1 | ||
|
|
edb842f0d1 | ||
|
|
b563fe567d | ||
|
|
b3dd0d89df |
2
pom.xml
2
pom.xml
@@ -3,7 +3,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>org.qortal</groupId>
|
||||
<artifactId>qortal</artifactId>
|
||||
<version>1.0.7</version>
|
||||
<version>1.0.8</version>
|
||||
<packaging>jar</packaging>
|
||||
<properties>
|
||||
<bitcoin.version>0.15.4</bitcoin.version>
|
||||
|
||||
@@ -77,6 +77,12 @@ public class Account {
|
||||
throw new DataException(message);
|
||||
}
|
||||
|
||||
// Delete account balance record instead of setting balance to zero
|
||||
if (balance.signum() == 0) {
|
||||
this.repository.getAccountRepository().delete(this.address, assetId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Can't have a balance without an account - make sure it exists!
|
||||
this.repository.getAccountRepository().ensureAccount(this.buildAccountData());
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ public enum ApiError {
|
||||
REPOSITORY_ISSUE(5, 500),
|
||||
NON_PRODUCTION(6, 403),
|
||||
BLOCKCHAIN_NEEDS_SYNC(7, 503),
|
||||
NO_TIME_SYNC(8, 503),
|
||||
|
||||
// VALIDATION
|
||||
INVALID_SIGNATURE(101, 400),
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -29,9 +30,8 @@ import org.qortal.data.network.PeerData;
|
||||
import org.qortal.network.Network;
|
||||
import org.qortal.network.PeerAddress;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
import org.qortal.repository.RepositoryManager;
|
||||
import org.qortal.utils.ExecuteProduceConsume;
|
||||
import org.qortal.utils.NTP;
|
||||
|
||||
@Path("/peers")
|
||||
@Tag(name = "Peers")
|
||||
@@ -81,11 +81,7 @@ public class PeersResource {
|
||||
ApiError.REPOSITORY_ISSUE
|
||||
})
|
||||
public List<PeerData> getKnownPeers() {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
return repository.getNetworkRepository().getAllPeers();
|
||||
} catch (DataException e) {
|
||||
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e);
|
||||
}
|
||||
return Network.getInstance().getAllKnownPeers();
|
||||
}
|
||||
|
||||
@GET
|
||||
@@ -166,12 +162,14 @@ public class PeersResource {
|
||||
public String addPeer(String address) {
|
||||
Security.checkApiCallAllowed(request);
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
final Long addedWhen = NTP.getTime();
|
||||
if (addedWhen == null)
|
||||
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.NO_TIME_SYNC);
|
||||
|
||||
try {
|
||||
PeerAddress peerAddress = PeerAddress.fromString(address);
|
||||
|
||||
PeerData peerData = new PeerData(peerAddress, System.currentTimeMillis(), "API");
|
||||
repository.getNetworkRepository().save(peerData);
|
||||
repository.saveChanges();
|
||||
Network.getInstance().mergePeers("API", addedWhen, Arrays.asList(peerAddress));
|
||||
|
||||
return "true";
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
||||
@@ -304,8 +304,10 @@ public class Block {
|
||||
|
||||
// Fetch our list of online accounts
|
||||
List<OnlineAccountData> onlineAccounts = Controller.getInstance().getOnlineAccounts();
|
||||
if (onlineAccounts.isEmpty())
|
||||
throw new IllegalStateException("No online accounts - not even our own?");
|
||||
if (onlineAccounts.isEmpty()) {
|
||||
LOGGER.error("No online accounts - not even our own?");
|
||||
return null;
|
||||
}
|
||||
|
||||
// Find newest online accounts timestamp
|
||||
long onlineAccountsTimestamp = 0;
|
||||
@@ -321,7 +323,11 @@ public class Block {
|
||||
if (onlineAccountData.getTimestamp() != onlineAccountsTimestamp)
|
||||
continue;
|
||||
|
||||
int accountIndex = repository.getAccountRepository().getRewardShareIndex(onlineAccountData.getPublicKey());
|
||||
Integer accountIndex = repository.getAccountRepository().getRewardShareIndex(onlineAccountData.getPublicKey());
|
||||
if (accountIndex == null)
|
||||
// Online account (reward-share) with current timestamp but reward-share cancelled
|
||||
continue;
|
||||
|
||||
indexedOnlineAccounts.put(accountIndex, onlineAccountData);
|
||||
}
|
||||
List<Integer> accountIndexes = new ArrayList<>(indexedOnlineAccounts.keySet());
|
||||
@@ -350,8 +356,10 @@ public class Block {
|
||||
|
||||
// Qortal: minter is always a reward-share, so find actual minter and get their effective minting level
|
||||
int minterLevel = Account.getRewardShareEffectiveMintingLevel(repository, minter.getPublicKey());
|
||||
if (minterLevel == 0)
|
||||
throw new IllegalStateException("Minter effective level returned zero?");
|
||||
if (minterLevel == 0) {
|
||||
LOGGER.error("Minter effective level returned zero?");
|
||||
return null;
|
||||
}
|
||||
|
||||
long timestamp = calcTimestamp(parentBlockData, minter.getPublicKey(), minterLevel);
|
||||
|
||||
@@ -419,8 +427,10 @@ public class Block {
|
||||
|
||||
// Qortal: minter is always a reward-share, so find actual minter and get their effective minting level
|
||||
int minterLevel = Account.getRewardShareEffectiveMintingLevel(repository, minter.getPublicKey());
|
||||
if (minterLevel == 0)
|
||||
throw new IllegalStateException("Minter effective level returned zero?");
|
||||
if (minterLevel == 0){
|
||||
LOGGER.error("Minter effective level returned zero?");
|
||||
return null;
|
||||
}
|
||||
|
||||
long timestamp = calcTimestamp(parentBlockData, minter.getPublicKey(), minterLevel);
|
||||
|
||||
|
||||
@@ -225,6 +225,19 @@ public class BlockChain {
|
||||
Throwable linkedException = e.getLinkedException();
|
||||
if (linkedException instanceof XMLMarshalException) {
|
||||
String message = ((XMLMarshalException) linkedException).getInternalException().getLocalizedMessage();
|
||||
|
||||
if (message == null && linkedException.getCause() != null && linkedException.getCause().getCause() != null )
|
||||
message = linkedException.getCause().getCause().getLocalizedMessage();
|
||||
|
||||
if (message == null && linkedException.getCause() != null)
|
||||
message = linkedException.getCause().getLocalizedMessage();
|
||||
|
||||
if (message == null)
|
||||
message = linkedException.getLocalizedMessage();
|
||||
|
||||
if (message == null)
|
||||
message = e.getLocalizedMessage();
|
||||
|
||||
LOGGER.error(message);
|
||||
throw new RuntimeException(message, e);
|
||||
}
|
||||
|
||||
@@ -40,6 +40,8 @@ public class BlockMinter extends Thread {
|
||||
|
||||
// Other properties
|
||||
private static final Logger LOGGER = LogManager.getLogger(BlockMinter.class);
|
||||
private static Long lastLogTimestamp;
|
||||
private static Long logTimeout;
|
||||
|
||||
// Constructors
|
||||
|
||||
@@ -151,6 +153,9 @@ public class BlockMinter extends Thread {
|
||||
if (previousBlock == null || !Arrays.equals(previousBlock.getSignature(), lastBlockData.getSignature())) {
|
||||
previousBlock = new Block(repository, lastBlockData);
|
||||
newBlocks.clear();
|
||||
|
||||
// Reduce log timeout
|
||||
logTimeout = 10 * 1000L;
|
||||
}
|
||||
|
||||
// Discard accounts we have already built blocks with
|
||||
@@ -163,11 +168,23 @@ public class BlockMinter extends Thread {
|
||||
// First block does the AT heavy-lifting
|
||||
if (newBlocks.isEmpty()) {
|
||||
Block newBlock = Block.mint(repository, previousBlock.getBlockData(), mintingAccount);
|
||||
if (newBlock == null) {
|
||||
// For some reason we can't mint right now
|
||||
moderatedLog(() -> LOGGER.error("Couldn't build a to-be-minted block"));
|
||||
continue;
|
||||
}
|
||||
|
||||
newBlocks.add(newBlock);
|
||||
} else {
|
||||
// The blocks for other minters require less effort...
|
||||
Block newBlock = newBlocks.get(0);
|
||||
newBlocks.add(newBlock.remint(mintingAccount));
|
||||
Block newBlock = newBlocks.get(0).remint(mintingAccount);
|
||||
if (newBlock == null) {
|
||||
// For some reason we can't mint right now
|
||||
moderatedLog(() -> LOGGER.error("Couldn't rebuild a to-be-minted block"));
|
||||
continue;
|
||||
}
|
||||
|
||||
newBlocks.add(newBlock);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,15 +194,22 @@ public class BlockMinter extends Thread {
|
||||
|
||||
// Make sure we're the only thread modifying the blockchain
|
||||
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
|
||||
if (!blockchainLock.tryLock(30, TimeUnit.SECONDS))
|
||||
if (!blockchainLock.tryLock(30, TimeUnit.SECONDS)) {
|
||||
LOGGER.warn("Couldn't acquire blockchain lock even after waiting 30 seconds");
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean newBlockMinted = false;
|
||||
|
||||
try {
|
||||
// Clear repository's "in transaction" state so we don't cause a repository deadlock
|
||||
// Clear repository session state so we have latest view of data
|
||||
repository.discardChanges();
|
||||
|
||||
// Now that we have blockchain lock, do final check that chain hasn't changed
|
||||
BlockData latestBlockData = blockRepository.getLastBlock();
|
||||
if (!Arrays.equals(lastBlockData.getSignature(), latestBlockData.getSignature()))
|
||||
continue;
|
||||
|
||||
List<Block> goodBlocks = new ArrayList<>();
|
||||
for (Block testBlock : newBlocks) {
|
||||
// Is new block's timestamp valid yet?
|
||||
@@ -194,8 +218,12 @@ public class BlockMinter extends Thread {
|
||||
continue;
|
||||
|
||||
// Is new block valid yet? (Before adding unconfirmed transactions)
|
||||
if (testBlock.isValid() != ValidationResult.OK)
|
||||
ValidationResult result = testBlock.isValid();
|
||||
if (result != ValidationResult.OK) {
|
||||
moderatedLog(() -> LOGGER.error(String.format("To-be-minted block invalid '%s' before adding transactions?", result.name())));
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
goodBlocks.add(testBlock);
|
||||
}
|
||||
@@ -346,10 +374,14 @@ public class BlockMinter extends Thread {
|
||||
// Ensure mintingAccount is 'online' so blocks can be minted
|
||||
Controller.getInstance().ensureTestingAccountsOnline(mintingAndOnlineAccounts);
|
||||
|
||||
BlockData previousBlockData = repository.getBlockRepository().getLastBlock();
|
||||
|
||||
PrivateKeyAccount mintingAccount = mintingAndOnlineAccounts[0];
|
||||
|
||||
mintTestingBlockRetainingTimestamps(repository, mintingAccount);
|
||||
}
|
||||
|
||||
public static void mintTestingBlockRetainingTimestamps(Repository repository, PrivateKeyAccount mintingAccount) throws DataException {
|
||||
BlockData previousBlockData = repository.getBlockRepository().getLastBlock();
|
||||
|
||||
Block newBlock = Block.mint(repository, previousBlockData, mintingAccount);
|
||||
|
||||
// Make sure we're the only thread modifying the blockchain
|
||||
@@ -377,4 +409,15 @@ public class BlockMinter extends Thread {
|
||||
}
|
||||
}
|
||||
|
||||
private static void moderatedLog(Runnable logFunction) {
|
||||
// We only log if logging at TRACE or previous log timeout has expired
|
||||
if (!LOGGER.isTraceEnabled() && lastLogTimestamp != null && lastLogTimestamp + logTimeout > System.currentTimeMillis())
|
||||
return;
|
||||
|
||||
lastLogTimestamp = System.currentTimeMillis();
|
||||
logTimeout = 2 * 60 * 1000L; // initial timeout, can be reduced if new block appears
|
||||
|
||||
logFunction.run();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -336,7 +336,7 @@ public class Controller extends Thread {
|
||||
try {
|
||||
Network network = Network.getInstance();
|
||||
network.start();
|
||||
} catch (IOException e) {
|
||||
} catch (IOException | DataException e) {
|
||||
LOGGER.error("Unable to start networking", e);
|
||||
Controller.getInstance().shutdown();
|
||||
Gui.getInstance().fatalError("Networking failure", e);
|
||||
@@ -549,17 +549,7 @@ public class Controller extends Thread {
|
||||
LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name()));
|
||||
|
||||
// Don't use this peer again for a while
|
||||
PeerData peerData = peer.getPeerData();
|
||||
peerData.setLastMisbehaved(NTP.getTime());
|
||||
|
||||
// Only save to repository if outbound peer
|
||||
if (peer.isOutbound())
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
repository.getNetworkRepository().save(peerData);
|
||||
repository.saveChanges();
|
||||
} catch (DataException e) {
|
||||
LOGGER.warn("Repository issue while updating peer synchronization info", e);
|
||||
}
|
||||
Network.getInstance().peerMisbehaved(peer);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ import java.net.InetSocketAddress;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.CancelledKeyException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
@@ -56,6 +55,7 @@ import org.qortal.repository.Repository;
|
||||
import org.qortal.repository.RepositoryManager;
|
||||
import org.qortal.settings.Settings;
|
||||
import org.qortal.utils.ExecuteProduceConsume;
|
||||
import org.qortal.utils.ExecutorDumper;
|
||||
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
|
||||
import org.qortal.utils.NTP;
|
||||
|
||||
@@ -104,6 +104,8 @@ public class Network {
|
||||
|
||||
private final byte[] ourPeerId;
|
||||
private final int maxMessageSize;
|
||||
|
||||
private List<PeerData> allKnownPeers;
|
||||
private List<Peer> connectedPeers;
|
||||
private List<PeerAddress> selfPeers;
|
||||
|
||||
@@ -152,7 +154,7 @@ public class Network {
|
||||
networkEPC = new NetworkProcessor(networkExecutor);
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
public void start() throws IOException, DataException {
|
||||
// Grab P2P port from settings
|
||||
int listenPort = Settings.getInstance().getListenPort();
|
||||
|
||||
@@ -177,6 +179,11 @@ public class Network {
|
||||
throw new IOException("Can't create listen socket", e);
|
||||
}
|
||||
|
||||
// Load all known peers from repository
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
allKnownPeers = repository.getNetworkRepository().getAllPeers();
|
||||
}
|
||||
|
||||
// Start up first networking thread
|
||||
networkEPC.start();
|
||||
}
|
||||
@@ -209,6 +216,12 @@ public class Network {
|
||||
|
||||
// Peer lists
|
||||
|
||||
public List<PeerData> getAllKnownPeers() {
|
||||
synchronized (this.allKnownPeers) {
|
||||
return new ArrayList<>(this.allKnownPeers);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Peer> getConnectedPeers() {
|
||||
synchronized (this.connectedPeers) {
|
||||
return new ArrayList<>(this.connectedPeers);
|
||||
@@ -223,24 +236,16 @@ public class Network {
|
||||
|
||||
/** Returns list of connected peers that have completed handshaking. */
|
||||
public List<Peer> getHandshakedPeers() {
|
||||
List<Peer> peers = new ArrayList<>();
|
||||
|
||||
synchronized (this.connectedPeers) {
|
||||
peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
|
||||
return this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
return peers;
|
||||
}
|
||||
|
||||
/** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */
|
||||
public List<Peer> getUniqueHandshakedPeers() {
|
||||
final List<Peer> peers;
|
||||
List<Peer> peers = getHandshakedPeers();
|
||||
|
||||
synchronized (this.connectedPeers) {
|
||||
peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
// Returns true if this [inbound] peer has corresponding outbound peer with same ID
|
||||
// Returns true if this peer is inbound and has corresponding outbound peer with same ID
|
||||
Predicate<Peer> hasOutboundWithSameId = peer -> {
|
||||
// Peer is outbound so return fast
|
||||
if (peer.isOutbound())
|
||||
@@ -249,7 +254,7 @@ public class Network {
|
||||
return peers.stream().anyMatch(otherPeer -> otherPeer.isOutbound() && Arrays.equals(otherPeer.getPeerId(), peer.getPeerId()));
|
||||
};
|
||||
|
||||
// Filter out [inbound] peers that have corresponding outbound peer with the same ID
|
||||
// Filter out inbound peers that have corresponding outbound peer with the same ID
|
||||
peers.removeIf(hasOutboundWithSameId);
|
||||
|
||||
return peers;
|
||||
@@ -276,6 +281,32 @@ public class Network {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Peer list filters
|
||||
|
||||
/** Must be inside <tt>synchronized (this.selfPeers) {...}</tt> */
|
||||
private final Predicate<PeerData> isSelfPeer = peerData -> {
|
||||
PeerAddress peerAddress = peerData.getAddress();
|
||||
return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress));
|
||||
};
|
||||
|
||||
/** Must be inside <tt>synchronized (this.connectedPeers) {...}</tt> */
|
||||
private final Predicate<PeerData> isConnectedPeer = peerData -> {
|
||||
PeerAddress peerAddress = peerData.getAddress();
|
||||
return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress));
|
||||
};
|
||||
|
||||
/** Must be inside <tt>synchronized (this.connectedPeers) {...}</tt> */
|
||||
private final Predicate<PeerData> isResolvedAsConnectedPeer = peerData -> {
|
||||
try {
|
||||
InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress();
|
||||
return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress));
|
||||
} catch (UnknownHostException e) {
|
||||
// Can't resolve - no point even trying to connect
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
// Initial setup
|
||||
|
||||
public static void installInitialPeers(Repository repository) throws DataException {
|
||||
@@ -297,6 +328,12 @@ public class Network {
|
||||
super(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onSpawnFailure() {
|
||||
// For debugging:
|
||||
// ExecutorDumper.dump(this.executor, 3, ExecuteProduceConsume.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Task produceTask(boolean canBlock) throws InterruptedException {
|
||||
Task task;
|
||||
@@ -504,7 +541,7 @@ public class Network {
|
||||
|
||||
LOGGER.debug(() -> String.format("Connection accepted from peer %s", PeerAddress.fromSocket(socketChannel.socket())));
|
||||
|
||||
newPeer = new Peer(socketChannel);
|
||||
newPeer = new Peer(socketChannel, channelSelector);
|
||||
this.connectedPeers.add(newPeer);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
@@ -512,105 +549,15 @@ public class Network {
|
||||
try {
|
||||
socketChannel.close();
|
||||
} catch (IOException ce) {
|
||||
// Couldn't close?
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
socketChannel.configureBlocking(false);
|
||||
socketChannel.register(channelSelector, SelectionKey.OP_READ);
|
||||
} catch (IOException e) {
|
||||
// Remove from connected peers
|
||||
synchronized (this.connectedPeers) {
|
||||
this.connectedPeers.remove(newPeer);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
this.onPeerReady(newPeer);
|
||||
}
|
||||
|
||||
public void prunePeers() throws InterruptedException, DataException {
|
||||
final Long now = NTP.getTime();
|
||||
if (now == null)
|
||||
return;
|
||||
|
||||
// Disconnect peers that are stuck during handshake
|
||||
List<Peer> handshakePeers = this.getConnectedPeers();
|
||||
|
||||
// Disregard peers that have completed handshake or only connected recently
|
||||
handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT);
|
||||
|
||||
for (Peer peer : handshakePeers)
|
||||
peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name()));
|
||||
|
||||
// Prune 'old' peers from repository...
|
||||
// Pruning peers isn't critical so no need to block for a repository instance.
|
||||
try (final Repository repository = RepositoryManager.tryRepository()) {
|
||||
if (repository == null)
|
||||
return;
|
||||
|
||||
// Fetch all known peers
|
||||
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
|
||||
|
||||
// 'Old' peers:
|
||||
// we have attempted to connect within the last day
|
||||
// we last managed to connect over a week ago
|
||||
Predicate<PeerData> isNotOldPeer = peerData -> {
|
||||
if (peerData.getLastAttempted() == null || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD)
|
||||
return true;
|
||||
|
||||
if (peerData.getLastConnected() == null || peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
// Disregard peers that are NOT 'old'
|
||||
peers.removeIf(isNotOldPeer);
|
||||
|
||||
// Don't consider already connected peers (simple address match)
|
||||
Predicate<PeerData> isConnectedPeer = peerData -> {
|
||||
PeerAddress peerAddress = peerData.getAddress();
|
||||
return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress));
|
||||
};
|
||||
|
||||
synchronized (this.connectedPeers) {
|
||||
peers.removeIf(isConnectedPeer);
|
||||
}
|
||||
|
||||
for (PeerData peerData : peers) {
|
||||
LOGGER.debug(String.format("Deleting old peer %s from repository", peerData.getAddress().toString()));
|
||||
repository.getNetworkRepository().delete(peerData.getAddress());
|
||||
}
|
||||
|
||||
repository.saveChanges();
|
||||
}
|
||||
}
|
||||
|
||||
private final Predicate<PeerData> isSelfPeer = peerData -> {
|
||||
PeerAddress peerAddress = peerData.getAddress();
|
||||
return this.selfPeers.stream().anyMatch(selfPeer -> selfPeer.equals(peerAddress));
|
||||
};
|
||||
|
||||
private final Predicate<PeerData> isConnectedPeer = peerData -> {
|
||||
PeerAddress peerAddress = peerData.getAddress();
|
||||
return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress));
|
||||
};
|
||||
|
||||
private final Predicate<PeerData> isResolvedAsConnectedPeer = peerData -> {
|
||||
try {
|
||||
InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress();
|
||||
return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress));
|
||||
} catch (UnknownHostException e) {
|
||||
// Can't resolve - no point even trying to connect
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
private Peer getConnectablePeer(final Long now) throws InterruptedException {
|
||||
// We can't block here so use tryRepository(). We don't NEED to connect a new peer.
|
||||
try (final Repository repository = RepositoryManager.tryRepository()) {
|
||||
@@ -618,7 +565,7 @@ public class Network {
|
||||
return null;
|
||||
|
||||
// Find an address to connect to
|
||||
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
|
||||
List<PeerData> peers = this.getAllKnownPeers();
|
||||
|
||||
// Don't consider peers with recent connection failures
|
||||
final long lastAttemptedThreshold = now - CONNECT_FAILURE_BACKOFF;
|
||||
@@ -631,14 +578,12 @@ public class Network {
|
||||
peers.removeIf(isSelfPeer);
|
||||
}
|
||||
|
||||
// Don't consider already connected peers (simple address match)
|
||||
synchronized (this.connectedPeers) {
|
||||
// Don't consider already connected peers (simple address match)
|
||||
peers.removeIf(isConnectedPeer);
|
||||
}
|
||||
|
||||
// Don't consider already connected peers (resolved address match)
|
||||
// XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS
|
||||
synchronized (this.connectedPeers) {
|
||||
// Don't consider already connected peers (resolved address match)
|
||||
// XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS
|
||||
peers.removeIf(isResolvedAsConnectedPeer);
|
||||
}
|
||||
|
||||
@@ -647,17 +592,18 @@ public class Network {
|
||||
return null;
|
||||
|
||||
// Pick random peer
|
||||
int peerIndex = new SecureRandom().nextInt(peers.size());
|
||||
int peerIndex = new Random().nextInt(peers.size());
|
||||
|
||||
// Pick candidate
|
||||
PeerData peerData = peers.get(peerIndex);
|
||||
Peer newPeer = new Peer(peerData);
|
||||
|
||||
// Update connection attempt info
|
||||
repository.discardChanges();
|
||||
peerData.setLastAttempted(now);
|
||||
repository.getNetworkRepository().save(peerData);
|
||||
repository.saveChanges();
|
||||
synchronized (this.allKnownPeers) {
|
||||
repository.getNetworkRepository().save(peerData);
|
||||
repository.saveChanges();
|
||||
}
|
||||
|
||||
return newPeer;
|
||||
} catch (DataException e) {
|
||||
@@ -667,7 +613,7 @@ public class Network {
|
||||
}
|
||||
|
||||
private void connectPeer(Peer newPeer) throws InterruptedException {
|
||||
SocketChannel socketChannel = newPeer.connect();
|
||||
SocketChannel socketChannel = newPeer.connect(this.channelSelector);
|
||||
if (socketChannel == null)
|
||||
return;
|
||||
|
||||
@@ -678,15 +624,6 @@ public class Network {
|
||||
this.connectedPeers.add(newPeer);
|
||||
}
|
||||
|
||||
try {
|
||||
socketChannel.register(channelSelector, SelectionKey.OP_READ);
|
||||
} catch (ClosedChannelException e) {
|
||||
// If channel has somehow already closed then remove from connectedPeers
|
||||
synchronized (this.connectedPeers) {
|
||||
this.connectedPeers.remove(newPeer);
|
||||
}
|
||||
}
|
||||
|
||||
this.onPeerReady(newPeer);
|
||||
}
|
||||
|
||||
@@ -718,15 +655,21 @@ public class Network {
|
||||
synchronized (this.connectedPeers) {
|
||||
this.connectedPeers.remove(peer);
|
||||
}
|
||||
}
|
||||
|
||||
// If this is an inbound peer then remove from known peers list
|
||||
// as remote port is not likely to be remote peer's listen port
|
||||
if (!peer.isOutbound())
|
||||
public void peerMisbehaved(Peer peer) {
|
||||
PeerData peerData = peer.getPeerData();
|
||||
peerData.setLastMisbehaved(NTP.getTime());
|
||||
|
||||
// Only update repository if outbound peer
|
||||
if (peer.isOutbound())
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
repository.getNetworkRepository().delete(peer.getPeerData().getAddress());
|
||||
repository.saveChanges();
|
||||
synchronized (this.allKnownPeers) {
|
||||
repository.getNetworkRepository().save(peerData);
|
||||
repository.saveChanges();
|
||||
}
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while trying to delete inbound peer %s", peer), e);
|
||||
LOGGER.warn("Repository issue while updating peer synchronization info", e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -828,7 +771,7 @@ public class Network {
|
||||
|
||||
private void onGetPeersMessage(Peer peer, Message message) {
|
||||
// Send our known peers
|
||||
if (!peer.sendMessage(buildPeersMessage(peer)))
|
||||
if (!peer.sendMessage(this.buildPeersMessage(peer)))
|
||||
peer.disconnect("failed to send peers list");
|
||||
}
|
||||
|
||||
@@ -845,7 +788,7 @@ public class Network {
|
||||
// Also add peer's details
|
||||
peerAddresses.add(PeerAddress.fromString(peer.getPeerData().getAddress().getHost()));
|
||||
|
||||
mergePeers(peer.toString(), peerAddresses);
|
||||
opportunisticMergePeers(peer.toString(), peerAddresses);
|
||||
}
|
||||
|
||||
private void onPingMessage(Peer peer, Message message) {
|
||||
@@ -875,7 +818,7 @@ public class Network {
|
||||
peerV2Addresses.add(0, sendingPeerAddress);
|
||||
}
|
||||
|
||||
mergePeers(peer.toString(), peerV2Addresses);
|
||||
opportunisticMergePeers(peer.toString(), peerV2Addresses);
|
||||
}
|
||||
|
||||
private void onPeerVerifyMessage(Peer peer, Message message) {
|
||||
@@ -925,8 +868,10 @@ public class Network {
|
||||
// Update connection info for outbound peers only
|
||||
if (peer.isOutbound())
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
repository.getNetworkRepository().save(peer.getPeerData());
|
||||
repository.saveChanges();
|
||||
synchronized (this.allKnownPeers) {
|
||||
repository.getNetworkRepository().save(peer.getPeerData());
|
||||
repository.saveChanges();
|
||||
}
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while trying to update outbound peer %s", peer), e);
|
||||
}
|
||||
@@ -964,77 +909,72 @@ public class Network {
|
||||
|
||||
/** Returns PEERS message made from peers we've connected to recently, and this node's details */
|
||||
public Message buildPeersMessage(Peer peer) {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
|
||||
List<PeerData> knownPeers = this.getAllKnownPeers();
|
||||
|
||||
// Filter out peers that we've not connected to ever or within X milliseconds
|
||||
final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD;
|
||||
Predicate<PeerData> notRecentlyConnected = peerData -> {
|
||||
final Long lastAttempted = peerData.getLastAttempted();
|
||||
final Long lastConnected = peerData.getLastConnected();
|
||||
// Filter out peers that we've not connected to ever or within X milliseconds
|
||||
final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD;
|
||||
Predicate<PeerData> notRecentlyConnected = peerData -> {
|
||||
final Long lastAttempted = peerData.getLastAttempted();
|
||||
final Long lastConnected = peerData.getLastConnected();
|
||||
|
||||
if (lastAttempted == null || lastConnected == null)
|
||||
return true;
|
||||
if (lastAttempted == null || lastConnected == null)
|
||||
return true;
|
||||
|
||||
if (lastConnected < lastAttempted)
|
||||
return true;
|
||||
if (lastConnected < lastAttempted)
|
||||
return true;
|
||||
|
||||
if (lastConnected < connectionThreshold)
|
||||
return true;
|
||||
if (lastConnected < connectionThreshold)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
};
|
||||
knownPeers.removeIf(notRecentlyConnected);
|
||||
return false;
|
||||
};
|
||||
knownPeers.removeIf(notRecentlyConnected);
|
||||
|
||||
if (peer.getVersion() >= 2) {
|
||||
List<PeerAddress> peerAddresses = new ArrayList<>();
|
||||
if (peer.getVersion() >= 2) {
|
||||
List<PeerAddress> peerAddresses = new ArrayList<>();
|
||||
|
||||
for (PeerData peerData : knownPeers) {
|
||||
try {
|
||||
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
|
||||
for (PeerData peerData : knownPeers) {
|
||||
try {
|
||||
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
|
||||
|
||||
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
|
||||
if (!peer.getIsLocal() && Peer.isAddressLocal(address))
|
||||
continue;
|
||||
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
|
||||
if (!peer.getIsLocal() && Peer.isAddressLocal(address))
|
||||
continue;
|
||||
|
||||
peerAddresses.add(peerData.getAddress());
|
||||
} catch (UnknownHostException e) {
|
||||
// Couldn't resolve hostname to IP address so discard
|
||||
}
|
||||
peerAddresses.add(peerData.getAddress());
|
||||
} catch (UnknownHostException e) {
|
||||
// Couldn't resolve hostname to IP address so discard
|
||||
}
|
||||
|
||||
// New format PEERS_V2 message that supports hostnames, IPv6 and ports
|
||||
return new PeersV2Message(peerAddresses);
|
||||
} else {
|
||||
// Map to socket addresses
|
||||
List<InetAddress> peerAddresses = new ArrayList<>();
|
||||
|
||||
for (PeerData peerData : knownPeers) {
|
||||
try {
|
||||
// We have to resolve to literal IP address to check for IPv4-ness.
|
||||
// This isn't great if hostnames have both IPv6 and IPv4 DNS entries.
|
||||
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
|
||||
|
||||
// Legacy PEERS message doesn't support IPv6
|
||||
if (address instanceof Inet6Address)
|
||||
continue;
|
||||
|
||||
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
|
||||
if (!peer.getIsLocal() && !Peer.isAddressLocal(address))
|
||||
continue;
|
||||
|
||||
peerAddresses.add(address);
|
||||
} catch (UnknownHostException e) {
|
||||
// Couldn't resolve hostname to IP address so discard
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy PEERS message that only sends IPv4 addresses
|
||||
return new PeersMessage(peerAddresses);
|
||||
}
|
||||
} catch (DataException e) {
|
||||
LOGGER.error("Repository issue while building PEERS message", e);
|
||||
return new PeersMessage(Collections.emptyList());
|
||||
|
||||
// New format PEERS_V2 message that supports hostnames, IPv6 and ports
|
||||
return new PeersV2Message(peerAddresses);
|
||||
} else {
|
||||
// Map to socket addresses
|
||||
List<InetAddress> peerAddresses = new ArrayList<>();
|
||||
|
||||
for (PeerData peerData : knownPeers) {
|
||||
try {
|
||||
// We have to resolve to literal IP address to check for IPv4-ness.
|
||||
// This isn't great if hostnames have both IPv6 and IPv4 DNS entries.
|
||||
InetAddress address = InetAddress.getByName(peerData.getAddress().getHost());
|
||||
|
||||
// Legacy PEERS message doesn't support IPv6
|
||||
if (address instanceof Inet6Address)
|
||||
continue;
|
||||
|
||||
// Don't send 'local' addresses if peer is not 'local'. e.g. don't send localhost:9084 to node4.qortal.org
|
||||
if (!peer.getIsLocal() && !Peer.isAddressLocal(address))
|
||||
continue;
|
||||
|
||||
peerAddresses.add(address);
|
||||
} catch (UnknownHostException e) {
|
||||
// Couldn't resolve hostname to IP address so discard
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy PEERS message that only sends IPv4 addresses
|
||||
return new PeersMessage(peerAddresses);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1077,26 +1017,38 @@ public class Network {
|
||||
}
|
||||
|
||||
public boolean forgetPeer(PeerAddress peerAddress) throws DataException {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
int numDeleted = repository.getNetworkRepository().delete(peerAddress);
|
||||
repository.saveChanges();
|
||||
int numDeleted;
|
||||
|
||||
disconnectPeer(peerAddress);
|
||||
synchronized (this.allKnownPeers) {
|
||||
this.allKnownPeers.removeIf(peerData -> peerData.getAddress().equals(peerAddress));
|
||||
|
||||
return numDeleted != 0;
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
numDeleted = repository.getNetworkRepository().delete(peerAddress);
|
||||
repository.saveChanges();
|
||||
}
|
||||
}
|
||||
|
||||
disconnectPeer(peerAddress);
|
||||
|
||||
return numDeleted != 0;
|
||||
}
|
||||
|
||||
public int forgetAllPeers() throws DataException {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
int numDeleted = repository.getNetworkRepository().deleteAllPeers();
|
||||
repository.saveChanges();
|
||||
int numDeleted;
|
||||
|
||||
for (Peer peer : this.getConnectedPeers())
|
||||
peer.disconnect("to be forgotten");
|
||||
synchronized (this.allKnownPeers) {
|
||||
this.allKnownPeers.clear();
|
||||
|
||||
return numDeleted;
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
numDeleted = repository.getNetworkRepository().deleteAllPeers();
|
||||
repository.saveChanges();
|
||||
}
|
||||
}
|
||||
|
||||
for (Peer peer : this.getConnectedPeers())
|
||||
peer.disconnect("to be forgotten");
|
||||
|
||||
return numDeleted;
|
||||
}
|
||||
|
||||
private void disconnectPeer(PeerAddress peerAddress) {
|
||||
@@ -1116,7 +1068,72 @@ public class Network {
|
||||
|
||||
// Network-wide calls
|
||||
|
||||
private void mergePeers(String addedBy, List<PeerAddress> peerAddresses) {
|
||||
public void prunePeers() throws DataException {
|
||||
final Long now = NTP.getTime();
|
||||
if (now == null)
|
||||
return;
|
||||
|
||||
// Disconnect peers that are stuck during handshake
|
||||
List<Peer> handshakePeers = this.getConnectedPeers();
|
||||
|
||||
// Disregard peers that have completed handshake or only connected recently
|
||||
handshakePeers.removeIf(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED || peer.getConnectionTimestamp() == null || peer.getConnectionTimestamp() > now - HANDSHAKE_TIMEOUT);
|
||||
|
||||
for (Peer peer : handshakePeers)
|
||||
peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name()));
|
||||
|
||||
// Prune 'old' peers from repository...
|
||||
// Pruning peers isn't critical so no need to block for a repository instance.
|
||||
try (final Repository repository = RepositoryManager.tryRepository()) {
|
||||
if (repository == null)
|
||||
return;
|
||||
|
||||
synchronized (this.allKnownPeers) {
|
||||
// Fetch all known peers
|
||||
List<PeerData> peers = new ArrayList<>(this.allKnownPeers);
|
||||
|
||||
// 'Old' peers:
|
||||
// We attempted to connect within the last day
|
||||
// but we last managed to connect over a week ago.
|
||||
Predicate<PeerData> isNotOldPeer = peerData -> {
|
||||
if (peerData.getLastAttempted() == null || peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD)
|
||||
return true;
|
||||
|
||||
if (peerData.getLastConnected() == null || peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
// Disregard peers that are NOT 'old'
|
||||
peers.removeIf(isNotOldPeer);
|
||||
|
||||
// Don't consider already connected peers (simple address match)
|
||||
synchronized (this.connectedPeers) {
|
||||
peers.removeIf(isConnectedPeer);
|
||||
}
|
||||
|
||||
for (PeerData peerData : peers) {
|
||||
LOGGER.debug(() -> String.format("Deleting old peer %s from repository", peerData.getAddress().toString()));
|
||||
repository.getNetworkRepository().delete(peerData.getAddress());
|
||||
}
|
||||
|
||||
repository.saveChanges();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void mergePeers(String addedBy, long addedWhen, List<PeerAddress> peerAddresses) throws DataException {
|
||||
mergePeersLock.lock();
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
this.mergePeers(repository, addedBy, addedWhen, peerAddresses);
|
||||
} finally {
|
||||
mergePeersLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void opportunisticMergePeers(String addedBy, List<PeerAddress> peerAddresses) {
|
||||
final Long addedWhen = NTP.getTime();
|
||||
if (addedWhen == null)
|
||||
return;
|
||||
@@ -1131,27 +1148,42 @@ public class Network {
|
||||
if (repository == null)
|
||||
return;
|
||||
|
||||
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
|
||||
this.mergePeers(repository, addedBy, addedWhen, peerAddresses);
|
||||
|
||||
// Filter out duplicates
|
||||
Predicate<PeerAddress> isKnownAddress = peerAddress -> knownPeers.stream().anyMatch(knownPeerData -> knownPeerData.getAddress().equals(peerAddress));
|
||||
} catch (DataException e) {
|
||||
// Already logged by this.mergePeers()
|
||||
}
|
||||
} finally {
|
||||
mergePeersLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void mergePeers(Repository repository, String addedBy, long addedWhen, List<PeerAddress> peerAddresses) throws DataException {
|
||||
List<PeerData> newPeers;
|
||||
synchronized (this.allKnownPeers) {
|
||||
for (PeerData knownPeerData : this.allKnownPeers) {
|
||||
// Filter out duplicates, without resolving via DNS
|
||||
Predicate<PeerAddress> isKnownAddress = peerAddress -> knownPeerData.getAddress().equals(peerAddress);
|
||||
peerAddresses.removeIf(isKnownAddress);
|
||||
}
|
||||
|
||||
repository.discardChanges();
|
||||
// Add leftover peer addresses to known peers list
|
||||
newPeers = peerAddresses.stream().map(peerAddress -> new PeerData(peerAddress, addedWhen, addedBy)).collect(Collectors.toList());
|
||||
|
||||
// Save the rest into database
|
||||
for (PeerAddress peerAddress : peerAddresses) {
|
||||
PeerData peerData = new PeerData(peerAddress, addedWhen, addedBy);
|
||||
LOGGER.info(String.format("Adding new peer %s to repository", peerAddress));
|
||||
this.allKnownPeers.addAll(newPeers);
|
||||
|
||||
try {
|
||||
// Save new peers into database
|
||||
for (PeerData peerData : newPeers) {
|
||||
LOGGER.info(String.format("Adding new peer %s to repository", peerData.getAddress()));
|
||||
repository.getNetworkRepository().save(peerData);
|
||||
}
|
||||
|
||||
repository.saveChanges();
|
||||
} catch (DataException e) {
|
||||
LOGGER.error("Repository issue while merging peers list from remote node", e);
|
||||
LOGGER.error(String.format("Repository issue while merging peers list from %s", addedBy), e);
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
mergePeersLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@ import java.net.SocketTimeoutException;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Collections;
|
||||
@@ -108,10 +110,10 @@ public class Peer {
|
||||
}
|
||||
|
||||
/** Construct Peer using existing, connected socket */
|
||||
public Peer(SocketChannel socketChannel) throws IOException {
|
||||
public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException {
|
||||
this.isOutbound = false;
|
||||
this.socketChannel = socketChannel;
|
||||
sharedSetup();
|
||||
sharedSetup(channelSelector);
|
||||
|
||||
this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress());
|
||||
this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
|
||||
@@ -254,17 +256,18 @@ public class Peer {
|
||||
new SecureRandom().nextBytes(verificationCodeExpected);
|
||||
}
|
||||
|
||||
private void sharedSetup() throws IOException {
|
||||
private void sharedSetup(Selector channelSelector) throws IOException {
|
||||
this.connectionTimestamp = NTP.getTime();
|
||||
this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
this.socketChannel.configureBlocking(false);
|
||||
this.socketChannel.register(channelSelector, SelectionKey.OP_READ);
|
||||
this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC!
|
||||
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
|
||||
this.pendingMessages = new LinkedBlockingQueue<>();
|
||||
}
|
||||
|
||||
public SocketChannel connect() {
|
||||
LOGGER.trace(String.format("Connecting to peer %s", this));
|
||||
public SocketChannel connect(Selector channelSelector) {
|
||||
LOGGER.trace(() -> String.format("Connecting to peer %s", this));
|
||||
|
||||
try {
|
||||
this.resolvedAddress = this.peerData.getAddress().toSocketAddress();
|
||||
@@ -272,10 +275,6 @@ public class Peer {
|
||||
|
||||
this.socketChannel = SocketChannel.open();
|
||||
this.socketChannel.socket().connect(resolvedAddress, CONNECT_TIMEOUT);
|
||||
|
||||
LOGGER.debug(String.format("Connected to peer %s", this));
|
||||
sharedSetup();
|
||||
return socketChannel;
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOGGER.trace(String.format("Connection timed out to peer %s", this));
|
||||
return null;
|
||||
@@ -286,6 +285,20 @@ public class Peer {
|
||||
LOGGER.trace(String.format("Connection failed to peer %s", this));
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
LOGGER.debug(() -> String.format("Connected to peer %s", this));
|
||||
sharedSetup(channelSelector);
|
||||
return socketChannel;
|
||||
} catch (IOException e) {
|
||||
LOGGER.trace(String.format("Post-connection setup failed, peer %s", this));
|
||||
try {
|
||||
socketChannel.close();
|
||||
} catch (IOException ce) {
|
||||
// Failed to close?
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -250,8 +250,10 @@ public class Payment {
|
||||
* For QORT amounts only: If recipient's last reference is this transaction's signature, then they can't have made any transactions of their own
|
||||
* (which would have changed their last reference) thus this is their first reference so remove it.
|
||||
*/
|
||||
if ((alwaysUninitializeRecipientReference || assetId == Asset.QORT) && Arrays.equals(recipient.getLastReference(), signature))
|
||||
if ((alwaysUninitializeRecipientReference || assetId == Asset.QORT) && Arrays.equals(recipient.getLastReference(), signature)) {
|
||||
recipient.setLastReference(null);
|
||||
this.repository.getAccountRepository().delete(recipient.getAddress(), assetId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -139,6 +139,8 @@ public interface AccountRepository {
|
||||
*/
|
||||
public RewardShareData getRewardShareByIndex(int index) throws DataException;
|
||||
|
||||
public boolean rewardShareExists(byte[] rewardSharePublicKey) throws DataException;
|
||||
|
||||
public void save(RewardShareData rewardShareData) throws DataException;
|
||||
|
||||
/** Delete reward-share from repository using passed minting account's public key and recipient's address. */
|
||||
|
||||
@@ -689,13 +689,13 @@ public class HSQLDBAccountRepository implements AccountRepository {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getRewardShareIndex(byte[] publicKey) throws DataException {
|
||||
public Integer getRewardShareIndex(byte[] rewardSharePublicKey) throws DataException {
|
||||
if (!this.rewardShareExists(rewardSharePublicKey))
|
||||
return null;
|
||||
|
||||
String sql = "SELECT COUNT(*) FROM RewardShares WHERE reward_share_public_key < ?";
|
||||
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(sql, publicKey)) {
|
||||
if (resultSet == null)
|
||||
return null;
|
||||
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(sql, rewardSharePublicKey)) {
|
||||
return resultSet.getInt(1);
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to determine reward-share index in repository", e);
|
||||
@@ -724,6 +724,15 @@ public class HSQLDBAccountRepository implements AccountRepository {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rewardShareExists(byte[] rewardSharePublicKey) throws DataException {
|
||||
try {
|
||||
return this.repository.exists("RewardShares", "reward_share_public_key = ?", rewardSharePublicKey);
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to check reward-share exists in repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void save(RewardShareData rewardShareData) throws DataException {
|
||||
HSQLDBSaver saveHelper = new HSQLDBSaver("RewardShares");
|
||||
|
||||
@@ -188,6 +188,9 @@ public class HSQLDBAssetRepository implements AssetRepository {
|
||||
public void delete(long assetId) throws DataException {
|
||||
try {
|
||||
this.repository.delete("Assets", "asset_id = ?", assetId);
|
||||
|
||||
// Also delete account balances that refer to asset
|
||||
this.repository.delete("AccountBalances", "asset_id = ?", assetId);
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to delete asset from repository", e);
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ public class Settings {
|
||||
/** Port number for inbound peer-to-peer connections. */
|
||||
private Integer listenPort;
|
||||
/** Minimum number of peers to allow block minting / synchronization. */
|
||||
private int minBlockchainPeers = 8;
|
||||
private int minBlockchainPeers = 5;
|
||||
/** Target number of outbound connections to peers we should make. */
|
||||
private int minOutboundPeers = 16;
|
||||
/** Maximum number of peer connections we allow. */
|
||||
|
||||
@@ -40,6 +40,7 @@ public class CreateGroupTransactionTransformer extends TransactionTransformer {
|
||||
layout.add("transaction's groupID", TransformationType.INT);
|
||||
layout.add("reference", TransformationType.SIGNATURE);
|
||||
layout.add("group creator's public key", TransformationType.PUBLIC_KEY);
|
||||
layout.add("group owner's address", TransformationType.ADDRESS);
|
||||
layout.add("group's name length", TransformationType.INT);
|
||||
layout.add("group's name", TransformationType.STRING);
|
||||
layout.add("group's description length", TransformationType.INT);
|
||||
|
||||
@@ -39,6 +39,7 @@ public class CreatePollTransactionTransformer extends TransactionTransformer {
|
||||
layout.add("transaction's groupID", TransformationType.INT);
|
||||
layout.add("reference", TransformationType.SIGNATURE);
|
||||
layout.add("poll creator's public key", TransformationType.PUBLIC_KEY);
|
||||
layout.add("poll owner's address", TransformationType.ADDRESS);
|
||||
layout.add("poll name length", TransformationType.INT);
|
||||
layout.add("poll name", TransformationType.STRING);
|
||||
layout.add("poll description length", TransformationType.INT);
|
||||
|
||||
@@ -34,8 +34,8 @@ public class TransferAssetTransactionTransformer extends TransactionTransformer
|
||||
layout.add("reference", TransformationType.SIGNATURE);
|
||||
layout.add("asset owner's public key", TransformationType.PUBLIC_KEY);
|
||||
layout.add("recipient", TransformationType.ADDRESS);
|
||||
layout.add("asset quantity", TransformationType.ASSET_QUANTITY);
|
||||
layout.add("asset ID", TransformationType.LONG);
|
||||
layout.add("asset quantity", TransformationType.ASSET_QUANTITY);
|
||||
layout.add("fee", TransformationType.AMOUNT);
|
||||
layout.add("signature", TransformationType.SIGNATURE);
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
|
||||
private final Logger logger;
|
||||
private final boolean isLoggerTraceEnabled;
|
||||
|
||||
private ExecutorService executor;
|
||||
protected ExecutorService executor;
|
||||
|
||||
// These are volatile to prevent thread-local caching of values
|
||||
// but all are updated inside synchronized blocks
|
||||
@@ -85,6 +85,10 @@ public abstract class ExecuteProduceConsume implements Runnable {
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
protected void onSpawnFailure() {
|
||||
/* Allow override in subclasses */
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Task to be performed, possibly blocking.
|
||||
*
|
||||
@@ -180,6 +184,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
|
||||
++this.spawnFailures;
|
||||
this.hasThreadPending = false;
|
||||
this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId()));
|
||||
this.onSpawnFailure();
|
||||
}
|
||||
} else {
|
||||
this.logger.trace(() -> String.format("[%d] NOT spawning another thread", Thread.currentThread().getId()));
|
||||
|
||||
81
src/main/java/org/qortal/utils/ExecutorDumper.java
Normal file
81
src/main/java/org/qortal/utils/ExecutorDumper.java
Normal file
@@ -0,0 +1,81 @@
|
||||
package org.qortal.utils;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
public abstract class ExecutorDumper {
|
||||
|
||||
private static final String OUR_CLASS_NAME = ExecutorDumper.class.getName();
|
||||
|
||||
public static void dump(ExecutorService executor, int checkDepth, Class<?> skipClass) {
|
||||
if (executor instanceof ThreadPoolExecutor)
|
||||
dumpThreadPoolExecutor((ThreadPoolExecutor) executor, checkDepth, skipClass);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
private static void dumpThreadPoolExecutor(ThreadPoolExecutor executor, int checkDepth, Class<?> skipClass) {
|
||||
try {
|
||||
Field mainLockField = executor.getClass().getDeclaredField("mainLock");
|
||||
mainLockField.setAccessible(true);
|
||||
|
||||
Field workersField = executor.getClass().getDeclaredField("workers");
|
||||
workersField.setAccessible(true);
|
||||
|
||||
Class<?>[] declaredClasses = executor.getClass().getDeclaredClasses();
|
||||
|
||||
Class<?> workerClass = null;
|
||||
for (int i = 0; i < declaredClasses.length; ++i)
|
||||
if (declaredClasses[i].getSimpleName().equals("Worker")) {
|
||||
workerClass = declaredClasses[i];
|
||||
break;
|
||||
}
|
||||
|
||||
if (workerClass == null)
|
||||
return;
|
||||
|
||||
Field workerThreadField = workerClass.getDeclaredField("thread");
|
||||
workerThreadField.setAccessible(true);
|
||||
|
||||
String skipClassName = skipClass.getName();
|
||||
|
||||
ReentrantLock mainLock = (ReentrantLock) mainLockField.get(executor);
|
||||
mainLock.lock();
|
||||
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
HashSet<Object> workers = (HashSet<Object>) workersField.get(executor);
|
||||
|
||||
WORKER_LOOP:
|
||||
for (Object workerObj : workers) {
|
||||
Thread thread = (Thread) workerThreadField.get(workerObj);
|
||||
|
||||
StackTraceElement[] stackTrace = thread.getStackTrace();
|
||||
if (stackTrace.length == 0)
|
||||
continue;
|
||||
|
||||
for (int d = 0; d < checkDepth; ++d) {
|
||||
String stackClassName = stackTrace[d].getClassName();
|
||||
if (stackClassName.equals(skipClassName) || stackClassName.equals(OUR_CLASS_NAME))
|
||||
continue WORKER_LOOP;
|
||||
}
|
||||
|
||||
System.out.println(String.format("[%d] %s:", thread.getId(), thread.getName()));
|
||||
|
||||
for (int d = 0; d < stackTrace.length; ++d)
|
||||
System.out.println(String.format("\t\t%s.%s at %s:%d",
|
||||
stackTrace[d].getClassName(), stackTrace[d].getMethodName(),
|
||||
stackTrace[d].getFileName(), stackTrace[d].getLineNumber()));
|
||||
}
|
||||
} finally {
|
||||
mainLock.unlock();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
//
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
125
src/test/java/org/qortal/test/minting/DisagreementTests.java
Normal file
125
src/test/java/org/qortal/test/minting/DisagreementTests.java
Normal file
@@ -0,0 +1,125 @@
|
||||
package org.qortal.test.minting;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.qortal.account.PrivateKeyAccount;
|
||||
import org.qortal.block.BlockMinter;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.data.account.RewardShareData;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.data.transaction.TransactionData;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
import org.qortal.repository.RepositoryManager;
|
||||
import org.qortal.test.common.AccountUtils;
|
||||
import org.qortal.test.common.Common;
|
||||
import org.qortal.test.common.TestAccount;
|
||||
import org.qortal.test.common.TransactionUtils;
|
||||
import org.qortal.transform.block.BlockTransformer;
|
||||
import org.roaringbitmap.IntIterator;
|
||||
|
||||
import io.druid.extendedset.intset.ConciseSet;
|
||||
|
||||
public class DisagreementTests extends Common {
|
||||
|
||||
private static final BigDecimal CANCEL_SHARE_PERCENT = BigDecimal.ONE.negate();
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws DataException {
|
||||
Common.useDefaultSettings();
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterTest() throws DataException {
|
||||
Common.orphanCheck();
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing minting a block when there is a signed online account timestamp present
|
||||
* that no longer has a corresponding reward-share in DB.
|
||||
* <p>
|
||||
* Something like:
|
||||
* <ul>
|
||||
* <li>Mint block, with tx to create reward-share R</li>
|
||||
* <li>Sign current timestamp with R</li>
|
||||
* <li>Mint block including R as online account</li>
|
||||
* <li>Mint block, with tx to cancel reward-share R</li>
|
||||
* <li>Mint another block: R's timestamp should be excluded</li>
|
||||
* </ul>
|
||||
*
|
||||
* @throws DataException
|
||||
*/
|
||||
@Test
|
||||
public void testOnlineAccounts() throws DataException {
|
||||
final BigDecimal sharePercent = new BigDecimal("12.8");
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
TestAccount mintingAccount = Common.getTestAccount(repository, "alice-reward-share");
|
||||
TestAccount signingAccount = Common.getTestAccount(repository, "alice");
|
||||
|
||||
// Create reward-share
|
||||
byte[] testRewardSharePrivateKey = AccountUtils.rewardShare(repository, "alice", "bob", sharePercent);
|
||||
PrivateKeyAccount testRewardShareAccount = new PrivateKeyAccount(repository, testRewardSharePrivateKey);
|
||||
|
||||
// Confirm reward-share info set correctly
|
||||
RewardShareData testRewardShareData = repository.getAccountRepository().getRewardShare(testRewardShareAccount.getPublicKey());
|
||||
assertNotNull(testRewardShareData);
|
||||
|
||||
// Create signed timestamps
|
||||
Controller.getInstance().ensureTestingAccountsOnline(mintingAccount, testRewardShareAccount);
|
||||
|
||||
// Mint another block
|
||||
BlockMinter.mintTestingBlockRetainingTimestamps(repository, mintingAccount);
|
||||
|
||||
// Confirm reward-share's signed timestamp is included
|
||||
BlockData blockData = repository.getBlockRepository().getLastBlock();
|
||||
List<RewardShareData> rewardSharesData = fetchRewardSharesForBlock(repository, blockData);
|
||||
boolean doesContainRewardShare = rewardSharesData.stream().anyMatch(rewardShareData -> Arrays.equals(rewardShareData.getRewardSharePublicKey(), testRewardShareData.getRewardSharePublicKey()));
|
||||
assertTrue(doesContainRewardShare);
|
||||
|
||||
// Cancel reward-share
|
||||
TransactionData cancelRewardShareTransactionData = AccountUtils.createRewardShare(repository, "alice", "bob", CANCEL_SHARE_PERCENT);
|
||||
TransactionUtils.signAsUnconfirmed(repository, cancelRewardShareTransactionData, signingAccount);
|
||||
BlockMinter.mintTestingBlockRetainingTimestamps(repository, mintingAccount);
|
||||
|
||||
// Confirm reward-share no longer exists in repository
|
||||
RewardShareData cancelledRewardShareData = repository.getAccountRepository().getRewardShare(testRewardShareAccount.getPublicKey());
|
||||
assertNull("Reward-share shouldn't exist", cancelledRewardShareData);
|
||||
|
||||
// Attempt to mint with cancelled reward-share
|
||||
BlockMinter.mintTestingBlockRetainingTimestamps(repository, mintingAccount);
|
||||
|
||||
// Confirm reward-share's signed timestamp is NOT included
|
||||
blockData = repository.getBlockRepository().getLastBlock();
|
||||
rewardSharesData = fetchRewardSharesForBlock(repository, blockData);
|
||||
doesContainRewardShare = rewardSharesData.stream().anyMatch(rewardShareData -> Arrays.equals(rewardShareData.getRewardSharePublicKey(), testRewardShareData.getRewardSharePublicKey()));
|
||||
assertFalse(doesContainRewardShare);
|
||||
}
|
||||
}
|
||||
|
||||
private List<RewardShareData> fetchRewardSharesForBlock(Repository repository, BlockData blockData) throws DataException {
|
||||
byte[] encodedOnlineAccounts = blockData.getEncodedOnlineAccounts();
|
||||
ConciseSet accountIndexes = BlockTransformer.decodeOnlineAccounts(encodedOnlineAccounts);
|
||||
|
||||
List<RewardShareData> rewardSharesData = new ArrayList<>();
|
||||
|
||||
IntIterator iterator = accountIndexes.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
int accountIndex = iterator.next();
|
||||
|
||||
RewardShareData rewardShareData = repository.getAccountRepository().getRewardShareByIndex(accountIndex);
|
||||
rewardSharesData.add(rewardShareData);
|
||||
}
|
||||
|
||||
return rewardSharesData;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user