From d2a92db921490a42ddf8b80fec28cbbf827041ce Mon Sep 17 00:00:00 2001 From: catbref Date: Thu, 29 Oct 2020 11:02:02 +0000 Subject: [PATCH] More caching for GetBlockMessage. Added API call GET /admin/enginestats to monitor cache usage --- .../org/qortal/api/model/ActivitySummary.java | 55 +++++- .../qortal/api/resource/AdminResource.java | 36 +++- .../org/qortal/controller/Controller.java | 157 ++++++++++++++---- .../qortal/network/message/BlockMessage.java | 6 + 4 files changed, 212 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/qortal/api/model/ActivitySummary.java b/src/main/java/org/qortal/api/model/ActivitySummary.java index 27b5ed8d..e8e9a3aa 100644 --- a/src/main/java/org/qortal/api/model/ActivitySummary.java +++ b/src/main/java/org/qortal/api/model/ActivitySummary.java @@ -1,5 +1,6 @@ package org.qortal.api.model; +import java.util.Collections; import java.util.EnumMap; import java.util.Map; @@ -13,17 +14,61 @@ import org.qortal.transaction.Transaction.TransactionType; @XmlAccessorType(XmlAccessType.FIELD) public class ActivitySummary { - public int blockCount; - public int transactionCount; - public int assetsIssued; - public int namesRegistered; + private int blockCount; + private int assetsIssued; + private int namesRegistered; // Assuming TransactionType values are contiguous so 'length' equals count @XmlJavaTypeAdapter(TransactionCountMapXmlAdapter.class) - public Map transactionCountByType = new EnumMap<>(TransactionType.class); + private Map transactionCountByType = new EnumMap<>(TransactionType.class); + private int totalTransactionCount = 0; public ActivitySummary() { // Needed for JAXB } + public int getBlockCount() { + return this.blockCount; + } + + public void setBlockCount(int blockCount) { + this.blockCount = blockCount; + } + + public int getTotalTransactionCount() { + return this.totalTransactionCount; + } + + public int getAssetsIssued() { + return this.assetsIssued; + } + + public void setAssetsIssued(int assetsIssued) { + this.assetsIssued = assetsIssued; + } + + public int getNamesRegistered() { + return this.namesRegistered; + } + + public void setNamesRegistered(int namesRegistered) { + this.namesRegistered = namesRegistered; + } + + public Map getTransactionCountByType() { + return Collections.unmodifiableMap(this.transactionCountByType); + } + + public void setTransactionCountByType(TransactionType transactionType, int transactionCount) { + this.transactionCountByType.put(transactionType, transactionCount); + + this.totalTransactionCount = this.transactionCountByType.values().stream().mapToInt(Integer::intValue).sum(); + } + + public void setTransactionCountByType(Map transactionCountByType) { + this.transactionCountByType = new EnumMap<>(transactionCountByType); + + this.totalTransactionCount = this.transactionCountByType.values().stream().mapToInt(Integer::intValue).sum(); + } + } diff --git a/src/main/java/org/qortal/api/resource/AdminResource.java b/src/main/java/org/qortal/api/resource/AdminResource.java index 52d7a9e7..6fbadb96 100644 --- a/src/main/java/org/qortal/api/resource/AdminResource.java +++ b/src/main/java/org/qortal/api/resource/AdminResource.java @@ -182,6 +182,8 @@ public class AdminResource { ) @ApiErrors({ApiError.REPOSITORY_ISSUE}) public ActivitySummary summary() { + Security.checkApiCallAllowed(request); + ActivitySummary summary = new ActivitySummary(); LocalDate date = LocalDate.now(); @@ -193,16 +195,13 @@ public class AdminResource { int startHeight = repository.getBlockRepository().getHeightFromTimestamp(start); int endHeight = repository.getBlockRepository().getBlockchainHeight(); - summary.blockCount = endHeight - startHeight; + summary.setBlockCount(endHeight - startHeight); - summary.transactionCountByType = repository.getTransactionRepository().getTransactionSummary(startHeight + 1, endHeight); + summary.setTransactionCountByType(repository.getTransactionRepository().getTransactionSummary(startHeight + 1, endHeight)); - for (Integer count : summary.transactionCountByType.values()) - summary.transactionCount += count; + summary.setAssetsIssued(repository.getAssetRepository().getRecentAssetIds(start).size()); - summary.assetsIssued = repository.getAssetRepository().getRecentAssetIds(start).size(); - - summary.namesRegistered = repository.getNameRepository().getRecentNames(start).size(); + summary.setNamesRegistered (repository.getNameRepository().getRecentNames(start).size()); return summary; } catch (DataException e) { @@ -210,6 +209,29 @@ public class AdminResource { } } + @GET + @Path("/enginestats") + @Operation( + summary = "Fetch statistics snapshot for core engine", + responses = { + @ApiResponse( + content = @Content( + mediaType = MediaType.APPLICATION_JSON, + array = @ArraySchema( + schema = @Schema( + implementation = Controller.StatsSnapshot.class + ) + ) + ) + ) + } + ) + public Controller.StatsSnapshot getEngineStats() { + Security.checkApiCallAllowed(request); + + return Controller.getInstance().getStatsSnapshot(); + } + @GET @Path("/mintingaccounts") @Operation( diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 6e52ae2c..f1116364 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -16,6 +16,7 @@ import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -24,11 +25,15 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bouncycastle.jce.provider.BouncyCastleProvider; @@ -137,10 +142,18 @@ public class Controller extends Thread { private ExecutorService callbackExecutor = Executors.newFixedThreadPool(3); private volatile boolean notifyGroupMembershipChange = false; - private static final int LATEST_BLOCKS_SIZE = 10; // To cover typical Synchronizer request + a few spare + private static final int BLOCK_CACHE_SIZE = 10; // To cover typical Synchronizer request + a few spare /** Latest blocks on our chain. Note: tail/last is the latest block. */ private final Deque latestBlocks = new LinkedList<>(); - private volatile BlockMessage latestBlockMessage = null; + + /** Cache of BlockMessages, indexed by block signature */ + @SuppressWarnings("serial") + private final LinkedHashMap blockMessageCache = new LinkedHashMap<>() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return this.size() > BLOCK_CACHE_SIZE; + } + }; private long repositoryBackupTimestamp = startTime; // ms private long ntpCheckTimestamp = startTime; // ms @@ -188,6 +201,47 @@ public class Controller extends Thread { /** Cache of latest blocks' online accounts */ Deque> latestBlocksOnlineAccounts = new ArrayDeque<>(MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS); + // Stats + @XmlAccessorType(XmlAccessType.FIELD) + public static class StatsSnapshot { + public static class GetBlockMessageStats { + public AtomicLong requests = new AtomicLong(); + public AtomicLong cacheHits = new AtomicLong(); + public AtomicLong unknownBlocks = new AtomicLong(); + public AtomicLong cacheFills = new AtomicLong(); + + public GetBlockMessageStats() { + } + } + public GetBlockMessageStats getBlockMessageStats = new GetBlockMessageStats(); + + public static class GetBlockSummariesStats { + public AtomicLong requests = new AtomicLong(); + public AtomicLong cacheHits = new AtomicLong(); + public AtomicLong fullyFromCache = new AtomicLong(); + + public GetBlockSummariesStats() { + } + } + public GetBlockSummariesStats getBlockSummariesStats = new GetBlockSummariesStats(); + + public static class GetBlockSignaturesV2Stats { + public AtomicLong requests = new AtomicLong(); + public AtomicLong cacheHits = new AtomicLong(); + public AtomicLong fullyFromCache = new AtomicLong(); + + public GetBlockSignaturesV2Stats() { + } + } + public GetBlockSignaturesV2Stats getBlockSignaturesV2Stats = new GetBlockSignaturesV2Stats(); + + public AtomicLong latestBlocksCacheRefills = new AtomicLong(); + + public StatsSnapshot() { + } + } + private final StatsSnapshot stats = new StatsSnapshot(); + // Constructors private Controller(String[] args) { @@ -267,7 +321,7 @@ public class Controller extends Thread { synchronized (this.latestBlocks) { this.latestBlocks.clear(); - for (int i = 0; i < LATEST_BLOCKS_SIZE && blockData != null; ++i) { + for (int i = 0; i < BLOCK_CACHE_SIZE && blockData != null; ++i) { this.latestBlocks.addFirst(blockData); blockData = repository.getBlockRepository().fromHeight(blockData.getHeight() - 1); } @@ -870,6 +924,10 @@ public class Controller extends Thread { if (cachedChainTip != null && Arrays.equals(cachedChainTip.getSignature(), blockDataCopy.getReference())) { // Chain tip is parent for new latest block, so we can safely add new latest block this.latestBlocks.addLast(latestBlockData); + + // Trim if necessary + if (latestBlockData.getHeight() - this.latestBlocks.peekFirst().getHeight() >= BLOCK_CACHE_SIZE) + this.latestBlocks.pollFirst(); } else { if (cachedChainTip != null) // Chain tip didn't match - potentially abnormal behaviour? @@ -878,8 +936,10 @@ public class Controller extends Thread { Base58.encode(blockDataCopy.getSignature()), Base58.encode(blockDataCopy.getReference()))); - // Protectively rebuild cache + // Defensively rebuild cache try { + this.stats.latestBlocksCacheRefills.incrementAndGet(); + this.refillLatestBlocksCache(); } catch (DataException e) { LOGGER.warn(() -> "Couldn't refill latest blocks cache?", e); @@ -925,8 +985,10 @@ public class Controller extends Thread { Base58.encode(cachedChainTip.getReference()), Base58.encode(blockDataCopy.getSignature()))); - // Protectively rebuild cache + // Defensively rebuild cache try { + this.stats.latestBlocksCacheRefills.incrementAndGet(); + this.refillLatestBlocksCache(); } catch (DataException e) { LOGGER.warn(() -> "Couldn't refill latest blocks cache?", e); @@ -1041,14 +1103,20 @@ public class Controller extends Thread { private void onNetworkGetBlockMessage(Peer peer, Message message) { GetBlockMessage getBlockMessage = (GetBlockMessage) message; byte[] signature = getBlockMessage.getSignature(); + this.stats.getBlockMessageStats.requests.incrementAndGet(); - BlockMessage blockMessage = this.latestBlockMessage; + ByteArray signatureAsByteArray = new ByteArray(signature); + + BlockMessage cachedBlockMessage = this.blockMessageCache.get(signatureAsByteArray); // Check cached latest block message - if (blockMessage != null && Arrays.equals(blockMessage.getBlockData().getSignature(), signature)) { - blockMessage.setId(message.getId()); + if (cachedBlockMessage != null) { + this.stats.getBlockMessageStats.cacheHits.incrementAndGet(); - if (!peer.sendMessage(blockMessage)) + // We need to duplicate it to prevent multiple threads setting ID on the same message + BlockMessage clonedBlockMessage = cachedBlockMessage.cloneWithNewId(message.getId()); + + if (!peer.sendMessage(clonedBlockMessage)) peer.disconnect("failed to send block"); return; @@ -1059,6 +1127,7 @@ public class Controller extends Thread { if (blockData == null) { // We don't have this block + this.stats.getBlockMessageStats.unknownBlocks.getAndIncrement(); // Send valid, yet unexpected message type in response, so peer's synchronizer doesn't have to wait for timeout LOGGER.debug(() -> String.format("Sending 'block unknown' response to peer %s for GET_BLOCK request for unknown block %s", peer, Base58.encode(signature))); @@ -1073,16 +1142,19 @@ public class Controller extends Thread { Block block = new Block(repository, blockData); - blockMessage = new BlockMessage(block); + BlockMessage blockMessage = new BlockMessage(block); blockMessage.setId(message.getId()); // This call also causes the other needed data to be pulled in from repository if (!peer.sendMessage(blockMessage)) peer.disconnect("failed to send block"); - // If request is for latest block, cache it - if (Arrays.equals(this.getChainTip().getSignature(), signature)) - this.latestBlockMessage = blockMessage; + // If request is for a recent block, cache it + if (getChainHeight() - blockData.getHeight() <= BLOCK_CACHE_SIZE) { + this.stats.getBlockMessageStats.cacheFills.incrementAndGet(); + + this.blockMessageCache.put(new ByteArray(blockData.getSignature()), blockMessage); + } } catch (DataException e) { LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e); } @@ -1130,6 +1202,7 @@ public class Controller extends Thread { private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) { GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; final byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); + this.stats.getBlockSummariesStats.requests.incrementAndGet(); List blockSummaries = new ArrayList<>(); @@ -1141,7 +1214,7 @@ public class Controller extends Thread { .collect(Collectors.toList()); } - if (blockSummaries.isEmpty()) + if (blockSummaries.isEmpty()) { try (final Repository repository = RepositoryManager.getRepository()) { int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested()); @@ -1156,6 +1229,12 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e); } + } else { + this.stats.getBlockSummariesStats.cacheHits.incrementAndGet(); + + if (blockSummaries.size() >= getBlockSummariesMessage.getNumberRequested()) + this.stats.getBlockSummariesStats.fullyFromCache.incrementAndGet(); + } Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries); blockSummariesMessage.setId(message.getId()); @@ -1165,29 +1244,43 @@ public class Controller extends Thread { private void onNetworkGetSignaturesV2Message(Peer peer, Message message) { GetSignaturesV2Message getSignaturesMessage = (GetSignaturesV2Message) message; - byte[] parentSignature = getSignaturesMessage.getParentSignature(); + final byte[] parentSignature = getSignaturesMessage.getParentSignature(); + this.stats.getBlockSignaturesV2Stats.requests.incrementAndGet(); - try (final Repository repository = RepositoryManager.getRepository()) { - List signatures = new ArrayList<>(); + List signatures = new ArrayList<>(); - do { + // Attempt to serve from our cache of latest blocks + synchronized (this.latestBlocks) { + signatures = this.latestBlocks.stream() + .dropWhile(cachedBlockData -> Arrays.equals(cachedBlockData.getSignature(), parentSignature)) + .map(BlockData::getSignature) + .collect(Collectors.toList()); + } + + if (signatures.isEmpty()) { + try (final Repository repository = RepositoryManager.getRepository()) { + int numberRequested = getSignaturesMessage.getNumberRequested(); BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); - if (blockData == null) - // No more signatures to send to peer - break; + while (blockData != null && signatures.size() < numberRequested) { + signatures.add(blockData.getSignature()); - parentSignature = blockData.getSignature(); - signatures.add(parentSignature); - } while (signatures.size() < getSignaturesMessage.getNumberRequested()); + blockData = repository.getBlockRepository().fromReference(blockData.getSignature()); + } + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); + } + } else { + this.stats.getBlockSignaturesV2Stats.cacheHits.incrementAndGet(); - Message signaturesMessage = new SignaturesMessage(signatures); - signaturesMessage.setId(message.getId()); - if (!peer.sendMessage(signaturesMessage)) - peer.disconnect("failed to send signatures (v2)"); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); + if (signatures.size() >= getSignaturesMessage.getNumberRequested()) + this.stats.getBlockSignaturesV2Stats.fullyFromCache.incrementAndGet(); } + + Message signaturesMessage = new SignaturesMessage(signatures); + signaturesMessage.setId(message.getId()); + if (!peer.sendMessage(signaturesMessage)) + peer.disconnect("failed to send signatures (v2)"); } private void onNetworkHeightV2Message(Peer peer, Message message) { @@ -1798,4 +1891,8 @@ public class Controller extends Thread { return now - offset; } + public StatsSnapshot getStatsSnapshot() { + return this.stats; + } + } diff --git a/src/main/java/org/qortal/network/message/BlockMessage.java b/src/main/java/org/qortal/network/message/BlockMessage.java index e63dce92..b07dc8b1 100644 --- a/src/main/java/org/qortal/network/message/BlockMessage.java +++ b/src/main/java/org/qortal/network/message/BlockMessage.java @@ -94,4 +94,10 @@ public class BlockMessage extends Message { } } + public BlockMessage cloneWithNewId(int newId) { + BlockMessage clone = new BlockMessage(this.block); + clone.setId(newId); + return clone; + } + }