3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-12 10:15:52 +00:00

Move cycle detecting lock creation out into a new Locks class, rather than stuff it into Utils.

Convert PeerGroup and Peer to also use cycle detecting locks, and add a unit test to Wallet to check that cycle detection works.
 Change default policy to warn. Now warnings are being triggered, the followup commits will fix them.
This commit is contained in:
Mike Hearn 2013-03-07 13:58:04 +01:00
parent 94670f3df0
commit 9de6dca8c1
7 changed files with 745 additions and 499 deletions

View File

@ -19,6 +19,7 @@ package com.google.bitcoin.core;
import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.BlockStoreException; import com.google.bitcoin.store.BlockStoreException;
import com.google.bitcoin.utils.EventListenerInvoker; import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.bitcoin.utils.Locks;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -38,6 +39,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static com.google.common.base.Preconditions.checkState;
/** /**
* A Peer handles the high level communication with a Bitcoin node. * A Peer handles the high level communication with a Bitcoin node.
* *
@ -52,6 +55,7 @@ public class Peer {
} }
private static final Logger log = LoggerFactory.getLogger(Peer.class); private static final Logger log = LoggerFactory.getLogger(Peer.class);
protected final ReentrantLock lock = Locks.lock("peer");
private final NetworkParameters params; private final NetworkParameters params;
private final AbstractBlockChain blockChain; private final AbstractBlockChain blockChain;
@ -171,13 +175,18 @@ public class Peer {
} }
@Override @Override
public synchronized String toString() { public String toString() {
PeerAddress addr = address.get(); lock.lock();
if (addr == null) { try {
// User-provided NetworkConnection object. PeerAddress addr = address.get();
return "Peer()"; if (addr == null) {
} else { // User-provided NetworkConnection object.
return "Peer(" + addr.getAddr() + ":" + addr.getPort() + ")"; return "Peer()";
} else {
return "Peer(" + addr.getAddr() + ":" + addr.getPort() + ")";
}
} finally {
lock.unlock();
} }
} }
@ -226,22 +235,22 @@ public class Peer {
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by // Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null. // returning null.
synchronized (Peer.this) { lock.lock();
try {
for (PeerEventListener listener : eventListeners) { for (PeerEventListener listener : eventListeners) {
synchronized (listener) { synchronized (listener) {
m = listener.onPreMessageReceived(Peer.this, m); m = listener.onPreMessageReceived(Peer.this, m);
if (m == null) break; if (m == null) break;
} }
} }
} if (m == null) return;
if (m == null) return;
synchronized (Peer.this) {
if (currentFilteredBlock != null && !(m instanceof Transaction)) { if (currentFilteredBlock != null && !(m instanceof Transaction)) {
processFilteredBlock(currentFilteredBlock); processFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null; currentFilteredBlock = null;
} }
} finally {
lock.unlock();
} }
if (m instanceof NotFoundMessage) { if (m instanceof NotFoundMessage) {
@ -257,8 +266,11 @@ public class Peer {
// messages stream in. We'll call processFilteredBlock when a non-tx message arrives (eg, another // messages stream in. We'll call processFilteredBlock when a non-tx message arrives (eg, another
// FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after // FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after
// a getblocks, to force the non-tx message path. // a getblocks, to force the non-tx message path.
synchronized (Peer.this) { lock.lock();
currentFilteredBlock = (FilteredBlock)m; try {
currentFilteredBlock = (FilteredBlock) m;
} finally {
lock.unlock();
} }
} else if (m instanceof Transaction) { } else if (m instanceof Transaction) {
processTransaction((Transaction) m); processTransaction((Transaction) m);
@ -309,6 +321,8 @@ public class Peer {
} }
private void processNotFoundMessage(NotFoundMessage m) { private void processNotFoundMessage(NotFoundMessage m) {
// This does not need to be locked.
// This is received when we previously did a getdata but the peer couldn't find what we requested in it's // This is received when we previously did a getdata but the peer couldn't find what we requested in it's
// memory pool. Typically, because we are downloading dependencies of a relevant transaction and reached // memory pool. Typically, because we are downloading dependencies of a relevant transaction and reached
// the bottom of the dependency tree (where the unconfirmed transactions connect to transactions that are // the bottom of the dependency tree (where the unconfirmed transactions connect to transactions that are
@ -327,7 +341,8 @@ public class Peer {
} }
} }
private synchronized void processAlert(AlertMessage m) { private void processAlert(AlertMessage m) {
// This does not need to be locked.
try { try {
if (m.isSignatureValid()) { if (m.isSignatureValid()) {
log.info("Received alert from peer {}: {}", toString(), m.getStatusBar()); log.info("Received alert from peer {}: {}", toString(), m.getStatusBar());
@ -347,16 +362,16 @@ public class Peer {
return handler; return handler;
} }
private synchronized void processHeaders(HeadersMessage m) throws IOException, ProtocolException { private void processHeaders(HeadersMessage m) throws IOException, ProtocolException {
// Runs in network loop thread for this peer. // Runs in network loop thread for this peer.
// //
// This method can run if a peer just randomly sends us a "headers" message (should never happen), or more // This method can run if a peer just randomly sends us a "headers" message (should never happen), or more
// likely when we've requested them as part of chain download using fast catchup. We need to add each block to // likely when we've requested them as part of chain download using fast catchup. We need to add each block to
// the chain if it pre-dates the fast catchup time. If we go past it, we can stop processing the headers and // the chain if it pre-dates the fast catchup time. If we go past it, we can stop processing the headers and
// request the full blocks from that point on instead. // request the full blocks from that point on instead.
Preconditions.checkState(!downloadBlockBodies, toString()); lock.lock();
try { try {
checkState(!downloadBlockBodies, toString());
for (int i = 0; i < m.getBlockHeaders().size(); i++) { for (int i = 0; i < m.getBlockHeaders().size(); i++) {
Block header = m.getBlockHeaders().get(i); Block header = m.getBlockHeaders().get(i);
if (header.getTimeSeconds() < fastCatchupTimeSecs) { if (header.getTimeSeconds() < fastCatchupTimeSecs) {
@ -392,10 +407,13 @@ public class Peer {
} catch (PrunedException e) { } catch (PrunedException e) {
// Unreachable when in SPV mode. // Unreachable when in SPV mode.
throw new RuntimeException(e); throw new RuntimeException(e);
} finally {
lock.unlock();
} }
} }
private synchronized void processGetData(GetDataMessage getdata) throws IOException { private void processGetData(GetDataMessage getdata) throws IOException {
// This does not need to be locked.
log.info("{}: Received getdata message: {}", address.get(), getdata.toString()); log.info("{}: Received getdata message: {}", address.get(), getdata.toString());
ArrayList<Message> items = new ArrayList<Message>(); ArrayList<Message> items = new ArrayList<Message>();
for (PeerEventListener listener : eventListeners) { for (PeerEventListener listener : eventListeners) {
@ -414,77 +432,82 @@ public class Peer {
} }
} }
private synchronized void processTransaction(Transaction tx) throws VerificationException, IOException { private void processTransaction(Transaction tx) throws VerificationException, IOException {
log.debug("{}: Received tx {}", address.get(), tx.getHashAsString()); lock.lock();
if (memoryPool != null) { try {
// We may get back a different transaction object. log.debug("{}: Received tx {}", address.get(), tx.getHashAsString());
tx = memoryPool.seen(tx, getAddress()); if (memoryPool != null) {
} // We may get back a different transaction object.
final Transaction fTx = tx; tx = memoryPool.seen(tx, getAddress());
// Label the transaction as coming in from the P2P network (as opposed to being created by us, direct import,
// etc). This helps the wallet decide how to risk analyze it later.
fTx.getConfidence().setSource(TransactionConfidence.Source.NETWORK);
if (maybeHandleRequestedData(fTx)) {
return;
}
if (currentFilteredBlock != null) {
if (!currentFilteredBlock.provideTransaction(tx)) {
// Got a tx that didn't fit into the filtered block, so we must have received everything.
processFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
} }
// Don't tell wallets or listeners about this tx as they'll learn about it when the filtered block is final Transaction fTx = tx;
// fully downloaded instead. // Label the transaction as coming in from the P2P network (as opposed to being created by us, direct import,
return; // etc). This helps the wallet decide how to risk analyze it later.
} fTx.getConfidence().setSource(TransactionConfidence.Source.NETWORK);
// It's a broadcast transaction. Tell all wallets about this tx so they can check if it's relevant or not. if (maybeHandleRequestedData(fTx)) {
for (ListIterator<Wallet> it = wallets.listIterator(); it.hasNext();) { return;
final Wallet wallet = it.next(); }
try { if (currentFilteredBlock != null) {
if (wallet.isPendingTransactionRelevant(fTx)) { if (!currentFilteredBlock.provideTransaction(tx)) {
// This transaction seems interesting to us, so let's download its dependencies. This has several // Got a tx that didn't fit into the filtered block, so we must have received everything.
// purposes: we can check that the sender isn't attacking us by engaging in protocol abuse games, processFilteredBlock(currentFilteredBlock);
// like depending on a time-locked transaction that will never confirm, or building huge chains currentFilteredBlock = null;
// of unconfirmed transactions (again - so they don't confirm and the money can be taken }
// back with a Finney attack). Knowing the dependencies also lets us store them in a serialized // Don't tell wallets or listeners about this tx as they'll learn about it when the filtered block is
// wallet so we always have enough data to re-announce to the network and get the payment into // fully downloaded instead.
// the chain, in case the sender goes away and the network starts to forget. return;
// TODO: Not all the above things are implemented. }
// It's a broadcast transaction. Tell all wallets about this tx so they can check if it's relevant or not.
for (ListIterator<Wallet> it = wallets.listIterator(); it.hasNext(); ) {
final Wallet wallet = it.next();
try {
if (wallet.isPendingTransactionRelevant(fTx)) {
// This transaction seems interesting to us, so let's download its dependencies. This has several
// purposes: we can check that the sender isn't attacking us by engaging in protocol abuse games,
// like depending on a time-locked transaction that will never confirm, or building huge chains
// of unconfirmed transactions (again - so they don't confirm and the money can be taken
// back with a Finney attack). Knowing the dependencies also lets us store them in a serialized
// wallet so we always have enough data to re-announce to the network and get the payment into
// the chain, in case the sender goes away and the network starts to forget.
// TODO: Not all the above things are implemented.
Futures.addCallback(downloadDependencies(fTx), new FutureCallback<List<Transaction>>() { Futures.addCallback(downloadDependencies(fTx), new FutureCallback<List<Transaction>>() {
public void onSuccess(List<Transaction> dependencies) { public void onSuccess(List<Transaction> dependencies) {
try { try {
log.info("{}: Dependency download complete!", address.get()); log.info("{}: Dependency download complete!", address.get());
wallet.receivePending(fTx, dependencies); wallet.receivePending(fTx, dependencies);
} catch (VerificationException e) { } catch (VerificationException e) {
log.error("{}: Wallet failed to process pending transaction {}", log.error("{}: Wallet failed to process pending transaction {}",
address.get(), fTx.getHashAsString()); address.get(), fTx.getHashAsString());
log.error("Error was: ", e); log.error("Error was: ", e);
// Not much more we can do at this point.
}
}
public void onFailure(Throwable throwable) {
log.error("Could not download dependencies of tx {}", fTx.getHashAsString());
log.error("Error was: ", throwable);
// Not much more we can do at this point. // Not much more we can do at this point.
} }
} });
public void onFailure(Throwable throwable) {
log.error("Could not download dependencies of tx {}", fTx.getHashAsString());
log.error("Error was: ", throwable);
// Not much more we can do at this point.
}
});
}
} catch (VerificationException e) {
log.error("Wallet failed to verify tx", e);
// Carry on, listeners may still want to know.
} }
} catch (VerificationException e) {
log.error("Wallet failed to verify tx", e);
// Carry on, listeners may still want to know.
} }
// Tell all listeners about this tx so they can decide whether to keep it or not. If no listener keeps a
// reference around then the memory pool will forget about it after a while too because it uses weak references.
EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker<PeerEventListener>() {
@Override
public void invoke(PeerEventListener listener) {
listener.onTransaction(Peer.this, fTx);
}
});
} finally {
lock.unlock();
} }
// Tell all listeners about this tx so they can decide whether to keep it or not. If no listener keeps a
// reference around then the memory pool will forget about it after a while too because it uses weak references.
EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker<PeerEventListener>() {
@Override
public void invoke(PeerEventListener listener) {
listener.onTransaction(Peer.this, fTx);
}
});
} }
/** /**
@ -541,16 +564,15 @@ public class Peer {
for (TransactionInput input : tx.getInputs()) { for (TransactionInput input : tx.getInputs()) {
// There may be multiple inputs that connect to the same transaction. // There may be multiple inputs that connect to the same transaction.
Sha256Hash hash = input.getOutpoint().getHash(); Sha256Hash hash = input.getOutpoint().getHash();
synchronized (this) { Transaction dep = memoryPool.get(hash);
Transaction dep = memoryPool.get(hash); if (dep == null) {
if (dep == null) { needToRequest.add(hash);
needToRequest.add(hash); } else {
} else { dependencies.add(dep);
dependencies.add(dep);
}
} }
} }
results.addAll(dependencies); results.addAll(dependencies);
lock.lock();
try { try {
// Build the request for the missing dependencies. // Build the request for the missing dependencies.
List<ListenableFuture<Transaction>> futures = Lists.newArrayList(); List<ListenableFuture<Transaction>> futures = Lists.newArrayList();
@ -631,13 +653,16 @@ public class Peer {
log.error("Couldn't send getdata in downloadDependencies({})", tx.getHash()); log.error("Couldn't send getdata in downloadDependencies({})", tx.getHash());
resultFuture.setException(e); resultFuture.setException(e);
return resultFuture; return resultFuture;
} finally {
lock.unlock();
} }
return resultFuture; return resultFuture;
} }
private synchronized void processBlock(Block m) throws IOException { private void processBlock(Block m) throws IOException {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("{}: Received broadcast block {}", address.get(), m.getHashAsString()); log.debug("{}: Received broadcast block {}", address.get(), m.getHashAsString());
lock.lock();
try { try {
// Was this block requested by getBlock()? // Was this block requested by getBlock()?
if (maybeHandleRequestedData(m)) return; if (maybeHandleRequestedData(m)) return;
@ -683,13 +708,16 @@ public class Peer {
} catch (PrunedException e) { } catch (PrunedException e) {
// Unreachable when in SPV mode. // Unreachable when in SPV mode.
throw new RuntimeException(e); throw new RuntimeException(e);
} finally {
lock.unlock();
} }
} }
// TODO: Fix this duplication. // TODO: Fix this duplication.
private synchronized void processFilteredBlock(FilteredBlock m) throws IOException { private void processFilteredBlock(FilteredBlock m) throws IOException {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("{}: Received broadcast filtered block {}", address.get(), m.getHash().toString()); log.debug("{}: Received broadcast filtered block {}", address.get(), m.getHash().toString());
lock.lock();
try { try {
if (!downloadData.get()) { if (!downloadData.get()) {
log.debug("{}: Received block we did not ask for: {}", address.get(), m.getHash().toString()); log.debug("{}: Received block we did not ask for: {}", address.get(), m.getHash().toString());
@ -731,10 +759,13 @@ public class Peer {
// data from the remote peer and fix things. Or just give up. // data from the remote peer and fix things. Or just give up.
// TODO: Request e.getHash() and submit it to the block store before any other blocks // TODO: Request e.getHash() and submit it to the block store before any other blocks
throw new RuntimeException(e); throw new RuntimeException(e);
} finally {
lock.unlock();
} }
} }
private boolean maybeHandleRequestedData(Message m) { private boolean maybeHandleRequestedData(Message m) {
checkState(lock.isLocked());
boolean found = false; boolean found = false;
Sha256Hash hash = m.getHash(); Sha256Hash hash = m.getHash();
for (ListIterator<GetDataRequest> it = getDataFutures.listIterator(); it.hasNext();) { for (ListIterator<GetDataRequest> it = getDataFutures.listIterator(); it.hasNext();) {
@ -749,7 +780,8 @@ public class Peer {
return found; return found;
} }
private synchronized void invokeOnBlocksDownloaded(final Block m) { private void invokeOnBlocksDownloaded(final Block m) {
checkState(lock.isLocked());
// It is possible for the peer block height difference to be negative when blocks have been solved and broadcast // It is possible for the peer block height difference to be negative when blocks have been solved and broadcast
// since the time we first connected to the peer. However, it's weird and unexpected to receive a callback // since the time we first connected to the peer. However, it's weird and unexpected to receive a callback
// with negative "blocks left" in this case, so we clamp to zero so the API user doesn't have to think about it. // with negative "blocks left" in this case, so we clamp to zero so the API user doesn't have to think about it.
@ -762,117 +794,127 @@ public class Peer {
}); });
} }
private synchronized void processInv(InventoryMessage inv) throws IOException { private void processInv(InventoryMessage inv) throws IOException {
// This should be called in the network loop thread for this peer. lock.lock();
List<InventoryItem> items = inv.getItems(); try {
// This should be called in the network loop thread for this peer.
List<InventoryItem> items = inv.getItems();
// Separate out the blocks and transactions, we'll handle them differently // Separate out the blocks and transactions, we'll handle them differently
List<InventoryItem> transactions = new LinkedList<InventoryItem>(); List<InventoryItem> transactions = new LinkedList<InventoryItem>();
List<InventoryItem> blocks = new LinkedList<InventoryItem>(); List<InventoryItem> blocks = new LinkedList<InventoryItem>();
for (InventoryItem item : items) { for (InventoryItem item : items) {
switch (item.type) { switch (item.type) {
case Transaction: transactions.add(item); break; case Transaction:
case Block: blocks.add(item); break; transactions.add(item);
default: throw new IllegalStateException("Not implemented: " + item.type); break;
case Block:
blocks.add(item);
break;
default:
throw new IllegalStateException("Not implemented: " + item.type);
}
} }
}
final boolean downloadData = this.downloadData.get(); final boolean downloadData = this.downloadData.get();
if (transactions.size() == 0 && blocks.size() == 1) { if (transactions.size() == 0 && blocks.size() == 1) {
// Single block announcement. If we're downloading the chain this is just a tickle to make us continue // Single block announcement. If we're downloading the chain this is just a tickle to make us continue
// (the block chain download protocol is very implicit and not well thought out). If we're not downloading // (the block chain download protocol is very implicit and not well thought out). If we're not downloading
// the chain then this probably means a new block was solved and the peer believes it connects to the best // the chain then this probably means a new block was solved and the peer believes it connects to the best
// chain, so count it. This way getBestChainHeight() can be accurate. // chain, so count it. This way getBestChainHeight() can be accurate.
if (downloadData) { if (downloadData) {
if (!blockChain.isOrphan(blocks.get(0).hash)) { if (!blockChain.isOrphan(blocks.get(0).hash)) {
blocksAnnounced.incrementAndGet();
}
} else {
blocksAnnounced.incrementAndGet(); blocksAnnounced.incrementAndGet();
} }
} else {
blocksAnnounced.incrementAndGet();
} }
}
GetDataMessage getdata = new GetDataMessage(params); GetDataMessage getdata = new GetDataMessage(params);
Iterator<InventoryItem> it = transactions.iterator(); Iterator<InventoryItem> it = transactions.iterator();
while (it.hasNext()) { while (it.hasNext()) {
InventoryItem item = it.next(); InventoryItem item = it.next();
if (memoryPool == null) { if (memoryPool == null) {
if (downloadData) { if (downloadData) {
// If there's no memory pool only download transactions if we're configured to. // If there's no memory pool only download transactions if we're configured to.
getdata.addItem(item); getdata.addItem(item);
} }
} else {
// Only download the transaction if we are the first peer that saw it be advertised. Other peers will also
// see it be advertised in inv packets asynchronously, they co-ordinate via the memory pool. We could
// potentially download transactions faster by always asking every peer for a tx when advertised, as remote
// peers run at different speeds. However to conserve bandwidth on mobile devices we try to only download a
// transaction once. This means we can miss broadcasts if the peer disconnects between sending us an inv and
// sending us the transaction: currently we'll never try to re-fetch after a timeout.
if (memoryPool.maybeWasSeen(item.hash)) {
// Some other peer already announced this so don't download.
it.remove();
} else { } else {
log.debug("{}: getdata on tx {}", address.get(), item.hash); // Only download the transaction if we are the first peer that saw it be advertised. Other peers will also
getdata.addItem(item); // see it be advertised in inv packets asynchronously, they co-ordinate via the memory pool. We could
// potentially download transactions faster by always asking every peer for a tx when advertised, as remote
// peers run at different speeds. However to conserve bandwidth on mobile devices we try to only download a
// transaction once. This means we can miss broadcasts if the peer disconnects between sending us an inv and
// sending us the transaction: currently we'll never try to re-fetch after a timeout.
if (memoryPool.maybeWasSeen(item.hash)) {
// Some other peer already announced this so don't download.
it.remove();
} else {
log.debug("{}: getdata on tx {}", address.get(), item.hash);
getdata.addItem(item);
}
memoryPool.seen(item.hash, this.getAddress());
} }
memoryPool.seen(item.hash, this.getAddress());
} }
}
// If we are requesting filteredblocks we have to send a ping after the getdata so that we have a clear // If we are requesting filteredblocks we have to send a ping after the getdata so that we have a clear
// end to the final FilteredBlock's transactions (in the form of a pong) sent to us // end to the final FilteredBlock's transactions (in the form of a pong) sent to us
boolean pingAfterGetData = false; boolean pingAfterGetData = false;
if (blocks.size() > 0 && downloadData && blockChain != null) { if (blocks.size() > 0 && downloadData && blockChain != null) {
// Ideally, we'd only ask for the data here if we actually needed it. However that can imply a lot of // Ideally, we'd only ask for the data here if we actually needed it. However that can imply a lot of
// disk IO to figure out what we've got. Normally peers will not send us inv for things we already have // disk IO to figure out what we've got. Normally peers will not send us inv for things we already have
// so we just re-request it here, and if we get duplicates the block chain / wallet will filter them out. // so we just re-request it here, and if we get duplicates the block chain / wallet will filter them out.
for (InventoryItem item : blocks) { for (InventoryItem item : blocks) {
if (blockChain.isOrphan(item.hash) && downloadBlockBodies) { if (blockChain.isOrphan(item.hash) && downloadBlockBodies) {
// If an orphan was re-advertised, ask for more blocks unless we are not currently downloading // If an orphan was re-advertised, ask for more blocks unless we are not currently downloading
// full block data because we have a getheaders outstanding. // full block data because we have a getheaders outstanding.
blockChainDownload(blockChain.getOrphanRoot(item.hash).getHash()); blockChainDownload(blockChain.getOrphanRoot(item.hash).getHash());
} else { } else {
// Don't re-request blocks we already requested. Normally this should not happen. However there is // Don't re-request blocks we already requested. Normally this should not happen. However there is
// an edge case: if a block is solved and we complete the inv<->getdata<->block<->getblocks cycle // an edge case: if a block is solved and we complete the inv<->getdata<->block<->getblocks cycle
// whilst other parts of the chain are streaming in, then the new getblocks request won't match the // whilst other parts of the chain are streaming in, then the new getblocks request won't match the
// previous one: whilst the stopHash is the same (because we use the orphan root), the start hash // previous one: whilst the stopHash is the same (because we use the orphan root), the start hash
// will be different and so the getblocks req won't be dropped as a duplicate. We'll end up // will be different and so the getblocks req won't be dropped as a duplicate. We'll end up
// requesting a subset of what we already requested, which can lead to parallel chain downloads // requesting a subset of what we already requested, which can lead to parallel chain downloads
// and other nastyness. So we just do a quick removal of redundant getdatas here too. // and other nastyness. So we just do a quick removal of redundant getdatas here too.
// //
// Note that as of June 2012 the Satoshi client won't actually ever interleave blocks pushed as // Note that as of June 2012 the Satoshi client won't actually ever interleave blocks pushed as
// part of chain download with newly announced blocks, so it should always be taken care of by // part of chain download with newly announced blocks, so it should always be taken care of by
// the duplicate check in blockChainDownload(). But the satoshi client may change in future so // the duplicate check in blockChainDownload(). But the satoshi client may change in future so
// it's better to be safe here. // it's better to be safe here.
if (!pendingBlockDownloads.contains(item.hash)) { if (!pendingBlockDownloads.contains(item.hash)) {
if (getPeerVersionMessage().isBloomFilteringSupported() && useFilteredBlocks) { if (getPeerVersionMessage().isBloomFilteringSupported() && useFilteredBlocks) {
getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash)); getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash));
pingAfterGetData = true; pingAfterGetData = true;
} else { } else {
getdata.addItem(item); getdata.addItem(item);
}
pendingBlockDownloads.add(item.hash);
} }
pendingBlockDownloads.add(item.hash);
} }
} }
// If we're downloading the chain, doing a getdata on the last block we were told about will cause the
// peer to advertize the head block to us in a single-item inv. When we download THAT, it will be an
// orphan block, meaning we'll re-enter blockChainDownload() to trigger another getblocks between the
// current best block we have and the orphan block. If more blocks arrive in the meantime they'll also
// become orphan.
} }
// If we're downloading the chain, doing a getdata on the last block we were told about will cause the
// peer to advertize the head block to us in a single-item inv. When we download THAT, it will be an
// orphan block, meaning we'll re-enter blockChainDownload() to trigger another getblocks between the
// current best block we have and the orphan block. If more blocks arrive in the meantime they'll also
// become orphan.
}
if (!getdata.getItems().isEmpty()) { if (!getdata.getItems().isEmpty()) {
// This will cause us to receive a bunch of block or tx messages. // This will cause us to receive a bunch of block or tx messages.
sendMessage(getdata); sendMessage(getdata);
} }
if (pingAfterGetData) if (pingAfterGetData)
sendMessage(new Ping((long) Math.random() * Long.MAX_VALUE)); sendMessage(new Ping((long) Math.random() * Long.MAX_VALUE));
} finally {
lock.unlock();
}
} }
/** /**
@ -881,6 +923,7 @@ public class Peer {
* will block until the peer answers. * will block until the peer answers.
*/ */
public ListenableFuture<Block> getBlock(Sha256Hash blockHash) throws IOException { public ListenableFuture<Block> getBlock(Sha256Hash blockHash) throws IOException {
// This does not need to be locked.
log.info("Request to fetch block {}", blockHash); log.info("Request to fetch block {}", blockHash);
GetDataMessage getdata = new GetDataMessage(params); GetDataMessage getdata = new GetDataMessage(params);
getdata.addBlock(blockHash); getdata.addBlock(blockHash);
@ -893,6 +936,7 @@ public class Peer {
* in future many peers will delete old transaction data they don't need. * in future many peers will delete old transaction data they don't need.
*/ */
public ListenableFuture<Transaction> getPeerMempoolTransaction(Sha256Hash hash) throws IOException { public ListenableFuture<Transaction> getPeerMempoolTransaction(Sha256Hash hash) throws IOException {
// This does not need to be locked.
// TODO: Unit test this method. // TODO: Unit test this method.
log.info("Request to fetch peer mempool tx {}", hash); log.info("Request to fetch peer mempool tx {}", hash);
GetDataMessage getdata = new GetDataMessage(params); GetDataMessage getdata = new GetDataMessage(params);
@ -902,6 +946,7 @@ public class Peer {
/** Sends a getdata with a single item in it. */ /** Sends a getdata with a single item in it. */
private ListenableFuture sendSingleGetData(GetDataMessage getdata) throws IOException { private ListenableFuture sendSingleGetData(GetDataMessage getdata) throws IOException {
// This does not need to be locked.
Preconditions.checkArgument(getdata.getItems().size() == 1); Preconditions.checkArgument(getdata.getItems().size() == 1);
GetDataRequest req = new GetDataRequest(); GetDataRequest req = new GetDataRequest();
req.future = SettableFuture.create(); req.future = SettableFuture.create();
@ -920,20 +965,25 @@ public class Peer {
* *
* @param secondsSinceEpoch Time in seconds since the epoch or 0 to reset to always downloading block bodies. * @param secondsSinceEpoch Time in seconds since the epoch or 0 to reset to always downloading block bodies.
*/ */
public synchronized void setDownloadParameters(long secondsSinceEpoch, boolean useFilteredBlocks) { public void setDownloadParameters(long secondsSinceEpoch, boolean useFilteredBlocks) {
Preconditions.checkNotNull(blockChain); lock.lock();
if (secondsSinceEpoch == 0) { try {
fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); Preconditions.checkNotNull(blockChain);
downloadBlockBodies = true; if (secondsSinceEpoch == 0) {
} else { fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
fastCatchupTimeSecs = secondsSinceEpoch; downloadBlockBodies = true;
// If the given time is before the current chains head block time, then this has no effect (we already } else {
// downloaded everything we need). fastCatchupTimeSecs = secondsSinceEpoch;
if (fastCatchupTimeSecs > blockChain.getChainHead().getHeader().getTimeSeconds()) { // If the given time is before the current chains head block time, then this has no effect (we already
downloadBlockBodies = false; // downloaded everything we need).
if (fastCatchupTimeSecs > blockChain.getChainHead().getHeader().getTimeSeconds()) {
downloadBlockBodies = false;
}
} }
this.useFilteredBlocks = useFilteredBlocks;
} finally {
lock.unlock();
} }
this.useFilteredBlocks = useFilteredBlocks;
} }
/** /**
@ -942,11 +992,13 @@ public class Peer {
* independently, otherwise the wallet will receive duplicate notifications. * independently, otherwise the wallet will receive duplicate notifications.
*/ */
public void addWallet(Wallet wallet) { public void addWallet(Wallet wallet) {
// This does not need to be locked.
wallets.add(wallet); wallets.add(wallet);
} }
/** Unlinks the given wallet from peer. See {@link Peer#addWallet(Wallet)}. */ /** Unlinks the given wallet from peer. See {@link Peer#addWallet(Wallet)}. */
public void removeWallet(Wallet wallet) { public void removeWallet(Wallet wallet) {
// This does not need to be locked.
wallets.remove(wallet); wallets.remove(wallet);
} }
@ -954,6 +1006,7 @@ public class Peer {
* Sends the given message on the peers Channel. * Sends the given message on the peers Channel.
*/ */
public ChannelFuture sendMessage(Message m) throws IOException { public ChannelFuture sendMessage(Message m) throws IOException {
// This does not need to be locked.
return Channels.write(channel, m); return Channels.write(channel, m);
} }
@ -962,7 +1015,7 @@ public class Peer {
// multiple threads simultaneously. // multiple threads simultaneously.
private Sha256Hash lastGetBlocksBegin, lastGetBlocksEnd; private Sha256Hash lastGetBlocksBegin, lastGetBlocksEnd;
private synchronized void blockChainDownload(Sha256Hash toHash) throws IOException { private void blockChainDownload(Sha256Hash toHash) throws IOException {
// The block chain download process is a bit complicated. Basically, we start with one or more blocks in a // The block chain download process is a bit complicated. Basically, we start with one or more blocks in a
// chain that we have from a previous session. We want to catch up to the head of the chain BUT we don't know // chain that we have from a previous session. We want to catch up to the head of the chain BUT we don't know
// where that chain is up to or even if the top block we have is even still in the chain - we // where that chain is up to or even if the top block we have is even still in the chain - we
@ -996,52 +1049,57 @@ public class Peer {
// headers and then request the blocks from that point onwards. "getheaders" does not send us an inv, it just // headers and then request the blocks from that point onwards. "getheaders" does not send us an inv, it just
// sends us the data we requested in a "headers" message. // sends us the data we requested in a "headers" message.
// TODO: Block locators should be abstracted out rather than special cased here. lock.lock();
List<Sha256Hash> blockLocator = new ArrayList<Sha256Hash>(51); try {
// For now we don't do the exponential thinning as suggested here: // TODO: Block locators should be abstracted out rather than special cased here.
// List<Sha256Hash> blockLocator = new ArrayList<Sha256Hash>(51);
// https://en.bitcoin.it/wiki/Protocol_specification#getblocks // For now we don't do the exponential thinning as suggested here:
// //
// This is because it requires scanning all the block chain headers, which is very slow. Instead we add the top // https://en.bitcoin.it/wiki/Protocol_specification#getblocks
// 50 block headers. If there is a re-org deeper than that, we'll end up downloading the entire chain. We //
// must always put the genesis block as the first entry. // This is because it requires scanning all the block chain headers, which is very slow. Instead we add the top
BlockStore store = blockChain.getBlockStore(); // 50 block headers. If there is a re-org deeper than that, we'll end up downloading the entire chain. We
StoredBlock chainHead = blockChain.getChainHead(); // must always put the genesis block as the first entry.
Sha256Hash chainHeadHash = chainHead.getHeader().getHash(); BlockStore store = blockChain.getBlockStore();
// Did we already make this request? If so, don't do it again. StoredBlock chainHead = blockChain.getChainHead();
if (Objects.equal(lastGetBlocksBegin, chainHeadHash) && Objects.equal(lastGetBlocksEnd, toHash)) { Sha256Hash chainHeadHash = chainHead.getHeader().getHash();
log.info("blockChainDownload({}): ignoring duplicated request", toHash.toString()); // Did we already make this request? If so, don't do it again.
return; if (Objects.equal(lastGetBlocksBegin, chainHeadHash) && Objects.equal(lastGetBlocksEnd, toHash)) {
} log.info("blockChainDownload({}): ignoring duplicated request", toHash.toString());
log.debug("{}: blockChainDownload({}) current head = {}", new Object[] { toString(), return;
toHash.toString(), chainHead.getHeader().getHashAsString() }); }
StoredBlock cursor = chainHead; log.debug("{}: blockChainDownload({}) current head = {}", new Object[]{toString(),
for (int i = 100; cursor != null && i > 0; i--) { toHash.toString(), chainHead.getHeader().getHashAsString()});
blockLocator.add(cursor.getHeader().getHash()); StoredBlock cursor = chainHead;
try { for (int i = 100; cursor != null && i > 0; i--) {
cursor = cursor.getPrev(store); blockLocator.add(cursor.getHeader().getHash());
} catch (BlockStoreException e) { try {
log.error("Failed to walk the block chain whilst constructing a locator"); cursor = cursor.getPrev(store);
throw new RuntimeException(e); } catch (BlockStoreException e) {
log.error("Failed to walk the block chain whilst constructing a locator");
throw new RuntimeException(e);
}
}
// Only add the locator if we didn't already do so. If the chain is < 50 blocks we already reached it.
if (cursor != null) {
blockLocator.add(params.genesisBlock.getHash());
} }
}
// Only add the locator if we didn't already do so. If the chain is < 50 blocks we already reached it.
if (cursor != null) {
blockLocator.add(params.genesisBlock.getHash());
}
// Record that we requested this range of blocks so we can filter out duplicate requests in the event of a // Record that we requested this range of blocks so we can filter out duplicate requests in the event of a
// block being solved during chain download. // block being solved during chain download.
lastGetBlocksBegin = chainHeadHash; lastGetBlocksBegin = chainHeadHash;
lastGetBlocksEnd = toHash; lastGetBlocksEnd = toHash;
if (downloadBlockBodies) { if (downloadBlockBodies) {
GetBlocksMessage message = new GetBlocksMessage(params, blockLocator, toHash); GetBlocksMessage message = new GetBlocksMessage(params, blockLocator, toHash);
sendMessage(message); sendMessage(message);
} else { } else {
// Downloading headers for a while instead of full blocks. // Downloading headers for a while instead of full blocks.
GetHeadersMessage message = new GetHeadersMessage(params, blockLocator, toHash); GetHeadersMessage message = new GetHeadersMessage(params, blockLocator, toHash);
sendMessage(message); sendMessage(message);
}
} finally {
lock.unlock();
} }
} }
@ -1049,15 +1107,17 @@ public class Peer {
* Starts an asynchronous download of the block chain. The chain download is deemed to be complete once we've * Starts an asynchronous download of the block chain. The chain download is deemed to be complete once we've
* downloaded the same number of blocks that the peer advertised having in its version handshake message. * downloaded the same number of blocks that the peer advertised having in its version handshake message.
*/ */
public synchronized void startBlockChainDownload() throws IOException { public void startBlockChainDownload() throws IOException {
// This does not need to be locked.
setDownloadData(true); setDownloadData(true);
// TODO: peer might still have blocks that we don't have, and even have a heavier // TODO: peer might still have blocks that we don't have, and even have a heavier
// chain even if the chain block count is lower. // chain even if the chain block count is lower.
if (getPeerBlockHeightDifference() >= 0) { final int peerBlockHeightDifference = getPeerBlockHeightDifference();
if (peerBlockHeightDifference >= 0) {
EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker<PeerEventListener>() { EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker<PeerEventListener>() {
@Override @Override
public void invoke(PeerEventListener listener) { public void invoke(PeerEventListener listener) {
listener.onChainDownloadStarted(Peer.this, getPeerBlockHeightDifference()); listener.onChainDownloadStarted(Peer.this, peerBlockHeightDifference);
} }
}); });
@ -1116,11 +1176,12 @@ public class Peer {
* updated. * updated.
* @throws ProtocolException if the peer version is too low to support measurable pings. * @throws ProtocolException if the peer version is too low to support measurable pings.
*/ */
public synchronized ListenableFuture<Long> ping() throws IOException, ProtocolException { public ListenableFuture<Long> ping() throws IOException, ProtocolException {
return ping((long) Math.random() * Long.MAX_VALUE); return ping((long) Math.random() * Long.MAX_VALUE);
} }
protected synchronized ListenableFuture<Long> ping(long nonce) throws IOException, ProtocolException { protected ListenableFuture<Long> ping(long nonce) throws IOException, ProtocolException {
// This does not need to be locked.
if (!getPeerVersionMessage().isPingPongSupported()) if (!getPeerVersionMessage().isPingPongSupported())
throw new ProtocolException("Peer version is too low for measurable pings: " + getPeerVersionMessage()); throw new ProtocolException("Peer version is too low for measurable pings: " + getPeerVersionMessage());
PendingPing pendingPing = new PendingPing(nonce); PendingPing pendingPing = new PendingPing(nonce);
@ -1163,6 +1224,7 @@ public class Peer {
} }
private void processPong(Pong m) { private void processPong(Pong m) {
// This does not need to be locked.
PendingPing ping = null; PendingPing ping = null;
// Iterates over a snapshot of the list, so we can run unlocked here. // Iterates over a snapshot of the list, so we can run unlocked here.
ListIterator<PendingPing> it = pendingPings.listIterator(); ListIterator<PendingPing> it = pendingPings.listIterator();
@ -1182,14 +1244,19 @@ public class Peer {
* Returns the difference between our best chain height and the peers, which can either be positive if we are * Returns the difference between our best chain height and the peers, which can either be positive if we are
* behind the peer, or negative if the peer is ahead of us. * behind the peer, or negative if the peer is ahead of us.
*/ */
public synchronized int getPeerBlockHeightDifference() { public int getPeerBlockHeightDifference() {
// Chain will overflow signed int blocks in ~41,000 years. lock.lock();
int chainHeight = (int) getBestHeight(); try {
// chainHeight should not be zero/negative because we shouldn't have given the user a Peer that is to another // Chain will overflow signed int blocks in ~41,000 years.
// client-mode node, nor should it be unconnected. If that happens it means the user overrode us somewhere or int chainHeight = (int) getBestHeight();
// there is a bug in the peer management code. // chainHeight should not be zero/negative because we shouldn't have given the user a Peer that is to another
Preconditions.checkState(params.allowEmptyPeerChains || chainHeight > 0, "Connected to peer with zero/negative chain height", chainHeight); // client-mode node, nor should it be unconnected. If that happens it means the user overrode us somewhere or
return chainHeight - blockChain.getBestChainHeight(); // there is a bug in the peer management code.
checkState(params.allowEmptyPeerChains || chainHeight > 0, "Connected to peer with zero/negative chain height", chainHeight);
return chainHeight - blockChain.getBestChainHeight();
} finally {
lock.unlock();
}
} }
private boolean isNotFoundMessageSupported() { private boolean isNotFoundMessageSupported() {
@ -1243,8 +1310,11 @@ public class Peer {
* @return if not-null then this is the future for the Peer disconnection event. * @return if not-null then this is the future for the Peer disconnection event.
*/ */
public ChannelFuture setMinProtocolVersion(int minProtocolVersion) { public ChannelFuture setMinProtocolVersion(int minProtocolVersion) {
synchronized (this) { lock.lock();
try {
this.minProtocolVersion = minProtocolVersion; this.minProtocolVersion = minProtocolVersion;
} finally {
lock.unlock();
} }
if (getVersionMessage().clientVersion < minProtocolVersion) { if (getVersionMessage().clientVersion < minProtocolVersion) {
log.warn("{}: Disconnecting due to new min protocol version {}", this, minProtocolVersion); log.warn("{}: Disconnecting due to new min protocol version {}", this, minProtocolVersion);
@ -1272,8 +1342,11 @@ public class Peer {
if (!getPeerVersionMessage().isBloomFilteringSupported()) if (!getPeerVersionMessage().isBloomFilteringSupported())
return; return;
boolean shouldQueryMemPool; boolean shouldQueryMemPool;
synchronized (this) { lock.lock();
try {
shouldQueryMemPool = memoryPool != null || downloadData.get(); shouldQueryMemPool = memoryPool != null || downloadData.get();
} finally {
lock.unlock();
} }
log.info("{}: Sending Bloom filter{}", this, shouldQueryMemPool ? " and querying mempool" : ""); log.info("{}: Sending Bloom filter{}", this, shouldQueryMemPool ? " and querying mempool" : "");
ChannelFuture future = sendMessage(filter); ChannelFuture future = sendMessage(filter);

View File

@ -1,5 +1,5 @@
/** /**
* Copyright 2011 Google Inc. * Copyright 2013 Google Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -21,6 +21,7 @@ import com.google.bitcoin.core.Peer.PeerHandler;
import com.google.bitcoin.discovery.PeerDiscovery; import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException; import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.utils.EventListenerInvoker; import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.bitcoin.utils.Locks;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.*;
@ -39,9 +40,11 @@ import java.net.SocketAddress;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
/** /**
* <p>Runs a set of connections to the P2P network, brings up connections to replace disconnected nodes and manages * <p>Runs a set of connections to the P2P network, brings up connections to replace disconnected nodes and manages
@ -69,12 +72,12 @@ public class PeerGroup extends AbstractIdleService {
private static final int DEFAULT_CONNECTIONS = 4; private static final int DEFAULT_CONNECTIONS = 4;
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
protected final ReentrantLock lock = Locks.lock("peergroup");
// These lists are all thread-safe so do not have to be accessed under the PeerGroup lock. // These lists are all thread-safe so do not have to be accessed under the PeerGroup lock.
// Addresses to try to connect to, excluding active peers. // Addresses to try to connect to, excluding active peers.
private List<PeerAddress> inactives; private List<PeerAddress> inactives;
// Currently active peers. This is an ordered list rather than a set to make unit tests predictable. This is a // Currently active peers. This is an ordered list rather than a set to make unit tests predictable.
// synchronized list. Locking order is: PeerGroup < Peer < peers. Same for pendingPeers.
private List<Peer> peers; private List<Peer> peers;
// Currently connecting peers. // Currently connecting peers.
private List<Peer> pendingPeers; private List<Peer> pendingPeers;
@ -85,9 +88,9 @@ public class PeerGroup extends AbstractIdleService {
// Callback for events related to chain download // Callback for events related to chain download
private PeerEventListener downloadListener; private PeerEventListener downloadListener;
// Callbacks for events related to peer connection/disconnection // Callbacks for events related to peer connection/disconnection
private List<PeerEventListener> peerEventListeners; private final CopyOnWriteArrayList<PeerEventListener> peerEventListeners;
// Peer discovery sources, will be polled occasionally if there aren't enough inactives. // Peer discovery sources, will be polled occasionally if there aren't enough inactives.
private Set<PeerDiscovery> peerDiscoverers; private CopyOnWriteArraySet<PeerDiscovery> peerDiscoverers;
// The version message to use for new connections. // The version message to use for new connections.
private VersionMessage versionMessage; private VersionMessage versionMessage;
// A class that tracks recent transactions that have been broadcast across the network, counts how many // A class that tracks recent transactions that have been broadcast across the network, counts how many
@ -99,7 +102,7 @@ public class PeerGroup extends AbstractIdleService {
// Runs a background thread that we use for scheduling pings to our peers, so we can measure their performance // Runs a background thread that we use for scheduling pings to our peers, so we can measure their performance
// and network latency. We ping peers every pingIntervalMsec milliseconds. // and network latency. We ping peers every pingIntervalMsec milliseconds.
private Timer pingTimer; private volatile Timer pingTimer;
/** How many milliseconds to wait after receiving a pong before sending another ping. */ /** How many milliseconds to wait after receiving a pong before sending another ping. */
public static final long DEFAULT_PING_INTERVAL_MSEC = 2000; public static final long DEFAULT_PING_INTERVAL_MSEC = 2000;
private long pingIntervalMsec = DEFAULT_PING_INTERVAL_MSEC; private long pingIntervalMsec = DEFAULT_PING_INTERVAL_MSEC;
@ -107,7 +110,7 @@ public class PeerGroup extends AbstractIdleService {
private final NetworkParameters params; private final NetworkParameters params;
private final AbstractBlockChain chain; private final AbstractBlockChain chain;
private long fastCatchupTimeSecs; private long fastCatchupTimeSecs;
private ArrayList<Wallet> wallets; private final CopyOnWriteArrayList<Wallet> wallets;
private AbstractPeerEventListener getDataListener; private AbstractPeerEventListener getDataListener;
private ClientBootstrap bootstrap; private ClientBootstrap bootstrap;
@ -190,7 +193,7 @@ public class PeerGroup extends AbstractIdleService {
this.params = params; this.params = params;
this.chain = chain; // Can be null. this.chain = chain; // Can be null.
this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
this.wallets = new ArrayList<Wallet>(1); this.wallets = new CopyOnWriteArrayList<Wallet>();
// This default sentinel value will be overridden by one of two actions: // This default sentinel value will be overridden by one of two actions:
// - adding a peer discovery source sets it to the default // - adding a peer discovery source sets it to the default
@ -217,7 +220,7 @@ public class PeerGroup extends AbstractIdleService {
pendingPeers = Collections.synchronizedList(new ArrayList<Peer>()); pendingPeers = Collections.synchronizedList(new ArrayList<Peer>());
channels = new DefaultChannelGroup(); channels = new DefaultChannelGroup();
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>(); peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerEventListeners = new ArrayList<PeerEventListener>(); peerEventListeners = new CopyOnWriteArrayList<PeerEventListener>();
// This event listener is added to every peer. It's here so when we announce transactions via an "inv", every // This event listener is added to every peer. It's here so when we announce transactions via an "inv", every
// peer can fetch them. // peer can fetch them.
getDataListener = new AbstractPeerEventListener() { getDataListener = new AbstractPeerEventListener() {
@ -250,6 +253,7 @@ public class PeerGroup extends AbstractIdleService {
private ChannelPipelineFactory makePipelineFactory(final NetworkParameters params, final AbstractBlockChain chain) { private ChannelPipelineFactory makePipelineFactory(final NetworkParameters params, final AbstractBlockChain chain) {
return new ChannelPipelineFactory() { return new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
// This runs unlocked.
VersionMessage ver = getVersionMessage().duplicate(); VersionMessage ver = getVersionMessage().duplicate();
ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight(); ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight();
ver.time = Utils.now().getTime() / 1000; ver.time = Utils.now().getTime() / 1000;
@ -274,9 +278,12 @@ public class PeerGroup extends AbstractIdleService {
*/ */
public void setMaxConnections(int maxConnections) { public void setMaxConnections(int maxConnections) {
int adjustment; int adjustment;
synchronized (this) { lock.lock();
try {
this.maxConnections = maxConnections; this.maxConnections = maxConnections;
if (!isRunning()) return; if (!isRunning()) return;
} finally {
lock.unlock();
} }
// We may now have too many or too few open connections. Add more or drop some to get to the right amount. // We may now have too many or too few open connections. Add more or drop some to get to the right amount.
adjustment = maxConnections - channels.size(); adjustment = maxConnections - channels.size();
@ -295,37 +302,47 @@ public class PeerGroup extends AbstractIdleService {
} }
/** The maximum number of connections that we will create to peers. */ /** The maximum number of connections that we will create to peers. */
public synchronized int getMaxConnections() { public int getMaxConnections() {
return maxConnections; lock.lock();
try {
return maxConnections;
} finally {
lock.unlock();
}
} }
private synchronized List<Message> handleGetData(GetDataMessage m) { private List<Message> handleGetData(GetDataMessage m) {
// Scans the wallets and memory pool for transactions in the getdata message and returns them. // Scans the wallets and memory pool for transactions in the getdata message and returns them.
// Runs on peer threads. // Runs on peer threads.
LinkedList<Message> transactions = new LinkedList<Message>(); lock.lock();
LinkedList<InventoryItem> items = new LinkedList<InventoryItem>(m.getItems()); try {
Iterator<InventoryItem> it = items.iterator(); LinkedList<Message> transactions = new LinkedList<Message>();
while (it.hasNext()) { LinkedList<InventoryItem> items = new LinkedList<InventoryItem>(m.getItems());
InventoryItem item = it.next(); Iterator<InventoryItem> it = items.iterator();
// Check the mempool first. while (it.hasNext()) {
Transaction tx = memoryPool.get(item.hash); InventoryItem item = it.next();
if (tx != null) { // Check the mempool first.
transactions.add(tx); Transaction tx = memoryPool.get(item.hash);
it.remove(); if (tx != null) {
} else { transactions.add(tx);
// Check the wallets. it.remove();
for (Wallet w : wallets) { } else {
synchronized (w) { // Check the wallets.
tx = w.getTransaction(item.hash); for (Wallet w : wallets) {
if (tx == null) continue; synchronized (w) {
transactions.add(tx); tx = w.getTransaction(item.hash);
it.remove(); if (tx == null) continue;
break; transactions.add(tx);
it.remove();
break;
}
} }
} }
} }
return transactions;
} finally {
lock.unlock();
} }
return transactions;
} }
/** /**
@ -337,15 +354,25 @@ public class PeerGroup extends AbstractIdleService {
* The VersionMessage you provide is copied and the best chain height/time filled in for each new connection, * The VersionMessage you provide is copied and the best chain height/time filled in for each new connection,
* therefore you don't have to worry about setting that. The provided object is really more of a template. * therefore you don't have to worry about setting that. The provided object is really more of a template.
*/ */
public synchronized void setVersionMessage(VersionMessage ver) { public void setVersionMessage(VersionMessage ver) {
versionMessage = ver; lock.lock();
try {
versionMessage = ver;
} finally {
lock.unlock();
}
} }
/** /**
* Returns the version message provided by setVersionMessage or a default if none was given. * Returns the version message provided by setVersionMessage or a default if none was given.
*/ */
public synchronized VersionMessage getVersionMessage() { public VersionMessage getVersionMessage() {
return versionMessage; lock.lock();
try {
return versionMessage;
} finally {
lock.unlock();
}
} }
/** /**
@ -367,11 +394,16 @@ public class PeerGroup extends AbstractIdleService {
} }
// Updates the relayTxesBeforeFilter flag of ver // Updates the relayTxesBeforeFilter flag of ver
private synchronized void updateVersionMessageRelayTxesBeforeFilter(VersionMessage ver) { private void updateVersionMessageRelayTxesBeforeFilter(VersionMessage ver) {
// We will provide the remote node with a bloom filter (ie they shouldn't relay yet) // We will provide the remote node with a bloom filter (ie they shouldn't relay yet)
// iff chain == null || !chain.shouldVerifyTransactions() and a wallet is added // iff chain == null || !chain.shouldVerifyTransactions() and a wallet is added
// Note that the default here means that no tx invs will be received if no wallet is ever added // Note that the default here means that no tx invs will be received if no wallet is ever added
ver.relayTxesBeforeFilter = chain != null && chain.shouldVerifyTransactions() && wallets.size() > 0; lock.lock();
try {
ver.relayTxesBeforeFilter = chain != null && chain.shouldVerifyTransactions() && wallets.size() > 0;
} finally {
lock.unlock();
}
} }
/** /**
@ -401,12 +433,12 @@ public class PeerGroup extends AbstractIdleService {
* <p>The listener will be locked during callback execution, which in turn will cause network message processing * <p>The listener will be locked during callback execution, which in turn will cause network message processing
* to stop until the listener returns.</p> * to stop until the listener returns.</p>
*/ */
public synchronized void addEventListener(PeerEventListener listener) { public void addEventListener(PeerEventListener listener) {
peerEventListeners.add(checkNotNull(listener)); peerEventListeners.add(checkNotNull(listener));
} }
/** The given event listener will no longer be called with events. */ /** The given event listener will no longer be called with events. */
public synchronized boolean removeEventListener(PeerEventListener listener) { public boolean removeEventListener(PeerEventListener listener) {
return peerEventListeners.remove(checkNotNull(listener)); return peerEventListeners.remove(checkNotNull(listener));
} }
@ -437,9 +469,12 @@ public class PeerGroup extends AbstractIdleService {
*/ */
public void addAddress(PeerAddress peerAddress) { public void addAddress(PeerAddress peerAddress) {
int newMax; int newMax;
synchronized (this) { lock.lock();
try {
inactives.add(peerAddress); inactives.add(peerAddress);
newMax = getMaxConnections() + 1; newMax = getMaxConnections() + 1;
} finally {
lock.unlock();
} }
setMaxConnections(newMax); setMaxConnections(newMax);
} }
@ -453,13 +488,19 @@ public class PeerGroup extends AbstractIdleService {
* Add addresses from a discovery source to the list of potential peers to connect to. If max connections has not * Add addresses from a discovery source to the list of potential peers to connect to. If max connections has not
* been configured, or set to zero, then it's set to the default at this point. * been configured, or set to zero, then it's set to the default at this point.
*/ */
public synchronized void addPeerDiscovery(PeerDiscovery peerDiscovery) { public void addPeerDiscovery(PeerDiscovery peerDiscovery) {
if (getMaxConnections() == 0) lock.lock();
setMaxConnections(DEFAULT_CONNECTIONS); try {
peerDiscoverers.add(peerDiscovery); if (getMaxConnections() == 0)
setMaxConnections(DEFAULT_CONNECTIONS);
peerDiscoverers.add(peerDiscovery);
} finally {
lock.unlock();
}
} }
protected void discoverPeers() throws PeerDiscoveryException { protected void discoverPeers() throws PeerDiscoveryException {
// This does not need to be locked.
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Set<PeerAddress> addressSet = Sets.newHashSet(); Set<PeerAddress> addressSet = Sets.newHashSet();
for (PeerDiscovery peerDiscovery : peerDiscoverers) { for (PeerDiscovery peerDiscovery : peerDiscoverers) {
@ -477,9 +518,11 @@ public class PeerGroup extends AbstractIdleService {
/** Picks a peer from discovery and connects to it. If connection fails, picks another and tries again. */ /** Picks a peer from discovery and connects to it. If connection fails, picks another and tries again. */
protected void connectToAnyPeer() throws PeerDiscoveryException { protected void connectToAnyPeer() throws PeerDiscoveryException {
// Do not call this method whilst synchronized on the PeerGroup lock. checkState(!lock.isLocked());
final State state = state();
if (!(state == State.STARTING || state == State.RUNNING)) return;
final PeerAddress addr; final PeerAddress addr;
if (!(state() == State.STARTING || state() == State.RUNNING)) return;
synchronized (inactives) { synchronized (inactives) {
if (inactives.size() == 0) { if (inactives.size() == 0) {
discoverPeers(); discoverPeers();
@ -503,9 +546,7 @@ public class PeerGroup extends AbstractIdleService {
@Override @Override
protected void startUp() throws Exception { protected void startUp() throws Exception {
// This is run in a background thread by the AbstractIdleService implementation. // This is run in a background thread by the AbstractIdleService implementation.
synchronized (this) { pingTimer = new Timer("Peer pinging thread", true);
pingTimer = new Timer("Peer pinging thread", true);
}
// Bring up the requested number of connections. If a connect attempt fails, // Bring up the requested number of connections. If a connect attempt fails,
// new peers will be tried until there is a success, so just calling connectToAnyPeer for the wanted number // new peers will be tried until there is a success, so just calling connectToAnyPeer for the wanted number
// of peers is sufficient. // of peers is sufficient.
@ -530,9 +571,7 @@ public class PeerGroup extends AbstractIdleService {
for (PeerDiscovery peerDiscovery : peerDiscoverers) { for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown(); peerDiscovery.shutdown();
} }
synchronized (this) { pingTimer.cancel();
pingTimer.cancel();
}
} }
/** /**
@ -546,27 +585,38 @@ public class PeerGroup extends AbstractIdleService {
* <p>Note that this should be done before chain download commences because if you add a wallet with keys earlier * <p>Note that this should be done before chain download commences because if you add a wallet with keys earlier
* than the current chain head, the relevant parts of the chain won't be redownloaded for you.</p> * than the current chain head, the relevant parts of the chain won't be redownloaded for you.</p>
*/ */
public synchronized void addWallet(Wallet wallet) { public void addWallet(Wallet wallet) {
Preconditions.checkNotNull(wallet); lock.lock();
Preconditions.checkState(!wallets.contains(wallet)); try {
wallets.add(wallet); Preconditions.checkNotNull(wallet);
announcePendingWalletTransactions(Collections.singletonList(wallet), peers); Preconditions.checkState(!wallets.contains(wallet));
wallets.add(wallet);
announcePendingWalletTransactions(Collections.singletonList(wallet), peers);
// Don't bother downloading block bodies before the oldest keys in all our wallets. Make sure we recalculate // Don't bother downloading block bodies before the oldest keys in all our wallets. Make sure we recalculate
// if a key is added. Of course, by then we may have downloaded the chain already. Ideally adding keys would // if a key is added. Of course, by then we may have downloaded the chain already. Ideally adding keys would
// automatically rewind the block chain and redownload the blocks to find transactions relevant to those keys, // automatically rewind the block chain and redownload the blocks to find transactions relevant to those keys,
// all transparently and in the background. But we are a long way from that yet. // all transparently and in the background. But we are a long way from that yet.
wallet.addEventListener(new AbstractWalletEventListener() { wallet.addEventListener(new AbstractWalletEventListener() {
@Override @Override
public void onKeyAdded(ECKey key) { public void onKeyAdded(ECKey key) {
recalculateFastCatchupAndFilter(); lock.lock();
} try {
}); recalculateFastCatchupAndFilter();
recalculateFastCatchupAndFilter(); } finally {
updateVersionMessageRelayTxesBeforeFilter(getVersionMessage()); lock.unlock();
}
}
});
recalculateFastCatchupAndFilter();
updateVersionMessageRelayTxesBeforeFilter(getVersionMessage());
} finally {
lock.unlock();
}
} }
private synchronized void recalculateFastCatchupAndFilter() { private void recalculateFastCatchupAndFilter() {
checkState(lock.isLocked());
// Fully verifying mode doesn't use this optimization (it can't as it needs to see all transactions). // Fully verifying mode doesn't use this optimization (it can't as it needs to see all transactions).
if (chain != null && chain.shouldVerifyTransactions()) if (chain != null && chain.shouldVerifyTransactions())
return; return;
@ -606,18 +656,21 @@ public class PeerGroup extends AbstractIdleService {
* See the docs for {@link BloomFilter#BloomFilter(int, double, long)} for a brief explanation of anonymity when * See the docs for {@link BloomFilter#BloomFilter(int, double, long)} for a brief explanation of anonymity when
* using bloom filters. * using bloom filters.
*/ */
public synchronized void setBloomFilterFalsePositiveRate(double bloomFilterFPRate) { public void setBloomFilterFalsePositiveRate(double bloomFilterFPRate) {
this.bloomFilterFPRate = bloomFilterFPRate; lock.lock();
recalculateFastCatchupAndFilter(); try {
this.bloomFilterFPRate = bloomFilterFPRate;
recalculateFastCatchupAndFilter();
} finally {
lock.unlock();
}
} }
/** /**
* Unlinks the given wallet so it no longer receives broadcast transactions or has its transactions announced. * Unlinks the given wallet so it no longer receives broadcast transactions or has its transactions announced.
*/ */
public void removeWallet(Wallet wallet) { public void removeWallet(Wallet wallet) {
if (wallet == null) wallets.remove(checkNotNull(wallet));
throw new IllegalArgumentException("wallet is null");
wallets.remove(wallet);
} }
/** /**
@ -639,8 +692,9 @@ public class PeerGroup extends AbstractIdleService {
return connectTo(address, true); return connectTo(address, true);
} }
// Internal version. Do not call whilst holding the PeerGroup lock. // Internal version.
protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) { protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) {
checkState(!lock.isLocked());
ChannelFuture future = bootstrap.connect(address); ChannelFuture future = bootstrap.connect(address);
// Make sure that the channel group gets access to the channel only if it connects successfully (otherwise // Make sure that the channel group gets access to the channel only if it connects successfully (otherwise
// it cannot be closed and trying to do so will cause problems). // it cannot be closed and trying to do so will cause problems).
@ -661,11 +715,14 @@ public class PeerGroup extends AbstractIdleService {
// This can be null in unit tests or apps that don't use TCP connections. // This can be null in unit tests or apps that don't use TCP connections.
networkHandler.getOwnerObject().setRemoteAddress(address); networkHandler.getOwnerObject().setRemoteAddress(address);
} }
synchronized (this) { if (incrementMaxConnections) {
if (incrementMaxConnections) { // We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new
// We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new // outbound connection.
// outbound connection. lock.lock();
try {
maxConnections++; maxConnections++;
} finally {
lock.unlock();
} }
} }
return future; return future;
@ -687,15 +744,20 @@ public class PeerGroup extends AbstractIdleService {
* *
* @param listener a listener for chain download events, may not be null * @param listener a listener for chain download events, may not be null
*/ */
public synchronized void startBlockChainDownload(PeerEventListener listener) { public void startBlockChainDownload(PeerEventListener listener) {
this.downloadListener = listener; lock.lock();
// TODO: be more nuanced about which peer to download from. We can also try try {
// downloading from multiple peers and handle the case when a new peer comes along this.downloadListener = listener;
// with a longer chain after we thought we were done. // TODO: be more nuanced about which peer to download from. We can also try
synchronized (peers) { // downloading from multiple peers and handle the case when a new peer comes along
if (!peers.isEmpty()) { // with a longer chain after we thought we were done.
startBlockChainDownloadFromPeer(peers.iterator().next()); synchronized (peers) {
if (!peers.isEmpty()) {
startBlockChainDownloadFromPeer(peers.iterator().next());
}
} }
} finally {
lock.unlock();
} }
} }
@ -715,77 +777,85 @@ public class PeerGroup extends AbstractIdleService {
} }
} }
protected synchronized void handleNewPeer(final Peer peer) { protected void handleNewPeer(final Peer peer) {
// Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point. lock.lock();
// Sets up the newly connected peer so it can do everything it needs to.
log.info("{}: New peer", peer);
// Give the peer a filter that can be used to probabilistically drop transactions that
// aren't relevant to our wallet. We may still receive some false positives, which is
// OK because it helps improve wallet privacy. Old nodes will just ignore the message.
try { try {
if (bloomFilter != null) peer.setBloomFilter(bloomFilter); // Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point.
} catch (IOException e) { } // That was quick...already disconnected // Sets up the newly connected peer so it can do everything it needs to.
// Link the peer to the memory pool so broadcast transactions have their confidence levels updated. log.info("{}: New peer", peer);
peer.setDownloadData(false); // Give the peer a filter that can be used to probabilistically drop transactions that
// TODO: The peer should calculate the fast catchup time from the added wallets here. // aren't relevant to our wallet. We may still receive some false positives, which is
for (Wallet wallet : wallets) // OK because it helps improve wallet privacy. Old nodes will just ignore the message.
peer.addWallet(wallet); try {
// Re-evaluate download peers. if (bloomFilter != null) peer.setBloomFilter(bloomFilter);
Peer newDownloadPeer = selectDownloadPeer(peers); } catch (IOException e) {
if (downloadPeer != newDownloadPeer) { } // That was quick...already disconnected
setDownloadPeer(newDownloadPeer); // Link the peer to the memory pool so broadcast transactions have their confidence levels updated.
boolean shouldDownloadChain = downloadListener != null && chain != null; peer.setDownloadData(false);
if (shouldDownloadChain) { // TODO: The peer should calculate the fast catchup time from the added wallets here.
startBlockChainDownloadFromPeer(downloadPeer); for (Wallet wallet : wallets)
peer.addWallet(wallet);
// Re-evaluate download peers.
Peer newDownloadPeer = selectDownloadPeer(peers);
if (downloadPeer != newDownloadPeer) {
setDownloadPeer(newDownloadPeer);
boolean shouldDownloadChain = downloadListener != null && chain != null;
if (shouldDownloadChain) {
startBlockChainDownloadFromPeer(downloadPeer);
}
} }
} // Make sure the peer knows how to upload transactions that are requested from us.
// Make sure the peer knows how to upload transactions that are requested from us. peer.addEventListener(getDataListener);
peer.addEventListener(getDataListener); // Now tell the peers about any transactions we have which didn't appear in the chain yet. These are not
// Now tell the peers about any transactions we have which didn't appear in the chain yet. These are not // necessarily spends we created. They may also be transactions broadcast across the network that we saw,
// necessarily spends we created. They may also be transactions broadcast across the network that we saw, // which are relevant to us, and which we therefore wish to help propagate (ie they send us coins).
// which are relevant to us, and which we therefore wish to help propagate (ie they send us coins). //
// // Note that this can cause a DoS attack against us if a malicious remote peer knows what keys we own, and
// Note that this can cause a DoS attack against us if a malicious remote peer knows what keys we own, and // then sends us fake relevant transactions. We'll attempt to relay the bad transactions, our badness score
// then sends us fake relevant transactions. We'll attempt to relay the bad transactions, our badness score // in the Satoshi client will increase and we'll get disconnected.
// in the Satoshi client will increase and we'll get disconnected. //
// // TODO: Find a way to balance the desire to propagate useful transactions against DoS attacks.
// TODO: Find a way to balance the desire to propagate useful transactions against DoS attacks. announcePendingWalletTransactions(wallets, Collections.singletonList(peer));
announcePendingWalletTransactions(wallets, Collections.singletonList(peer)); // And set up event listeners for clients. This will allow them to find out about new transactions and blocks.
// And set up event listeners for clients. This will allow them to find out about new transactions and blocks. for (PeerEventListener listener : peerEventListeners) {
for (PeerEventListener listener : peerEventListeners) { peer.addEventListener(listener);
peer.addEventListener(listener);
}
setupPingingForNewPeer(peer);
EventListenerInvoker.invoke(peerEventListeners, new EventListenerInvoker<PeerEventListener>() {
@Override
public void invoke(PeerEventListener listener) {
listener.onPeerConnected(peer, peers.size());
} }
}); setupPingingForNewPeer(peer);
final PeerGroup thisGroup = this; EventListenerInvoker.invoke(peerEventListeners, new EventListenerInvoker<PeerEventListener>() {
// TODO: Move this into the Peer object itself. @Override
peer.addEventListener(new AbstractPeerEventListener() { public void invoke(PeerEventListener listener) {
int filteredBlocksReceivedFromPeer = 0; listener.onPeerConnected(peer, peers.size());
@Override }
public Message onPreMessageReceived(Peer peer, Message m) { });
if (m instanceof FilteredBlock) { // TODO: Move this into the Peer object itself.
filteredBlocksReceivedFromPeer++; peer.addEventListener(new AbstractPeerEventListener() {
if (filteredBlocksReceivedFromPeer % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT-1) { int filteredBlocksReceivedFromPeer = 0;
try {
synchronized(thisGroup) { @Override
public Message onPreMessageReceived(Peer peer, Message m) {
if (m instanceof FilteredBlock) {
filteredBlocksReceivedFromPeer++;
if (filteredBlocksReceivedFromPeer % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT - 1) {
lock.lock();
try {
peer.sendMessage(bloomFilter); peer.sendMessage(bloomFilter);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
} }
} catch (IOException e) {
throw new RuntimeException(e);
} }
} }
return m;
} }
return m; });
} } finally {
}); lock.unlock();
}
} }
private void setupPingingForNewPeer(final Peer peer) { private void setupPingingForNewPeer(final Peer peer) {
checkState(lock.isLocked());
if (peer.getPeerVersionMessage().clientVersion < Pong.MIN_PROTOCOL_VERSION) if (peer.getPeerVersionMessage().clientVersion < Pong.MIN_PROTOCOL_VERSION)
return; return;
if (getPingIntervalMsec() <= 0) if (getPingIntervalMsec() <= 0)
@ -831,6 +901,7 @@ public class PeerGroup extends AbstractIdleService {
/** Returns true if at least one peer received an inv. */ /** Returns true if at least one peer received an inv. */
private synchronized boolean announcePendingWalletTransactions(List<Wallet> announceWallets, private synchronized boolean announcePendingWalletTransactions(List<Wallet> announceWallets,
List<Peer> announceToPeers) { List<Peer> announceToPeers) {
checkState(lock.isLocked());
// Build up an inv announcing the hashes of all pending transactions in all our wallets. // Build up an inv announcing the hashes of all pending transactions in all our wallets.
InventoryMessage inv = new InventoryMessage(params); InventoryMessage inv = new InventoryMessage(params);
for (Wallet w : announceWallets) { for (Wallet w : announceWallets) {
@ -855,25 +926,30 @@ public class PeerGroup extends AbstractIdleService {
return success; return success;
} }
private synchronized void setDownloadPeer(Peer peer) { private void setDownloadPeer(Peer peer) {
if (downloadPeer == peer) { lock.lock();
return; try {
} if (downloadPeer == peer) {
if (chain == null) { return;
// PeerGroup creator did not want us to download any data. We still track the download peer for }
// informational purposes. if (chain == null) {
// PeerGroup creator did not want us to download any data. We still track the download peer for
// informational purposes.
downloadPeer = peer;
return;
}
if (downloadPeer != null) {
log.info("Unsetting download peer: {}", downloadPeer);
downloadPeer.setDownloadData(false);
}
downloadPeer = peer; downloadPeer = peer;
return; if (downloadPeer != null) {
} log.info("Setting download peer: {}", downloadPeer);
if (downloadPeer != null) { downloadPeer.setDownloadData(true);
log.info("Unsetting download peer: {}", downloadPeer); downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilter != null);
downloadPeer.setDownloadData(false); }
} } finally {
downloadPeer = peer; lock.unlock();
if (downloadPeer != null) {
log.info("Setting download peer: {}", downloadPeer);
downloadPeer.setDownloadData(true);
downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilter != null);
} }
} }
@ -884,19 +960,23 @@ public class PeerGroup extends AbstractIdleService {
* have that it's really valid. * have that it's really valid.
*/ */
public MemoryPool getMemoryPool() { public MemoryPool getMemoryPool() {
// Locking unneeded as memoryPool is final.
return memoryPool; return memoryPool;
} }
/** /**
* Tells the PeerGroup to download only block headers before a certain time and bodies after that. See * Tells the PeerGroup to download only block headers before a certain time and bodies after that. Call this
* {@link Peer#setFastCatchupTime(long)} for further explanation. Call this before starting block chain download. * before starting block chain download.
*/ */
public synchronized void setFastCatchupTimeSecs(long secondsSinceEpoch) { public void setFastCatchupTimeSecs(long secondsSinceEpoch) {
Preconditions.checkState(chain == null || !chain.shouldVerifyTransactions(), "Fast catchup is incompatible with fully verifying"); lock.lock();
fastCatchupTimeSecs = secondsSinceEpoch; try {
if (downloadPeer != null) { Preconditions.checkState(chain == null || !chain.shouldVerifyTransactions(), "Fast catchup is incompatible with fully verifying");
downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilter != null); fastCatchupTimeSecs = secondsSinceEpoch;
if (downloadPeer != null) {
downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilter != null);
}
} finally {
lock.unlock();
} }
} }
@ -906,8 +986,13 @@ public class PeerGroup extends AbstractIdleService {
* the min of the wallets earliest key times. * the min of the wallets earliest key times.
* @return a time in seconds since the epoch * @return a time in seconds since the epoch
*/ */
public synchronized long getFastCatchupTimeSecs() { public long getFastCatchupTimeSecs() {
return fastCatchupTimeSecs; lock.lock();
try {
return fastCatchupTimeSecs;
} finally {
lock.unlock();
}
} }
protected void handlePeerDeath(final Peer peer) { protected void handlePeerDeath(final Peer peer) {
@ -921,9 +1006,12 @@ public class PeerGroup extends AbstractIdleService {
checkArgument(!peers.contains(peer)); checkArgument(!peers.contains(peer));
final Peer downloadPeer; final Peer downloadPeer;
final PeerEventListener downloadListener; final PeerEventListener downloadListener;
synchronized (this) { lock.lock();
try {
downloadPeer = this.downloadPeer; downloadPeer = this.downloadPeer;
downloadListener = this.downloadListener; downloadListener = this.downloadListener;
} finally {
lock.unlock();
} }
if (peer == downloadPeer) { if (peer == downloadPeer) {
log.info("Download peer died. Picking a new one."); log.info("Download peer died. Picking a new one.");
@ -961,7 +1049,8 @@ public class PeerGroup extends AbstractIdleService {
}); });
} }
private synchronized void startBlockChainDownloadFromPeer(Peer peer) { private void startBlockChainDownloadFromPeer(Peer peer) {
lock.lock();
try { try {
peer.addEventListener(downloadListener); peer.addEventListener(downloadListener);
setDownloadPeer(peer); setDownloadPeer(peer);
@ -969,7 +1058,8 @@ public class PeerGroup extends AbstractIdleService {
peer.startBlockChainDownload(); peer.startBlockChainDownload();
} catch (IOException e) { } catch (IOException e) {
log.error("failed to start block chain download from " + peer, e); log.error("failed to start block chain download from " + peer, e);
return; } finally {
lock.unlock();
} }
} }
@ -1006,21 +1096,31 @@ public class PeerGroup extends AbstractIdleService {
* @return * @return
*/ */
public int getMinBroadcastConnections() { public int getMinBroadcastConnections() {
if (minBroadcastConnections == 0) { lock.lock();
int max = getMaxConnections(); try {
if (max <= 1) if (minBroadcastConnections == 0) {
return max; int max = getMaxConnections();
else if (max <= 1)
return (int)Math.round(getMaxConnections() / 2.0); return max;
else
return (int) Math.round(getMaxConnections() / 2.0);
}
return minBroadcastConnections;
} finally {
lock.unlock();
} }
return minBroadcastConnections;
} }
/** /**
* See {@link com.google.bitcoin.core.PeerGroup#getMinBroadcastConnections()}. * See {@link com.google.bitcoin.core.PeerGroup#getMinBroadcastConnections()}.
*/ */
public void setMinBroadcastConnections(int value) { public void setMinBroadcastConnections(int value) {
minBroadcastConnections = value; lock.lock();
try {
minBroadcastConnections = value;
} finally {
lock.unlock();
}
} }
/** /**
@ -1076,7 +1176,8 @@ public class PeerGroup extends AbstractIdleService {
boolean done = false; boolean done = false;
log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(), log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(),
numSeenPeers); numSeenPeers);
synchronized (PeerGroup.this) { lock.lock();
try {
if (numSeenPeers >= minConnections) { if (numSeenPeers >= minConnections) {
// We've seen the min required number of peers announce the transaction. Note that we // We've seen the min required number of peers announce the transaction. Note that we
// can't wait for the current number of connected peers right now because we could have // can't wait for the current number of connected peers right now because we could have
@ -1101,6 +1202,8 @@ public class PeerGroup extends AbstractIdleService {
} }
done = true; done = true;
} }
} finally {
lock.unlock();
} }
if (done) { if (done) {
// We're done! Run this outside of the peer group lock as setting the future may immediately // We're done! Run this outside of the peer group lock as setting the future may immediately
@ -1128,7 +1231,8 @@ public class PeerGroup extends AbstractIdleService {
if (minConnections == 1) { if (minConnections == 1) {
sendComplete.addListener(new ChannelFutureListener() { sendComplete.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture _) throws Exception { public void operationComplete(ChannelFuture _) throws Exception {
synchronized (PeerGroup.this) { lock.lock();
try {
for (Wallet wallet : wallets) { for (Wallet wallet : wallets) {
try { try {
if (wallet.isPendingTransactionRelevant(pinnedTx)) { if (wallet.isPendingTransactionRelevant(pinnedTx)) {
@ -1141,6 +1245,8 @@ public class PeerGroup extends AbstractIdleService {
return; return;
} }
} }
} finally {
lock.unlock();
} }
future.set(pinnedTx); future.set(pinnedTx);
return; return;
@ -1161,8 +1267,13 @@ public class PeerGroup extends AbstractIdleService {
* times are available via {@link com.google.bitcoin.core.Peer#getLastPingTime()} but it increases load on the * times are available via {@link com.google.bitcoin.core.Peer#getLastPingTime()} but it increases load on the
* remote node. It defaults to 5000. * remote node. It defaults to 5000.
*/ */
public synchronized long getPingIntervalMsec() { public long getPingIntervalMsec() {
return pingIntervalMsec; lock.lock();
try {
return pingIntervalMsec;
} finally {
lock.unlock();
}
} }
/** /**
@ -1172,8 +1283,13 @@ public class PeerGroup extends AbstractIdleService {
* Setting the value to be <= 0 disables pinging entirely, although you can still request one yourself * Setting the value to be <= 0 disables pinging entirely, although you can still request one yourself
* using {@link com.google.bitcoin.core.Peer#ping()}. * using {@link com.google.bitcoin.core.Peer#ping()}.
*/ */
public synchronized void setPingIntervalMsec(long pingIntervalMsec) { public void setPingIntervalMsec(long pingIntervalMsec) {
this.pingIntervalMsec = pingIntervalMsec; lock.lock();
try {
this.pingIntervalMsec = pingIntervalMsec;
} finally {
lock.unlock();
}
} }
/** /**
@ -1181,7 +1297,7 @@ public class PeerGroup extends AbstractIdleService {
* If no peers are connected, returns zero. * If no peers are connected, returns zero.
*/ */
public int getMostCommonChainHeight() { public int getMostCommonChainHeight() {
// Copy the peers list so we can calculate on it without violating lock ordering: Peer < peers // Copy the peers list so we can calculate on it without violating lock ordering.
ArrayList<Peer> peers; ArrayList<Peer> peers;
synchronized (this.peers) { synchronized (this.peers) {
peers = new ArrayList<Peer>(this.peers); peers = new ArrayList<Peer>(this.peers);
@ -1307,6 +1423,11 @@ public class PeerGroup extends AbstractIdleService {
* returns. Can return null if no peer was selected. * returns. Can return null if no peer was selected.
*/ */
public Peer getDownloadPeer() { public Peer getDownloadPeer() {
return downloadPeer; lock.lock();
try {
return downloadPeer;
} finally {
lock.unlock();
}
} }
} }

View File

@ -37,27 +37,13 @@ import static com.google.common.base.Preconditions.checkArgument;
* To enable debug logging from the library, run with -Dbitcoinj.logging=true on your command line. * To enable debug logging from the library, run with -Dbitcoinj.logging=true on your command line.
*/ */
public class Utils { public class Utils {
public static final CycleDetectingLockFactory cycleDetectingLockFactory;
private static final MessageDigest digest; private static final MessageDigest digest;
static { static {
try { try {
digest = MessageDigest.getInstance("SHA-256"); digest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e); // Can't happen. throw new RuntimeException(e); // Can't happen.
} }
cycleDetectingLockFactory = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW);
}
private static final boolean detectLockCycles = true;
/** Returns a re-entrant lock that may be cycle detecting, depending on {@link Utils#detectLockCycles}. */
public static ReentrantLock lock(String name) {
if (detectLockCycles)
return cycleDetectingLockFactory.newReentrantLock(name);
else
return new ReentrantLock();
} }
/** The string that prefixes all text messages signed using Bitcoin keys. */ /** The string that prefixes all text messages signed using Bitcoin keys. */

View File

@ -20,6 +20,7 @@ import com.google.bitcoin.core.TransactionConfidence.ConfidenceType;
import com.google.bitcoin.core.WalletTransaction.Pool; import com.google.bitcoin.core.WalletTransaction.Pool;
import com.google.bitcoin.store.WalletProtobufSerializer; import com.google.bitcoin.store.WalletProtobufSerializer;
import com.google.bitcoin.utils.EventListenerInvoker; import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.bitcoin.utils.Locks;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -74,7 +75,7 @@ public class Wallet implements Serializable, BlockChainListener {
private static final Logger log = LoggerFactory.getLogger(Wallet.class); private static final Logger log = LoggerFactory.getLogger(Wallet.class);
private static final long serialVersionUID = 2L; private static final long serialVersionUID = 2L;
protected final ReentrantLock lock = Utils.lock("wallet"); protected final ReentrantLock lock = Locks.lock("wallet");
// Algorithm for movement of transactions between pools. Outbound tx = us spending coins. Inbound tx = us // Algorithm for movement of transactions between pools. Outbound tx = us spending coins. Inbound tx = us
// receiving coins. If a tx is both inbound and outbound (spend with change) it is considered outbound for the // receiving coins. If a tx is both inbound and outbound (spend with change) it is considered outbound for the
@ -1388,44 +1389,50 @@ public class Wallet implements Serializable, BlockChainListener {
* @param includeInactive If true, transactions that are on side chains (are unspendable) are included. * @param includeInactive If true, transactions that are on side chains (are unspendable) are included.
*/ */
public Set<Transaction> getTransactions(boolean includeDead, boolean includeInactive) { public Set<Transaction> getTransactions(boolean includeDead, boolean includeInactive) {
Set<Transaction> all = new HashSet<Transaction>();
lock.lock(); lock.lock();
all.addAll(unspent.values()); try {
all.addAll(spent.values()); Set<Transaction> all = new HashSet<Transaction>();
all.addAll(pending.values()); all.addAll(unspent.values());
if (includeDead) all.addAll(spent.values());
all.addAll(dead.values()); all.addAll(pending.values());
if (includeInactive) if (includeDead)
all.addAll(inactive.values()); all.addAll(dead.values());
lock.unlock(); if (includeInactive)
return all; all.addAll(inactive.values());
return all;
} finally {
lock.unlock();
}
} }
/** /**
* Returns a set of all WalletTransactions in the wallet. * Returns a set of all WalletTransactions in the wallet.
*/ */
public Iterable<WalletTransaction> getWalletTransactions() { public Iterable<WalletTransaction> getWalletTransactions() {
HashSet<Transaction> pendingInactive = new HashSet<Transaction>();
lock.lock(); lock.lock();
pendingInactive.addAll(pending.values()); try {
pendingInactive.retainAll(inactive.values()); HashSet<Transaction> pendingInactive = new HashSet<Transaction>();
HashSet<Transaction> onlyPending = new HashSet<Transaction>(); pendingInactive.addAll(pending.values());
HashSet<Transaction> onlyInactive = new HashSet<Transaction>(); pendingInactive.retainAll(inactive.values());
onlyPending.addAll(pending.values()); HashSet<Transaction> onlyPending = new HashSet<Transaction>();
onlyPending.removeAll(pendingInactive); HashSet<Transaction> onlyInactive = new HashSet<Transaction>();
onlyInactive.addAll(inactive.values()); onlyPending.addAll(pending.values());
onlyInactive.removeAll(pendingInactive); onlyPending.removeAll(pendingInactive);
onlyInactive.addAll(inactive.values());
onlyInactive.removeAll(pendingInactive);
Set<WalletTransaction> all = new HashSet<WalletTransaction>(); Set<WalletTransaction> all = new HashSet<WalletTransaction>();
addWalletTransactionsToSet(all, Pool.UNSPENT, unspent.values()); addWalletTransactionsToSet(all, Pool.UNSPENT, unspent.values());
addWalletTransactionsToSet(all, Pool.SPENT, spent.values()); addWalletTransactionsToSet(all, Pool.SPENT, spent.values());
addWalletTransactionsToSet(all, Pool.DEAD, dead.values()); addWalletTransactionsToSet(all, Pool.DEAD, dead.values());
addWalletTransactionsToSet(all, Pool.PENDING, onlyPending); addWalletTransactionsToSet(all, Pool.PENDING, onlyPending);
addWalletTransactionsToSet(all, Pool.INACTIVE, onlyInactive); addWalletTransactionsToSet(all, Pool.INACTIVE, onlyInactive);
addWalletTransactionsToSet(all, Pool.PENDING_INACTIVE, pendingInactive); addWalletTransactionsToSet(all, Pool.PENDING_INACTIVE, pendingInactive);
lock.unlock(); return all;
return all; } finally {
lock.unlock();
}
} }
private static void addWalletTransactionsToSet(Set<WalletTransaction> txs, private static void addWalletTransactionsToSet(Set<WalletTransaction> txs,
@ -1452,7 +1459,7 @@ public class Wallet implements Serializable, BlockChainListener {
/** /**
* Adds the given transaction to the given pools and registers a confidence change listener on it. * Adds the given transaction to the given pools and registers a confidence change listener on it.
*/ */
private synchronized void addWalletTransaction(Pool pool, Transaction tx) { private void addWalletTransaction(Pool pool, Transaction tx) {
checkState(lock.isLocked()); checkState(lock.isLocked());
switch (pool) { switch (pool) {
case UNSPENT: case UNSPENT:

View File

@ -17,6 +17,7 @@
package com.google.bitcoin.store; package com.google.bitcoin.store;
import com.google.bitcoin.core.*; import com.google.bitcoin.core.*;
import com.google.bitcoin.utils.Locks;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -54,7 +55,7 @@ public class SPVBlockStore implements BlockStore {
protected int numHeaders; protected int numHeaders;
protected NetworkParameters params; protected NetworkParameters params;
protected ReentrantLock lock = Utils.lock("SPVBlockStore"); protected ReentrantLock lock = Locks.lock("SPVBlockStore");
// The entire ring-buffer is mmapped and accessing it should be as fast as accessing regular memory once it's // The entire ring-buffer is mmapped and accessing it should be as fast as accessing regular memory once it's
// faulted in. Unfortunately, in theory practice and theory are the same. In practice they aren't. // faulted in. Unfortunately, in theory practice and theory are the same. In practice they aren't.

View File

@ -0,0 +1,55 @@
/**
* Copyright 2013 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.utils;
import com.google.common.util.concurrent.CycleDetectingLockFactory;
import java.util.concurrent.locks.ReentrantLock;
/**
* A wrapper around explicit lock creation that lets you control whether bitcoinj performs cycle detection or not.
*/
public class Locks {
static {
// Default policy goes here. If you want to change this, use one of the static methods before
// instantiating any bitcoinj objects. The policy change will take effect only on new objects
// from that point onwards.
warnOnLockCycles();
}
public static CycleDetectingLockFactory factory = null;
public static ReentrantLock lock(String name) {
if (factory != null)
return factory.newReentrantLock(name);
else
return new ReentrantLock();
}
public static void warnOnLockCycles() {
factory = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.WARN);
}
public static void throwOnLockCycles() {
factory = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW);
}
public static void ignoreLockCycles() {
factory = null;
}
}

View File

@ -21,6 +21,7 @@ import com.google.bitcoin.core.WalletTransaction.Pool;
import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.MemoryBlockStore; import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.BriefLogFormatter; import com.google.bitcoin.utils.BriefLogFormatter;
import com.google.bitcoin.utils.Locks;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CycleDetectingLockFactory; import com.google.common.util.concurrent.CycleDetectingLockFactory;
import org.junit.Before; import org.junit.Before;
@ -917,7 +918,9 @@ public class WalletTest {
@Test @Test
public void lockCycles() { public void lockCycles() {
final ReentrantLock lock = Utils.cycleDetectingLockFactory.newReentrantLock("test"); Locks.throwOnLockCycles();
final ReentrantLock lock = Locks.lock("test");
wallet = new Wallet(params);
lock.lock(); lock.lock();
int foo = wallet.getKeychainSize(); int foo = wallet.getKeychainSize();
lock.unlock(); lock.unlock();