3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 06:44:16 +00:00

Clean up Peer.java a bit - remove the custom future implementation and use Guava futures instead.

This commit is contained in:
Mike Hearn 2013-01-14 22:30:32 +01:00
parent e4e4e45a47
commit 4254e276fb

View File

@ -31,7 +31,7 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* A Peer handles the high level communication with a Bitcoin node.
@ -50,10 +50,6 @@ public class Peer {
private final NetworkParameters params;
private final AbstractBlockChain blockChain;
// When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here
// whilst waiting for the response. Synchronized on itself. Is not used for downloads Peer generates itself.
// TODO: Make this work for transactions as well.
private final List<GetDataFuture<Block>> pendingGetBlockFutures;
private PeerAddress address;
private List<PeerEventListener> eventListeners;
private List<PeerLifecycleListener> lifecycleListeners;
@ -78,13 +74,21 @@ public class Peer {
// received at which point it gets set to true again. This isn't relevant unless downloadData is true.
private boolean downloadBlockBodies = true;
// Keeps track of things we requested internally with getdata but didn't receive yet, so we can avoid re-requests.
// It's not quite the same as pendingGetBlockFutures, as this is used only for getdatas done as part of downloading
// It's not quite the same as getDataFutures, as this is used only for getdatas done as part of downloading
// the chain and so is lighter weight (we just keep a bunch of hashes not futures).
//
// It is important to avoid a nasty edge case where we can end up with parallel chain downloads proceeding
// simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us.
private HashSet<Sha256Hash> pendingBlockDownloads = new HashSet<Sha256Hash>();
// When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here
// whilst waiting for the response. Synchronized on itself. Is not used for downloads Peer generates itself.
private static class GetDataRequest {
Sha256Hash hash;
SettableFuture future;
}
private final List<GetDataRequest> getDataFutures;
// Outstanding pings against this peer and how long the last one took to complete.
private CopyOnWriteArrayList<PendingPing> pendingPings;
private long[] lastPingTimes;
@ -103,7 +107,7 @@ public class Peer {
this.versionMessage = Preconditions.checkNotNull(ver);
this.blockChain = chain; // Allowed to be null.
this.downloadData = chain != null;
this.pendingGetBlockFutures = new ArrayList<GetDataFuture<Block>>();
this.getDataFutures = new CopyOnWriteArrayList<GetDataRequest>();
this.eventListeners = new CopyOnWriteArrayList<PeerEventListener>();
this.lifecycleListeners = new CopyOnWriteArrayList<PeerLifecycleListener>();
this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
@ -372,18 +376,8 @@ public class Peer {
log.debug("{}: Received broadcast block {}", address, m.getHashAsString());
try {
// Was this block requested by getBlock()?
synchronized (pendingGetBlockFutures) {
for (int i = 0; i < pendingGetBlockFutures.size(); i++) {
GetDataFuture<Block> f = pendingGetBlockFutures.get(i);
if (f.getItem().hash.equals(m.getHash())) {
// Yes, it was. So pass it through the future.
f.setResult(m);
// Blocks explicitly requested don't get sent to the block chain.
pendingGetBlockFutures.remove(i);
return;
}
}
}
if (maybeHandleRequestedData(m)) return;
if (!downloadData) {
log.warn("Received block we did not ask for: {}", m.getHashAsString());
return;
@ -423,6 +417,21 @@ public class Peer {
}
}
private boolean maybeHandleRequestedData(Block m) {
boolean found = false;
Sha256Hash hash = m.getHash();
for (ListIterator<GetDataRequest> it = getDataFutures.listIterator(); it.hasNext();) {
GetDataRequest req = it.next();
if (hash.equals(req.hash)) {
req.future.set(m);
getDataFutures.remove(req);
found = true;
// Keep going in case there are more.
}
}
return found;
}
private synchronized void invokeOnBlocksDownloaded(final Block m) {
// It is possible for the peer block height difference to be negative when blocks have been solved and broadcast
// since the time we first connected to the peer. However, it's weird and unexpected to receive a callback
@ -535,28 +544,24 @@ public class Peer {
}
/**
* Asks the connected peer for the block of the given hash, and returns a Future representing the answer.
* Asks the connected peer for the block of the given hash, and returns a future representing the answer.
* If you want the block right away and don't mind waiting for it, just call .get() on the result. Your thread
* will block until the peer answers. You can also use the Future object to wait with a timeout, or just check
* whether it's done later.
* will block until the peer answers.
*
* @param blockHash Hash of the block you wareare requesting.
* @throws IOException
*/
public Future<Block> getBlock(Sha256Hash blockHash) throws IOException {
public ListenableFuture<Block> getBlock(Sha256Hash blockHash) throws IOException {
log.info("Request to fetch block {}", blockHash);
GetDataMessage getdata = new GetDataMessage(params);
InventoryItem inventoryItem = new InventoryItem(InventoryItem.Type.Block, blockHash);
getdata.addItem(inventoryItem);
GetDataFuture<Block> future = new GetDataFuture<Block>(inventoryItem);
// Add to the list of things we're waiting for. It's important this come before the network send to avoid
// race conditions.
synchronized (pendingGetBlockFutures) {
pendingGetBlockFutures.add(future);
}
GetDataRequest req = new GetDataRequest();
req.future = SettableFuture.create();
req.hash = blockHash;
getDataFutures.add(req);
sendMessage(getdata);
return future;
return req.future;
}
/**
@ -597,58 +602,6 @@ public class Peer {
removeEventListener(wallet.getPeerEventListener());
}
// A GetDataFuture wraps the result of a getBlock or (in future) getTransaction so the owner of the object can
// decide whether to wait forever, wait for a short while or check later after doing other work.
private static class GetDataFuture<T extends Message> implements Future<T> {
private boolean cancelled;
private final InventoryItem item;
private final CountDownLatch latch;
private T result;
GetDataFuture(InventoryItem item) {
this.item = item;
this.latch = new CountDownLatch(1);
}
public boolean cancel(boolean b) {
// Cannot cancel a getdata - once sent, it's sent.
cancelled = true;
return false;
}
public boolean isCancelled() {
return cancelled;
}
public boolean isDone() {
return result != null || cancelled;
}
public T get() throws InterruptedException, ExecutionException {
latch.await();
return Preconditions.checkNotNull(result);
}
public T get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
if (!latch.await(l, timeUnit))
throw new TimeoutException();
return Preconditions.checkNotNull(result);
}
InventoryItem getItem() {
return item;
}
/** Called by the Peer when the result has arrived. Completes the task. */
void setResult(T result) {
// This should be called in the network loop thread for this peer
this.result = result;
// Now release the thread that is waiting. We don't need to synchronize here as the latch establishes
// a memory barrier.
latch.countDown();
}
}
/**
* Sends the given message on the peers Channel.
*/