3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-31 07:12:17 +00:00

Use ping messages to work around the lack of notfound messages on old nodes.

For now optimistically assume Gavin merges the notfound patch into bitcoin-qt, version wise.
This commit is contained in:
Mike Hearn 2013-01-19 19:27:55 +01:00
parent b7b52c3fc9
commit e9babb2772
2 changed files with 43 additions and 10 deletions

View File

@ -23,10 +23,7 @@ import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.*;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -93,11 +90,15 @@ public class Peer {
private static class GetDataRequest { private static class GetDataRequest {
Sha256Hash hash; Sha256Hash hash;
SettableFuture future; SettableFuture future;
// If the peer does not support the notfound message, we'll use ping/pong messages to simulate it. This is
// a nasty hack that relies on the fact that bitcoin-qt is single threaded and processes messages in order.
// The nonce field records which pong should clear this request as "not found".
long nonce;
} }
private final List<GetDataRequest> getDataFutures; private final CopyOnWriteArrayList<GetDataRequest> getDataFutures;
// Outstanding pings against this peer and how long the last one took to complete. // Outstanding pings against this peer and how long the last one took to complete.
private CopyOnWriteArrayList<PendingPing> pendingPings; private final CopyOnWriteArrayList<PendingPing> pendingPings;
private long[] lastPingTimes; private long[] lastPingTimes;
private static final int PING_MOVING_AVERAGE_WINDOW = 20; private static final int PING_MOVING_AVERAGE_WINDOW = 20;
@ -316,6 +317,7 @@ public class Peer {
if (item.hash.equals(req.hash)) { if (item.hash.equals(req.hash)) {
req.future.cancel(true); req.future.cancel(true);
getDataFutures.remove(req); getDataFutures.remove(req);
break;
} }
} }
} }
@ -489,11 +491,15 @@ public class Peer {
// Build the request for the missing dependencies. // Build the request for the missing dependencies.
List<ListenableFuture<Transaction>> futures = Lists.newArrayList(); List<ListenableFuture<Transaction>> futures = Lists.newArrayList();
GetDataMessage getdata = new GetDataMessage(params); GetDataMessage getdata = new GetDataMessage(params);
final long nonce = (long)(Math.random()*Long.MAX_VALUE);
for (Sha256Hash hash : needToRequest) { for (Sha256Hash hash : needToRequest) {
getdata.addTransaction(hash); getdata.addTransaction(hash);
GetDataRequest req = new GetDataRequest(); GetDataRequest req = new GetDataRequest();
req.hash = hash; req.hash = hash;
req.future = SettableFuture.create(); req.future = SettableFuture.create();
if (!isNotFoundMessageSupported()) {
req.nonce = nonce;
}
futures.add(req.future); futures.add(req.future);
getDataFutures.add(req); getDataFutures.add(req);
} }
@ -538,7 +544,25 @@ public class Peer {
}); });
// Start the operation. // Start the operation.
sendMessage(getdata); sendMessage(getdata);
} catch (IOException e) { if (!isNotFoundMessageSupported()) {
// If the peer isn't new enough to support the notfound message, we use a nasty hack instead and
// assume if we send a ping message after the getdata message, it'll be processed after all answers
// from getdata are done, so we can watch for the pong message as a substitute.
ping(nonce).addListener(new Runnable() {
public void run() {
// The pong came back so clear out any transactions we requested but didn't get.
for (ListIterator<GetDataRequest> it = getDataFutures.listIterator(); it.hasNext();) {
GetDataRequest req = it.next();
if (req.nonce == nonce) {
req.future.cancel(true);
getDataFutures.remove(req);
break;
}
}
}
}, MoreExecutors.sameThreadExecutor());
}
} catch (Exception e) {
log.error("Couldn't send getdata in downloadDependencies({})", tx.getHash()); log.error("Couldn't send getdata in downloadDependencies({})", tx.getHash());
resultFuture.setException(e); resultFuture.setException(e);
return resultFuture; return resultFuture;
@ -973,9 +997,9 @@ public class Peer {
// Measurement of the time elapsed. // Measurement of the time elapsed.
public long startTimeMsec; public long startTimeMsec;
public PendingPing() { public PendingPing(long nonce) {
future = SettableFuture.create(); future = SettableFuture.create();
nonce = (long) Math.random() * Long.MAX_VALUE; this.nonce = nonce;
startTimeMsec = Utils.now().getTime(); startTimeMsec = Utils.now().getTime();
} }
@ -1011,10 +1035,14 @@ public class Peer {
* @throws ProtocolException if the peer version is too low to support measurable pings. * @throws ProtocolException if the peer version is too low to support measurable pings.
*/ */
public synchronized ListenableFuture<Long> ping() throws IOException, ProtocolException { public synchronized ListenableFuture<Long> ping() throws IOException, ProtocolException {
return ping((long) Math.random() * Long.MAX_VALUE);
}
protected synchronized ListenableFuture<Long> ping(long nonce) throws IOException, ProtocolException {
int peerVersion = getPeerVersionMessage().clientVersion; int peerVersion = getPeerVersionMessage().clientVersion;
if (peerVersion < Pong.MIN_PROTOCOL_VERSION) if (peerVersion < Pong.MIN_PROTOCOL_VERSION)
throw new ProtocolException("Peer version is too low for measurable pings: " + peerVersion); throw new ProtocolException("Peer version is too low for measurable pings: " + peerVersion);
PendingPing pendingPing = new PendingPing(); PendingPing pendingPing = new PendingPing(nonce);
pendingPings.add(pendingPing); pendingPings.add(pendingPing);
sendMessage(new Ping(pendingPing.nonce)); sendMessage(new Ping(pendingPing.nonce));
return pendingPing.future; return pendingPing.future;
@ -1073,6 +1101,10 @@ public class Peer {
return chainHeight - blockChain.getBestChainHeight(); return chainHeight - blockChain.getBestChainHeight();
} }
private boolean isNotFoundMessageSupported() {
return getPeerVersionMessage().clientVersion >= 70001;
}
/** /**
* Returns true if this peer will try and download things it is sent in "inv" messages. Normally you only need * Returns true if this peer will try and download things it is sent in "inv" messages. Normally you only need
* one peer to be downloading data. Defaults to true. * one peer to be downloading data. Defaults to true.

View File

@ -64,6 +64,7 @@ public class PeerTest extends TestWithNetworkConnections {
private void connect(PeerHandler handler, Channel channel, ChannelHandlerContext ctx) throws Exception { private void connect(PeerHandler handler, Channel channel, ChannelHandlerContext ctx) throws Exception {
handler.connectRequested(ctx, new UpstreamChannelStateEvent(channel, ChannelState.CONNECTED, socketAddress)); handler.connectRequested(ctx, new UpstreamChannelStateEvent(channel, ChannelState.CONNECTED, socketAddress));
VersionMessage peerVersion = new VersionMessage(unitTestParams, OTHER_PEER_CHAIN_HEIGHT); VersionMessage peerVersion = new VersionMessage(unitTestParams, OTHER_PEER_CHAIN_HEIGHT);
peerVersion.clientVersion = 70001;
DownstreamMessageEvent versionEvent = DownstreamMessageEvent versionEvent =
new DownstreamMessageEvent(channel, Channels.future(channel), peerVersion, null); new DownstreamMessageEvent(channel, Channels.future(channel), peerVersion, null);
handler.messageReceived(ctx, versionEvent); handler.messageReceived(ctx, versionEvent);