mirror of
https://github.com/Qortal/altcoinj.git
synced 2025-01-30 23:02:15 +00:00
Some improvements to the network code:
- Delete the PeerGroupThread and replace it with a connectToAnyPeer() method. Getting connected to the network is now significantly faster as there is no sleep-wait loop between connections. - Delete the connectionTimeMillis param to the PeerGroup constructors. - Expose the PeerGroup c'tor that lets you specify the ClientBootstrap and expose a utility method to make one. This means users can now customize TCP options directly. - PeerGroup.setMaxConnections() now actually creates or tears down connections to meet the new required number, if the peer group is running. - Update the PeerMonitor so there's a spinner that lets you add or remove peers. Connecting and version handshaking is very fast now. - Misc changes and fixes.
This commit is contained in:
parent
e20c35bf50
commit
f440913c1d
@ -172,19 +172,20 @@ public class Peer {
|
||||
|
||||
@Override
|
||||
public void connectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
||||
super.connectRequested(ctx, e);
|
||||
channel = e.getChannel();
|
||||
address = new PeerAddress((InetSocketAddress)e.getValue());
|
||||
channel = e.getChannel();
|
||||
super.connectRequested(ctx, e);
|
||||
}
|
||||
|
||||
/** Catch any exceptions, logging them and then closing the channel. */
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
String s = address == null ? "?" : address.toString();
|
||||
if (e.getCause() instanceof ConnectException || e.getCause() instanceof IOException) {
|
||||
// Short message for network errors
|
||||
log.info(toString() + " - " + e.getCause().getMessage());
|
||||
log.info(s + " - " + e.getCause().getMessage());
|
||||
} else {
|
||||
log.warn(toString() + " - ", e.getCause());
|
||||
log.warn(s + " - ", e.getCause());
|
||||
}
|
||||
|
||||
e.getChannel().close();
|
||||
|
@ -36,7 +36,10 @@ import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
@ -62,20 +65,16 @@ public class PeerGroup {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
|
||||
|
||||
public static final int DEFAULT_CONNECTION_DELAY_MILLIS = 5 * 1000;
|
||||
|
||||
// Addresses to try to connect to, excluding active peers
|
||||
private BlockingQueue<PeerAddress> inactives;
|
||||
// Connection initiation thread
|
||||
private PeerGroupThread peerGroupThread;
|
||||
// True if the connection initiation thread should be running
|
||||
// True if start() has been called.
|
||||
private boolean running;
|
||||
|
||||
// TODO: Rationalize the data structures used here.
|
||||
// Currently active peers. This is a linked list rather than a set to make unit tests predictable.
|
||||
private LinkedList<Peer> peers;
|
||||
// These lists are all thread-safe so do not have to be accessed under the PeerGroup lock.
|
||||
// Addresses to try to connect to, excluding active peers.
|
||||
private List<PeerAddress> inactives;
|
||||
// Currently active peers. This is an ordered list rather than a set to make unit tests predictable.
|
||||
private List<Peer> peers;
|
||||
// Currently connecting peers
|
||||
private Set<Peer> pendingPeers;
|
||||
private List<Peer> pendingPeers;
|
||||
private Map<Peer, ChannelFuture> channelFutures;
|
||||
|
||||
// The peer we are currently downloading the chain from
|
||||
@ -95,7 +94,6 @@ public class PeerGroup {
|
||||
|
||||
private final NetworkParameters params;
|
||||
private final AbstractBlockChain chain;
|
||||
private int connectionDelayMillis;
|
||||
private long fastCatchupTimeSecs;
|
||||
private ArrayList<Wallet> wallets;
|
||||
private AbstractPeerEventListener getDataListener;
|
||||
@ -122,47 +120,46 @@ public class PeerGroup {
|
||||
Peer.PeerLifecycleListener startupListener = new PeerStartupListener();
|
||||
|
||||
/**
|
||||
* Creates a PeerGroup with the given parameters and a default 5 second connection timeout.
|
||||
*
|
||||
* @param params Network parameters
|
||||
* @param chain a BlockChain object that will receive and handle block messages.
|
||||
*/
|
||||
public PeerGroup(NetworkParameters params, AbstractBlockChain chain) {
|
||||
this(params, chain, DEFAULT_CONNECTION_DELAY_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a PeerGroup with the given parameters and a default 5 second connection timeout. No chain is
|
||||
* provided so this node will report its chain height as zero to other peers.
|
||||
* Creates a PeerGroup with the given parameters. No chain is provided so this node will report its chain height
|
||||
* as zero to other peers. This constructor is useful if you just want to explore the network but aren't interested
|
||||
* in downloading block data.
|
||||
*
|
||||
* @param params Network parameters
|
||||
*/
|
||||
public PeerGroup(NetworkParameters params) {
|
||||
this(params, null, DEFAULT_CONNECTION_DELAY_MILLIS);
|
||||
this(params, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a PeerGroup with the given parameters. The connectionDelayMillis parameter controls
|
||||
*
|
||||
* @param params bitcoin network parameters
|
||||
* @param chain the block chain maintained by the network
|
||||
* @param connectionDelayMillis how long to wait between attempts to connect to nodes or read
|
||||
* from any added peer discovery sources
|
||||
* Creates a PeerGroup for the given network and chain. Blocks will be passed to the chain as they are broadcast
|
||||
* and downloaded. This is probably the constructor you want to use.
|
||||
*/
|
||||
public PeerGroup(final NetworkParameters params, final AbstractBlockChain chain,
|
||||
int connectionDelayMillis) {
|
||||
this(params, chain, connectionDelayMillis, new ClientBootstrap(
|
||||
new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(new PeerGroupThreadFactory()),
|
||||
Executors.newCachedThreadPool(new PeerGroupThreadFactory()))));
|
||||
bootstrap.setPipelineFactory(makePipelineFactory(params, chain));
|
||||
public PeerGroup(NetworkParameters params, AbstractBlockChain chain) {
|
||||
this(params, chain, null);
|
||||
}
|
||||
|
||||
PeerGroup(final NetworkParameters params, final AbstractBlockChain chain,
|
||||
int connectionDelayMillis, ClientBootstrap bootstrap) {
|
||||
|
||||
/**
|
||||
* <p>Creates a PeerGroup for the given network and chain, using the provided Netty {@link ClientBootstrap} object.
|
||||
* </p>
|
||||
*
|
||||
* <p>A ClientBootstrap creates raw (TCP) connections to other nodes on the network. Normally you won't need to
|
||||
* provide one - use the other constructors. Providing your own bootstrap is useful if you want to control
|
||||
* details like how many network threads are used, the connection timeout value and so on. To do this, you can
|
||||
* use {@link PeerGroup.createClientBootstrap()} method and then customize the resulting object. Example:</p>
|
||||
*
|
||||
* <pre>
|
||||
* ClientBootstrap bootstrap = PeerGroup.createClientBootstrap();
|
||||
* bootstrap.setOption("oonnectionTimeoutMillis", 3000);
|
||||
* PeerGroup peerGroup = new PeerGroup(params, chain, bootstrap);
|
||||
* </pre>
|
||||
*
|
||||
* <p>The ClientBootstrap provided does not need a channel pipeline factory set. If one wasn't set, the provided
|
||||
* bootstrap will be modified to have one that sets up the pipelines correctly.</p>
|
||||
*/
|
||||
public PeerGroup(NetworkParameters params, AbstractBlockChain chain, ClientBootstrap bootstrap) {
|
||||
this.params = params;
|
||||
this.chain = chain; // Can be null.
|
||||
this.connectionDelayMillis = connectionDelayMillis;
|
||||
this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
|
||||
this.wallets = new ArrayList<Wallet>(1);
|
||||
|
||||
@ -175,12 +172,17 @@ public class PeerGroup {
|
||||
this.versionMessage = new VersionMessage(params, height);
|
||||
|
||||
memoryPool = new MemoryPool();
|
||||
this.bootstrap = bootstrap;
|
||||
|
||||
inactives = new LinkedBlockingQueue<PeerAddress>();
|
||||
peers = new LinkedList<Peer>();
|
||||
pendingPeers = new HashSet<Peer>();
|
||||
channelFutures = new HashMap<Peer, ChannelFuture>();
|
||||
// Configure Netty. The "ClientBootstrap" creates connections to other nodes. It can be configured in various
|
||||
// ways to control the network.
|
||||
this.bootstrap = bootstrap != null ? bootstrap : createClientBootstrap();
|
||||
if (this.bootstrap.getPipelineFactory() == null)
|
||||
this.bootstrap.setPipelineFactory(makePipelineFactory(params, chain));
|
||||
|
||||
inactives = Collections.synchronizedList(new ArrayList<PeerAddress>());
|
||||
peers = Collections.synchronizedList(new ArrayList<Peer>());
|
||||
pendingPeers = Collections.synchronizedList(new ArrayList<Peer>());
|
||||
channelFutures = Collections.synchronizedMap(new HashMap<Peer, ChannelFuture>());
|
||||
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
|
||||
peerEventListeners = new ArrayList<PeerEventListener>();
|
||||
// This event listener is added to every peer. It's here so when we announce transactions via an "inv", every
|
||||
@ -193,12 +195,25 @@ public class PeerGroup {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method that just sets up a normal Netty ClientBootstrap using the default options, except for a custom
|
||||
* thread factory that gives worker threads useful names and lowers their priority (to avoid competing with UI
|
||||
* threads). You don't normally need to call this - if you aren't sure what it does, just use the regular
|
||||
* constructors for {@link PeerGroup} that don't take a ClientBootstrap object.
|
||||
*/
|
||||
public static ClientBootstrap createClientBootstrap() {
|
||||
ExecutorService bossExecutor = Executors.newCachedThreadPool(new PeerGroupThreadFactory());
|
||||
ExecutorService workerExecutor = Executors.newCachedThreadPool(new PeerGroupThreadFactory());
|
||||
NioClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
|
||||
ClientBootstrap bs = new ClientBootstrap(channelFactory);
|
||||
return bs;
|
||||
}
|
||||
|
||||
// Create a Netty pipeline factory. The pipeline factory will create a network processing
|
||||
// pipeline with the bitcoin serializer ({@code TCPNetworkConnection}) downstream
|
||||
// of the higher level {@code Peer}. Received packets will first be decoded, then passed
|
||||
// {@code Peer}. Sent packets will be created by the {@code Peer}, then encoded and sent.
|
||||
private ChannelPipelineFactory makePipelineFactory(
|
||||
final NetworkParameters params, final AbstractBlockChain chain) {
|
||||
private ChannelPipelineFactory makePipelineFactory(final NetworkParameters params, final AbstractBlockChain chain) {
|
||||
return new ChannelPipelineFactory() {
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
VersionMessage ver = getVersionMessage().duplicate();
|
||||
@ -218,9 +233,35 @@ public class PeerGroup {
|
||||
};
|
||||
}
|
||||
|
||||
/** The maximum number of connections that we will create to peers. */
|
||||
public synchronized void setMaxConnections(int maxConnections) {
|
||||
/**
|
||||
* Adjusts the desired number of connections that we will create to peers. Note that if there are already peers
|
||||
* open and the new value is lower than the current number of peers, those connections will be terminated. Likewise
|
||||
* if there aren't enough current connections to meet the new requested max size, some will be added.
|
||||
*/
|
||||
public void setMaxConnections(int maxConnections) {
|
||||
int adjustment;
|
||||
synchronized (this) {
|
||||
this.maxConnections = maxConnections;
|
||||
if (!running) return;
|
||||
}
|
||||
// We may now have too many or too few open connections. Adding the sizes together here is a race condition.
|
||||
adjustment = maxConnections - (peers.size() + pendingPeers.size());
|
||||
while (adjustment > 0) {
|
||||
try {
|
||||
connectToAnyPeer();
|
||||
} catch (PeerDiscoveryException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
adjustment--;
|
||||
}
|
||||
while (adjustment < 0) {
|
||||
Channel channel;
|
||||
synchronized (peers) {
|
||||
channel = channelFutures.get(peers.remove(peers.size() - 1)).getChannel();
|
||||
}
|
||||
channel.close();
|
||||
adjustment++;
|
||||
}
|
||||
}
|
||||
|
||||
/** The maximum number of connections that we will create to peers. */
|
||||
@ -332,30 +373,27 @@ public class PeerGroup {
|
||||
* Returns a newly allocated list containing the currently connected peers. If all you care about is the count,
|
||||
* use numConnectedPeers().
|
||||
*/
|
||||
public synchronized List<Peer> getConnectedPeers() {
|
||||
ArrayList<Peer> result = new ArrayList<Peer>(peers.size());
|
||||
public List<Peer> getConnectedPeers() {
|
||||
synchronized (peers) {
|
||||
ArrayList<Peer> result = new ArrayList<Peer>(peers.size());
|
||||
result.addAll(peers);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an address to the list of potential peers to connect to. This will increment the total number of max
|
||||
* connections by one, so if all you use is addAddress, it is guaranteed to be attempted. If you're using a
|
||||
* mix of peer discovery and addAddress, there's no guarantee this address will be picked in preference to
|
||||
* those found via discovery.
|
||||
* Add an address to the list of potential peers to connect to. It won't necessarily be used unless there's a need
|
||||
* to build new connections to reach the max connection count.
|
||||
*
|
||||
* @param peerAddress IP/port to use.
|
||||
*/
|
||||
public synchronized void addAddress(PeerAddress peerAddress) {
|
||||
// TODO(miron) consider deduplication
|
||||
public void addAddress(PeerAddress peerAddress) {
|
||||
inactives.add(peerAddress);
|
||||
maxConnections++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add addresses from a discovery source to the list of potential peers to connect to
|
||||
* Add addresses from a discovery source to the list of potential peers to connect to. If max connections has not
|
||||
* been configured, or set to zero, then it's set to the default at this point.
|
||||
*/
|
||||
public synchronized void addPeerDiscovery(PeerDiscovery peerDiscovery) {
|
||||
if (getMaxConnections() == 0)
|
||||
@ -363,20 +401,62 @@ public class PeerGroup {
|
||||
peerDiscoverers.add(peerDiscovery);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the background thread that makes connections.
|
||||
*/
|
||||
public synchronized void start() {
|
||||
this.peerGroupThread = new PeerGroupThread();
|
||||
running = true;
|
||||
this.peerGroupThread.start();
|
||||
protected void discoverPeers() throws PeerDiscoveryException {
|
||||
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
|
||||
// TODO: Run all peer discovery sources in parallel.
|
||||
InetSocketAddress[] addresses;
|
||||
addresses = peerDiscovery.getPeers();
|
||||
synchronized (inactives) {
|
||||
for (int i = 0; i < addresses.length; i++) {
|
||||
inactives.add(new PeerAddress(addresses[i]));
|
||||
}
|
||||
if (inactives.size() > 0) break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Visible for testing.
|
||||
synchronized void mockStart(PeerGroupThread peerGroupThread) {
|
||||
this.peerGroupThread = peerGroupThread;
|
||||
/** Picks a peer from discovery and connects to it. If connection fails, picks another and tries again. */
|
||||
protected void connectToAnyPeer() throws PeerDiscoveryException {
|
||||
// Do not call this method whilst synchronized on the PeerGroup lock.
|
||||
final PeerAddress addr;
|
||||
if (!isRunning()) return;
|
||||
synchronized (inactives) {
|
||||
if (inactives.size() == 0) {
|
||||
discoverPeers();
|
||||
}
|
||||
if (inactives.size() == 0) {
|
||||
log.debug("Peer discovery didn't provide us any more peers, not trying to build new connection.");
|
||||
return;
|
||||
}
|
||||
addr = inactives.remove(inactives.size() - 1);
|
||||
}
|
||||
// Don't do connectTo whilst holding the PeerGroup lock because this can trigger some amazingly deep stacks
|
||||
// and potentially circular deadlock in the case of immediate failure (eg, attempt to access IPv6 node from
|
||||
// a non-v6 capable machine). It doesn't relay control immediately to the netty boss thread as you may expect.
|
||||
//
|
||||
// This method eventually constructs a Peer and puts it into pendingPeers. If the connection fails to establish,
|
||||
// handlePeerDeath will be called, which will potentially call this method again to replace the dead or failed
|
||||
// connection.
|
||||
connectTo(addr.toSocketAddress(), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the PeerGroup. This may block whilst peer discovery takes place.
|
||||
*/
|
||||
public void start() {
|
||||
synchronized (this) {
|
||||
running = true;
|
||||
}
|
||||
// Bring up the requested number of connections. If a connect attempt fails, new peers will be tried until
|
||||
// there is a success, so just calling connectToAnyPeer for the wanted number of peers is sufficient.
|
||||
for (int i = 0; i < getMaxConnections(); i++) {
|
||||
try {
|
||||
connectToAnyPeer();
|
||||
} catch (PeerDiscoveryException e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop this PeerGroup.
|
||||
@ -389,7 +469,18 @@ public class PeerGroup {
|
||||
public synchronized void stop() {
|
||||
if (running) {
|
||||
running = false;
|
||||
peerGroupThread.interrupt();
|
||||
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
|
||||
peerDiscovery.shutdown();
|
||||
}
|
||||
LinkedList<ChannelFuture> futures;
|
||||
synchronized (channelFutures) {
|
||||
// Copy the list here because the act of closing the channel modifies the channelFutures map.
|
||||
futures = new LinkedList<ChannelFuture>(channelFutures.values());
|
||||
}
|
||||
for (ChannelFuture future : futures) {
|
||||
future.getChannel().close();
|
||||
}
|
||||
bootstrap.releaseExternalResources();
|
||||
}
|
||||
}
|
||||
|
||||
@ -447,7 +538,7 @@ public class PeerGroup {
|
||||
* Returns the number of currently connected peers. To be informed when this count changes, register a
|
||||
* {@link PeerEventListener} and use the onPeerConnected/onPeerDisconnected methods.
|
||||
*/
|
||||
public synchronized int numConnectedPeers() {
|
||||
public int numConnectedPeers() {
|
||||
return peers.size();
|
||||
}
|
||||
|
||||
@ -455,96 +546,6 @@ public class PeerGroup {
|
||||
return running;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs various tasks for the peer group: connects to new nodes to keep the currently connected node count at
|
||||
* the right level, runs peer discovery if we run out, and broadcasts transactions that were submitted via
|
||||
* broadcastTransaction().
|
||||
*/
|
||||
class PeerGroupThread extends Thread {
|
||||
public PeerGroupThread() {
|
||||
super("Peer group thread");
|
||||
// Ensure we don't fight with UI threads.
|
||||
setPriority(Math.max(Thread.MIN_PRIORITY, Thread.currentThread().getPriority() - 1));
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
while (isRunning()) {
|
||||
// Modify the peer group under its lock, always.
|
||||
int numPeers;
|
||||
|
||||
synchronized (PeerGroup.this) {
|
||||
numPeers = peers.size();
|
||||
}
|
||||
|
||||
if (inactives.size() == 0) {
|
||||
discoverPeers();
|
||||
} else if (numPeers < getMaxConnections()) {
|
||||
tryNextPeer();
|
||||
}
|
||||
|
||||
// TODO: Remove this and replace it with a real observation of when work needs to be done.
|
||||
Thread.sleep(connectionDelayMillis);
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
}
|
||||
|
||||
log.info("shutdown start");
|
||||
|
||||
// We were asked to stop. Reset running flag and disconnect all peer channels asynchronously.
|
||||
// Peers could still linger until their channels close.
|
||||
synchronized (PeerGroup.this) {
|
||||
running = false;
|
||||
shutdownPeerDiscovery();
|
||||
LinkedList<ChannelFuture> futures;
|
||||
synchronized (channelFutures) {
|
||||
// Copy the list here because the act of closing the channel modifies the channelFutures map.
|
||||
futures = new LinkedList<ChannelFuture>(channelFutures.values());
|
||||
}
|
||||
for (ChannelFuture future : futures) {
|
||||
future.getChannel().close();
|
||||
}
|
||||
bootstrap.releaseExternalResources();
|
||||
}
|
||||
|
||||
log.info("shutdown done");
|
||||
}
|
||||
|
||||
private void discoverPeers() {
|
||||
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
|
||||
InetSocketAddress[] addresses;
|
||||
try {
|
||||
addresses = peerDiscovery.getPeers();
|
||||
} catch (PeerDiscoveryException e) {
|
||||
// Will try again later.
|
||||
log.error("Failed to discover peer addresses from discovery source", e);
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < addresses.length; i++) {
|
||||
inactives.add(new PeerAddress(addresses[i]));
|
||||
}
|
||||
|
||||
if (inactives.size() > 0) break;
|
||||
}
|
||||
}
|
||||
|
||||
private void shutdownPeerDiscovery() {
|
||||
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
|
||||
peerDiscovery.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try connecting to a peer.
|
||||
*/
|
||||
private void tryNextPeer() throws InterruptedException {
|
||||
PeerAddress address = inactives.take();
|
||||
connectTo(address.toSocketAddress(), false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to a peer by creating a Netty channel to the destination address.
|
||||
*
|
||||
@ -556,7 +557,7 @@ public class PeerGroup {
|
||||
return connectTo(address, true);
|
||||
}
|
||||
|
||||
// Internal version.
|
||||
// Internal version. Do not call whilst holding the PeerGroup lock.
|
||||
protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) {
|
||||
ChannelFuture future = bootstrap.connect(address);
|
||||
TCPNetworkConnection.NetworkHandler networkHandler =
|
||||
@ -566,11 +567,13 @@ public class PeerGroup {
|
||||
networkHandler.getOwnerObject().setRemoteAddress(address);
|
||||
}
|
||||
synchronized (this) {
|
||||
// TODO: This probably forces a wait for the channel to connect, should re-organize all this code.
|
||||
Peer peer = peerFromChannelFuture(future);
|
||||
channelFutures.put(peer, future);
|
||||
if (incrementMaxConnections)
|
||||
setMaxConnections(getMaxConnections() + 1);
|
||||
if (incrementMaxConnections) {
|
||||
// We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new
|
||||
// outbound connection.
|
||||
maxConnections++;
|
||||
}
|
||||
}
|
||||
return future;
|
||||
}
|
||||
@ -731,19 +734,28 @@ public class PeerGroup {
|
||||
return fastCatchupTimeSecs;
|
||||
}
|
||||
|
||||
protected synchronized void handlePeerDeath(final Peer peer) {
|
||||
protected void handlePeerDeath(final Peer peer) {
|
||||
// This can run on any Netty worker thread. Because connectToAnyPeer() must run unlocked to avoid circular
|
||||
// deadlock, this method must run largely unlocked too. Some members are thread-safe and others aren't, so
|
||||
// we synchronize only the parts that need it.
|
||||
if (!isRunning()) {
|
||||
log.info("Peer death while shutting down");
|
||||
return;
|
||||
}
|
||||
checkArgument(!peers.contains(peer));
|
||||
final Peer downloadPeer;
|
||||
final PeerEventListener downloadListener;
|
||||
synchronized (this) {
|
||||
downloadPeer = this.downloadPeer;
|
||||
downloadListener = this.downloadListener;
|
||||
}
|
||||
if (peer == downloadPeer) {
|
||||
log.info("Download peer died. Picking a new one.");
|
||||
setDownloadPeer(null);
|
||||
// Pick a new one and possibly tell it to download the chain.
|
||||
synchronized (peers) {
|
||||
if (!peers.isEmpty()) {
|
||||
Peer next = peers.peekFirst();
|
||||
Peer next = peers.get(0);
|
||||
setDownloadPeer(next);
|
||||
if (downloadListener != null) {
|
||||
startBlockChainDownloadFromPeer(next);
|
||||
@ -751,6 +763,15 @@ public class PeerGroup {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Replace this peer with a new one to keep our connection count up, if necessary.
|
||||
// The calculation is a little racy.
|
||||
if (peers.size() + pendingPeers.size() < getMaxConnections()) {
|
||||
try {
|
||||
connectToAnyPeer();
|
||||
} catch (PeerDiscoveryException e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
// TODO: Remove peerEventListeners from the Peer here.
|
||||
peer.removeEventListener(getDataListener);
|
||||
EventListenerInvoker.invoke(peerEventListeners, new EventListenerInvoker<PeerEventListener>() {
|
||||
@ -781,7 +802,7 @@ public class PeerGroup {
|
||||
* @param numPeers How many peers to wait for.
|
||||
* @return a future that will be triggered when the number of connected peers >= numPeers
|
||||
*/
|
||||
public synchronized ListenableFuture<PeerGroup> waitForPeers(final int numPeers) {
|
||||
public ListenableFuture<PeerGroup> waitForPeers(final int numPeers) {
|
||||
if (peers.size() >= numPeers) {
|
||||
return Futures.immediateFuture(this);
|
||||
}
|
||||
@ -859,7 +880,7 @@ public class PeerGroup {
|
||||
public void run() {
|
||||
// This can be called immediately if we already have enough peers. Otherwise it'll be called from a
|
||||
// peer thread.
|
||||
final Peer somePeer = peers.getFirst();
|
||||
final Peer somePeer = peers.get(0);
|
||||
log.info("broadcastTransaction: Enough peers, adding {} to the memory pool and sending to {}",
|
||||
tx.getHashAsString(), somePeer);
|
||||
final Transaction pinnedTx = memoryPool.seen(tx, somePeer.getAddress());
|
||||
@ -902,7 +923,8 @@ public class PeerGroup {
|
||||
// This will run in a peer thread.
|
||||
final int numSeenPeers = tx.getConfidence().getBroadcastBy().size();
|
||||
boolean done = false;
|
||||
log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(), numSeenPeers);
|
||||
log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(),
|
||||
numSeenPeers);
|
||||
synchronized (PeerGroup.this) {
|
||||
if (numSeenPeers >= minConnections) {
|
||||
// We've seen the min required number of peers announce the transaction. Note that we
|
||||
@ -940,7 +962,7 @@ public class PeerGroup {
|
||||
return future;
|
||||
}
|
||||
|
||||
static class PeerGroupThreadFactory implements ThreadFactory {
|
||||
private static class PeerGroupThreadFactory implements ThreadFactory {
|
||||
static final AtomicInteger poolNumber = new AtomicInteger(1);
|
||||
final ThreadGroup group;
|
||||
final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
@ -948,15 +970,11 @@ public class PeerGroup {
|
||||
|
||||
PeerGroupThreadFactory() {
|
||||
group = Thread.currentThread().getThreadGroup();
|
||||
namePrefix = "PeerGroup-" +
|
||||
poolNumber.getAndIncrement() +
|
||||
"-thread-";
|
||||
namePrefix = "PeerGroup-" + poolNumber.getAndIncrement() + "-thread-";
|
||||
}
|
||||
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(group, r,
|
||||
namePrefix + threadNumber.getAndIncrement(),
|
||||
0);
|
||||
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
|
||||
// Lower the priority of the peer threads. This is to avoid competing with UI threads created by the API
|
||||
// user when doing lots of work, like downloading the block chain. We select a priority level one lower
|
||||
// than the parent thread, or the minimum.
|
||||
|
@ -189,6 +189,7 @@ public class TCPNetworkConnection implements NetworkConnection {
|
||||
public class NetworkHandler extends ReplayingDecoder<VoidEnum> implements ChannelDownstreamHandler {
|
||||
@Override
|
||||
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
||||
super.channelConnected(ctx, e);
|
||||
channel = e.getChannel();
|
||||
// The version message does not use checksumming, until Feb 2012 when it magically does.
|
||||
// Announce ourselves. This has to come first to connect to clients beyond v0.30.20.2 which wait to hear
|
||||
|
@ -16,10 +16,8 @@
|
||||
|
||||
package com.google.bitcoin.core;
|
||||
|
||||
import com.google.bitcoin.core.PeerGroup.PeerGroupThread;
|
||||
import com.google.bitcoin.discovery.PeerDiscovery;
|
||||
import com.google.bitcoin.discovery.PeerDiscoveryException;
|
||||
import org.easymock.EasyMock;
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.junit.Before;
|
||||
@ -67,7 +65,7 @@ public class PeerGroupTest extends TestWithNetworkConnections {
|
||||
}
|
||||
|
||||
});
|
||||
peerGroup = new PeerGroup(params, blockChain, 1, bootstrap);
|
||||
peerGroup = new PeerGroup(params, blockChain, bootstrap);
|
||||
peerGroup.addWallet(wallet);
|
||||
}
|
||||
|
||||
@ -147,8 +145,7 @@ public class PeerGroupTest extends TestWithNetworkConnections {
|
||||
|
||||
private FakeChannel connectPeer(int id, VersionMessage versionMessage) {
|
||||
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id);
|
||||
FakeChannel p =
|
||||
(FakeChannel) peerGroup.connectTo(remoteAddress).getChannel();
|
||||
FakeChannel p = (FakeChannel) peerGroup.connectTo(remoteAddress).getChannel();
|
||||
assertTrue(p.nextEvent() instanceof ChannelStateEvent);
|
||||
inbound(p, versionMessage);
|
||||
return p;
|
||||
@ -287,11 +284,6 @@ public class PeerGroupTest extends TestWithNetworkConnections {
|
||||
FakeChannel p1 = connectPeer(1, new VersionMessage(params, 2));
|
||||
FakeChannel p2 = connectPeer(2);
|
||||
|
||||
PeerGroupThread peerGroupThread = control.createMock(PeerGroupThread.class);
|
||||
peerGroup.mockStart(peerGroupThread);
|
||||
peerGroupThread.interrupt();
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
control.replay();
|
||||
|
||||
peerGroup.setMinBroadcastConnections(2);
|
||||
|
@ -22,6 +22,8 @@ import com.google.bitcoin.utils.BriefLogFormatter;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
|
||||
import javax.swing.*;
|
||||
import javax.swing.event.ChangeEvent;
|
||||
import javax.swing.event.ChangeListener;
|
||||
import javax.swing.table.AbstractTableModel;
|
||||
import java.awt.*;
|
||||
import java.io.IOException;
|
||||
@ -46,12 +48,14 @@ public class PeerMonitor {
|
||||
public PeerMonitor() {
|
||||
setupNetwork();
|
||||
setupGUI();
|
||||
peerGroup.start();
|
||||
}
|
||||
|
||||
private void setupNetwork() {
|
||||
params = NetworkParameters.prodNet();
|
||||
peerGroup = new PeerGroup(params, null /* no chain */);
|
||||
peerGroup.setUserAgent("PeerMonitor", "1.0");
|
||||
peerGroup.setMaxConnections(4);
|
||||
peerGroup.addPeerDiscovery(new DnsDiscovery(params));
|
||||
pingService = new ScheduledThreadPoolExecutor(1);
|
||||
peerGroup.addEventListener(new AbstractPeerEventListener() {
|
||||
@ -71,7 +75,6 @@ public class PeerMonitor {
|
||||
refreshUI();
|
||||
}
|
||||
});
|
||||
peerGroup.start();
|
||||
}
|
||||
|
||||
private void pingPeer(final Peer peer) {
|
||||
@ -103,8 +106,20 @@ public class PeerMonitor {
|
||||
private void setupGUI() {
|
||||
JFrame window = new JFrame("Network monitor");
|
||||
window.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
JLabel instructions = new JLabel("Peers shown below. Download peer is in bold.");
|
||||
window.getContentPane().add(instructions, BorderLayout.NORTH);
|
||||
|
||||
JPanel panel = new JPanel();
|
||||
JLabel instructions = new JLabel("Number of peers to connect to: ");
|
||||
final SpinnerNumberModel spinnerModel = new SpinnerNumberModel(4, 0, 100, 1);
|
||||
spinnerModel.addChangeListener(new ChangeListener() {
|
||||
public void stateChanged(ChangeEvent changeEvent) {
|
||||
peerGroup.setMaxConnections(spinnerModel.getNumber().intValue());
|
||||
}
|
||||
});
|
||||
JSpinner numPeersSpinner = new JSpinner(spinnerModel);
|
||||
panel.add(instructions);
|
||||
panel.add(numPeersSpinner);
|
||||
window.getContentPane().add(panel, BorderLayout.NORTH);
|
||||
|
||||
peerTableModel = new PeerTableModel();
|
||||
JTable peerTable = new JTable(peerTableModel);
|
||||
JScrollPane scrollPane = new JScrollPane(peerTable);
|
||||
|
Loading…
Reference in New Issue
Block a user