3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-12 02:05:53 +00:00

PeerGroup: Reimplement pinging to use the executor. New code is simpler and more correct.

This commit is contained in:
Mike Hearn 2014-11-13 23:31:36 +01:00
parent e7c00df740
commit 5a8ed59029
2 changed files with 23 additions and 47 deletions

View File

@ -824,7 +824,6 @@ public class PeerGroup implements TransactionBroadcaster {
@Override @Override
public void run() { public void run() {
log.info("Starting ..."); log.info("Starting ...");
vPingTimer = new Timer("Peer pinging thread", true);
if (torClient != null) { if (torClient != null) {
log.info("Starting Tor/Orchid ..."); log.info("Starting Tor/Orchid ...");
torClient.start(); torClient.start();
@ -838,6 +837,7 @@ public class PeerGroup implements TransactionBroadcaster {
channels.startAsync(); channels.startAsync();
channels.awaitRunning(); channels.awaitRunning();
triggerConnections(); triggerConnections();
setupPinging();
} }
}); });
} }
@ -860,7 +860,6 @@ public class PeerGroup implements TransactionBroadcaster {
@Override @Override
public void run() { public void run() {
log.info("Stopping ..."); log.info("Stopping ...");
vPingTimer.cancel();
// Blocking close of all sockets. // Blocking close of all sockets.
channels.stopAsync(); channels.stopAsync();
channels.awaitTerminated(); channels.awaitTerminated();
@ -1242,7 +1241,6 @@ public class PeerGroup implements TransactionBroadcaster {
for (ListenerRegistration<PeerEventListener> registration : peerEventListeners) { for (ListenerRegistration<PeerEventListener> registration : peerEventListeners) {
peer.addEventListenerWithoutOnDisconnect(registration.listener, registration.executor); peer.addEventListenerWithoutOnDisconnect(registration.listener, registration.executor);
} }
setupPingingForNewPeer(peer);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -1258,57 +1256,31 @@ public class PeerGroup implements TransactionBroadcaster {
} }
} }
private void setupPingingForNewPeer(final Peer peer) { @Nullable private volatile ListenableScheduledFuture<?> vPingTask;
checkState(lock.isHeldByCurrentThread());
if (peer.getPeerVersionMessage().clientVersion < Pong.MIN_PROTOCOL_VERSION) @SuppressWarnings("NonAtomicOperationOnVolatileField")
return; private void setupPinging() {
if (getPingIntervalMsec() <= 0) if (getPingIntervalMsec() <= 0)
return; // Disabled. return; // Disabled.
// Start the process of pinging the peer. Do a ping right now and then ensure there's a fixed delay between
// each ping. If the peer is taken out of the peers list then the cycle will stop. vPingTask = executor.scheduleAtFixedRate(new Runnable() {
//
// TODO: This should really be done by a timer integrated with the network thread to avoid races.
final Runnable[] pingRunnable = new Runnable[1];
pingRunnable[0] = new Runnable() {
private boolean firstRun = true;
@Override @Override
public void run() { public void run() {
// Ensure that the first ping happens immediately and later pings after the requested delay. if (getPingIntervalMsec() <= 0) {
if (firstRun) { ListenableScheduledFuture<?> task = vPingTask;
firstRun = false; if (task != null) {
try { task.cancel(false);
peer.ping().addListener(this, Threading.SAME_THREAD); vPingTask = null;
} catch (Exception e) {
log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString());
return;
} }
return;
}
final long interval = getPingIntervalMsec();
if (interval <= 0)
return; // Disabled. return; // Disabled.
final TimerTask task = new TimerTask() { }
@Override for (Peer peer : getConnectedPeers()) {
public void run() { if (peer.getPeerVersionMessage().clientVersion < Pong.MIN_PROTOCOL_VERSION)
try { continue;
if (!peers.contains(peer) || !PeerGroup.this.isRunning()) peer.ping();
return; // Peer was removed/shut down.
peer.ping().addListener(pingRunnable[0], Threading.SAME_THREAD);
} catch (Exception e) {
log.warn("{}: Exception whilst trying to ping peer: {}", peer, e.toString());
}
}
};
try {
vPingTimer.schedule(task, interval);
} catch (IllegalStateException ignored) {
// This can happen if there's a shutdown race and this runnable is executing whilst the timer is
// simultaneously cancelled.
} }
} }
}; }, getPingIntervalMsec(), getPingIntervalMsec(), TimeUnit.MILLISECONDS);
pingRunnable[0].run();
} }
private void setDownloadPeer(@Nullable Peer peer) { private void setDownloadPeer(@Nullable Peer peer) {
@ -1646,6 +1618,10 @@ public class PeerGroup implements TransactionBroadcaster {
lock.lock(); lock.lock();
try { try {
this.pingIntervalMsec = pingIntervalMsec; this.pingIntervalMsec = pingIntervalMsec;
ListenableScheduledFuture<?> task = vPingTask;
if (task != null)
task.cancel(false);
setupPinging();
} finally { } finally {
lock.unlock(); lock.unlock();
} }

View File

@ -454,7 +454,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
versionMessage.clientVersion = FilteredBlock.MIN_PROTOCOL_VERSION; versionMessage.clientVersion = FilteredBlock.MIN_PROTOCOL_VERSION;
versionMessage.localServices = VersionMessage.NODE_NETWORK; versionMessage.localServices = VersionMessage.NODE_NETWORK;
InboundMessageQueuer p1 = connectPeer(1, versionMessage); InboundMessageQueuer p1 = connectPeer(1, versionMessage);
Ping ping = (Ping) outbound(p1); Ping ping = (Ping) waitForOutbound(p1);
inbound(p1, new Pong(ping.getNonce())); inbound(p1, new Pong(ping.getNonce()));
pingAndWait(p1); pingAndWait(p1);
assertTrue(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE); assertTrue(peerGroup.getConnectedPeers().get(0).getLastPingTime() < Long.MAX_VALUE);