More caching for GetBlockMessage. Added API call GET /admin/enginestats to monitor cache usage

This commit is contained in:
catbref 2020-10-29 11:02:02 +00:00
parent 9c18a33d7f
commit d2a92db921
4 changed files with 212 additions and 42 deletions

View File

@ -1,5 +1,6 @@
package org.qortal.api.model;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
@ -13,17 +14,61 @@ import org.qortal.transaction.Transaction.TransactionType;
@XmlAccessorType(XmlAccessType.FIELD)
public class ActivitySummary {
public int blockCount;
public int transactionCount;
public int assetsIssued;
public int namesRegistered;
private int blockCount;
private int assetsIssued;
private int namesRegistered;
// Assuming TransactionType values are contiguous so 'length' equals count
@XmlJavaTypeAdapter(TransactionCountMapXmlAdapter.class)
public Map<TransactionType, Integer> transactionCountByType = new EnumMap<>(TransactionType.class);
private Map<TransactionType, Integer> transactionCountByType = new EnumMap<>(TransactionType.class);
private int totalTransactionCount = 0;
public ActivitySummary() {
// Needed for JAXB
}
public int getBlockCount() {
return this.blockCount;
}
public void setBlockCount(int blockCount) {
this.blockCount = blockCount;
}
public int getTotalTransactionCount() {
return this.totalTransactionCount;
}
public int getAssetsIssued() {
return this.assetsIssued;
}
public void setAssetsIssued(int assetsIssued) {
this.assetsIssued = assetsIssued;
}
public int getNamesRegistered() {
return this.namesRegistered;
}
public void setNamesRegistered(int namesRegistered) {
this.namesRegistered = namesRegistered;
}
public Map<TransactionType, Integer> getTransactionCountByType() {
return Collections.unmodifiableMap(this.transactionCountByType);
}
public void setTransactionCountByType(TransactionType transactionType, int transactionCount) {
this.transactionCountByType.put(transactionType, transactionCount);
this.totalTransactionCount = this.transactionCountByType.values().stream().mapToInt(Integer::intValue).sum();
}
public void setTransactionCountByType(Map<TransactionType, Integer> transactionCountByType) {
this.transactionCountByType = new EnumMap<>(transactionCountByType);
this.totalTransactionCount = this.transactionCountByType.values().stream().mapToInt(Integer::intValue).sum();
}
}

View File

