forked from Qortal/qortal
Rework BlockMessage caching with new pseudo outgoing-only message that only caches raw bytes
This commit is contained in:
parent
745528a9b1
commit
b319d6db6b
@ -67,8 +67,8 @@ import org.qortal.gui.SysTray;
|
|||||||
import org.qortal.network.Network;
|
import org.qortal.network.Network;
|
||||||
import org.qortal.network.Peer;
|
import org.qortal.network.Peer;
|
||||||
import org.qortal.network.message.ArbitraryDataMessage;
|
import org.qortal.network.message.ArbitraryDataMessage;
|
||||||
import org.qortal.network.message.BlockMessage;
|
|
||||||
import org.qortal.network.message.BlockSummariesMessage;
|
import org.qortal.network.message.BlockSummariesMessage;
|
||||||
|
import org.qortal.network.message.CachedBlockMessage;
|
||||||
import org.qortal.network.message.GetArbitraryDataMessage;
|
import org.qortal.network.message.GetArbitraryDataMessage;
|
||||||
import org.qortal.network.message.GetBlockMessage;
|
import org.qortal.network.message.GetBlockMessage;
|
||||||
import org.qortal.network.message.GetBlockSummariesMessage;
|
import org.qortal.network.message.GetBlockSummariesMessage;
|
||||||
@ -148,9 +148,9 @@ public class Controller extends Thread {
|
|||||||
|
|
||||||
/** Cache of BlockMessages, indexed by block signature */
|
/** Cache of BlockMessages, indexed by block signature */
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
private final LinkedHashMap<ByteArray, BlockMessage> blockMessageCache = new LinkedHashMap<>() {
|
private final LinkedHashMap<ByteArray, CachedBlockMessage> blockMessageCache = new LinkedHashMap<>() {
|
||||||
@Override
|
@Override
|
||||||
protected boolean removeEldestEntry(Map.Entry<ByteArray, BlockMessage> eldest) {
|
protected boolean removeEldestEntry(Map.Entry<ByteArray, CachedBlockMessage> eldest) {
|
||||||
return this.size() > Settings.getInstance().getBlockCacheSize();
|
return this.size() > Settings.getInstance().getBlockCacheSize();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -1151,7 +1151,7 @@ public class Controller extends Thread {
|
|||||||
|
|
||||||
ByteArray signatureAsByteArray = new ByteArray(signature);
|
ByteArray signatureAsByteArray = new ByteArray(signature);
|
||||||
|
|
||||||
BlockMessage cachedBlockMessage = this.blockMessageCache.get(signatureAsByteArray);
|
CachedBlockMessage cachedBlockMessage = this.blockMessageCache.get(signatureAsByteArray);
|
||||||
int blockCacheSize = Settings.getInstance().getBlockCacheSize();
|
int blockCacheSize = Settings.getInstance().getBlockCacheSize();
|
||||||
|
|
||||||
// Check cached latest block message
|
// Check cached latest block message
|
||||||
@ -1159,7 +1159,7 @@ public class Controller extends Thread {
|
|||||||
this.stats.getBlockMessageStats.cacheHits.incrementAndGet();
|
this.stats.getBlockMessageStats.cacheHits.incrementAndGet();
|
||||||
|
|
||||||
// We need to duplicate it to prevent multiple threads setting ID on the same message
|
// We need to duplicate it to prevent multiple threads setting ID on the same message
|
||||||
BlockMessage clonedBlockMessage = cachedBlockMessage.cloneWithNewId(message.getId());
|
CachedBlockMessage clonedBlockMessage = cachedBlockMessage.cloneWithNewId(message.getId());
|
||||||
|
|
||||||
if (!peer.sendMessage(clonedBlockMessage))
|
if (!peer.sendMessage(clonedBlockMessage))
|
||||||
peer.disconnect("failed to send block");
|
peer.disconnect("failed to send block");
|
||||||
@ -1187,12 +1187,15 @@ public class Controller extends Thread {
|
|||||||
|
|
||||||
Block block = new Block(repository, blockData);
|
Block block = new Block(repository, blockData);
|
||||||
|
|
||||||
BlockMessage blockMessage = new BlockMessage(block);
|
CachedBlockMessage blockMessage = new CachedBlockMessage(block);
|
||||||
blockMessage.setId(message.getId());
|
blockMessage.setId(message.getId());
|
||||||
|
|
||||||
// This call also causes the other needed data to be pulled in from repository
|
// This call also causes the other needed data to be pulled in from repository
|
||||||
if (!peer.sendMessage(blockMessage))
|
if (!peer.sendMessage(blockMessage)) {
|
||||||
peer.disconnect("failed to send block");
|
peer.disconnect("failed to send block");
|
||||||
|
// Don't fall-through to caching because failure to send might be from failure to build message
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// If request is for a recent block, cache it
|
// If request is for a recent block, cache it
|
||||||
if (getChainHeight() - blockData.getHeight() <= blockCacheSize) {
|
if (getChainHeight() - blockData.getHeight() <= blockCacheSize) {
|
||||||
|
@ -0,0 +1,70 @@
|
|||||||
|
package org.qortal.network.message;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.qortal.block.Block;
|
||||||
|
import org.qortal.transform.TransformationException;
|
||||||
|
import org.qortal.transform.block.BlockTransformer;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
|
||||||
|
// This is an OUTGOING-only Message which more readily lends itself to being cached
|
||||||
|
public class CachedBlockMessage extends Message {
|
||||||
|
|
||||||
|
private Block block = null;
|
||||||
|
private byte[] cachedBytes = null;
|
||||||
|
|
||||||
|
public CachedBlockMessage(Block block) {
|
||||||
|
super(MessageType.BLOCK);
|
||||||
|
|
||||||
|
this.block = block;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CachedBlockMessage(byte[] cachedBytes) {
|
||||||
|
super(MessageType.BLOCK);
|
||||||
|
|
||||||
|
this.block = null;
|
||||||
|
this.cachedBytes = cachedBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws UnsupportedEncodingException {
|
||||||
|
throw new UnsupportedOperationException("CachedBlockMessage is for outgoing messages only");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected byte[] toData() {
|
||||||
|
// Already serialized?
|
||||||
|
if (this.cachedBytes != null)
|
||||||
|
return cachedBytes;
|
||||||
|
|
||||||
|
if (this.block == null)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||||
|
|
||||||
|
bytes.write(Ints.toByteArray(this.block.getBlockData().getHeight()));
|
||||||
|
|
||||||
|
bytes.write(BlockTransformer.toBytes(this.block));
|
||||||
|
|
||||||
|
this.cachedBytes = bytes.toByteArray();
|
||||||
|
// We no longer need source Block
|
||||||
|
// and Block contains repository handle which is highly likely to be invalid after this call
|
||||||
|
this.block = null;
|
||||||
|
|
||||||
|
return this.cachedBytes;
|
||||||
|
} catch (TransformationException | IOException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public CachedBlockMessage cloneWithNewId(int newId) {
|
||||||
|
CachedBlockMessage clone = new CachedBlockMessage(this.cachedBytes);
|
||||||
|
clone.setId(newId);
|
||||||
|
return clone;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user