From 5a8ed590291a8aee96ecdd11a15c4a6ee8d991b1 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Thu, 13 Nov 2014 23:31:36 +0100 Subject: [PATCH] PeerGroup: Reimplement pinging to use the executor. New code is simpler and more correct. --- .../java/org/bitcoinj/core/PeerGroup.java | 68 ++++++------------- .../java/org/bitcoinj/core/PeerGroupTest.java | 2 +- 2 files changed, 23 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/org/bitcoinj/core/PeerGroup.java b/core/src/main/java/org/bitcoinj/core/PeerGroup.java index 4b44a546..e7538782 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerGroup.java +++ b/core/src/main/java/org/bitcoinj/core/PeerGroup.java @@ -824,7 +824,6 @@ public class PeerGroup implements TransactionBroadcaster { @Override public void run() { log.info("Starting ..."); - vPingTimer = new Timer("Peer pinging thread", true); if (torClient != null) { log.info("Starting Tor/Orchid ..."); torClient.start(); @@ -838,6 +837,7 @@ public class PeerGroup implements TransactionBroadcaster { channels.startAsync(); channels.awaitRunning(); triggerConnections(); + setupPinging(); } }); } @@ -860,7 +860,6 @@ public class PeerGroup implements TransactionBroadcaster { @Override public void run() { log.info("Stopping ..."); - vPingTimer.cancel(); // Blocking close of all sockets. channels.stopAsync(); channels.awaitTerminated(); @@ -1242,7 +1241,6 @@ public class PeerGroup implements TransactionBroadcaster { for (ListenerRegistration registration : peerEventListeners) { peer.addEventListenerWithoutOnDisconnect(registration.listener, registration.executor); } - setupPingingForNewPeer(peer); } finally { lock.unlock(); } @@ -1258,57 +1256,31 @@ public class PeerGroup implements TransactionBroadcaster { } } - private void setupPingingForNewPeer(final Peer peer) { - checkState(lock.isHeldByCurrentThread()); - if (peer.getPeerVersionMessage().clientVersion < Pong.MIN_PROTOCOL_VERSION) - return; + @Nullable private volatile ListenableScheduledFuture vPingTask; + + @SuppressWarnings("NonAtomicOperationOnVolatileField") + private void setupPinging() { if (getPingIntervalMsec() <= 0) return; // Disabled. - // Start the process of pinging the peer. Do a ping right now and then ensure there's a fixed delay between - // each ping. If the peer is taken out of the peers list then the cycle will stop. - // - // TODO: This should really be done by a timer integrated with the network thread to avoid races. - final Runnable[] pingRunnable = new Runnable[1]; - pingRunnable[0] = new Runnable() { - private boolean firstRun = true; + + vPingTask = executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { - // Ensure that the first ping happens immediately and later pings after the requested delay. - if (firstRun) { - firstRun = false; - try { - peer.ping().addListener(this, Threading.SAME_THREAD); - } catch (Exception e) { - log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString()); - return; + if (getPingIntervalMsec() <= 0) { + ListenableScheduledFuture task = vPingTask; + if (task != null) { + task.cancel(false); + vPingTask = null; } - return; - } - - final long interval = getPingIntervalMsec(); - if (interval <= 0) return; // Disabled. - final TimerTask task = new TimerTask() { - @Override - public void run() { - try { - if (!peers.contains(peer) || !PeerGroup.this.isRunning()) - return; // Peer was removed/shut down. - peer.ping().addListener(pingRunnable[0], Threading.SAME_THREAD); - } catch (Exception e) { - log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString()); - } - } - }; - try { - vPingTimer.schedule(task, interval); - } catch (IllegalStateException ignored) { - // This can happen if there's a shutdown race and this runnable is executing whilst the timer is - // simultaneously cancelled. + } + for (Peer peer : getConnectedPeers()) { + if (peer.getPeerVersionMessage().clientVersion < Pong.MIN_PROTOCOL_VERSION) + continue; + peer.ping(); } } - }; - pingRunnable[0].run(); + }, getPingIntervalMsec(), getPingIntervalMsec(), TimeUnit.MILLISECONDS); } private void setDownloadPeer(@Nullable Peer peer) { @@ -1646,6 +1618,10 @@ public class PeerGroup implements TransactionBroadcaster { lock.lock(); try { this.pingIntervalMsec = pingIntervalMsec; + ListenableScheduledFuture task = vPingTask; + if (task != null) + task.cancel(false); + setupPinging(); } finally { lock.unlock(); } diff --git a/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java b/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java index f5f2f37e..24a25bbd 100644 --- a/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java +++ b/core/src/test/java/org/bitcoinj/core/PeerGroupTest.java @@ -454,7 +454,7 @@ public class PeerGroupTest extends TestWithPeerGroup { versionMessage.clientVersion = FilteredBlock.MIN_PROTOCOL_VERSION; versionMessage.localServices = VersionMessage.NODE_NETWORK; InboundMessageQueuer p1 = connectPeer(1, versionMessage); - Ping ping = (Ping) outbound(p1); + Ping ping = (Ping) waitForOutbound(p1); inbound(p1, new Pong(ping.getNonce())); pingAndWait(p1); assertTrue(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE);