Merge pull request #267 from kennycud/master

The latest QDN optimizations
This commit is contained in:
crowetic
2025-07-29 18:28:02 -07:00
committed by GitHub
11 changed files with 180 additions and 180 deletions

View File

@@ -27,8 +27,6 @@ public class UnsignedFeesSocket extends ApiWebSocket implements Listener {
@Override
public void configure(WebSocketServletFactory factory) {
LOGGER.info("configure");
factory.register(UnsignedFeesSocket.class);
EventBus.INSTANCE.addListener(this);
@@ -65,7 +63,6 @@ public class UnsignedFeesSocket extends ApiWebSocket implements Listener {
@OnWebSocketMessage
public void onWebSocketMessage(Session session, String message) {
LOGGER.info("onWebSocketMessage: message = " + message);
}
private void sendUnsignedFeeEvent(Session session, UnsignedFeeEvent unsignedFeeEvent) {

View File

@@ -177,7 +177,7 @@ public class ArbitraryDataFile {
File file = path.toFile();
if (file.exists()) {
try {
byte[] digest = Crypto.digest(file);
byte[] digest = Crypto.digestFileStream(file);
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(digest, signature);
// Copy file to data directory if needed
@@ -352,7 +352,7 @@ public class ArbitraryDataFile {
return this.chunks.size();
}
public boolean join() {
public boolean join() {
// Ensure we have chunks
if (this.chunks != null && !this.chunks.isEmpty()) {
@@ -373,7 +373,7 @@ public class ArbitraryDataFile {
for (ArbitraryDataFileChunk chunk : this.chunks) {
File sourceFile = chunk.filePath.toFile();
BufferedInputStream in = new BufferedInputStream(new FileInputStream(sourceFile));
byte[] buffer = new byte[2048];
byte[] buffer = new byte[8192];
int inSize;
while ((inSize = in.read(buffer)) != -1) {
out.write(buffer, 0, inSize);
@@ -398,6 +398,8 @@ public class ArbitraryDataFile {
return false;
}
public boolean delete() {
// Delete the complete file
// ... but only if it's inside the Qortal data or temp directory
@@ -667,6 +669,9 @@ public class ArbitraryDataFile {
}
}
public boolean containsChunk(byte[] hash) {
for (ArbitraryDataFileChunk chunk : this.chunks) {
if (Arrays.equals(hash, chunk.getHash())) {
@@ -781,18 +786,17 @@ public class ArbitraryDataFile {
return this.filePath;
}
public byte[] digest() {
File file = this.getFile();
if (file != null && file.exists()) {
try {
return Crypto.digest(file);
} catch (IOException e) {
LOGGER.error("Couldn't compute digest for ArbitraryDataFile");
}
public byte[] digest() {
File file = this.getFile();
if (file != null && file.exists()) {
try {
return Crypto.digestFileStream(file);
} catch (IOException e) {
LOGGER.error("Couldn't compute digest for ArbitraryDataFile");
}
return null;
}
return null;
}
public String digest58() {
if (this.digest() != null) {

View File

@@ -437,16 +437,24 @@ public class ArbitraryDataReader {
throw new IOException(String.format("File doesn't exist: %s", arbitraryDataFile));
}
// Ensure the complete hash matches the joined chunks
if (!Arrays.equals(arbitraryDataFile.digest(), transactionData.getData())) {
// Delete the invalid file
LOGGER.info("Deleting invalid file: path = " + arbitraryDataFile.getFilePath());
if( arbitraryDataFile.delete() ) {
LOGGER.info("Deleted invalid file successfully: path = " + arbitraryDataFile.getFilePath());
}
else {
LOGGER.warn("Could not delete invalid file: path = " + arbitraryDataFile.getFilePath());
}
if (!Arrays.equals(arbitraryDataFile.digest(), transactionData.getData())) {
// Delete the invalid file
LOGGER.info("Deleting invalid file: path = {}", arbitraryDataFile.getFilePath());
if (arbitraryDataFile.delete()) {
LOGGER.info("Deleted invalid file successfully: path = {}", arbitraryDataFile.getFilePath());
} else {
LOGGER.warn("Could not delete invalid file: path = {}", arbitraryDataFile.getFilePath());
}
// Also delete its chunks
if (arbitraryDataFile.deleteAllChunks()) {
LOGGER.info("Deleted all chunks associated with invalid file: {}", arbitraryDataFile.getFilePath());
} else {
LOGGER.warn("Failed to delete one or more chunks for invalid file: {}", arbitraryDataFile.getFilePath());
}
throw new DataException("Unable to validate complete file hash");
}

View File

@@ -157,6 +157,8 @@ public class Synchronizer extends Thread {
// Clear interrupted flag so we can shutdown trim threads
Thread.interrupted();
// Fall-through to exit
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

View File

@@ -772,6 +772,8 @@ public class ArbitraryDataFileListManager {
String ourAddress = Network.getInstance().getOurExternalIpAddressAndPort();
ArbitraryDataFileListMessage arbitraryDataFileListMessage;
Collections.shuffle(hashes);
// Remove optional parameters if the requesting peer doesn't support it yet
// A message with less statistical data is better than no message at all
if (!peer.isAtLeastVersion(MIN_PEER_VERSION_FOR_FILE_LIST_STATS)) {

View File

@@ -32,6 +32,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.qortal.crypto.Crypto;
public class ArbitraryDataFileManager extends Thread {
public static final int SEND_TIMEOUT_MS = 500;
@@ -129,7 +131,7 @@ public class ArbitraryDataFileManager extends Thread {
public boolean fetchArbitraryDataFiles(Peer peer,
byte[] signature,
ArbitraryTransactionData arbitraryTransactionData,
List<byte[]> hashes) throws DataException {
List<byte[]> hashes, ArbitraryFileListResponseInfo responseInfo) throws DataException {
// Load data file(s)
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromTransactionData(arbitraryTransactionData);
@@ -161,6 +163,8 @@ public class ArbitraryDataFileManager extends Thread {
}
else {
LOGGER.trace("Already requesting data file {} for signature {} from peer {}", arbitraryDataFile, Base58.encode(signature), peer);
this.addResponse(responseInfo);
}
}
}
@@ -247,6 +251,18 @@ public class ArbitraryDataFileManager extends Thread {
ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response;
arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile();
byte[] fileBytes = arbitraryDataFile.getBytes();
if (fileBytes == null) {
LOGGER.debug(String.format("Failed to read bytes for file hash %s", hash58));
return null;
}
byte[] actualHash = Crypto.digest(fileBytes);
if (!Arrays.equals(hash, actualHash)) {
LOGGER.debug(String.format("Hash mismatch for chunk: expected %s but got %s",
hash58, Base58.encode(actualHash)));
return null;
}
} else {
LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58));
arbitraryDataFile = existingFile;

View File

@@ -180,7 +180,8 @@ public class ArbitraryDataFileRequestThread {
responseInfo.getPeer(),
arbitraryTransactionData.getSignature(),
arbitraryTransactionData,
Arrays.asList(Base58.decode(responseInfo.getHash58()))
Arrays.asList(Base58.decode(responseInfo.getHash58())),
responseInfo
);
} catch (DataException e) {
LOGGER.warn("Unable to process file hashes: {}", e.getMessage());

View File

@@ -1,6 +1,7 @@
package org.qortal.crypto;
import com.google.common.primitives.Bytes;
import org.bouncycastle.crypto.params.Ed25519PrivateKeyParameters;
import org.bouncycastle.crypto.params.X25519PrivateKeyParameters;
import org.bouncycastle.crypto.params.X25519PublicKeyParameters;
@@ -11,6 +12,7 @@ import org.qortal.utils.Base58;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -66,6 +68,20 @@ public abstract class Crypto {
}
}
public static byte[] digestFileStream(File file) throws IOException {
try (InputStream fis = new FileInputStream(file)) {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] buffer = new byte[8192]; // 8 KB buffer
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
digest.update(buffer, 0, bytesRead);
}
return digest.digest();
} catch (NoSuchAlgorithmException e) {
throw new IOException("SHA-256 algorithm not available", e);
}
}
/**
* Returns 32-byte digest of two rounds of SHA-256 on message passed in input.
*

View File

@@ -63,7 +63,7 @@ public class ArbitraryResourceStatus {
this.description = status.description;
this.localChunkCount = localChunkCount;
this.totalChunkCount = totalChunkCount;
this.percentLoaded = (this.localChunkCount != null && this.totalChunkCount != null && this.totalChunkCount > 0) ? this.localChunkCount / (float)this.totalChunkCount * 100.0f : null;
this.percentLoaded = (this.localChunkCount != null && this.totalChunkCount != null && this.totalChunkCount > 0 && this.totalChunkCount >= this.localChunkCount) ? this.localChunkCount / (float)this.totalChunkCount * 100.0f : null;
}
public ArbitraryResourceStatus(Status status) {

View File

@@ -706,7 +706,9 @@ public class Group {
// Save reference to invite transaction so invite can be rebuilt during orphaning.
GroupInviteData groupInviteData = this.getInvite(invitee);
cancelGroupInviteTransactionData.setInviteReference(groupInviteData.getReference());
if( groupInviteData != null) {
cancelGroupInviteTransactionData.setInviteReference(groupInviteData.getReference());
}
// Delete invite
this.deleteInvite(invitee);
@@ -715,7 +717,9 @@ public class Group {
public void uncancelInvite(CancelGroupInviteTransactionData cancelGroupInviteTransactionData) throws DataException {
// Reinstate invite
TransactionData transactionData = this.repository.getTransactionRepository().fromSignature(cancelGroupInviteTransactionData.getInviteReference());
this.addInvite((GroupInviteTransactionData) transactionData);
if( transactionData != null ) {
this.addInvite((GroupInviteTransactionData) transactionData);
}
// Clear cached reference to invite transaction
cancelGroupInviteTransactionData.setInviteReference(null);

View File

@@ -235,6 +235,8 @@ public class Network {
this.allKnownPeers.addAll(repository.getNetworkRepository().getAllPeers());
}
}
LOGGER.debug("starting with {} known peers", this.allKnownPeers.size());
}
// Attempt to set up UPnP. All errors are ignored.
@@ -711,63 +713,49 @@ public class Network {
}
private Peer getConnectablePeer(final Long now) throws InterruptedException {
// We can't block here so use tryRepository(). We don't NEED to connect a new peer.
try (Repository repository = RepositoryManager.tryRepository()) {
if (repository == null) {
LOGGER.warn("Unable to get repository connection : Network.getConnectablePeer()");
return null;
}
// Find an address to connect to
List<PeerData> peers = this.getAllKnownPeers();
// Find an address to connect to
List<PeerData> peers = this.getAllKnownPeers();
// Don't consider peers with recent connection failures
final long lastAttemptedThreshold = now - CONNECT_FAILURE_BACKOFF;
peers.removeIf(peerData -> peerData.getLastAttempted() != null
&& (peerData.getLastConnected() == null
|| peerData.getLastConnected() < peerData.getLastAttempted())
&& peerData.getLastAttempted() > lastAttemptedThreshold);
// Don't consider peers with recent connection failures
final long lastAttemptedThreshold = now - CONNECT_FAILURE_BACKOFF;
peers.removeIf(peerData -> peerData.getLastAttempted() != null
&& (peerData.getLastConnected() == null
|| peerData.getLastConnected() < peerData.getLastAttempted())
&& peerData.getLastAttempted() > lastAttemptedThreshold);
// Don't consider peers that we know loop back to ourself
synchronized (this.selfPeers) {
peers.removeIf(isSelfPeer);
}
// Don't consider peers that we know loop back to ourself
synchronized (this.selfPeers) {
peers.removeIf(isSelfPeer);
}
// Don't consider already connected peers (simple address match)
peers.removeIf(isConnectedPeer);
// Don't consider already connected peers (simple address match)
peers.removeIf(isConnectedPeer);
// Don't consider already connected peers (resolved address match)
// Disabled because this might be too slow if we end up waiting a long time for hostnames to resolve via DNS
// Which is ok because duplicate connections to the same peer are handled during handshaking
// peers.removeIf(isResolvedAsConnectedPeer);
// Don't consider already connected peers (resolved address match)
// Disabled because this might be too slow if we end up waiting a long time for hostnames to resolve via DNS
// Which is ok because duplicate connections to the same peer are handled during handshaking
// peers.removeIf(isResolvedAsConnectedPeer);
this.checkLongestConnection(now);
this.checkLongestConnection(now);
// Any left?
if (peers.isEmpty()) {
return null;
}
// Pick random peer
int peerIndex = new Random().nextInt(peers.size());
// Pick candidate
PeerData peerData = peers.get(peerIndex);
Peer newPeer = new Peer(peerData);
newPeer.setIsDataPeer(false);
// Update connection attempt info
peerData.setLastAttempted(now);
synchronized (this.allKnownPeers) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
}
return newPeer;
} catch (DataException e) {
LOGGER.error("Repository issue while finding a connectable peer", e);
// Any left?
if (peers.isEmpty()) {
return null;
}
// Pick random peer
int peerIndex = new Random().nextInt(peers.size());
// Pick candidate
PeerData peerData = peers.get(peerIndex);
Peer newPeer = new Peer(peerData);
newPeer.setIsDataPeer(false);
// Update connection attempt info
peerData.setLastAttempted(now);
return newPeer;
}
public boolean connectPeer(Peer newPeer) throws InterruptedException {
@@ -947,18 +935,6 @@ public class Network {
public void peerMisbehaved(Peer peer) {
PeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(NTP.getTime());
// Only update repository if outbound peer
if (peer.isOutbound()) {
try (Repository repository = RepositoryManager.getRepository()) {
synchronized (this.allKnownPeers) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
}
} catch (DataException e) {
LOGGER.warn("Repository issue while updating peer synchronization info", e);
}
}
}
/**
@@ -1148,19 +1124,6 @@ public class Network {
// Make a note that we've successfully completed handshake (and when)
peer.getPeerData().setLastConnected(NTP.getTime());
// Update connection info for outbound peers only
if (peer.isOutbound()) {
try (Repository repository = RepositoryManager.getRepository()) {
synchronized (this.allKnownPeers) {
repository.getNetworkRepository().save(peer.getPeerData());
repository.saveChanges();
}
} catch (DataException e) {
LOGGER.error("[{}] Repository issue while trying to update outbound peer {}",
peer.getPeerConnectionId(), peer, e);
}
}
// Process any pending signature requests, as this peer may have been connected for this purpose only
List<byte[]> pendingSignatureRequests = new ArrayList<>(peer.getPendingSignatureRequests());
if (pendingSignatureRequests != null && !pendingSignatureRequests.isEmpty()) {
@@ -1424,32 +1387,23 @@ public class Network {
}
public boolean forgetPeer(PeerAddress peerAddress) throws DataException {
int numDeleted;
boolean numDeleted;
synchronized (this.allKnownPeers) {
this.allKnownPeers.removeIf(peerData -> peerData.getAddress().equals(peerAddress));
try (Repository repository = RepositoryManager.getRepository()) {
numDeleted = repository.getNetworkRepository().delete(peerAddress);
repository.saveChanges();
}
numDeleted = this.allKnownPeers.removeIf(peerData -> peerData.getAddress().equals(peerAddress));
}
disconnectPeer(peerAddress);
return numDeleted != 0;
return numDeleted;
}
public int forgetAllPeers() throws DataException {
int numDeleted;
synchronized (this.allKnownPeers) {
numDeleted = this.allKnownPeers.size();
this.allKnownPeers.clear();
try (Repository repository = RepositoryManager.getRepository()) {
numDeleted = repository.getNetworkRepository().deleteAllPeers();
repository.saveChanges();
}
}
for (Peer peer : this.getImmutableConnectedPeers()) {
@@ -1498,48 +1452,36 @@ public class Network {
// Prune 'old' peers from repository...
// Pruning peers isn't critical so no need to block for a repository instance.
try (Repository repository = RepositoryManager.tryRepository()) {
if (repository == null) {
LOGGER.warn("Unable to get repository connection : Network.prunePeers()");
return;
}
synchronized (this.allKnownPeers) {
// Fetch all known peers
List<PeerData> peers = new ArrayList<>(this.allKnownPeers);
synchronized (this.allKnownPeers) {
// Fetch all known peers
List<PeerData> peers = new ArrayList<>(this.allKnownPeers);
// 'Old' peers:
// We attempted to connect within the last day
// but we last managed to connect over a week ago.
Predicate<PeerData> isNotOldPeer = peerData -> {
if (peerData.getLastAttempted() == null
|| peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD) {
return true;
}
if (peerData.getLastConnected() == null
|| peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD) {
return true;
}
return false;
};
// Disregard peers that are NOT 'old'
peers.removeIf(isNotOldPeer);
// Don't consider already connected peers (simple address match)
peers.removeIf(isConnectedPeer);
for (PeerData peerData : peers) {
LOGGER.debug("Deleting old peer {} from repository", peerData.getAddress().toString());
repository.getNetworkRepository().delete(peerData.getAddress());
// Delete from known peer cache too
this.allKnownPeers.remove(peerData);
// 'Old' peers:
// We attempted to connect within the last day
// but we last managed to connect over a week ago.
Predicate<PeerData> isNotOldPeer = peerData -> {
if (peerData.getLastAttempted() == null
|| peerData.getLastAttempted() < now - OLD_PEER_ATTEMPTED_PERIOD) {
return true;
}
repository.saveChanges();
if (peerData.getLastConnected() == null
|| peerData.getLastConnected() > now - OLD_PEER_CONNECTION_PERIOD) {
return true;
}
return false;
};
// Disregard peers that are NOT 'old'
peers.removeIf(isNotOldPeer);
// Don't consider already connected peers (simple address match)
peers.removeIf(isConnectedPeer);
for (PeerData peerData : peers) {
// Delete from known peer cache too
this.allKnownPeers.remove(peerData);
}
}
}
@@ -1547,8 +1489,8 @@ public class Network {
public boolean mergePeers(String addedBy, long addedWhen, List<PeerAddress> peerAddresses) throws DataException {
mergePeersLock.lock();
try (Repository repository = RepositoryManager.getRepository()) {
return this.mergePeers(repository, addedBy, addedWhen, peerAddresses);
try{
return this.mergePeersUnlocked(addedBy, addedWhen, peerAddresses);
} finally {
mergePeersLock.unlock();
}
@@ -1567,23 +1509,17 @@ public class Network {
try {
// Merging peers isn't critical so don't block for a repository instance.
try (Repository repository = RepositoryManager.tryRepository()) {
if (repository == null) {
LOGGER.warn("Unable to get repository connection : Network.opportunisticMergePeers()");
return;
}
this.mergePeers(repository, addedBy, addedWhen, peerAddresses);
this.mergePeersUnlocked(addedBy, addedWhen, peerAddresses);
} catch (DataException e) {
// Already logged by this.mergePeers()
}
} catch (DataException e) {
// Already logged by this.mergePeersUnlocked()
} finally {
mergePeersLock.unlock();
}
}
private boolean mergePeers(Repository repository, String addedBy, long addedWhen, List<PeerAddress> peerAddresses)
private boolean mergePeersUnlocked(String addedBy, long addedWhen, List<PeerAddress> peerAddresses)
throws DataException {
List<String> fixedNetwork = Settings.getInstance().getFixedNetwork();
if (fixedNetwork != null && !fixedNetwork.isEmpty()) {
@@ -1608,19 +1544,6 @@ public class Network {
this.allKnownPeers.addAll(newPeers);
try {
// Save new peers into database
for (PeerData peerData : newPeers) {
LOGGER.info("Adding new peer {} to repository", peerData.getAddress());
repository.getNetworkRepository().save(peerData);
}
repository.saveChanges();
} catch (DataException e) {
LOGGER.error("Repository issue while merging peers list from {}", addedBy, e);
throw e;
}
return true;
}
}
@@ -1665,6 +1588,33 @@ public class Network {
LOGGER.warn("Interrupted while waiting for networking threads to terminate");
}
try( Repository repository = RepositoryManager.getRepository() ){
// reset all known peers in database
int deletedCount = repository.getNetworkRepository().deleteAllPeers();
LOGGER.debug("Deleted {} known peers", deletedCount);
List<PeerData> knownPeersToProcess;
synchronized (this.allKnownPeers) {
knownPeersToProcess = new ArrayList<>(this.allKnownPeers);
}
int addedPeerCount = 0;
// save all known peers for next start up
for (PeerData knownPeerToProcess : knownPeersToProcess) {
repository.getNetworkRepository().save(knownPeerToProcess);
addedPeerCount++;
}
repository.saveChanges();
LOGGER.debug("Added {} known peers", addedPeerCount);
} catch (DataException e) {
LOGGER.error(e.getMessage(), e);
}
// Close all peer connections
for (Peer peer : this.getImmutableConnectedPeers()) {
peer.shutdown();