3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 06:44:16 +00:00

Make PeerGroup remember discovery sources and retry them after a while.

This commit is contained in:
Mike Hearn 2011-09-05 17:09:30 +00:00
parent 6a049b633e
commit eae1130a31
2 changed files with 81 additions and 34 deletions

View File

@ -25,13 +25,12 @@ import com.google.bitcoin.store.BlockStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.annotation.XmlElementRef;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@ -61,7 +60,7 @@ public class PeerGroup {
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
private static final int CONNECTION_DELAY_MILLIS = 5 * 1000;
public static final int DEFAULT_CONNECTION_DELAY_MILLIS = 5 * 1000;
private static final int CORE_THREADS = 1;
private static final int THREAD_KEEP_ALIVE_SECONDS = 1;
@ -79,28 +78,37 @@ public class PeerGroup {
private Peer downloadPeer;
// Callback for events related to chain download
private PeerEventListener downloadListener;
// Callbacks for events related to peer connection/disconnection
private Set<PeerEventListener> peerEventListeners;
// Peer discovery sources, will be polled occasionally if there aren't enough inactives.
private Set<PeerDiscovery> peerDiscoverers;
private NetworkParameters params;
private BlockStore blockStore;
private BlockChain chain;
private int connectionDelayMillis;
/**
* Create a PeerGroup
* Creates a PeerGroup with the given parameters and a default 5 second connection timeout.
*/
public PeerGroup(BlockStore blockStore, NetworkParameters params, BlockChain chain) {
this(blockStore, params, chain, DEFAULT_CONNECTION_DELAY_MILLIS);
}
/**
* Creates a PeerGroup with the given parameters. The connectionDelayMillis parameter controls how long the
* PeerGroup will wait between attempts to connect to nodes or read from any added peer discovery sources.
*/
public PeerGroup(BlockStore blockStore, NetworkParameters params, BlockChain chain, int connectionDelayMillis) {
this.blockStore = blockStore;
this.params = params;
this.chain = chain;
this.connectionDelayMillis = connectionDelayMillis;
inactives = new LinkedBlockingQueue<PeerAddress>();
peers = Collections.synchronizedSet(new HashSet<Peer>());
peerEventListeners = Collections.synchronizedSet(new HashSet<PeerEventListener>());
peerDiscoverers = Collections.synchronizedSet(new HashSet<PeerDiscovery>());
peerPool = new ThreadPoolExecutor(CORE_THREADS, DEFAULT_CONNECTIONS,
THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1),
@ -140,18 +148,7 @@ public class PeerGroup {
/** Add addresses from a discovery source to the list of potential peers to connect to */
public void addPeerDiscovery(PeerDiscovery peerDiscovery) {
// TODO(miron) consider remembering the discovery source and retrying occasionally
InetSocketAddress[] addresses;
try {
addresses = peerDiscovery.getPeers();
} catch (PeerDiscoveryException e) {
log.error("Failed to discover peer addresses from discovery source", e);
return;
}
for (int i = 0; i < addresses.length; i++) {
inactives.add(new PeerAddress(addresses[i]));
}
peerDiscoverers.add(peerDiscovery);
}
/** Starts the background thread that makes connections. */
@ -205,19 +202,21 @@ public class PeerGroup {
public void run() {
try {
while (running) {
tryNextPeer();
if (inactives.size() == 0) {
discoverPeers();
} else {
tryNextPeer();
}
// We started a new peer connection, delay before trying another one
Thread.sleep(CONNECTION_DELAY_MILLIS);
Thread.sleep(connectionDelayMillis);
}
} catch (InterruptedException ex) {
synchronized (this) {
running = false;
}
}
peerPool.shutdownNow();
synchronized (peers) {
for (Peer peer : peers) {
peer.disconnect();
@ -225,16 +224,31 @@ public class PeerGroup {
}
}
/*
* Try connecting to a peer. If we exceed the number of connections, delay and try
* again.
*/
private void discoverPeers() {
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
InetSocketAddress[] addresses;
try {
addresses = peerDiscovery.getPeers();
} catch (PeerDiscoveryException e) {
// Will try again later.
log.error("Failed to discover peer addresses from discovery source", e);
return;
}
for (int i = 0; i < addresses.length; i++) {
inactives.add(new PeerAddress(addresses[i]));
}
if (inactives.size() > 0) break;
}
}
/** Try connecting to a peer. If we exceed the number of connections, delay and try again. */
private void tryNextPeer() throws InterruptedException {
final PeerAddress address = inactives.take();
while (true) {
try {
final Peer peer = new Peer(params, address,
blockStore.getChainHead().getHeight(), chain);
final Peer peer = new Peer(params, address, blockStore.getChainHead().getHeight(), chain);
Runnable command = new Runnable() {
public void run() {
try {
@ -285,7 +299,7 @@ public class PeerGroup {
// If we got here, we should retry this address because an error unrelated
// to the peer has occurred.
Thread.sleep(CONNECTION_DELAY_MILLIS);
Thread.sleep(connectionDelayMillis);
}
}
}

View File

@ -16,14 +16,20 @@
package com.google.bitcoin.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.MemoryBlockStore;
import org.junit.Before;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.concurrent.Semaphore;
public class PeerGroupTest {
static final NetworkParameters params = NetworkParameters.unitTests();
@ -36,7 +42,7 @@ public class PeerGroupTest {
wallet = new Wallet(params);
blockStore = new MemoryBlockStore(params);
BlockChain chain = new BlockChain(params, wallet, blockStore);
peerGroup = new PeerGroup(blockStore, params, chain);
peerGroup = new PeerGroup(blockStore, params, chain, 1000);
}
@Test
@ -46,4 +52,31 @@ public class PeerGroupTest {
peerGroup.addEventListener(listener);
assertTrue(peerGroup.removeEventListener(listener));
}
@Test
public void peerDiscoveryPolling() throws Exception {
// Check that if peer discovery fails, we keep trying until we have some nodes to talk with.
final Semaphore sem = new Semaphore(0);
final boolean[] result = new boolean[1];
result[0] = false;
peerGroup.addPeerDiscovery(new PeerDiscovery() {
public InetSocketAddress[] getPeers() throws PeerDiscoveryException {
if (result[0] == false) {
// Pretend we are not connected to the internet.
result[0] = true;
throw new PeerDiscoveryException("test failure");
} else {
// Return a bogus address.
sem.release();
return new InetSocketAddress[]{new InetSocketAddress("localhost", 0)};
}
}
});
peerGroup.start();
sem.acquire();
// Check that we did indeed throw an exception. If we got here it means we threw and then PeerGroup tried
// again a bit later.
assertTrue(result[0]);
peerGroup.stop();
}
}