3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-31 23:32:16 +00:00

Peer/Group: Clear some static analysis warnings related to the (buggy) optionality of the block chain and some threading issues.

This commit is contained in:
Mike Hearn 2013-07-10 15:32:20 +02:00
parent 4c0930a961
commit d92314dd18
3 changed files with 158 additions and 103 deletions

View File

@ -23,12 +23,16 @@ import com.google.bitcoin.utils.Threading;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import net.jcip.annotations.GuardedBy; import net.jcip.annotations.GuardedBy;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -143,7 +147,7 @@ public class Peer {
* in a memory pool will have their confidence levels updated when a peer announces it, to reflect the greater * in a memory pool will have their confidence levels updated when a peer announces it, to reflect the greater
* likelyhood that the transaction is valid. * likelyhood that the transaction is valid.
*/ */
public Peer(NetworkParameters params, AbstractBlockChain chain, VersionMessage ver, MemoryPool mempool) { public Peer(NetworkParameters params, @Nullable AbstractBlockChain chain, VersionMessage ver, @Nullable MemoryPool mempool) {
this.params = Preconditions.checkNotNull(params); this.params = Preconditions.checkNotNull(params);
this.versionMessage = Preconditions.checkNotNull(ver); this.versionMessage = Preconditions.checkNotNull(ver);
this.blockChain = chain; // Allowed to be null. this.blockChain = chain; // Allowed to be null.
@ -402,9 +406,17 @@ public class Peer {
long fastCatchupTimeSecs; long fastCatchupTimeSecs;
lock.lock(); lock.lock();
fastCatchupTimeSecs = this.fastCatchupTimeSecs; try {
downloadBlockBodies = this.downloadBlockBodies; if (blockChain == null) {
lock.unlock(); // Can happen if we are receiving unrequested data, or due to programmer error.
log.warn("Received headers when Peer is not configured with a chain.");
return;
}
fastCatchupTimeSecs = this.fastCatchupTimeSecs;
downloadBlockBodies = this.downloadBlockBodies;
} finally {
lock.unlock();
}
try { try {
checkState(!downloadBlockBodies, toString()); checkState(!downloadBlockBodies, toString());
@ -426,19 +438,30 @@ public class Peer {
throw new ProtocolException("Got unconnected header from peer: " + header.getHashAsString()); throw new ProtocolException("Got unconnected header from peer: " + header.getHashAsString());
} }
} else { } else {
log.info("Passed the fast catchup time, discarding {} headers and requesting full blocks", lock.lock();
m.getBlockHeaders().size() - i); try {
this.downloadBlockBodies = true; log.info("Passed the fast catchup time, discarding {} headers and requesting full blocks",
// Prevent this request being seen as a duplicate. m.getBlockHeaders().size() - i);
this.lastGetBlocksBegin = Sha256Hash.ZERO_HASH; this.downloadBlockBodies = true;
blockChainDownload(Sha256Hash.ZERO_HASH); // Prevent this request being seen as a duplicate.
this.lastGetBlocksBegin = Sha256Hash.ZERO_HASH;
blockChainDownloadLocked(Sha256Hash.ZERO_HASH);
} finally {
lock.unlock();
}
return; return;
} }
} }
// We added all headers in the message to the chain. Request some more if we got up to the limit, otherwise // We added all headers in the message to the chain. Request some more if we got up to the limit, otherwise
// we are at the end of the chain. // we are at the end of the chain.
if (m.getBlockHeaders().size() >= HeadersMessage.MAX_HEADERS) if (m.getBlockHeaders().size() >= HeadersMessage.MAX_HEADERS) {
blockChainDownload(Sha256Hash.ZERO_HASH); lock.lock();
try {
blockChainDownloadLocked(Sha256Hash.ZERO_HASH);
} finally {
lock.unlock();
}
}
} catch (VerificationException e) { } catch (VerificationException e) {
log.warn("Block header verification failed", e); log.warn("Block header verification failed", e);
} catch (PrunedException e) { } catch (PrunedException e) {
@ -564,6 +587,7 @@ public class Peer {
* <p>Note that dependencies downloaded this way will not trigger the onTransaction method of event listeners.</p> * <p>Note that dependencies downloaded this way will not trigger the onTransaction method of event listeners.</p>
*/ */
public ListenableFuture<List<Transaction>> downloadDependencies(Transaction tx) { public ListenableFuture<List<Transaction>> downloadDependencies(Transaction tx) {
checkNotNull(memoryPool, "Must have a configured MemoryPool object to download dependencies.");
TransactionConfidence.ConfidenceType txConfidence = tx.getConfidence().getConfidenceType(); TransactionConfidence.ConfidenceType txConfidence = tx.getConfidence().getConfidenceType();
Preconditions.checkArgument(txConfidence != TransactionConfidence.ConfidenceType.BUILDING); Preconditions.checkArgument(txConfidence != TransactionConfidence.ConfidenceType.BUILDING);
log.info("{}: Downloading dependencies of {}", vAddress, tx.getHashAsString()); log.info("{}: Downloading dependencies of {}", vAddress, tx.getHashAsString());
@ -587,6 +611,7 @@ public class Peer {
private ListenableFuture<Object> downloadDependenciesInternal(final Transaction tx, private ListenableFuture<Object> downloadDependenciesInternal(final Transaction tx,
final Object marker, final Object marker,
final List<Transaction> results) { final List<Transaction> results) {
checkNotNull(memoryPool, "Must have a configured MemoryPool object to download dependencies.");
final SettableFuture<Object> resultFuture = SettableFuture.create(); final SettableFuture<Object> resultFuture = SettableFuture.create();
final Sha256Hash rootTxHash = tx.getHash(); final Sha256Hash rootTxHash = tx.getHash();
// We want to recursively grab its dependencies. This is so listeners can learn important information like // We want to recursively grab its dependencies. This is so listeners can learn important information like
@ -704,6 +729,10 @@ public class Peer {
} }
// Was this block requested by getBlock()? // Was this block requested by getBlock()?
if (maybeHandleRequestedData(m)) return; if (maybeHandleRequestedData(m)) return;
if (blockChain == null) {
log.warn("Received block but was not configured with an AbstractBlockChain");
return;
}
// Did we lose download peer status after requesting block data? // Did we lose download peer status after requesting block data?
if (!vDownloadData) { if (!vDownloadData) {
log.debug("{}: Received block we did not ask for: {}", vAddress, m.getHashAsString()); log.debug("{}: Received block we did not ask for: {}", vAddress, m.getHashAsString());
@ -724,20 +753,27 @@ public class Peer {
// //
// We must do two things here: // We must do two things here:
// (1) Request from current top of chain to the oldest ancestor of the received block in the orphan set // (1) Request from current top of chain to the oldest ancestor of the received block in the orphan set
// (2) Filter out duplicate getblock requests (done in blockChainDownload). // (2) Filter out duplicate getblock requests (done in blockChainDownloadLocked).
// //
// The reason for (1) is that otherwise if new blocks were solved during the middle of chain download // The reason for (1) is that otherwise if new blocks were solved during the middle of chain download
// we'd do a blockChainDownload() on the new best chain head, which would cause us to try and grab the // we'd do a blockChainDownloadLocked() on the new best chain head, which would cause us to try and grab the
// chain twice (or more!) on the same connection! The block chain would filter out the duplicates but // chain twice (or more!) on the same connection! The block chain would filter out the duplicates but
// only at a huge speed penalty. By finding the orphan root we ensure every getblocks looks the same // only at a huge speed penalty. By finding the orphan root we ensure every getblocks looks the same
// no matter how many blocks are solved, and therefore that the (2) duplicate filtering can work. // no matter how many blocks are solved, and therefore that the (2) duplicate filtering can work.
// //
// We only do this if we are not currently downloading headers. If we are then we don't want to kick // We only do this if we are not currently downloading headers. If we are then we don't want to kick
// off a request for lots more headers in parallel. // off a request for lots more headers in parallel.
if (downloadBlockBodies) lock.lock();
blockChainDownload(blockChain.getOrphanRoot(m.getHash()).getHash()); try {
else if (downloadBlockBodies) {
log.info("Did not start chain download on solved block due to in-flight header download."); final Block orphanRoot = checkNotNull(blockChain.getOrphanRoot(m.getHash()));
blockChainDownloadLocked(orphanRoot.getHash());
} else {
log.info("Did not start chain download on solved block due to in-flight header download.");
}
} finally {
lock.unlock();
}
} }
} catch (VerificationException e) { } catch (VerificationException e) {
// We don't want verification failures to kill the thread. // We don't want verification failures to kill the thread.
@ -757,6 +793,10 @@ public class Peer {
log.debug("{}: Received block we did not ask for: {}", vAddress, m.getHash().toString()); log.debug("{}: Received block we did not ask for: {}", vAddress, m.getHash().toString());
return; return;
} }
if (blockChain == null) {
log.warn("Received filtered block but was not configured with an AbstractBlockChain");
return;
}
// Note that we currently do nothing about peers which maliciously do not include transactions which // Note that we currently do nothing about peers which maliciously do not include transactions which
// actually match our filter or which simply do not send us all the transactions we need: it can be fixed // actually match our filter or which simply do not send us all the transactions we need: it can be fixed
// by cross-checking peers against each other. // by cross-checking peers against each other.
@ -789,14 +829,20 @@ public class Peer {
// //
// We must do two things here: // We must do two things here:
// (1) Request from current top of chain to the oldest ancestor of the received block in the orphan set // (1) Request from current top of chain to the oldest ancestor of the received block in the orphan set
// (2) Filter out duplicate getblock requests (done in blockChainDownload). // (2) Filter out duplicate getblock requests (done in blockChainDownloadLocked).
// //
// The reason for (1) is that otherwise if new blocks were solved during the middle of chain download // The reason for (1) is that otherwise if new blocks were solved during the middle of chain download
// we'd do a blockChainDownload() on the new best chain head, which would cause us to try and grab the // we'd do a blockChainDownloadLocked() on the new best chain head, which would cause us to try and grab the
// chain twice (or more!) on the same connection! The block chain would filter out the duplicates but // chain twice (or more!) on the same connection! The block chain would filter out the duplicates but
// only at a huge speed penalty. By finding the orphan root we ensure every getblocks looks the same // only at a huge speed penalty. By finding the orphan root we ensure every getblocks looks the same
// no matter how many blocks are solved, and therefore that the (2) duplicate filtering can work. // no matter how many blocks are solved, and therefore that the (2) duplicate filtering can work.
blockChainDownload(blockChain.getOrphanRoot(m.getHash()).getHash()); lock.lock();
try {
final Block orphanRoot = checkNotNull(blockChain.getOrphanRoot(m.getHash()));
blockChainDownloadLocked(orphanRoot.getHash());
} finally {
lock.unlock();
}
} }
} catch (VerificationException e) { } catch (VerificationException e) {
// We don't want verification failures to kill the thread. // We don't want verification failures to kill the thread.
@ -827,7 +873,7 @@ public class Peer {
// It is possible for the peer block height difference to be negative when blocks have been solved and broadcast // It is possible for the peer block height difference to be negative when blocks have been solved and broadcast
// since the time we first connected to the peer. However, it's weird and unexpected to receive a callback // since the time we first connected to the peer. However, it's weird and unexpected to receive a callback
// with negative "blocks left" in this case, so we clamp to zero so the API user doesn't have to think about it. // with negative "blocks left" in this case, so we clamp to zero so the API user doesn't have to think about it.
final int blocksLeft = Math.max(0, (int) vPeerVersionMessage.bestHeight - blockChain.getBestChainHeight()); final int blocksLeft = Math.max(0, (int) vPeerVersionMessage.bestHeight - checkNotNull(blockChain).getBestChainHeight());
for (final ListenerRegistration<PeerEventListener> registration : eventListeners) { for (final ListenerRegistration<PeerEventListener> registration : eventListeners) {
registration.executor.execute(new Runnable() { registration.executor.execute(new Runnable() {
@Override @Override
@ -865,7 +911,7 @@ public class Peer {
// (the block chain download protocol is very implicit and not well thought out). If we're not downloading // (the block chain download protocol is very implicit and not well thought out). If we're not downloading
// the chain then this probably means a new block was solved and the peer believes it connects to the best // the chain then this probably means a new block was solved and the peer believes it connects to the best
// chain, so count it. This way getBestChainHeight() can be accurate. // chain, so count it. This way getBestChainHeight() can be accurate.
if (downloadData) { if (downloadData && blockChain != null) {
if (!blockChain.isOrphan(blocks.get(0).hash)) { if (!blockChain.isOrphan(blocks.get(0).hash)) {
blocksAnnounced.incrementAndGet(); blocksAnnounced.incrementAndGet();
} }
@ -917,7 +963,8 @@ public class Peer {
if (blockChain.isOrphan(item.hash) && downloadBlockBodies) { if (blockChain.isOrphan(item.hash) && downloadBlockBodies) {
// If an orphan was re-advertised, ask for more blocks unless we are not currently downloading // If an orphan was re-advertised, ask for more blocks unless we are not currently downloading
// full block data because we have a getheaders outstanding. // full block data because we have a getheaders outstanding.
blockChainDownload(blockChain.getOrphanRoot(item.hash).getHash()); final Block orphanRoot = checkNotNull(blockChain.getOrphanRoot(item.hash));
blockChainDownloadLocked(orphanRoot.getHash());
} else { } else {
// Don't re-request blocks we already requested. Normally this should not happen. However there is // Don't re-request blocks we already requested. Normally this should not happen. However there is
// an edge case: if a block is solved and we complete the inv<->getdata<->block<->getblocks cycle // an edge case: if a block is solved and we complete the inv<->getdata<->block<->getblocks cycle
@ -929,7 +976,7 @@ public class Peer {
// //
// Note that as of June 2012 the Satoshi client won't actually ever interleave blocks pushed as // Note that as of June 2012 the Satoshi client won't actually ever interleave blocks pushed as
// part of chain download with newly announced blocks, so it should always be taken care of by // part of chain download with newly announced blocks, so it should always be taken care of by
// the duplicate check in blockChainDownload(). But the satoshi client may change in future so // the duplicate check in blockChainDownloadLocked(). But the satoshi client may change in future so
// it's better to be safe here. // it's better to be safe here.
if (!pendingBlockDownloads.contains(item.hash)) { if (!pendingBlockDownloads.contains(item.hash)) {
if (vPeerVersionMessage.isBloomFilteringSupported() && useFilteredBlocks) { if (vPeerVersionMessage.isBloomFilteringSupported() && useFilteredBlocks) {
@ -944,7 +991,7 @@ public class Peer {
} }
// If we're downloading the chain, doing a getdata on the last block we were told about will cause the // If we're downloading the chain, doing a getdata on the last block we were told about will cause the
// peer to advertize the head block to us in a single-item inv. When we download THAT, it will be an // peer to advertize the head block to us in a single-item inv. When we download THAT, it will be an
// orphan block, meaning we'll re-enter blockChainDownload() to trigger another getblocks between the // orphan block, meaning we'll re-enter blockChainDownloadLocked() to trigger another getblocks between the
// current best block we have and the orphan block. If more blocks arrive in the meantime they'll also // current best block we have and the orphan block. If more blocks arrive in the meantime they'll also
// become orphan. // become orphan.
} }
@ -1052,12 +1099,14 @@ public class Peer {
return Channels.write(vChannel, m); return Channels.write(vChannel, m);
} }
// Keep track of the last request we made to the peer in blockChainDownload so we can avoid redundant and harmful // Keep track of the last request we made to the peer in blockChainDownloadLocked so we can avoid redundant and harmful
// getblocks requests. This does not have to be synchronized because blockChainDownload cannot be called from // getblocks requests.
// multiple threads simultaneously. @GuardedBy("lock")
private Sha256Hash lastGetBlocksBegin, lastGetBlocksEnd; private Sha256Hash lastGetBlocksBegin, lastGetBlocksEnd;
private void blockChainDownload(Sha256Hash toHash) throws IOException { @GuardedBy("lock")
private void blockChainDownloadLocked(Sha256Hash toHash) throws IOException {
checkState(lock.isHeldByCurrentThread());
// The block chain download process is a bit complicated. Basically, we start with one or more blocks in a // The block chain download process is a bit complicated. Basically, we start with one or more blocks in a
// chain that we have from a previous session. We want to catch up to the head of the chain BUT we don't know // chain that we have from a previous session. We want to catch up to the head of the chain BUT we don't know
// where that chain is up to or even if the top block we have is even still in the chain - we // where that chain is up to or even if the top block we have is even still in the chain - we
@ -1078,7 +1127,7 @@ public class Peer {
// //
// The getblocks with the new locator gets us another inv with another bunch of blocks. We download them once // The getblocks with the new locator gets us another inv with another bunch of blocks. We download them once
// again. This time when the peer sends us an inv with the head block, we already have it so we won't download // again. This time when the peer sends us an inv with the head block, we already have it so we won't download
// it again - but we recognize this case as special and call back into blockChainDownload to continue the // it again - but we recognize this case as special and call back into blockChainDownloadLocked to continue the
// process. // process.
// //
// So this is a complicated process but it has the advantage that we can download a chain of enormous length // So this is a complicated process but it has the advantage that we can download a chain of enormous length
@ -1091,57 +1140,52 @@ public class Peer {
// headers and then request the blocks from that point onwards. "getheaders" does not send us an inv, it just // headers and then request the blocks from that point onwards. "getheaders" does not send us an inv, it just
// sends us the data we requested in a "headers" message. // sends us the data we requested in a "headers" message.
lock.lock(); // TODO: Block locators should be abstracted out rather than special cased here.
try { List<Sha256Hash> blockLocator = new ArrayList<Sha256Hash>(51);
// TODO: Block locators should be abstracted out rather than special cased here. // For now we don't do the exponential thinning as suggested here:
List<Sha256Hash> blockLocator = new ArrayList<Sha256Hash>(51); //
// For now we don't do the exponential thinning as suggested here: // https://en.bitcoin.it/wiki/Protocol_specification#getblocks
// //
// https://en.bitcoin.it/wiki/Protocol_specification#getblocks // This is because it requires scanning all the block chain headers, which is very slow. Instead we add the top
// // 50 block headers. If there is a re-org deeper than that, we'll end up downloading the entire chain. We
// This is because it requires scanning all the block chain headers, which is very slow. Instead we add the top // must always put the genesis block as the first entry.
// 50 block headers. If there is a re-org deeper than that, we'll end up downloading the entire chain. We BlockStore store = checkNotNull(blockChain).getBlockStore();
// must always put the genesis block as the first entry. StoredBlock chainHead = blockChain.getChainHead();
BlockStore store = blockChain.getBlockStore(); Sha256Hash chainHeadHash = chainHead.getHeader().getHash();
StoredBlock chainHead = blockChain.getChainHead(); // Did we already make this request? If so, don't do it again.
Sha256Hash chainHeadHash = chainHead.getHeader().getHash(); if (Objects.equal(lastGetBlocksBegin, chainHeadHash) && Objects.equal(lastGetBlocksEnd, toHash)) {
// Did we already make this request? If so, don't do it again. log.info("blockChainDownloadLocked({}): ignoring duplicated request", toHash.toString());
if (Objects.equal(lastGetBlocksBegin, chainHeadHash) && Objects.equal(lastGetBlocksEnd, toHash)) { return;
log.info("blockChainDownload({}): ignoring duplicated request", toHash.toString()); }
return; log.debug("{}: blockChainDownloadLocked({}) current head = {}", new Object[]{toString(),
} toHash.toString(), chainHead.getHeader().getHashAsString()});
log.debug("{}: blockChainDownload({}) current head = {}", new Object[]{toString(), StoredBlock cursor = chainHead;
toHash.toString(), chainHead.getHeader().getHashAsString()}); for (int i = 100; cursor != null && i > 0; i--) {
StoredBlock cursor = chainHead; blockLocator.add(cursor.getHeader().getHash());
for (int i = 100; cursor != null && i > 0; i--) { try {
blockLocator.add(cursor.getHeader().getHash()); cursor = cursor.getPrev(store);
try { } catch (BlockStoreException e) {
cursor = cursor.getPrev(store); log.error("Failed to walk the block chain whilst constructing a locator");
} catch (BlockStoreException e) { throw new RuntimeException(e);
log.error("Failed to walk the block chain whilst constructing a locator");
throw new RuntimeException(e);
}
}
// Only add the locator if we didn't already do so. If the chain is < 50 blocks we already reached it.
if (cursor != null) {
blockLocator.add(params.getGenesisBlock().getHash());
} }
}
// Only add the locator if we didn't already do so. If the chain is < 50 blocks we already reached it.
if (cursor != null) {
blockLocator.add(params.getGenesisBlock().getHash());
}
// Record that we requested this range of blocks so we can filter out duplicate requests in the event of a // Record that we requested this range of blocks so we can filter out duplicate requests in the event of a
// block being solved during chain download. // block being solved during chain download.
lastGetBlocksBegin = chainHeadHash; lastGetBlocksBegin = chainHeadHash;
lastGetBlocksEnd = toHash; lastGetBlocksEnd = toHash;
if (downloadBlockBodies) { if (downloadBlockBodies) {
GetBlocksMessage message = new GetBlocksMessage(params, blockLocator, toHash); GetBlocksMessage message = new GetBlocksMessage(params, blockLocator, toHash);
sendMessage(message); sendMessage(message);
} else { } else {
// Downloading headers for a while instead of full blocks. // Downloading headers for a while instead of full blocks.
GetHeadersMessage message = new GetHeadersMessage(params, blockLocator, toHash); GetHeadersMessage message = new GetHeadersMessage(params, blockLocator, toHash);
sendMessage(message); sendMessage(message);
}
} finally {
lock.unlock();
} }
} }
@ -1150,7 +1194,6 @@ public class Peer {
* downloaded the same number of blocks that the peer advertised having in its version handshake message. * downloaded the same number of blocks that the peer advertised having in its version handshake message.
*/ */
public void startBlockChainDownload() throws IOException { public void startBlockChainDownload() throws IOException {
// This does not need to be locked.
setDownloadData(true); setDownloadData(true);
// TODO: peer might still have blocks that we don't have, and even have a heavier // TODO: peer might still have blocks that we don't have, and even have a heavier
// chain even if the chain block count is lower. // chain even if the chain block count is lower.
@ -1165,7 +1208,12 @@ public class Peer {
}); });
} }
// When we just want as many blocks as possible, we can set the target hash to zero. // When we just want as many blocks as possible, we can set the target hash to zero.
blockChainDownload(Sha256Hash.ZERO_HASH); lock.lock();
try {
blockChainDownloadLocked(Sha256Hash.ZERO_HASH);
} finally {
lock.unlock();
}
} }
} }
@ -1283,6 +1331,7 @@ public class Peer {
* behind the peer, or negative if the peer is ahead of us. * behind the peer, or negative if the peer is ahead of us.
*/ */
public int getPeerBlockHeightDifference() { public int getPeerBlockHeightDifference() {
checkNotNull(blockChain, "No block chain configured");
// Chain will overflow signed int blocks in ~41,000 years. // Chain will overflow signed int blocks in ~41,000 years.
int chainHeight = (int) getBestHeight(); int chainHeight = (int) getBestHeight();
// chainHeight should not be zero/negative because we shouldn't have given the user a Peer that is to another // chainHeight should not be zero/negative because we shouldn't have given the user a Peer that is to another
@ -1342,7 +1391,7 @@ public class Peer {
* will be disconnected. * will be disconnected.
* @return if not-null then this is the future for the Peer disconnection event. * @return if not-null then this is the future for the Peer disconnection event.
*/ */
public ChannelFuture setMinProtocolVersion(int minProtocolVersion) { @Nullable public ChannelFuture setMinProtocolVersion(int minProtocolVersion) {
this.vMinProtocolVersion = minProtocolVersion; this.vMinProtocolVersion = minProtocolVersion;
if (getVersionMessage().clientVersion < minProtocolVersion) { if (getVersionMessage().clientVersion < minProtocolVersion) {
log.warn("{}: Disconnecting due to new min protocol version {}", this, minProtocolVersion); log.warn("{}: Disconnecting due to new min protocol version {}", this, minProtocolVersion);

View File

@ -34,6 +34,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -74,13 +75,12 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
protected final ReentrantLock lock = Threading.lock("peergroup"); protected final ReentrantLock lock = Threading.lock("peergroup");
// These lists are all thread-safe so do not have to be accessed under the PeerGroup lock.
// Addresses to try to connect to, excluding active peers. // Addresses to try to connect to, excluding active peers.
private final List<PeerAddress> inactives; @GuardedBy("lock") private final List<PeerAddress> inactives;
// Currently active peers. This is an ordered list rather than a set to make unit tests predictable. // Currently active peers. This is an ordered list rather than a set to make unit tests predictable.
@GuardedBy("lock") private final List<Peer> peers; private final CopyOnWriteArrayList<Peer> peers;
// Currently connecting peers. // Currently connecting peers.
@GuardedBy("lock") private final List<Peer> pendingPeers; private final CopyOnWriteArrayList<Peer> pendingPeers;
private final ChannelGroup channels; private final ChannelGroup channels;
// The peer that has been selected for the purposes of downloading announced data. // The peer that has been selected for the purposes of downloading announced data.
@ -90,9 +90,9 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
// Callbacks for events related to peer connection/disconnection // Callbacks for events related to peer connection/disconnection
private final CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>> peerEventListeners; private final CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>> peerEventListeners;
// Peer discovery sources, will be polled occasionally if there aren't enough inactives. // Peer discovery sources, will be polled occasionally if there aren't enough inactives.
private CopyOnWriteArraySet<PeerDiscovery> peerDiscoverers; private final CopyOnWriteArraySet<PeerDiscovery> peerDiscoverers;
// The version message to use for new connections. // The version message to use for new connections.
private VersionMessage versionMessage; @GuardedBy("lock") private VersionMessage versionMessage;
// A class that tracks recent transactions that have been broadcast across the network, counts how many // A class that tracks recent transactions that have been broadcast across the network, counts how many
// peers announced them and updates the transaction confidence data. It is passed to each Peer. // peers announced them and updates the transaction confidence data. It is passed to each Peer.
private final MemoryPool memoryPool; private final MemoryPool memoryPool;
@ -109,12 +109,12 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
private final NetworkParameters params; private final NetworkParameters params;
private final AbstractBlockChain chain; private final AbstractBlockChain chain;
private long fastCatchupTimeSecs; @GuardedBy("lock") private long fastCatchupTimeSecs;
private final CopyOnWriteArrayList<Wallet> wallets; private final CopyOnWriteArrayList<Wallet> wallets;
// This event listener is added to every peer. It's here so when we announce transactions via an "inv", every // This event listener is added to every peer. It's here so when we announce transactions via an "inv", every
// peer can fetch them. // peer can fetch them.
private AbstractPeerEventListener getDataListener = new AbstractPeerEventListener() { private final AbstractPeerEventListener getDataListener = new AbstractPeerEventListener() {
@Override @Override
public List<Message> getData(Peer peer, GetDataMessage m) { public List<Message> getData(Peer peer, GetDataMessage m) {
return handleGetData(m); return handleGetData(m);
@ -198,9 +198,9 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
* <p>The ClientBootstrap provided does not need a channel pipeline factory set. If one wasn't set, the provided * <p>The ClientBootstrap provided does not need a channel pipeline factory set. If one wasn't set, the provided
* bootstrap will be modified to have one that sets up the pipelines correctly.</p> * bootstrap will be modified to have one that sets up the pipelines correctly.</p>
*/ */
public PeerGroup(NetworkParameters params, AbstractBlockChain chain, ClientBootstrap bootstrap) { public PeerGroup(NetworkParameters params, @Nullable AbstractBlockChain chain, @Nullable ClientBootstrap bootstrap) {
this.params = params; this.params = checkNotNull(params);
this.chain = chain; // Can be null. this.chain = chain;
this.fastCatchupTimeSecs = params.getGenesisBlock().getTimeSeconds(); this.fastCatchupTimeSecs = params.getGenesisBlock().getTimeSeconds();
this.wallets = new CopyOnWriteArrayList<Wallet>(); this.wallets = new CopyOnWriteArrayList<Wallet>();
@ -224,9 +224,9 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
this.bootstrap = bootstrap; this.bootstrap = bootstrap;
} }
inactives = Collections.synchronizedList(new ArrayList<PeerAddress>()); inactives = new ArrayList<PeerAddress>();
peers = new ArrayList<Peer>(); peers = new CopyOnWriteArrayList<Peer>();
pendingPeers = new ArrayList<Peer>(); pendingPeers = new CopyOnWriteArrayList<Peer>();
channels = new DefaultChannelGroup(); channels = new DefaultChannelGroup();
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>(); peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerEventListeners = new CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>>(); peerEventListeners = new CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>>();
@ -251,7 +251,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
// pipeline with the bitcoin serializer ({@code TCPNetworkConnection}) downstream // pipeline with the bitcoin serializer ({@code TCPNetworkConnection}) downstream
// of the higher level {@code Peer}. Received packets will first be decoded, then passed // of the higher level {@code Peer}. Received packets will first be decoded, then passed
// {@code Peer}. Sent packets will be created by the {@code Peer}, then encoded and sent. // {@code Peer}. Sent packets will be created by the {@code Peer}, then encoded and sent.
private ChannelPipelineFactory makePipelineFactory(final NetworkParameters params, final AbstractBlockChain chain) { private ChannelPipelineFactory makePipelineFactory(final NetworkParameters params, @Nullable final AbstractBlockChain chain) {
return new ChannelPipelineFactory() { return new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
// This runs unlocked. // This runs unlocked.
@ -511,7 +511,6 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
} }
protected void discoverPeers() throws PeerDiscoveryException { protected void discoverPeers() throws PeerDiscoveryException {
// This does not need to be locked.
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Set<PeerAddress> addressSet = Sets.newHashSet(); Set<PeerAddress> addressSet = Sets.newHashSet();
for (PeerDiscovery peerDiscovery : peerDiscoverers) { for (PeerDiscovery peerDiscovery : peerDiscoverers) {
@ -520,8 +519,11 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
for (InetSocketAddress address : addresses) addressSet.add(new PeerAddress(address)); for (InetSocketAddress address : addresses) addressSet.add(new PeerAddress(address));
if (addressSet.size() > 0) break; if (addressSet.size() > 0) break;
} }
synchronized (inactives) { lock.lock();
try {
inactives.addAll(addressSet); inactives.addAll(addressSet);
} finally {
lock.unlock();
} }
log.info("Peer discovery took {}msec", System.currentTimeMillis() - start); log.info("Peer discovery took {}msec", System.currentTimeMillis() - start);
} }
@ -532,7 +534,8 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
if (!(state == State.STARTING || state == State.RUNNING)) return; if (!(state == State.STARTING || state == State.RUNNING)) return;
final PeerAddress addr; final PeerAddress addr;
synchronized (inactives) { lock.lock();
try {
if (inactives.size() == 0) { if (inactives.size() == 0) {
discoverPeers(); discoverPeers();
} }
@ -541,6 +544,8 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
return; return;
} }
addr = inactives.remove(inactives.size() - 1); addr = inactives.remove(inactives.size() - 1);
} finally {
lock.unlock();
} }
// Don't do connectTo whilst holding the PeerGroup lock because this can trigger some amazingly deep stacks // Don't do connectTo whilst holding the PeerGroup lock because this can trigger some amazingly deep stacks
// and potentially circular deadlock in the case of immediate failure (eg, attempt to access IPv6 node from // and potentially circular deadlock in the case of immediate failure (eg, attempt to access IPv6 node from
@ -795,7 +800,8 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
try { try {
if (bloomFilter != null) peer.setBloomFilter(bloomFilter); if (bloomFilter != null) peer.setBloomFilter(bloomFilter);
} catch (IOException e) { } catch (IOException e) {
} // That was quick...already disconnected // That was quick...already disconnected
}
// Link the peer to the memory pool so broadcast transactions have their confidence levels updated. // Link the peer to the memory pool so broadcast transactions have their confidence levels updated.
peer.setDownloadData(false); peer.setDownloadData(false);
// TODO: The peer should calculate the fast catchup time from the added wallets here. // TODO: The peer should calculate the fast catchup time from the added wallets here.

View File

@ -381,7 +381,7 @@ public class PeerTest extends TestWithNetworkConnections {
GetBlocksMessage message = (GetBlocksMessage) event.getValue().getMessage(); GetBlocksMessage message = (GetBlocksMessage) event.getValue().getMessage();
assertEquals(message.getLocator(), expectedLocator); assertEquals(message.getLocator(), expectedLocator);
assertEquals(message.getStopHash(), Sha256Hash.ZERO_HASH); assertEquals(Sha256Hash.ZERO_HASH, message.getStopHash());
} }
@Test @Test