diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index a0ca1d05..f57915b8 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -639,6 +639,21 @@ public class Controller extends Thread { // Disregard peers that are on the same block as last sync attempt and we didn't like their chain peers.removeIf(hasInferiorChainTip); + final int peersBeforeComparison = peers.size(); + + // Request recent block summaries from the remaining peers, and locate our common block with each + Synchronizer.getInstance().findCommonBlocksWithPeers(peers); + + // Compare the peers against each other, and against our chain, which will return an updated list excluding those without common blocks + peers = Synchronizer.getInstance().comparePeers(peers); + + // We may have added more inferior chain tips when comparing peers, so remove any peers that are currently on those chains + peers.removeIf(hasInferiorChainTip); + + final int peersRemoved = peersBeforeComparison - peers.size(); + if (peersRemoved > 0) + LOGGER.debug(String.format("Ignoring %d peers on inferior chains. Peers remaining: %d", peersRemoved, peers.size())); + if (peers.isEmpty()) return; @@ -744,6 +759,13 @@ public class Controller extends Thread { } } + public void addInferiorChainSignature(byte[] inferiorSignature) { + // Update our list of inferior chain tips + ByteArray inferiorChainSignature = new ByteArray(inferiorSignature); + if (!inferiorChainSignatures.contains(inferiorChainSignature)) + inferiorChainSignatures.add(inferiorChainSignature); + } + public static class StatusChangeEvent implements Event { public StatusChangeEvent() { } diff --git a/src/main/java/org/qortal/controller/Synchronizer.java b/src/main/java/org/qortal/controller/Synchronizer.java index c35f37a3..54d05a06 100644 --- a/src/main/java/org/qortal/controller/Synchronizer.java +++ b/src/main/java/org/qortal/controller/Synchronizer.java @@ -8,6 +8,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import java.util.Iterator; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -17,6 +18,7 @@ import org.qortal.block.Block; import org.qortal.block.Block.ValidationResult; import org.qortal.data.block.BlockData; import org.qortal.data.block.BlockSummaryData; +import org.qortal.data.block.CommonBlockData; import org.qortal.data.network.PeerChainTipData; import org.qortal.data.transaction.RewardShareTransactionData; import org.qortal.data.transaction.TransactionData; @@ -75,6 +77,362 @@ public class Synchronizer { return instance; } + + /** + * Iterate through a list of supplied peers, and attempt to find our common block with each. + * If a common block is found, its summary will be retained in the peer's commonBlockSummary property, for processing later. + *

+ * Will return SynchronizationResult.OK on success. + *

+ * @param peers + * @return SynchronizationResult.OK if the process completed successfully, or a different SynchronizationResult if something went wrong. + * @throws InterruptedException + */ + public SynchronizationResult findCommonBlocksWithPeers(List peers) throws InterruptedException { + try (final Repository repository = RepositoryManager.getRepository()) { + try { + + if (peers.size() == 0) + return SynchronizationResult.NOTHING_TO_DO; + + // If our latest block is very old, it's best that we don't try and determine the best peers to sync to. + // This is because it can involve very large chain comparisons, which is too intensive. + // In reality, most forking problems occur near the chain tips, so we will reserve this functionality for those situations. + final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); + if (minLatestBlockTimestamp == null) + return SynchronizationResult.REPOSITORY_ISSUE; + + final BlockData ourLatestBlockData = repository.getBlockRepository().getLastBlock(); + if (ourLatestBlockData.getTimestamp() < minLatestBlockTimestamp) { + LOGGER.debug(String.format("Our latest block is very old, so we won't collect common block info from peers")); + return SynchronizationResult.NOTHING_TO_DO; + } + + LOGGER.debug(String.format("Searching for common blocks with %d peers...", peers.size())); + final long startTime = System.currentTimeMillis(); + + for (Peer peer : peers) { + // Are we shutting down? + if (Controller.isStopping()) + return SynchronizationResult.SHUTTING_DOWN; + + // Check if we can use the cached common block data, by comparing the peer's current chain tip against the peer's chain tip when we last found our common block + PeerChainTipData peerChainTipData = peer.getChainTipData(); + CommonBlockData commonBlockData = peer.getCommonBlockData(); + + if (peerChainTipData != null && commonBlockData != null) { + PeerChainTipData commonBlockChainTipData = commonBlockData.getChainTipData(); + if (peerChainTipData.getLastBlockSignature() != null && commonBlockChainTipData != null && commonBlockChainTipData.getLastBlockSignature() != null) { + if (Arrays.equals(peerChainTipData.getLastBlockSignature(), commonBlockChainTipData.getLastBlockSignature())) { + LOGGER.debug(String.format("Skipping peer %s because we already have the latest common block data in our cache. Cached common block sig is %.08s", peer, Base58.encode(commonBlockData.getCommonBlockSummary().getSignature()))); + continue; + } + } + } + + // Cached data is stale, so clear it and repopulate + peer.setCommonBlockData(null); + + // Search for the common block + Synchronizer.getInstance().findCommonBlockWithPeer(peer, repository); + } + + final long totalTimeTaken = System.currentTimeMillis() - startTime; + LOGGER.info(String.format("Finished searching for common blocks with %d peer%s. Total time taken: %d ms", peers.size(), (peers.size() != 1 ? "s" : ""), totalTimeTaken)); + + return SynchronizationResult.OK; + } finally { + repository.discardChanges(); // Free repository locks, if any, also in case anything went wrong + } + } catch (DataException e) { + LOGGER.error("Repository issue during synchronization with peer", e); + return SynchronizationResult.REPOSITORY_ISSUE; + } + } + + /** + * Attempt to find the find our common block with supplied peer. + * If a common block is found, its summary will be retained in the peer's commonBlockSummary property, for processing later. + *

+ * Will return SynchronizationResult.OK on success. + *

+ * @param peer + * @param repository + * @return SynchronizationResult.OK if the process completed successfully, or a different SynchronizationResult if something went wrong. + * @throws InterruptedException + */ + public SynchronizationResult findCommonBlockWithPeer(Peer peer, Repository repository) throws InterruptedException { + try { + final BlockData ourLatestBlockData = repository.getBlockRepository().getLastBlock(); + final int ourInitialHeight = ourLatestBlockData.getHeight(); + + PeerChainTipData peerChainTipData = peer.getChainTipData(); + int peerHeight = peerChainTipData.getLastHeight(); + byte[] peersLastBlockSignature = peerChainTipData.getLastBlockSignature(); + + byte[] ourLastBlockSignature = ourLatestBlockData.getSignature(); + LOGGER.debug(String.format("Fetching summaries from peer %s at height %d, sig %.8s, ts %d; our height %d, sig %.8s, ts %d", peer, + peerHeight, Base58.encode(peersLastBlockSignature), peer.getChainTipData().getLastBlockTimestamp(), + ourInitialHeight, Base58.encode(ourLastBlockSignature), ourLatestBlockData.getTimestamp())); + + List peerBlockSummaries = new ArrayList<>(); + SynchronizationResult findCommonBlockResult = fetchSummariesFromCommonBlock(repository, peer, ourInitialHeight, false, peerBlockSummaries); + if (findCommonBlockResult != SynchronizationResult.OK) { + // Logging performed by fetchSummariesFromCommonBlock() above + peer.setCommonBlockData(null); + return findCommonBlockResult; + } + + // First summary is common block + final BlockData commonBlockData = repository.getBlockRepository().fromSignature(peerBlockSummaries.get(0).getSignature()); + final BlockSummaryData commonBlockSummary = new BlockSummaryData(commonBlockData); + final int commonBlockHeight = commonBlockData.getHeight(); + final byte[] commonBlockSig = commonBlockData.getSignature(); + final String commonBlockSig58 = Base58.encode(commonBlockSig); + LOGGER.debug(String.format("Common block with peer %s is at height %d, sig %.8s, ts %d", peer, + commonBlockHeight, commonBlockSig58, commonBlockData.getTimestamp())); + peerBlockSummaries.remove(0); + + // Store the common block summary against the peer, and the current chain tip (for caching) + peer.setCommonBlockData(new CommonBlockData(commonBlockSummary, peerChainTipData)); + + return SynchronizationResult.OK; + } catch (DataException e) { + LOGGER.error("Repository issue during synchronization with peer", e); + return SynchronizationResult.REPOSITORY_ISSUE; + } + } + + + /** + * Compare a list of peers to determine the best peer(s) to sync to next. + *

+ * Will return a filtered list of peers on success, or an identical list of peers on failure. + * This allows us to fall back to legacy behaviour (random selection from the entire list of peers), if we are unable to make the comparison. + *

+ * @param peers + * @return a list of peers, possibly filtered. + * @throws InterruptedException + */ + public List comparePeers(List peers) throws InterruptedException { + try (final Repository repository = RepositoryManager.getRepository()) { + try { + + // If our latest block is very old, it's best that we don't try and determine the best peers to sync to. + // This is because it can involve very large chain comparisons, which is too intensive. + // In reality, most forking problems occur near the chain tips, so we will reserve this functionality for those situations. + final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); + if (minLatestBlockTimestamp == null) + return peers; + + final BlockData ourLatestBlockData = repository.getBlockRepository().getLastBlock(); + if (ourLatestBlockData.getTimestamp() < minLatestBlockTimestamp) { + LOGGER.debug(String.format("Our latest block is very old, so we won't filter the peers list")); + return peers; + } + + // Retrieve a list of unique common blocks from this list of peers + List commonBlocks = this.uniqueCommonBlocks(peers); + + // Order common blocks by height, in ascending order + // This is essential for the logic below to make the correct decisions when discarding chains - do not remove + commonBlocks.sort((b1, b2) -> Integer.valueOf(b1.getHeight()).compareTo(Integer.valueOf(b2.getHeight()))); + + // Get our latest height + final int ourHeight = ourLatestBlockData.getHeight(); + + // Create a placeholder to track of common blocks that we can discard due to being inferior chains + int dropPeersAfterCommonBlockHeight = 0; + + // Remove peers with no common block data + Iterator iterator = peers.iterator(); + while (iterator.hasNext()) { + Peer peer = (Peer) iterator.next(); + if (peer.getCommonBlockData() == null) { + LOGGER.debug(String.format("Removed peer %s because it has no common block data")); + iterator.remove(); + } + } + + // Loop through each group of common blocks + for (BlockSummaryData commonBlockSummary : commonBlocks) { + List peersSharingCommonBlock = peers.stream().filter(peer -> peer.getCommonBlockData().getCommonBlockSummary().equals(commonBlockSummary)).collect(Collectors.toList()); + + // Check if we need to discard this group of peers + if (dropPeersAfterCommonBlockHeight > 0) { + if (commonBlockSummary.getHeight() > dropPeersAfterCommonBlockHeight) { + // We have already determined that the correct chain diverged from a lower height. We are safe to skip these peers. + for (Peer peer : peersSharingCommonBlock) { + LOGGER.debug(String.format("Peer %s has common block at height %d but the superior chain is at height %d. Removing it from this round.", peer, commonBlockSummary.getHeight(), dropPeersAfterCommonBlockHeight)); + Controller.getInstance().addInferiorChainSignature(peer.getChainTipData().getLastBlockSignature()); + } + continue; + } + } + + // Calculate the length of the shortest peer chain sharing this common block, including our chain + final int ourAdditionalBlocksAfterCommonBlock = ourHeight - commonBlockSummary.getHeight(); + int minChainLength = this.calculateMinChainLength(commonBlockSummary, ourAdditionalBlocksAfterCommonBlock, peersSharingCommonBlock); + + // Fetch block summaries from each peer + for (Peer peer : peersSharingCommonBlock) { + + // Count the number of blocks this peer has beyond our common block + final int peerHeight = peer.getChainTipData().getLastHeight(); + final int peerAdditionalBlocksAfterCommonBlock = peerHeight - commonBlockSummary.getHeight(); + // Limit the number of blocks we are comparing. FUTURE: we could request more in batches, but there may not be a case when this is needed + final int summariesRequired = Math.min(peerAdditionalBlocksAfterCommonBlock, MAXIMUM_REQUEST_SIZE); + + if (summariesRequired > 0) { + LOGGER.trace(String.format("Requesting %d block summar%s from peer %s after common block %.8s. Peer height: %d", summariesRequired, (summariesRequired != 1 ? "ies" : "y"), peer, Base58.encode(commonBlockSummary.getSignature()), peerHeight)); + + List blockSummaries = this.getBlockSummaries(peer, commonBlockSummary.getSignature(), summariesRequired); + peer.getCommonBlockData().setBlockSummariesAfterCommonBlock(blockSummaries); + + if (blockSummaries != null) { + LOGGER.trace(String.format("Peer %s returned %d block summar%s", peer, blockSummaries.size(), (blockSummaries.size() != 1 ? "ies" : "y"))); + + // We need to adjust minChainLength if peers fail to return all expected block summaries + if (blockSummaries.size() < summariesRequired) { + // This could mean that the peer has re-orged. But we still have the same common block, so it's safe to proceed with this set of signatures instead. + LOGGER.debug(String.format("Peer %s returned %d block summar%s instead of expected %d", peer, blockSummaries.size(), (blockSummaries.size() != 1 ? "ies" : "y"), summariesRequired)); + + // Update minChainLength if we have at least 1 block for this peer. If we don't have any blocks, this peer will be excluded from chain weight comparisons later in the process, so we shouldn't update minChainLength + if (blockSummaries.size() > 0) + minChainLength = blockSummaries.size(); + } + } + } + else { + // There are no block summaries after this common block + peer.getCommonBlockData().setBlockSummariesAfterCommonBlock(null); + } + } + + // Fetch our corresponding block summaries. Limit to MAXIMUM_REQUEST_SIZE, in order to make the comparison fairer, as peers have been limited too + final int ourSummariesRequired = Math.min(ourAdditionalBlocksAfterCommonBlock, MAXIMUM_REQUEST_SIZE); + LOGGER.trace(String.format("About to fetch our block summaries from %d to %d. Our height: %d", commonBlockSummary.getHeight() + 1, commonBlockSummary.getHeight() + ourSummariesRequired, ourHeight)); + List ourBlockSummaries = repository.getBlockRepository().getBlockSummaries(commonBlockSummary.getHeight() + 1, commonBlockSummary.getHeight() + ourSummariesRequired); + if (ourBlockSummaries.isEmpty()) + LOGGER.debug(String.format("We don't have any block summaries so can't compare our chain against peers with this common block. We can still compare them against each other.")); + else + populateBlockSummariesMinterLevels(repository, ourBlockSummaries); + + // Create array to hold peers for comparison + List superiorPeersForComparison = new ArrayList<>(); + + // Calculate our chain weight + BigInteger ourChainWeight = BigInteger.valueOf(0); + if (ourBlockSummaries.size() > 0) + ourChainWeight = Block.calcChainWeight(commonBlockSummary.getHeight(), commonBlockSummary.getSignature(), ourBlockSummaries, minChainLength); + + NumberFormat formatter = new DecimalFormat("0.###E0"); + LOGGER.debug(String.format("Our chain weight based on %d blocks is %s", ourBlockSummaries.size(), formatter.format(ourChainWeight))); + + LOGGER.debug(String.format("Listing peers with common block %.8s...", Base58.encode(commonBlockSummary.getSignature()))); + iterator = peersSharingCommonBlock.iterator(); + while (iterator.hasNext()) { + Peer peer = (Peer)iterator.next(); + + final int peerHeight = peer.getChainTipData().getLastHeight(); + final int peerAdditionalBlocksAfterCommonBlock = peerHeight - commonBlockSummary.getHeight(); + final CommonBlockData peerCommonBlockData = peer.getCommonBlockData(); + + if (peerCommonBlockData == null || peerCommonBlockData.getBlockSummariesAfterCommonBlock() == null || peerCommonBlockData.getBlockSummariesAfterCommonBlock().isEmpty()) { + // No response - remove this peer for now + LOGGER.debug(String.format("Peer %s doesn't have any block summaries - removing it from this round", peer)); + iterator.remove(); + continue; + } + + final List peerBlockSummariesAfterCommonBlock = peerCommonBlockData.getBlockSummariesAfterCommonBlock(); + populateBlockSummariesMinterLevels(repository, peerBlockSummariesAfterCommonBlock); + + // Calculate cumulative chain weight of this blockchain subset, from common block to highest mutual block held by all peers in this group. + LOGGER.debug(String.format("About to calculate chain weight based on %d blocks for peer %s with common block %.8s (peer has %d blocks after common block)", peerBlockSummariesAfterCommonBlock.size(), peer, Base58.encode(commonBlockSummary.getSignature()), peerAdditionalBlocksAfterCommonBlock)); + BigInteger peerChainWeight = Block.calcChainWeight(commonBlockSummary.getHeight(), commonBlockSummary.getSignature(), peerBlockSummariesAfterCommonBlock, minChainLength); + peer.getCommonBlockData().setChainWeight(peerChainWeight); + LOGGER.debug(String.format("Chain weight of peer %s based on %d blocks (%d - %d) is %s", peer, peerBlockSummariesAfterCommonBlock.size(), peerBlockSummariesAfterCommonBlock.get(0).getHeight(), peerBlockSummariesAfterCommonBlock.get(peerBlockSummariesAfterCommonBlock.size()-1).getHeight(), formatter.format(peerChainWeight))); + + // Compare against our chain - if our blockchain has greater weight then don't synchronize with peer (or any others in this group) + if (ourChainWeight.compareTo(peerChainWeight) > 0) { + // This peer is on an inferior chain - remove it + LOGGER.debug(String.format("Peer %s is on an inferior chain to us - added %.8s to inferior chain signatures list", peer, Base58.encode(peer.getChainTipData().getLastBlockSignature()))); + Controller.getInstance().addInferiorChainSignature(peer.getChainTipData().getLastBlockSignature()); + } + else { + // Our chain is inferior + LOGGER.debug(String.format("Peer %s is on a better chain to us. We will compare the other peers sharing this common block against each other, and drop all peers sharing higher common blocks.", peer)); + dropPeersAfterCommonBlockHeight = commonBlockSummary.getHeight(); + superiorPeersForComparison.add(peer); + } + } + + // Now that we have selected the best peers, compare them against each other and remove any with lower weights + if (superiorPeersForComparison.size() > 0) { + BigInteger bestChainWeight = null; + for (Peer peer : superiorPeersForComparison) { + // Increase bestChainWeight if needed + if (bestChainWeight == null || peer.getCommonBlockData().getChainWeight().compareTo(bestChainWeight) >= 0) + bestChainWeight = peer.getCommonBlockData().getChainWeight(); + } + for (Peer peer : superiorPeersForComparison) { + // Check if we should discard an inferior peer + if (peer.getCommonBlockData().getChainWeight().compareTo(bestChainWeight) < 0) { + LOGGER.debug(String.format("Peer %s has a lower chain weight than other peer(s) in this group - removing it from this round.", peer)); + Controller.getInstance().addInferiorChainSignature(peer.getChainTipData().getLastBlockSignature()); + } + } + } + } + + return peers; + } finally { + repository.discardChanges(); // Free repository locks, if any, also in case anything went wrong + } + } catch (DataException e) { + LOGGER.error("Repository issue during peer comparison", e); + return peers; + } + } + + private List uniqueCommonBlocks(List peers) { + List commonBlocks = new ArrayList<>(); + + Iterator iterator = peers.iterator(); + while (iterator.hasNext()) { + Peer peer = iterator.next(); + + if (peer.getCommonBlockData() != null && peer.getCommonBlockData().getCommonBlockSummary() != null) { + LOGGER.debug(String.format("Peer %s has common block %.8s", peer, Base58.encode(peer.getCommonBlockData().getCommonBlockSummary().getSignature()))); + + BlockSummaryData commonBlockSummary = peer.getCommonBlockData().getCommonBlockSummary(); + if (!commonBlocks.contains(commonBlockSummary)) + commonBlocks.add(commonBlockSummary); + } + else { + LOGGER.debug(String.format("Peer %s has no common block data. Skipping...", peer)); + iterator.remove(); + } + } + + return commonBlocks; + } + + private int calculateMinChainLength(BlockSummaryData commonBlockSummary, int ourAdditionalBlocksAfterCommonBlock, List peersSharingCommonBlock) { + // Calculate the length of the shortest peer chain sharing this common block, including our chain + int minChainLength = ourAdditionalBlocksAfterCommonBlock; + for (Peer peer : peersSharingCommonBlock) { + final int peerHeight = peer.getChainTipData().getLastHeight(); + final int peerAdditionalBlocksAfterCommonBlock = peerHeight - commonBlockSummary.getHeight(); + + if (peerAdditionalBlocksAfterCommonBlock < minChainLength) + minChainLength = peerAdditionalBlocksAfterCommonBlock; + } + return minChainLength; + } + + /** * Attempt to synchronize blockchain with peer. *

diff --git a/src/main/java/org/qortal/data/block/CommonBlockData.java b/src/main/java/org/qortal/data/block/CommonBlockData.java new file mode 100644 index 00000000..dd502df7 --- /dev/null +++ b/src/main/java/org/qortal/data/block/CommonBlockData.java @@ -0,0 +1,56 @@ +package org.qortal.data.block; + +import org.qortal.data.network.PeerChainTipData; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import java.math.BigInteger; +import java.util.List; + +@XmlAccessorType(XmlAccessType.FIELD) +public class CommonBlockData { + + // Properties + private BlockSummaryData commonBlockSummary = null; + private List blockSummariesAfterCommonBlock = null; + private BigInteger chainWeight = null; + private PeerChainTipData chainTipData = null; + + // Constructors + + protected CommonBlockData() { + } + + public CommonBlockData(BlockSummaryData commonBlockSummary, PeerChainTipData chainTipData) { + this.commonBlockSummary = commonBlockSummary; + this.chainTipData = chainTipData; + } + + + // Getters / setters + + public BlockSummaryData getCommonBlockSummary() { + return this.commonBlockSummary; + } + + public List getBlockSummariesAfterCommonBlock() { + return this.blockSummariesAfterCommonBlock; + } + + public void setBlockSummariesAfterCommonBlock(List blockSummariesAfterCommonBlock) { + this.blockSummariesAfterCommonBlock = blockSummariesAfterCommonBlock; + } + + public BigInteger getChainWeight() { + return this.chainWeight; + } + + public void setChainWeight(BigInteger chainWeight) { + this.chainWeight = chainWeight; + } + + public PeerChainTipData getChainTipData() { + return this.chainTipData; + } + +} diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index f619111a..3061431e 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -11,10 +11,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.security.SecureRandom; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; +import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -22,6 +19,7 @@ import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.qortal.data.block.CommonBlockData; import org.qortal.data.network.PeerChainTipData; import org.qortal.data.network.PeerData; import org.qortal.network.message.ChallengeMessage; @@ -106,6 +104,9 @@ public class Peer { /** Latest block info as reported by peer. */ private PeerChainTipData peersChainTipData; + /** Our common block with this peer */ + private CommonBlockData commonBlockData; + // Constructors /** Construct unconnected, outbound Peer using socket address in peer data */ @@ -272,6 +273,18 @@ public class Peer { } } + public CommonBlockData getCommonBlockData() { + synchronized (this.peerInfoLock) { + return this.commonBlockData; + } + } + + public void setCommonBlockData(CommonBlockData commonBlockData) { + synchronized (this.peerInfoLock) { + this.commonBlockData = commonBlockData; + } + } + /*package*/ void queueMessage(Message message) { if (!this.pendingMessages.offer(message)) LOGGER.info(() -> String.format("No room to queue message from peer %s - discarding", this));