diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index c1642a4c..442bbeef 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -21,6 +21,9 @@ import com.google.bitcoin.store.BlockStoreException; import com.google.bitcoin.utils.EventListenerInvoker; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.jboss.netty.channel.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +82,10 @@ public class Peer { // simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us. private HashSet pendingBlockDownloads = new HashSet(); + // Outstanding pings against this peer and how long the last one took to complete. Locked under the Peer lock. + private List pendingPings; + private long lastPingTime; + private Channel channel; private VersionMessage peerVersionMessage; private boolean isAcked; @@ -97,6 +104,8 @@ public class Peer { this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); this.isAcked = false; this.handler = new PeerHandler(); + this.pendingPings = Lists.newLinkedList(); + this.lastPingTime = Long.MAX_VALUE; } /** @@ -235,7 +244,7 @@ public class Peer { if (((Ping) m).hasNonce()) sendMessage(new Pong(((Ping) m).getNonce())); } else if (m instanceof Pong) { - // We don't do anything with pongs right now, leave that to eventListeners + processPong((Pong)m); } else { // TODO: Handle the other messages we can receive. log.warn("Received unhandled message: {}", m); @@ -729,6 +738,70 @@ public class Peer { } } + private class PendingPing { + // The future that will be invoked when the pong is heard back. + public SettableFuture future; + // The random nonce that lets us tell apart overlapping pings/pongs. + public long nonce; + // Measurement of the time elapsed. + public long startTimeMsec; + + public PendingPing() { + future = SettableFuture.create(); + nonce = (long) Math.random() * Long.MAX_VALUE; + startTimeMsec = Utils.now().getTime(); + } + + public void complete() { + Preconditions.checkNotNull(future, "Already completed"); + Long elapsed = Long.valueOf(Utils.now().getTime() - startTimeMsec); + synchronized (Peer.this) { + Peer.this.lastPingTime = elapsed.longValue(); + } + log.debug("{}: ping time is {} msec", Peer.this.toString(), elapsed); + future.set(elapsed); + future = null; + } + } + + /** + * Sends the peer a ping message and returns a future that will be invoked when the pong is received back. + * The future provides a number which is the number of milliseconds elapsed between the ping and the pong. + * @throws ProtocolException if the peer version is too low to support measurable pings. + */ + public ListenableFuture ping() throws IOException, ProtocolException { + int peerVersion = getPeerVersionMessage().clientVersion; + if (peerVersion < 60000) + throw new ProtocolException("Peer version is too low for measurable pings: " + peerVersion); + PendingPing pendingPing = new PendingPing(); + pendingPings.add(pendingPing); + sendMessage(new Ping(pendingPing.nonce)); + return pendingPing.future; + } + + /** + * Returns the elapsed time of the last ping/pong cycle. If {@link com.google.bitcoin.core.Peer#ping()} has never + * been called or we did not hear back the "pong" message yet, returns {@link Long#MAX_VALUE}. + */ + public long getLastPingTime() { + return lastPingTime; + } + + private void processPong(Pong m) { + ListIterator it = pendingPings.listIterator(); + PendingPing ping = null; + while (it.hasNext()) { + ping = it.next(); + if (m.getNonce() == ping.nonce) { + it.remove(); + break; + } + } + // This line may trigger an event listener being run on the same thread, if one is attached to the + // pending ping future. That event listener may in turn re-run ping, so we need to do it last. + if (ping != null) ping.complete(); + } + /** * Returns the difference between our best chain height and the peers, which can either be positive if we are * behind the peer, or negative if the peer is ahead of us. diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index ceaa72f3..38cf9a53 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -17,6 +17,7 @@ package com.google.bitcoin.core; import com.google.bitcoin.core.Peer.PeerHandler; +import com.google.common.util.concurrent.ListenableFuture; import org.easymock.Capture; import org.jboss.netty.channel.*; import org.junit.Before; @@ -431,6 +432,25 @@ public class PeerTest extends TestWithNetworkConnections { // All done. inbound(peer, b3); } + + @Test + public void pingPong() throws Exception { + control.replay(); + connect(); + Utils.rollMockClock(0); + // No ping pong happened yet. + assertEquals(Long.MAX_VALUE, peer.getLastPingTime()); + ListenableFuture future = peer.ping(); + Ping pingMsg = (Ping) outbound(); + assertEquals(Long.MAX_VALUE, peer.getLastPingTime()); + assertFalse(future.isDone()); + Utils.rollMockClock(5); + inbound(peer, new Pong(pingMsg.getNonce())); + assertTrue(future.isDone()); + long elapsed = future.get(); + assertTrue("" + elapsed, elapsed > 1000); + assertEquals(elapsed, peer.getLastPingTime()); + } private Message outbound() { Message message = (Message)event.getValue().getMessage(); diff --git a/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java b/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java index d73cf039..66f77423 100644 --- a/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java +++ b/examples/src/main/java/com/google/bitcoin/examples/PeerMonitor.java @@ -16,17 +16,18 @@ package com.google.bitcoin.examples; -import com.google.bitcoin.core.AbstractPeerEventListener; -import com.google.bitcoin.core.NetworkParameters; -import com.google.bitcoin.core.Peer; -import com.google.bitcoin.core.PeerGroup; +import com.google.bitcoin.core.*; import com.google.bitcoin.discovery.DnsDiscovery; import com.google.bitcoin.utils.BriefLogFormatter; +import com.google.common.util.concurrent.MoreExecutors; import javax.swing.*; import javax.swing.table.AbstractTableModel; import java.awt.*; +import java.io.IOException; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Shows connected peers in a table view, so you can watch as they come and go. @@ -35,6 +36,7 @@ public class PeerMonitor { private NetworkParameters params; private PeerGroup peerGroup; private PeerTableModel peerTableModel; + private ScheduledThreadPoolExecutor pingService; public static void main(String[] args) throws Exception { BriefLogFormatter.init(); @@ -51,10 +53,17 @@ public class PeerMonitor { peerGroup = new PeerGroup(params, null /* no chain */); peerGroup.setUserAgent("PeerMonitor", "1.0"); peerGroup.addPeerDiscovery(new DnsDiscovery(params)); + pingService = new ScheduledThreadPoolExecutor(1); peerGroup.addEventListener(new AbstractPeerEventListener() { @Override - public void onPeerConnected(Peer peer, int peerCount) { + public void onPeerConnected(final Peer peer, int peerCount) { refreshUI(); + // Ping the peer with a 1 second delay between pings. + pingService.scheduleWithFixedDelay(new Runnable() { + public void run() { + pingPeer(peer); + } + }, 0, 1, TimeUnit.SECONDS); } @Override @@ -65,6 +74,23 @@ public class PeerMonitor { peerGroup.start(); } + private void pingPeer(final Peer peer) { + try { + // Annoyingly, java.awt.EventQueue is not an executor, so we can't + // dispatch the listener directly to the right thread. + peer.ping().addListener(new Runnable() { + public void run() { + // When we get the pong message back, refresh the table. + refreshUI(); + } + }, MoreExecutors.sameThreadExecutor()); + } catch (IOException e) { + e.printStackTrace(); + } catch (ProtocolException e) { + // Peer is too old to support pinging, so just ignore this here. + } + } + private void refreshUI() { // Tell the Swing UI thread to redraw the peers table. SwingUtilities.invokeLater(new Runnable() { @@ -93,6 +119,7 @@ public class PeerMonitor { private final int PROTOCOL_VERSION = 1; private final int USER_AGENT = 2; private final int CHAIN_HEIGHT = 3; + private final int PING_TIME = 4; public int getRowCount() { return peerGroup.numConnectedPeers(); @@ -105,12 +132,13 @@ public class PeerMonitor { case PROTOCOL_VERSION: return "Protocol version"; case USER_AGENT: return "User Agent"; case CHAIN_HEIGHT: return "Chain height"; + case PING_TIME: return "Ping time"; default: throw new RuntimeException(); } } public int getColumnCount() { - return 4; + return 5; } public Object getValueAt(int row, int col) { @@ -125,6 +153,12 @@ public class PeerMonitor { return peer.getPeerVersionMessage().subVer; case CHAIN_HEIGHT: return peer.getBestHeight(); + case PING_TIME: + long time = peer.getLastPingTime(); + if (time == Long.MAX_VALUE) + return ""; + else + return String.format("%d ms", time); default: throw new RuntimeException(); }