3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-30 23:02:15 +00:00

Add a ping() method to the Peer class. It returns a future that can be used to find out when the pong returns and how slow it was. Temporarily, use this in the PeerMonitor example. Next up: moving pinging into the PeerGroup.

This commit is contained in:
Mike Hearn 2012-11-02 16:19:13 +01:00
parent 547bc29864
commit fc573d5f1c
3 changed files with 134 additions and 7 deletions

View File

@ -21,6 +21,9 @@ import com.google.bitcoin.store.BlockStoreException;
import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.jboss.netty.channel.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -79,6 +82,10 @@ public class Peer {
// simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us.
private HashSet<Sha256Hash> pendingBlockDownloads = new HashSet<Sha256Hash>();
// Outstanding pings against this peer and how long the last one took to complete. Locked under the Peer lock.
private List<PendingPing> pendingPings;
private long lastPingTime;
private Channel channel;
private VersionMessage peerVersionMessage;
private boolean isAcked;
@ -97,6 +104,8 @@ public class Peer {
this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
this.isAcked = false;
this.handler = new PeerHandler();
this.pendingPings = Lists.newLinkedList();
this.lastPingTime = Long.MAX_VALUE;
}
/**
@ -235,7 +244,7 @@ public class Peer {
if (((Ping) m).hasNonce())
sendMessage(new Pong(((Ping) m).getNonce()));
} else if (m instanceof Pong) {
// We don't do anything with pongs right now, leave that to eventListeners
processPong((Pong)m);
} else {
// TODO: Handle the other messages we can receive.
log.warn("Received unhandled message: {}", m);
@ -729,6 +738,70 @@ public class Peer {
}
}
private class PendingPing {
// The future that will be invoked when the pong is heard back.
public SettableFuture<Long> future;
// The random nonce that lets us tell apart overlapping pings/pongs.
public long nonce;
// Measurement of the time elapsed.
public long startTimeMsec;
public PendingPing() {
future = SettableFuture.create();
nonce = (long) Math.random() * Long.MAX_VALUE;
startTimeMsec = Utils.now().getTime();
}
public void complete() {
Preconditions.checkNotNull(future, "Already completed");
Long elapsed = Long.valueOf(Utils.now().getTime() - startTimeMsec);
synchronized (Peer.this) {
Peer.this.lastPingTime = elapsed.longValue();
}
log.debug("{}: ping time is {} msec", Peer.this.toString(), elapsed);
future.set(elapsed);
future = null;
}
}
/**
* Sends the peer a ping message and returns a future that will be invoked when the pong is received back.
* The future provides a number which is the number of milliseconds elapsed between the ping and the pong.
* @throws ProtocolException if the peer version is too low to support measurable pings.
*/
public ListenableFuture<Long> ping() throws IOException, ProtocolException {
int peerVersion = getPeerVersionMessage().clientVersion;
if (peerVersion < 60000)
throw new ProtocolException("Peer version is too low for measurable pings: " + peerVersion);
PendingPing pendingPing = new PendingPing();
pendingPings.add(pendingPing);
sendMessage(new Ping(pendingPing.nonce));
return pendingPing.future;
}
/**
* Returns the elapsed time of the last ping/pong cycle. If {@link com.google.bitcoin.core.Peer#ping()} has never
* been called or we did not hear back the "pong" message yet, returns {@link Long#MAX_VALUE}.
*/
public long getLastPingTime() {
return lastPingTime;
}
private void processPong(Pong m) {
ListIterator<PendingPing> it = pendingPings.listIterator();
PendingPing ping = null;
while (it.hasNext()) {
ping = it.next();
if (m.getNonce() == ping.nonce) {
it.remove();
break;
}
}
// This line may trigger an event listener being run on the same thread, if one is attached to the
// pending ping future. That event listener may in turn re-run ping, so we need to do it last.
if (ping != null) ping.complete();
}
/**
* Returns the difference between our best chain height and the peers, which can either be positive if we are
* behind the peer, or negative if the peer is ahead of us.

View File

@ -17,6 +17,7 @@
package com.google.bitcoin.core;
import com.google.bitcoin.core.Peer.PeerHandler;
import com.google.common.util.concurrent.ListenableFuture;
import org.easymock.Capture;
import org.jboss.netty.channel.*;
import org.junit.Before;
@ -432,6 +433,25 @@ public class PeerTest extends TestWithNetworkConnections {
inbound(peer, b3);
}
@Test
public void pingPong() throws Exception {
control.replay();
connect();
Utils.rollMockClock(0);
// No ping pong happened yet.
assertEquals(Long.MAX_VALUE, peer.getLastPingTime());
ListenableFuture<Long> future = peer.ping();
Ping pingMsg = (Ping) outbound();
assertEquals(Long.MAX_VALUE, peer.getLastPingTime());
assertFalse(future.isDone());
Utils.rollMockClock(5);
inbound(peer, new Pong(pingMsg.getNonce()));
assertTrue(future.isDone());
long elapsed = future.get();
assertTrue("" + elapsed, elapsed > 1000);
assertEquals(elapsed, peer.getLastPingTime());
}
private Message outbound() {
Message message = (Message)event.getValue().getMessage();
event.reset();

View File

@ -16,17 +16,18 @@
package com.google.bitcoin.examples;
import com.google.bitcoin.core.AbstractPeerEventListener;
import com.google.bitcoin.core.NetworkParameters;
import com.google.bitcoin.core.Peer;
import com.google.bitcoin.core.PeerGroup;
import com.google.bitcoin.core.*;
import com.google.bitcoin.discovery.DnsDiscovery;
import com.google.bitcoin.utils.BriefLogFormatter;
import com.google.common.util.concurrent.MoreExecutors;
import javax.swing.*;
import javax.swing.table.AbstractTableModel;
import java.awt.*;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Shows connected peers in a table view, so you can watch as they come and go.
@ -35,6 +36,7 @@ public class PeerMonitor {
private NetworkParameters params;
private PeerGroup peerGroup;
private PeerTableModel peerTableModel;
private ScheduledThreadPoolExecutor pingService;
public static void main(String[] args) throws Exception {
BriefLogFormatter.init();
@ -51,10 +53,17 @@ public class PeerMonitor {
peerGroup = new PeerGroup(params, null /* no chain */);
peerGroup.setUserAgent("PeerMonitor", "1.0");
peerGroup.addPeerDiscovery(new DnsDiscovery(params));
pingService = new ScheduledThreadPoolExecutor(1);
peerGroup.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
public void onPeerConnected(final Peer peer, int peerCount) {
refreshUI();
// Ping the peer with a 1 second delay between pings.
pingService.scheduleWithFixedDelay(new Runnable() {
public void run() {
pingPeer(peer);
}
}, 0, 1, TimeUnit.SECONDS);
}
@Override
@ -65,6 +74,23 @@ public class PeerMonitor {
peerGroup.start();
}
private void pingPeer(final Peer peer) {
try {
// Annoyingly, java.awt.EventQueue is not an executor, so we can't
// dispatch the listener directly to the right thread.
peer.ping().addListener(new Runnable() {
public void run() {
// When we get the pong message back, refresh the table.
refreshUI();
}
}, MoreExecutors.sameThreadExecutor());
} catch (IOException e) {
e.printStackTrace();
} catch (ProtocolException e) {
// Peer is too old to support pinging, so just ignore this here.
}
}
private void refreshUI() {
// Tell the Swing UI thread to redraw the peers table.
SwingUtilities.invokeLater(new Runnable() {
@ -93,6 +119,7 @@ public class PeerMonitor {
private final int PROTOCOL_VERSION = 1;
private final int USER_AGENT = 2;
private final int CHAIN_HEIGHT = 3;
private final int PING_TIME = 4;
public int getRowCount() {
return peerGroup.numConnectedPeers();
@ -105,12 +132,13 @@ public class PeerMonitor {
case PROTOCOL_VERSION: return "Protocol version";
case USER_AGENT: return "User Agent";
case CHAIN_HEIGHT: return "Chain height";
case PING_TIME: return "Ping time";
default: throw new RuntimeException();
}
}
public int getColumnCount() {
return 4;
return 5;
}
public Object getValueAt(int row, int col) {
@ -125,6 +153,12 @@ public class PeerMonitor {
return peer.getPeerVersionMessage().subVer;
case CHAIN_HEIGHT:
return peer.getBestHeight();
case PING_TIME:
long time = peer.getLastPingTime();
if (time == Long.MAX_VALUE)
return "";
else
return String.format("%d ms", time);
default: throw new RuntimeException();
}