@ -182,6 +182,8 @@ public class AdminResource {
)
@ApiErrors({ApiError.REPOSITORY_ISSUE})
public ActivitySummary summary() {
Security.checkApiCallAllowed(request);
ActivitySummary summary = new ActivitySummary();
LocalDate date = LocalDate.now();
@ -193,16 +195,13 @@ public class AdminResource {
int startHeight = repository.getBlockRepository().getHeightFromTimestamp(start);
int endHeight = repository.getBlockRepository().getBlockchainHeight();
summary.blockCount = endHeight - startHeight;
summary.setBlockCount(endHeight - startHeight);
summary.transactionCountByType = repository.getTransactionRepository().getTransactionSummary(startHeight + 1, endHeight);
summary.setTransactionCountByType(repository.getTransactionRepository().getTransactionSummary(startHeight + 1, endHeight));
for (Integer count : summary.transactionCountByType.values())
summary.transactionCount += count;
summary.setAssetsIssued(repository.getAssetRepository().getRecentAssetIds(start).size());
summary.assetsIssued = repository.getAssetRepository().getRecentAssetIds(start).size();
summary.namesRegistered = repository.getNameRepository().getRecentNames(start).size();
summary.setNamesRegistered (repository.getNameRepository().getRecentNames(start).size());
return summary;
} catch (DataException e) {
@ -210,6 +209,29 @@ public class AdminResource {
}
}
@GET
@Path("/enginestats")
@Operation(
summary = "Fetch statistics snapshot for core engine",
responses = {
@ApiResponse(
content = @Content(
mediaType = MediaType.APPLICATION_JSON,
array = @ArraySchema(
schema = @Schema(
implementation = Controller.StatsSnapshot.class
)
)
)
)
}
)
public Controller.StatsSnapshot getEngineStats() {
Security.checkApiCallAllowed(request);
return Controller.getInstance().getStatsSnapshot();
}
@GET
@Path("/mintingaccounts")
@Operation(

View File

@ -16,6 +16,7 @@ import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -24,11 +25,15 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
@ -137,10 +142,18 @@ public class Controller extends Thread {
private ExecutorService callbackExecutor = Executors.newFixedThreadPool(3);
private volatile boolean notifyGroupMembershipChange = false;
private static final int LATEST_BLOCKS_SIZE = 10; // To cover typical Synchronizer request + a few spare
private static final int BLOCK_CACHE_SIZE = 10; // To cover typical Synchronizer request + a few spare
/** Latest blocks on our chain. Note: tail/last is the latest block. */
private final Deque<BlockData> latestBlocks = new LinkedList<>();
private volatile BlockMessage latestBlockMessage = null;
/** Cache of BlockMessages, indexed by block signature */
@SuppressWarnings("serial")
private final LinkedHashMap<ByteArray, BlockMessage> blockMessageCache = new LinkedHashMap<>() {
@Override
protected boolean removeEldestEntry(Map.Entry<ByteArray, BlockMessage> eldest) {
return this.size() > BLOCK_CACHE_SIZE;
}
};
private long repositoryBackupTimestamp = startTime; // ms
private long ntpCheckTimestamp = startTime; // ms
@ -188,6 +201,47 @@ public class Controller extends Thread {
/** Cache of latest blocks' online accounts */
Deque<List<OnlineAccountData>> latestBlocksOnlineAccounts = new ArrayDeque<>(MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS);
// Stats
@XmlAccessorType(XmlAccessType.FIELD)
public static class StatsSnapshot {
public static class GetBlockMessageStats {
public AtomicLong requests = new AtomicLong();
public AtomicLong cacheHits = new AtomicLong();
public AtomicLong unknownBlocks = new AtomicLong();
public AtomicLong cacheFills = new AtomicLong();
public GetBlockMessageStats() {
}
}
public GetBlockMessageStats getBlockMessageStats = new GetBlockMessageStats();
public static class GetBlockSummariesStats {
public AtomicLong requests = new AtomicLong();
public AtomicLong cacheHits = new AtomicLong();
public AtomicLong fullyFromCache = new AtomicLong();
public GetBlockSummariesStats() {
}
}
public GetBlockSummariesStats getBlockSummariesStats = new GetBlockSummariesStats();
public static class GetBlockSignaturesV2Stats {
public AtomicLong requests = new AtomicLong();
public AtomicLong cacheHits = new AtomicLong();
public AtomicLong fullyFromCache = new AtomicLong();
public GetBlockSignaturesV2Stats() {
}
}
public GetBlockSignaturesV2Stats getBlockSignaturesV2Stats = new GetBlockSignaturesV2Stats();
public AtomicLong latestBlocksCacheRefills = new AtomicLong();
public StatsSnapshot() {
}
}
private final StatsSnapshot stats = new StatsSnapshot();
// Constructors
private Controller(String[] args) {
@ -267,7 +321,7 @@ public class Controller extends Thread {
synchronized (this.latestBlocks) {
this.latestBlocks.clear();
for (int i = 0; i < LATEST_BLOCKS_SIZE && blockData != null; ++i) {
for (int i = 0; i < BLOCK_CACHE_SIZE && blockData != null; ++i) {
this.latestBlocks.addFirst(blockData);
blockData = repository.getBlockRepository().fromHeight(blockData.getHeight() - 1);
}
@ -870,6 +924,10 @@ public class Controller extends Thread {
if (cachedChainTip != null && Arrays.equals(cachedChainTip.getSignature(), blockDataCopy.getReference())) {
// Chain tip is parent for new latest block, so we can safely add new latest block
this.latestBlocks.addLast(latestBlockData);
// Trim if necessary
if (latestBlockData.getHeight() - this.latestBlocks.peekFirst().getHeight() >= BLOCK_CACHE_SIZE)
this.latestBlocks.pollFirst();
} else {
if (cachedChainTip != null)
// Chain tip didn't match - potentially abnormal behaviour?
@ -878,8 +936,10 @@ public class Controller extends Thread {
Base58.encode(blockDataCopy.getSignature()),
Base58.encode(blockDataCopy.getReference())));
// Protectively rebuild cache
// Defensively rebuild cache
try {
this.stats.latestBlocksCacheRefills.incrementAndGet();
this.refillLatestBlocksCache();
} catch (DataException e) {
LOGGER.warn(() -> "Couldn't refill latest blocks cache?", e);
@ -925,8 +985,10 @@ public class Controller extends Thread {
Base58.encode(cachedChainTip.getReference()),
Base58.encode(blockDataCopy.getSignature())));
// Protectively rebuild cache
// Defensively rebuild cache
try {
this.stats.latestBlocksCacheRefills.incrementAndGet();
this.refillLatestBlocksCache();
} catch (DataException e) {
LOGGER.warn(() -> "Couldn't refill latest blocks cache?", e);
@ -1041,14 +1103,20 @@ public class Controller extends Thread {
private void onNetworkGetBlockMessage(Peer peer, Message message) {
GetBlockMessage getBlockMessage = (GetBlockMessage) message;
byte[] signature = getBlockMessage.getSignature();
this.stats.getBlockMessageStats.requests.incrementAndGet();
BlockMessage blockMessage = this.latestBlockMessage;
ByteArray signatureAsByteArray = new ByteArray(signature);
BlockMessage cachedBlockMessage = this.blockMessageCache.get(signatureAsByteArray);
// Check cached latest block message
if (blockMessage != null && Arrays.equals(blockMessage.getBlockData().getSignature(), signature)) {
blockMessage.setId(message.getId());
if (cachedBlockMessage != null) {
this.stats.getBlockMessageStats.cacheHits.incrementAndGet();
if (!peer.sendMessage(blockMessage))
// We need to duplicate it to prevent multiple threads setting ID on the same message
BlockMessage clonedBlockMessage = cachedBlockMessage.cloneWithNewId(message.getId());
if (!peer.sendMessage(clonedBlockMessage))
peer.disconnect("failed to send block");
return;
@ -1059,6 +1127,7 @@ public class Controller extends Thread {
if (blockData == null) {
// We don't have this block
this.stats.getBlockMessageStats.unknownBlocks.getAndIncrement();
// Send valid, yet unexpected message type in response, so peer's synchronizer doesn't have to wait for timeout
LOGGER.debug(() -> String.format("Sending 'block unknown' response to peer %s for GET_BLOCK request for unknown block %s", peer, Base58.encode(signature)));
@ -1073,16 +1142,19 @@ public class Controller extends Thread {
Block block = new Block(repository, blockData);
blockMessage = new BlockMessage(block);
BlockMessage blockMessage = new BlockMessage(block);
blockMessage.setId(message.getId());
// This call also causes the other needed data to be pulled in from repository
if (!peer.sendMessage(blockMessage))
peer.disconnect("failed to send block");
// If request is for latest block, cache it
if (Arrays.equals(this.getChainTip().getSignature(), signature))
this.latestBlockMessage = blockMessage;
// If request is for a recent block, cache it
if (getChainHeight() - blockData.getHeight() <= BLOCK_CACHE_SIZE) {
this.stats.getBlockMessageStats.cacheFills.incrementAndGet();
this.blockMessageCache.put(new ByteArray(blockData.getSignature()), blockMessage);
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e);
}
@ -1130,6 +1202,7 @@ public class Controller extends Thread {
private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) {
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
final byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
this.stats.getBlockSummariesStats.requests.incrementAndGet();
List<BlockSummaryData> blockSummaries = new ArrayList<>();
@ -1141,7 +1214,7 @@ public class Controller extends Thread {
.collect(Collectors.toList());
}
if (blockSummaries.isEmpty())
if (blockSummaries.isEmpty()) {
try (final Repository repository = RepositoryManager.getRepository()) {
int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested());
@ -1156,6 +1229,12 @@ public class Controller extends Thread {
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
} else {
this.stats.getBlockSummariesStats.cacheHits.incrementAndGet();
if (blockSummaries.size() >= getBlockSummariesMessage.getNumberRequested())
this.stats.getBlockSummariesStats.fullyFromCache.incrementAndGet();
}
Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries);
blockSummariesMessage.setId(message.getId());
@ -1165,29 +1244,43 @@ public class Controller extends Thread {
private void onNetworkGetSignaturesV2Message(Peer peer, Message message) {
GetSignaturesV2Message getSignaturesMessage = (GetSignaturesV2Message) message;
byte[] parentSignature = getSignaturesMessage.getParentSignature();
final byte[] parentSignature = getSignaturesMessage.getParentSignature();
this.stats.getBlockSignaturesV2Stats.requests.incrementAndGet();
try (final Repository repository = RepositoryManager.getRepository()) {
List<byte[]> signatures = new ArrayList<>();
List<byte[]> signatures = new ArrayList<>();
do {
// Attempt to serve from our cache of latest blocks
synchronized (this.latestBlocks) {
signatures = this.latestBlocks.stream()
.dropWhile(cachedBlockData -> Arrays.equals(cachedBlockData.getSignature(), parentSignature))
.map(BlockData::getSignature)
.collect(Collectors.toList());
}
if (signatures.isEmpty()) {
try (final Repository repository = RepositoryManager.getRepository()) {
int numberRequested = getSignaturesMessage.getNumberRequested();
BlockData blockData = repository.getBlockRepository().fromReference(parentSignature);
if (blockData == null)
// No more signatures to send to peer
break;
while (blockData != null && signatures.size() < numberRequested) {
signatures.add(blockData.getSignature());
parentSignature = blockData.getSignature();
signatures.add(parentSignature);
} while (signatures.size() < getSignaturesMessage.getNumberRequested());
blockData = repository.getBlockRepository().fromReference(blockData.getSignature());
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
} else {
this.stats.getBlockSignaturesV2Stats.cacheHits.incrementAndGet();
Message signaturesMessage = new SignaturesMessage(signatures);
signaturesMessage.setId(message.getId());
if (!peer.sendMessage(signaturesMessage))
peer.disconnect("failed to send signatures (v2)");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e);
if (signatures.size() >= getSignaturesMessage.getNumberRequested())
this.stats.getBlockSignaturesV2Stats.fullyFromCache.incrementAndGet();
}
Message signaturesMessage = new SignaturesMessage(signatures);
signaturesMessage.setId(message.getId());
if (!peer.sendMessage(signaturesMessage))
peer.disconnect("failed to send signatures (v2)");
}
private void onNetworkHeightV2Message(Peer peer, Message message) {
@ -1798,4 +1891,8 @@ public class Controller extends Thread {
return now - offset;
}
public StatsSnapshot getStatsSnapshot() {
return this.stats;
}
}

View File

@ -94,4 +94,10 @@ public class BlockMessage extends Message {
}
}
public BlockMessage cloneWithNewId(int newId) {
BlockMessage clone = new BlockMessage(this.block);
clone.setId(newId);
return clone;
}
}