Netty network stack.

This should resolve some scalability and robustness issues.
This commit is contained in:
Miron Cuperman
2012-04-01 18:37:21 -07:00
committed by Mike Hearn
parent 2d8f25a2da
commit fa10523007
14 changed files with 915 additions and 693 deletions

View File

@@ -162,6 +162,12 @@
<artifactId>derby</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.3.1.Final</version>
</dependency>
<dependency>
<groupId>com.madgag</groupId>
<artifactId>sc-light-jdk15on</artifactId>
@@ -178,4 +184,4 @@
</dependency>
</dependencies>
</project>
</project>

View File

@@ -31,15 +31,6 @@ import java.io.IOException;
* Construction is blocking whilst the protocol version is negotiated.
*/
public interface NetworkConnection {
/**
* Connect to the remote peer.
*
* @param peerAddress the address of the remote peer
* @param connectTimeoutMsec timeout in milliseconds
*/
public void connect(PeerAddress peerAddress, int connectTimeoutMsec)
throws IOException, ProtocolException;
/**
* Sends a "ping" message to the remote node. The protocol doesn't presently use this feature much.
*
@@ -47,20 +38,6 @@ public interface NetworkConnection {
*/
void ping() throws IOException;
/**
* Shuts down the network socket. Note that there's no way to wait for a socket to be fully flushed out to the
* wire, so if you call this immediately after sending a message it might not get sent.
*/
void shutdown() throws IOException;
/**
* Reads a network message from the wire, blocking until the message is fully received.
*
* @return An instance of a Message subclass
* @throws ProtocolException if the message is badly formatted, failed checksum or there was a TCP failure.
*/
Message readMessage() throws IOException, ProtocolException;
/**
* Writes the given message out over the network using the protocol tag. For a Transaction
* this should be "tx" for example. It's safe to call this from multiple threads simultaneously,

View File

@@ -16,32 +16,33 @@
package com.google.bitcoin.core;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.*;
import org.jboss.netty.channel.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.BlockStoreException;
import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
/**
* A Peer handles the high level communication with a BitCoin node.
*
* <p>After making the connection with connect(), call run() to start the message handling loop.
* <p>{@link Peer#getHandler()} is part of a Netty Pipeline with a Bitcoin serializer downstream of it.
*/
public class Peer {
private static final Logger log = LoggerFactory.getLogger(Peer.class);
public static final int CONNECT_TIMEOUT_MSEC = 60000;
private NetworkConnection conn;
private final NetworkParameters params;
// Whether the peer loop is supposed to be running or not. Set to false during shutdown so the peer loop
// knows to quit when the socket goes away.
private volatile boolean running;
private final BlockChain blockChain;
// When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here
// whilst waiting for the response. Synchronized on itself. Is not used for downloads Peer generates itself.
@@ -73,49 +74,25 @@ public class Peer {
// simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us.
private HashSet<Sha256Hash> pendingBlockDownloads = new HashSet<Sha256Hash>();
/**
* Construct a peer that reads/writes from the given block chain. Note that communication won't occur until
* you call connect(), which will set up a new NetworkConnection.
*
* @param bestHeight our current best chain height, to facilitate downloading
*/
public Peer(NetworkParameters params, PeerAddress address, int bestHeight, BlockChain blockChain) {
this(params, address, blockChain, new VersionMessage(params, bestHeight));
}
private Channel channel;
private VersionMessage peerVersionMessage;
boolean isAcked;
private PeerHandler handler;
/**
* Construct a peer that reads/writes from the given block chain. Note that communication won't occur until
* you call connect(), which will set up a new NetworkConnection.
*
* @param ver The version data to announce to the other side.
* Construct a peer that reads/writes from the given block chain.
*/
public Peer(NetworkParameters params, PeerAddress address, BlockChain blockChain, VersionMessage ver) {
public Peer(NetworkParameters params, BlockChain blockChain, VersionMessage ver) {
this.params = params;
this.address = address;
this.blockChain = blockChain;
this.pendingGetBlockFutures = new ArrayList<GetDataFuture<Block>>();
this.eventListeners = new ArrayList<PeerEventListener>();
this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
this.versionMessage = ver;
this.pendingGetBlockFutures = new ArrayList<GetDataFuture<Block>>();
this.eventListeners = new CopyOnWriteArrayList<PeerEventListener>();
this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
this.isAcked = false;
this.handler = new PeerHandler();
}
/**
* Construct a peer that reads/writes from the given block chain. Note that communication won't occur until
* you call connect(), which will set up a new NetworkConnection.
*/
public Peer(NetworkParameters params, PeerAddress address, BlockChain blockChain) {
this(params, address, 0, blockChain);
}
/**
* Construct a peer that uses the given, already connected network connection object.
*/
public Peer(NetworkParameters params, BlockChain blockChain, NetworkConnection connection) {
this(params, null, 0, blockChain);
this.conn = connection;
this.address = connection.getPeerAddress();
}
public synchronized void addEventListener(PeerEventListener listener) {
eventListeners.add(listener);
}
@@ -140,100 +117,108 @@ public class Peer {
public String toString() {
if (address == null) {
// User-provided NetworkConnection object.
return "Peer(NetworkConnection:" + conn + ")";
return "Peer()";
} else {
return "Peer(" + address.getAddr() + ":" + address.getPort() + ")";
}
}
/**
* Connects to the peer.
*
* @throws PeerException when there is a temporary problem with the peer and we should retry later
*/
public synchronized void connect() throws PeerException {
try {
conn = new TCPNetworkConnection(params, versionMessage);
conn.connect(address, CONNECT_TIMEOUT_MSEC);
} catch (IOException ex) {
throw new PeerException(ex);
} catch (ProtocolException ex) {
throw new PeerException(ex);
private void notifyDisconnect() {
for (PeerEventListener listener : eventListeners) {
synchronized (listener) {
listener.onPeerDisconnected(Peer.this, 0);
}
}
}
// For testing
void setConnection(NetworkConnection conn) {
this.conn = conn;
}
class PeerHandler extends SimpleChannelHandler {
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
super.channelClosed(ctx, e);
notifyDisconnect();
}
/**
* Runs in the peers network loop and manages communication with the peer.
*
* <p>connect() must be called first
*
* @throws PeerException when there is a temporary problem with the peer and we should retry later
*/
public void run() throws PeerException {
// This should be called in the network loop thread for this peer
if (conn == null)
throw new RuntimeException("please call connect() first");
@Override
public void connectRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
super.connectRequested(ctx, e);
channel = e.getChannel();
address = new PeerAddress((InetSocketAddress)e.getValue());
}
running = true;
try {
while (running) {
Message m = conn.readMessage();
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (PeerEventListener listener : eventListeners) {
synchronized (listener) {
m = listener.onPreMessageReceived(this, m);
if (m == null) break;
}
}
if (m == null) continue;
if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m);
} else if (m instanceof Block) {
processBlock((Block) m);
} else if (m instanceof Transaction) {
processTransaction((Transaction) m);
} else if (m instanceof GetDataMessage) {
processGetData((GetDataMessage) m);
} else if (m instanceof AddressMessage) {
// We don't care about addresses of the network right now. But in future,
// we should save them in the wallet so we don't put too much load on the seed nodes and can
// properly explore the network.
} else if (m instanceof HeadersMessage) {
processHeaders((HeadersMessage) m);
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage)m);
} else {
// TODO: Handle the other messages we can receive.
log.warn("Received unhandled message: {}", m);
}
}
} catch (IOException e) {
if (!running) {
// This exception was expected because we are tearing down the socket as part of quitting.
log.info("{}: Shutting down peer loop", address);
/** Catch any exceptions, logging them and then closing the channel. */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
if (e.getCause() instanceof ConnectException || e.getCause() instanceof IOException) {
// Short message for network errors
log.info(toString() + " - " + e.getCause().getMessage());
} else {
disconnect();
throw new PeerException(e);
log.warn(toString() + " - ", e.getCause());
}
} catch (ProtocolException e) {
disconnect();
throw new PeerException(e);
} catch (RuntimeException e) {
log.error("Unexpected exception in peer loop", e);
disconnect();
throw e;
e.getChannel().close();
}
disconnect();
/** Handle incoming Bitcoin messages */
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
Message m = (Message)e.getMessage();
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by 178 synchronized (listener) {
// returning null.
for (PeerEventListener listener : eventListeners) {
synchronized (listener) {
m = listener.onPreMessageReceived(Peer.this, m);
if (m == null) break;
}
}
if (m == null) return;
if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m);
} else if (m instanceof Block) {
processBlock((Block) m);
} else if (m instanceof Transaction) {
processTransaction((Transaction) m);
} else if (m instanceof GetDataMessage) {
processGetData((GetDataMessage) m);
} else if (m instanceof AddressMessage) {
// We don't care about addresses of the network right now. But in future,
// we should save them in the wallet so we don't put too much load on the seed nodes and can
// properly explore the network.
} else if (m instanceof HeadersMessage) {
processHeaders((HeadersMessage) m);
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage)m);
} else if (m instanceof VersionMessage) {
peerVersionMessage = (VersionMessage)m;
EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker<PeerEventListener>() {
@Override
public void invoke(PeerEventListener listener) {
listener.onPeerConnected(Peer.this, 1);
}
});
} else if (m instanceof VersionAck) {
if (peerVersionMessage == null) {
throw new ProtocolException("got a version ack before version");
}
if (isAcked) {
throw new ProtocolException("got more than one version ack");
}
isAcked = true;
} else {
// TODO: Handle the other messages we can receive.
log.warn("Received unhandled message: {}", m);
}
}
public Peer getPeer() {
return Peer.this;
}
}
private void processAlert(AlertMessage m) {
@@ -250,6 +235,11 @@ public class Peer {
log.error("Failed to check signature: bug in platform libraries?", t);
}
}
/** Returns the Netty Pipeline stage handling the high level Bitcoin protocol. */
public PeerHandler getHandler() {
return handler;
}
private void processHeaders(HeadersMessage m) throws IOException, ProtocolException {
// Runs in network loop thread for this peer.
@@ -470,7 +460,7 @@ public class Peer {
if (!getdata.getItems().isEmpty()) {
// This will cause us to receive a bunch of block or tx messages.
conn.writeMessage(getdata);
sendMessage(getdata);
}
}
@@ -494,7 +484,8 @@ public class Peer {
synchronized (pendingGetBlockFutures) {
pendingGetBlockFutures.add(future);
}
conn.writeMessage(getdata);
sendMessage(getdata);
return future;
}
@@ -588,10 +579,10 @@ public class Peer {
}
/**
* Sends the given message on the peers network connection. Just uses {@link NetworkConnection#writeMessage(Message)}.
* Sends the given message on the peers Channel.
*/
public void sendMessage(Message m) throws IOException {
conn.writeMessage(m);
Channels.write(channel, m);
}
// Keep track of the last request we made to the peer in blockChainDownload so we can avoid redundant and harmful
@@ -676,11 +667,11 @@ public class Peer {
if (downloadBlockBodies) {
GetBlocksMessage message = new GetBlocksMessage(params, blockLocator, toHash);
conn.writeMessage(message);
sendMessage(message);
} else {
// Downloading headers for a while instead of full blocks.
GetHeadersMessage message = new GetHeadersMessage(params, blockLocator, toHash);
conn.writeMessage(message);
sendMessage(message);
}
}
@@ -710,7 +701,7 @@ public class Peer {
*/
public int getPeerBlockHeightDifference() {
// Chain will overflow signed int blocks in ~41,000 years.
int chainHeight = (int) conn.getVersionMessage().bestHeight;
int chainHeight = (int) peerVersionMessage.bestHeight;
// chainHeight should not be zero/negative because we shouldn't have given the user a Peer that is to another
// client-mode node, nor should it be unconnected. If that happens it means the user overrode us somewhere or
// there is a bug in the peer management code.
@@ -718,23 +709,6 @@ public class Peer {
return chainHeight - blockChain.getChainHead().getHeight();
}
/**
* Terminates the network connection and stops the message handling loop.
*
* <p>This does not wait for the loop to terminate.
*/
public synchronized void disconnect() {
log.debug("Disconnecting peer");
running = false;
try {
// This is the correct way to stop an IO bound loop
if (conn != null)
conn.shutdown();
} catch (IOException e) {
// Don't care about this.
}
}
/**
* Returns true if this peer will try and download things it is sent in "inv" messages. Normally you only need
* one peer to be downloading data. Defaults to true.
@@ -757,25 +731,25 @@ public class Peer {
public PeerAddress getAddress() {
return address;
}
/**
* @return various version numbers claimed by peer.
*/
public VersionMessage getPeerVersionMessage() {
return peerVersionMessage;
}
/**
* @return various version numbers we claim.
*/
public VersionMessage getVersionMessage() {
return conn.getVersionMessage();
return versionMessage;
}
/**
* @return the height of the best chain as claimed by peer.
*/
public long getBestHeight() {
return conn.getVersionMessage().bestHeight;
}
/**
* @return whether the peer is currently connected and the message loop is running.
*/
public boolean isConnected() {
return running;
return peerVersionMessage.bestHeight;
}
}

View File

@@ -249,4 +249,8 @@ public class PeerAddress extends ChildMessage {
public int hashCode() {
return addr.hashCode() ^ port ^ (int) time ^ services.hashCode();
}
public InetSocketAddress toSocketAddress() {
return new InetSocketAddress(addr, port);
}
}

View File

@@ -17,17 +17,21 @@
package com.google.bitcoin.core;
import com.google.bitcoin.core.Peer.PeerHandler;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.utils.EventListenerInvoker;
import com.google.common.base.Preconditions;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -56,7 +60,6 @@ public class PeerGroup {
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
public static final int DEFAULT_CONNECTION_DELAY_MILLIS = 5 * 1000;
private static final int THREAD_KEEP_ALIVE_SECONDS = 1;
// Addresses to try to connect to, excluding active peers
private BlockingQueue<PeerAddress> inactives;
@@ -64,12 +67,11 @@ public class PeerGroup {
private PeerGroupThread peerGroupThread;
// True if the connection initiation thread should be running
private boolean running;
// A pool of threads for peers, of size maxConnection
private ThreadPoolExecutor peerPool;
// Currently active peers
private Set<Peer> peers;
// Currently connecting peers
private Set<Peer> pendingPeers;
private Map<Peer, ChannelFuture> channelFutures;
// The peer we are currently downloading the chain from
private Peer downloadPeer;
// Callback for events related to chain download
@@ -83,14 +85,35 @@ public class PeerGroup {
// A class that tracks recent transactions that have been broadcast across the network, counts how many
// peers announced them and updates the transaction confidence data. It is passed to each Peer.
private final MemoryPool memoryPool;
private int maxConnections;
private NetworkParameters params;
private BlockChain chain;
private final NetworkParameters params;
private final BlockChain chain;
private int connectionDelayMillis;
private long fastCatchupTimeSecs;
private ArrayList<Wallet> wallets;
private AbstractPeerEventListener getDataListener;
private ClientBootstrap bootstrap;
private class PeerStartupListener extends AbstractPeerEventListener {
public void onPeerConnected(Peer peer, int peerCount) {
pendingPeers.remove(peer);
peers.add(peer);
handleNewPeer(peer);
}
public void onPeerDisconnected(Peer peer, int peerCount) {
pendingPeers.remove(peer);
peers.remove(peer);
channelFutures.remove(peer);
handlePeerDeath(peer);
}
}
// Visible for testing
PeerEventListener startupListener = new PeerStartupListener();
/**
* Creates a PeerGroup with the given parameters and a default 5 second connection timeout. If you don't care
* about blocks or pending transactions, you can just provide a MemoryBlockStore and a newly created Wallet.
@@ -103,33 +126,44 @@ public class PeerGroup {
}
/**
* Creates a PeerGroup with the given parameters. The connectionDelayMillis parameter controls how long the
* PeerGroup will wait between attempts to connect to nodes or read from any added peer discovery sources.
* Creates a PeerGroup with the given parameters. The connectionDelayMillis parameter controls
*
* @param params bitcoin network parameters
* @param chain the block chain maintained by the network
* @param connectionDelayMillis how long to wait between attempts to connect to nodes or read
* from any added peer discovery sources
*/
public PeerGroup(NetworkParameters params, BlockChain chain, int connectionDelayMillis) {
public PeerGroup(final NetworkParameters params, final BlockChain chain,
int connectionDelayMillis) {
this(params, chain, connectionDelayMillis, new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())));
bootstrap.setPipelineFactory(makePipelineFactory(params, chain));
}
PeerGroup(final NetworkParameters params, final BlockChain chain,
int connectionDelayMillis, ClientBootstrap bootstrap) {
this.params = params;
this.chain = chain;
this.connectionDelayMillis = connectionDelayMillis;
this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds();
this.wallets = new ArrayList<Wallet>(1);
this.maxConnections = DEFAULT_CONNECTIONS;
// Set up a default template version message that doesn't tell the other side what kind of BitCoinJ user
// this is.
this.versionMessage = new VersionMessage(params, chain.getBestChainHeight());
memoryPool = new MemoryPool();
this.bootstrap = bootstrap;
inactives = new LinkedBlockingQueue<PeerAddress>();
// TODO: Remove usage of synchronized sets here in favor of simple coarse-grained locking.
peers = Collections.synchronizedSet(new HashSet<Peer>());
pendingPeers = Collections.synchronizedSet(new HashSet<Peer>());
channelFutures = Collections.synchronizedMap(new HashMap<Peer, ChannelFuture>());
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerPool = new ThreadPoolExecutor(
DEFAULT_CONNECTIONS,
DEFAULT_CONNECTIONS,
THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1),
new PeerGroupThreadFactory());
peerEventListeners = new ArrayList<PeerEventListener>();
// This event listener is added to every peer. It's here so when we announce transactions via an "inv", every
// peer can fetch them.
@@ -141,6 +175,43 @@ public class PeerGroup {
};
}
// Create a Netty pipeline factory. The pipeline factory will create a network processing
// pipeline with the bitcoin serializer ({@code TCPNetworkConnection}) downstream
// of the higher level {@code Peer}. Received packets will first be decoded, then passed
// {@code Peer}. Sent packets will be created by the {@code Peer}, then encoded and sent.
private ChannelPipelineFactory makePipelineFactory(
final NetworkParameters params, final BlockChain chain) {
return new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
VersionMessage ver = getVersionMessage().duplicate();
ver.bestHeight = chain.getBestChainHeight();
ver.time = Utils.now().getTime() / 1000;
ChannelPipeline p = Channels.pipeline();
Peer peer = new Peer(params, chain, ver);
peer.addEventListener(startupListener);
pendingPeers.add(peer);
TCPNetworkConnection codec =
new TCPNetworkConnection(params,
peer.getVersionMessage());
p.addLast("codec", codec.getHandler());
p.addLast("peer", peer.getHandler());
return p;
}
};
}
/** The maximum number of connections that we will create to peers. */
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
/** The maximum number of connections that we will create to peers. */
public int getMaxConnections() {
return maxConnections;
}
private synchronized List<Message> handleGetData(GetDataMessage m) {
// Scans the wallets for transactions in the getdata message and returns them. Invoked in parallel
// on peer threads.
@@ -227,40 +298,6 @@ public class PeerGroup {
public synchronized boolean removeEventListener(PeerEventListener listener) {
return peerEventListeners.remove(checkNotNull(listener));
}
/**
* Use this to directly add an already initialized and connected {@link Peer} object. Normally, you would prefer
* to use {@link PeerGroup#addAddress(PeerAddress)} and let this object handle construction of the peer for you.
* This method is useful when you are working closely with the network code (and in unit tests).<p>
*
* Note that if this peer group already has the maximum number of peers running (see {@link PeerGroup#DEFAULT_CONNECTIONS})
* then this method will block until other peers are disconnected.<p>
*
* Calling this will result in calls to any registered {@link PeerEventListener}s. Block chain download may occur.
*/
public void addPeer(Peer peer) {
synchronized (this) {
Preconditions.checkState(running, "Must call start() before adding peers.");
log.info("Adding directly to group: {}", peer);
}
// This starts the peer thread running. Note: this is not synchronized. If it were, we could not
// use WAIT_FOR_STARTUP mode below because the newly created thread will call handleNewPeer() which is locked.
executePeer(null, peer, false, ExecuteBlockMode.WAIT_FOR_STARTUP);
}
/**
* Depending on the environment, this should normally be between 1 and 10, default is 4.
*
* @param maxConnections the maximum number of peer connections that this group will try to make.
*/
public synchronized void setMaxConnections(int maxConnections) {
peerPool.setCorePoolSize(Math.min(maxConnections, DEFAULT_CONNECTIONS));
peerPool.setMaximumPoolSize(maxConnections);
}
public synchronized int getMaxConnections() {
return peerPool.getMaximumPoolSize();
}
/**
* Returns a newly allocated list containing the currently connected peers. If all you care about is the count,
@@ -295,6 +332,11 @@ public class PeerGroup {
running = true;
this.peerGroupThread.start();
}
void mockStart(PeerGroupThread peerGroupThread) {
this.peerGroupThread = peerGroupThread;
running = true;
}
/**
* Stop this PeerGroup.
@@ -405,7 +447,7 @@ public class PeerGroup {
* the right level, runs peer discovery if we run out, and broadcasts transactions that were submitted via
* broadcastTransaction().
*/
private final class PeerGroupThread extends Thread {
class PeerGroupThread extends Thread {
private LinkedBlockingQueue<FutureTask> tasks;
public PeerGroupThread() {
@@ -451,25 +493,20 @@ public class PeerGroup {
} catch (InterruptedException ex) {
}
// We were asked to stop. Reset running flag and disconnect all peers. Peers could
// still linger until their event loop is scheduled.
log.info("shutdown start");
// We were asked to stop. Reset running flag and disconnect all peer channels asynchronously.
// Peers could still linger until their channels close.
synchronized (PeerGroup.this) {
running = false;
peerPool.shutdown();
shutdownPeerDiscovery();
synchronized (peers) {
for (Peer peer : peers) {
peer.disconnect();
}
for (ChannelFuture future : channelFutures.values()) {
future.getChannel().close();
}
peers = null; // Fail quickly if someone tries to access peers while we are shutting down.
synchronized (pendingPeers) {
for (Peer peer : pendingPeers) {
peer.disconnect();
}
}
pendingPeers = null;
bootstrap.releaseExternalResources();
}
log.info("shutdown done");
}
private void discoverPeers() {
@@ -498,37 +535,11 @@ public class PeerGroup {
}
/**
* Try connecting to a peer. If we exceed the number of connections, delay and try again.
* Try connecting to a peer.
*/
private void tryNextPeer() throws InterruptedException {
PeerAddress address = inactives.take();
while (true) {
try {
VersionMessage ver = getVersionMessage().duplicate();
ver.bestHeight = chain.getBestChainHeight();
ver.time = Utils.now().getTime() / 1000;
Peer peer = new Peer(params, address, chain, ver);
executePeer(address, peer, true, ExecuteBlockMode.RETURN_IMMEDIATELY);
break;
} catch (RejectedExecutionException e) {
// Reached maxConnections, try again after a delay
// TODO - consider being smarter about retry. No need to retry
// if we reached maxConnections or if peer queue is empty. Also consider
// exponential backoff on peers and adjusting the sleep time according to the
// lowest backoff value in queue.
}
synchronized (PeerGroup.this) {
// Check if we are shutting down before next try
if (!running)
break;
}
// If we got here, we should retry this address because an error unrelated
// to the peer has occurred.
Thread.sleep(connectionDelayMillis);
}
connectTo(address.toSocketAddress());
}
/**
@@ -540,89 +551,32 @@ public class PeerGroup {
}
}
private enum ExecuteBlockMode {
WAIT_FOR_STARTUP, RETURN_IMMEDIATELY
/**
* Connect to a peer by creating a Netty channel to the destination address.
*
* @param address destination IP and port
*
* @return a ChannelFuture that can be used to wait for the socket to connect. A socket
* connection does not mean that protocol handshake has occured.
*/
public ChannelFuture connectTo(SocketAddress address) {
ChannelFuture future = bootstrap.connect(address);
TCPNetworkConnection connection = (TCPNetworkConnection)future.getChannel().getPipeline().get("codec");
if (connection != null)
connection.setRemoteAddress(address);
Peer peer = peerFromChannelFuture(future);
channelFutures.put(peer, future);
return future;
}
private void executePeer(final PeerAddress address, final Peer peer, final boolean shouldConnect,
final ExecuteBlockMode blockUntilRunning) {
final CountDownLatch latch = new CountDownLatch(1);
peerPool.execute(new Runnable() {
public void run() {
try {
synchronized (PeerGroup.this) {
// Add peer to pendingPeers so that we can shut it down if PeerGroup shuts down
pendingPeers.add(peer);
}
// Recheck if running, in case we shut down in the mean time
if (running && shouldConnect) {
log.info("Connecting to " + peer);
peer.connect();
}
synchronized (PeerGroup.this) {
pendingPeers.remove(peer);
// We may have started shutting down the group since we started connecting.
// In this case, we must not add ourself to the list of peers because the controller
// thread already went through it.
if (!running) {
peer.disconnect();
return;
}
peers.add(peer);
}
handleNewPeer(peer);
if (blockUntilRunning == ExecuteBlockMode.WAIT_FOR_STARTUP)
latch.countDown();
peer.run();
} catch (PeerException ex) {
// Do not propagate PeerException - log and try next peer. Suppress stack traces for
// exceptions we expect as part of normal network behaviour.
final Throwable cause = ex.getCause();
if (cause instanceof SocketTimeoutException) {
log.info("Timeout talking to " + peer + ": " + cause.getMessage());
} else if (cause instanceof ConnectException) {
log.info("Could not connect to " + peer + ": " + cause.getMessage());
} else if (cause instanceof IOException) {
log.info("Error talking to " + peer + ": " + cause.getMessage());
} else {
log.error("Unexpected exception whilst talking to " + peer, ex);
}
} finally {
boolean needHandleDeath;
synchronized (PeerGroup.this) {
// We may be terminating because of a controlled shutdown. If so, don't inform the user of individual
// peer connections or select a new download peer. Disconnection is the responsibility of the controlling
// thread in this case.
if (!running)
return;
// Disconnect and put the address back on the queue. We will retry this peer after all
// other peers have been tried.
peer.disconnect();
needHandleDeath = peers.remove(peer);
}
// This is unsynchronized since it can take a while.
if (needHandleDeath)
handlePeerDeath(peer);
// We may not know the address if the peer was added directly.
if (address != null)
inactives.add(address);
}
}
});
if (blockUntilRunning == ExecuteBlockMode.WAIT_FOR_STARTUP) {
try {
latch.await();
} catch (InterruptedException e) {
}
}
static public Peer peerFromChannelFuture(ChannelFuture future) {
return peerFromChannel(future.getChannel());
}
static public Peer peerFromChannel(Channel channel) {
return ((PeerHandler)channel.getPipeline().get("peer")).getPeer();
}
/**
* Start downloading the blockchain from the first available peer.
* <p/>

View File

@@ -16,31 +16,36 @@
package com.google.bitcoin.core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.jboss.netty.channel.Channels.write;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Date;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import org.jboss.netty.handler.codec.replay.VoidEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@code TCPNetworkConnection} is used for connecting to a Bitcoin node over the standard TCP/IP protocol.<p>
*
* Multiple {@code TCPNetworkConnection}s can wait if another NetworkConnection instance is deserializing a
* <p>{@link TCPNetworkConnection#getHandler()} is part of a Netty Pipeline, downstream of other pipeline stages.
* <p>Multiple {@code TCPNetworkConnection}s can wait if another NetworkConnection instance is deserializing a
* message and discard duplicates before reading them. This is intended to avoid memory usage spikes in constrained
* environments like Android where deserializing a large message (like a block) on multiple threads simultaneously is
* both wasteful and can cause OOM failures. This feature is controlled at construction time.
*/
public class TCPNetworkConnection implements NetworkConnection {
public class TCPNetworkConnection {
private static final Logger log = LoggerFactory.getLogger(TCPNetworkConnection.class);
private final Socket socket;
private OutputStream out;
private InputStream in;
// The IP address to which we are connecting.
private InetAddress remoteIp;
private final NetworkParameters params;
@@ -50,71 +55,42 @@ public class TCPNetworkConnection implements NetworkConnection {
private VersionMessage myVersionMessage;
private static final Date checksummingProtocolChangeDate = new Date(1329696000000L);
private long messageCount;
private Channel channel;
private NetworkHandler handler;
/**
* Construct a network connection with the given params and version. To actually connect to a remote node, call
* {@link TCPNetworkConnection#connect(PeerAddress, int)}.
* Construct a network connection with the given params and version.
*
* @param params Defines which network to connect to and details of the protocol.
* @param ver The VersionMessage to announce to the other side of the connection.
* @throws IOException if there is a network related failure.
* @throws ProtocolException if the version negotiation failed.
*/
public TCPNetworkConnection(NetworkParameters params, VersionMessage ver)
throws IOException, ProtocolException {
public TCPNetworkConnection(NetworkParameters params, VersionMessage ver) {
this.params = params;
this.myVersionMessage = ver;
socket = new Socket();
// So pre-Feb 2012, update checkumming property after version is read.
this.serializer = new BitcoinSerializer(this.params, false);
this.serializer.setUseChecksumming(Utils.now().after(checksummingProtocolChangeDate));
this.handler = new NetworkHandler();
}
/**
* Connect to the given IP address using the port specified as part of the network parameters. Once construction
* is complete a functioning network channel is set up and running.
*
* @param params Defines which network to connect to and details of the protocol.
* @param bestHeight The height of the best chain we know about, sent to the other side.
* @throws IOException if there is a network related failure.
* @throws ProtocolException if the version negotiation failed.
*/
public TCPNetworkConnection(NetworkParameters params, int bestHeight)
throws IOException, ProtocolException {
this(params, new VersionMessage(params, bestHeight));
}
public void connect(PeerAddress peerAddress, int connectTimeoutMsec) throws IOException, ProtocolException {
remoteIp = peerAddress.getAddr();
int port = (peerAddress.getPort() > 0) ? peerAddress.getPort() : this.params.port;
InetSocketAddress address = new InetSocketAddress(remoteIp, port);
socket.connect(address, connectTimeoutMsec);
out = socket.getOutputStream();
in = socket.getInputStream();
// The version message does not use checksumming, until Feb 2012 when it magically does.
// Announce ourselves. This has to come first to connect to clients beyond v0.30.20.2 which wait to hear
// from us until they send their version message back.
log.info("Announcing ourselves as: {}", myVersionMessage.subVer);
writeMessage(myVersionMessage);
// When connecting, the remote peer sends us a version message with various bits of
// useful data in it. We need to know the peer protocol version before we can talk to it.
// There is a bug in Satoshis code such that it can sometimes send us alert messages before version negotiation
// has completed. There's no harm in ignoring them (they're meant for Bitcoin-Qt users anyway) so we just cycle
// here until we find the right message.
Message m;
while (!((m = readMessage()) instanceof VersionMessage));
private void onFirstMessage(Message m) throws IOException, ProtocolException {
if (!(m instanceof VersionMessage)) {
// Bad peers might not follow the protocol. This has been seen in the wild (issue 81).
log.info("First message received was not a version message but rather " + m);
return;
}
versionMessage = (VersionMessage) m;
// Now it's our turn ...
// Send an ACK message stating we accept the peers protocol version.
writeMessage(new VersionAck());
// And get one back ...
readMessage();
write(channel, new VersionAck());
}
private void onSecondMessage(Message m) throws IOException, ProtocolException {
// Switch to the new protocol version.
int peerVersion = versionMessage.clientVersion;
log.info("Connected to peer: version={}, subVer='{}', services=0x{}, time={}, blocks={}", new Object[] {
@@ -129,12 +105,7 @@ public class TCPNetworkConnection implements NetworkConnection {
// implementations claim to have a block chain in their services field but then report a height of zero, filter
// them out here.
if (!versionMessage.hasBlockChain() || versionMessage.bestHeight <= 0) {
// Shut down the socket
try {
shutdown();
} catch (IOException ex) {
// ignore exceptions while aborting
}
// Shut down the channel
throw new ProtocolException("Peer does not have a copy of the block chain.");
}
// Newer clients use checksumming.
@@ -143,33 +114,70 @@ public class TCPNetworkConnection implements NetworkConnection {
}
public void ping() throws IOException {
writeMessage(new Ping());
}
public void shutdown() throws IOException {
socket.close();
write(channel, new Ping());
}
@Override
public String toString() {
return "[" + remoteIp.getHostAddress() + "]:" + params.port + " (" + (socket.isConnected() ? "connected" :
"disconnected") + ")";
return "[" + remoteIp.getHostAddress() + "]:" + params.port;
}
public Message readMessage() throws IOException, ProtocolException {
Message message;
do {
message = serializer.deserialize(in);
// If message is null, it means deduping was enabled, we read a duplicated message and skipped parsing to
// avoid doing redundant work. So go around and wait for another message.
} while (message == null);
return message;
}
public void writeMessage(Message message) throws IOException {
synchronized (out) {
serializer.serialize(message, out);
public class NetworkHandler extends ReplayingDecoder<VoidEnum> implements ChannelDownstreamHandler {
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
channel = e.getChannel();
// The version message does not use checksumming, until Feb 2012 when it magically does.
// Announce ourselves. This has to come first to connect to clients beyond v0.30.20.2 which wait to hear
// from us until they send their version message back.
log.info("Announcing ourselves as: {}", myVersionMessage.subVer);
write(channel, myVersionMessage);
// When connecting, the remote peer sends us a version message with various bits of
// useful data in it. We need to know the peer protocol version before we can talk to it.
}
// Attempt to decode a Bitcoin message passing upstream in the channel.
//
// By extending ReplayingDecoder, reading past the end of buffer will throw a special Error
// causing the channel to read more and retry.
//
// On VMs/systems where exception handling is slow, this will impact performance. On the
// other hand, implementing a FrameDecoder will increase code complexity due to having
// to implement retries ourselves.
//
// TODO: consider using a decoder state and checkpoint() if performance is an issue.
@Override
protected Object decode(ChannelHandlerContext ctx, Channel chan,
ChannelBuffer buffer, VoidEnum state) throws Exception {
Message message = serializer.deserialize(new ChannelBufferInputStream(buffer));
messageCount++;
if (messageCount == 1) {
onFirstMessage(message);
} else if (messageCount == 2) {
onSecondMessage(message);
}
return message;
}
/** Serialize outgoing Bitcoin messages passing downstream in the channel. */
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Message message = (Message)e.getMessage();
ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
serializer.serialize(message, new ChannelBufferOutputStream(buffer));
write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
}
}
/** Returns the Netty Pipeline stage handling Bitcoin serialization for this connection. */
public NetworkHandler getHandler() {
return handler;
}
public VersionMessage getVersionMessage() {
@@ -179,4 +187,9 @@ public class TCPNetworkConnection implements NetworkConnection {
public PeerAddress getPeerAddress() {
return new PeerAddress(remoteIp, params.port);
}
public void setRemoteAddress(SocketAddress address) {
if (address instanceof InetSocketAddress)
remoteIp = ((InetSocketAddress)address).getAddress();
}
}

View File

@@ -0,0 +1,54 @@
package com.google.bitcoin.core;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;
import org.jboss.netty.channel.*;
import org.jboss.netty.util.internal.QueueFactory;
public class FakeChannel extends AbstractChannel {
final BlockingQueue<ChannelEvent> events = QueueFactory.createQueue(ChannelEvent.class);
private final ChannelConfig config;
private SocketAddress localAddress;
private SocketAddress remoteAddress;
protected FakeChannel(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
super(null, factory, pipeline, sink);
config = new DefaultChannelConfig();
localAddress = new InetSocketAddress("127.0.0.1", 2000);
}
public ChannelConfig getConfig() {
return config;
}
public SocketAddress getLocalAddress() {
return localAddress;
}
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
return super.connect(remoteAddress);
}
public boolean isBound() {
return true;
}
public boolean isConnected() {
return true;
}
public ChannelEvent nextEvent() {
return events.poll();
}
}

View File

@@ -0,0 +1,54 @@
package com.google.bitcoin.core;
import static org.jboss.netty.channel.Channels.fireChannelConnected;
import org.jboss.netty.channel.*;
public class FakeChannelSink extends AbstractChannelSink {
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
FakeChannel channel =
(FakeChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
// Close
}
break;
case BOUND:
if (value != null) {
// Bind
} else {
// Close
}
break;
case CONNECTED:
if (value != null) {
future.setSuccess();
fireChannelConnected(channel, channel.getRemoteAddress());
} else {
// Close
}
break;
case INTEREST_OPS:
// Unsupported - discard silently.
future.setSuccess();
break;
}
boolean offered = channel.events.offer(event);
assert offered;
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
FakeChannel channel = (FakeChannel) event.getChannel();
boolean offered = channel.events.offer(event);
assert offered;
}
}
}

View File

@@ -16,52 +16,69 @@
package com.google.bitcoin.core;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.anyObject;
import static org.junit.Assert.*;
import java.io.IOException;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import static org.junit.Assert.*;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.junit.Before;
import org.junit.Test;
import com.google.bitcoin.core.PeerGroup.PeerGroupThread;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
public class PeerGroupTest extends TestWithNetworkConnections {
static final NetworkParameters params = NetworkParameters.unitTests();
private PeerGroup peerGroup;
private final BlockingQueue<Peer> disconnectedPeers = new LinkedBlockingQueue<Peer>();
// FIXME Some tests here are non-deterministic due to the peers running on a separate thread.
// FIXME Fix this by having exchangeAndWait and inboundAndWait methods in MockNetworkConnection.
private VersionMessage remoteVersionMessage;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
peerGroup = new PeerGroup(params, blockChain, 1000);
peerGroup.addWallet(wallet);
// Support for testing disconnect events in a non-racy manner.
peerGroup.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
super.onPeerDisconnected(peer, peerCount);
try {
disconnectedPeers.put(peer);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
remoteVersionMessage = new VersionMessage(params, 1);
ClientBootstrap bootstrap = new ClientBootstrap(new ChannelFactory() {
public void releaseExternalResources() {}
public Channel newChannel(ChannelPipeline pipeline) {
ChannelSink sink = new FakeChannelSink();
return new FakeChannel(this, pipeline, sink);
}
});
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
VersionMessage ver = new VersionMessage(params, 1);
ChannelPipeline p = Channels.pipeline();
Peer peer = new Peer(params, blockChain, ver);
peer.addEventListener(peerGroup.startupListener);
p.addLast("peer", peer.getHandler());
return p;
}
});
peerGroup = new PeerGroup(params, blockChain, 1, bootstrap);
peerGroup.addWallet(wallet);
}
@Test
@@ -104,47 +121,57 @@ public class PeerGroupTest extends TestWithNetworkConnections {
@Test
public void receiveTxBroadcast() throws Exception {
// Check that when we receive transactions on all our peers, we do the right thing.
peerGroup.start();
// Create a couple of peers.
MockNetworkConnection n1 = createMockNetworkConnection();
Peer p1 = new Peer(params, blockChain, n1);
MockNetworkConnection n2 = createMockNetworkConnection();
Peer p2 = new Peer(params, blockChain, n2);
peerGroup.start();
peerGroup.addPeer(p1);
peerGroup.addPeer(p2);
peerGroup.addWallet(wallet);
FakeChannel p1 = connectPeer(1);
FakeChannel p2 = connectPeer(2);
// Check the peer accessors.
assertEquals(2, peerGroup.numConnectedPeers());
Set<Peer> tmp = new HashSet<Peer>(peerGroup.getConnectedPeers());
Set<Peer> expectedPeers = new HashSet<Peer>();
expectedPeers.add(p1);
expectedPeers.add(p2);
expectedPeers.add(peerOf(p1));
expectedPeers.add(peerOf(p2));
assertEquals(tmp, expectedPeers);
BigInteger value = Utils.toNanoCoins(1, 0);
Transaction t1 = TestUtils.createFakeTx(unitTestParams, value, address);
InventoryMessage inv = new InventoryMessage(unitTestParams);
inv.addTransaction(t1);
assertTrue(n1.exchange(inv) instanceof GetDataMessage);
assertNull(n2.exchange(inv)); // Only one peer is used to download.
assertNull(n1.exchange(t1));
inbound(p1, inv);
assertTrue(outbound(p1) instanceof GetDataMessage);
inbound(p2, inv);
assertNull(outbound(p2)); // Only one peer is used to download.
inbound(p1, t1);
assertNull(outbound(p2));
assertEquals(value, wallet.getBalance(Wallet.BalanceType.ESTIMATED));
peerGroup.stop();
}
private FakeChannel connectPeer(int id) {
return connectPeer(id, remoteVersionMessage);
}
private FakeChannel connectPeer(int id, VersionMessage versionMessage) {
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id);
FakeChannel p =
(FakeChannel) peerGroup.connectTo(remoteAddress).getChannel();
assertTrue(p.nextEvent() instanceof ChannelStateEvent);
inbound(p, versionMessage);
return p;
}
@Test
public void singleDownloadPeer1() throws Exception {
// Check that we don't attempt to retrieve blocks on multiple peers.
peerGroup.start();
// Create a couple of peers.
MockNetworkConnection n1 = createMockNetworkConnection();
Peer p1 = new Peer(params, blockChain, n1);
MockNetworkConnection n2 = createMockNetworkConnection();
Peer p2 = new Peer(params, blockChain, n2);
peerGroup.start();
peerGroup.addPeer(p1);
peerGroup.addPeer(p2);
FakeChannel p1 = connectPeer(1);
FakeChannel p2 = connectPeer(2);
assertEquals(2, peerGroup.numConnectedPeers());
// Set up a little block chain. We heard about b1 but not b2 (it is pending download). b3 is solved whilst we
@@ -158,12 +185,15 @@ public class PeerGroupTest extends TestWithNetworkConnections {
InventoryMessage inv = new InventoryMessage(params);
inv.addBlock(b3);
// Only peer 1 tries to download it.
assertTrue(n1.exchange(inv) instanceof GetDataMessage);
assertNull(n2.exchange(inv));
inbound(p1, inv);
assertTrue(outbound(p1) instanceof GetDataMessage);
assertNull(outbound(p2));
// Peer 1 goes away.
disconnectAndWait(n1);
closePeer(peerOf(p1));
// Peer 2 fetches it next time it hears an inv (should it fetch immediately?).
assertTrue(n2.exchange(inv) instanceof GetDataMessage);
inbound(p2, inv);
assertTrue(outbound(p2) instanceof GetDataMessage);
peerGroup.stop();
}
@@ -172,51 +202,42 @@ public class PeerGroupTest extends TestWithNetworkConnections {
// Check that we don't attempt multiple simultaneous block chain downloads, when adding a new peer in the
// middle of an existing chain download.
// Create a couple of peers.
MockNetworkConnection n1 = createMockNetworkConnection();
Peer p1 = new Peer(params, blockChain, n1);
MockNetworkConnection n2 = createMockNetworkConnection();
Peer p2 = new Peer(params, blockChain, n2);
peerGroup.start();
peerGroup.addPeer(p1);
// Create a couple of peers.
FakeChannel p1 = connectPeer(1);
// Set up a little block chain.
Block b1 = TestUtils.createFakeBlock(params, blockStore).block;
Block b2 = TestUtils.makeSolvedTestBlock(params, b1);
Block b3 = TestUtils.makeSolvedTestBlock(params, b2);
n1.setVersionMessageForHeight(params, 3);
n2.setVersionMessageForHeight(params, 3);
// Expect a zero hash getblocks on p1. This is how the process starts.
peerGroup.startBlockChainDownload(new AbstractPeerEventListener() {
});
GetBlocksMessage getblocks = (GetBlocksMessage) n1.outbound();
GetBlocksMessage getblocks = (GetBlocksMessage) outbound(p1);
assertEquals(Sha256Hash.ZERO_HASH, getblocks.getStopHash());
// We give back an inv with some blocks in it.
InventoryMessage inv = new InventoryMessage(params);
inv.addBlock(b1);
inv.addBlock(b2);
inv.addBlock(b3);
assertTrue(n1.exchange(inv) instanceof GetDataMessage);
inbound(p1, inv);
assertTrue(outbound(p1) instanceof GetDataMessage);
// We hand back the first block.
n1.inbound(b1);
inbound(p1, b1);
// Now we successfully connect to another peer. There should be no messages sent.
peerGroup.addPeer(p2);
Message message = n2.outbound();
FakeChannel p2 = connectPeer(2);
Message message = (Message)outbound(p2);
assertNull(message == null ? "" : message.toString(), message);
peerGroup.stop();
}
@Test
public void transactionConfidence() throws Exception {
// Checks that we correctly count how many peers broadcast a transaction, so we can establish some measure of
// its trustworthyness assuming an untampered with internet connection. This is done via the MemoryPool class.
MockNetworkConnection n1 = createMockNetworkConnection();
Peer p1 = new Peer(params, blockChain, n1);
MockNetworkConnection n2 = createMockNetworkConnection();
Peer p2 = new Peer(params, blockChain, n2);
MockNetworkConnection n3 = createMockNetworkConnection();
Peer p3 = new Peer(params, blockChain, n3);
// its trustworthyness assuming an untampered with internet connection.
final Transaction[] event = new Transaction[2];
peerGroup.addEventListener(new AbstractPeerEventListener() {
@Override
@@ -225,38 +246,47 @@ public class PeerGroupTest extends TestWithNetworkConnections {
}
});
peerGroup.start();
peerGroup.addPeer(p1);
peerGroup.addPeer(p2);
peerGroup.addPeer(p3);
FakeChannel p1 = connectPeer(1);
FakeChannel p2 = connectPeer(2);
FakeChannel p3 = connectPeer(3);
Transaction tx = TestUtils.createFakeTx(params, Utils.toNanoCoins(20, 0), address);
InventoryMessage inv = new InventoryMessage(params);
inv.addTransaction(tx);
// Peer 2 advertises the tx and requests a download of it, because it came first.
assertTrue(n2.exchange(inv) instanceof GetDataMessage);
// Peer 2 advertises the tx but does not download it.
inbound(p2, inv);
assertTrue(outbound(p2) instanceof GetDataMessage);
assertEquals(0, tx.getConfidence().numBroadcastPeers());
assertTrue(peerGroup.getMemoryPool().maybeWasSeen(tx.getHash()));
assertEquals(null, event[0]);
// Peer 1 advertises the tx, we don't do anything as it's already been requested.
assertNull(n1.exchange(inv));
assertNull(n2.exchange(tx));
assertNull(event[0]);
// Peer 1 advertises the tx, we don't do anything as it's already been requested.
inbound(p1, inv);
assertNull(outbound(p1));
inbound(p2, tx);
assertNull(outbound(p2));
tx = event[0]; // We want to use the canonical copy delivered by the PeerGroup from now on.
assertNotNull(tx);
event[0] = null;
// Peer 1 (the download peer) advertises the tx, we download it.
inbound(p1, inv); // returns getdata
inbound(p1, tx); // returns nothing after a queue drain.
// Two peers saw this tx hash.
assertEquals(2, tx.getConfidence().numBroadcastPeers());
assertTrue(tx.getConfidence().getBroadcastBy().contains(n1.getPeerAddress()));
assertTrue(tx.getConfidence().getBroadcastBy().contains(n2.getPeerAddress()));
assertTrue(tx.getConfidence().getBroadcastBy().contains(peerOf(p1).getAddress()));
assertTrue(tx.getConfidence().getBroadcastBy().contains(peerOf(p2).getAddress()));
tx.getConfidence().addEventListener(new TransactionConfidence.Listener() {
public void onConfidenceChanged(Transaction tx) {
event[1] = tx;
}
});
// A straggler reports in.
n3.exchange(inv);
inbound(p3, inv);
assertEquals(tx, event[1]);
assertEquals(3, tx.getConfidence().numBroadcastPeers());
assertTrue(tx.getConfidence().getBroadcastBy().contains(n3.getPeerAddress()));
assertTrue(tx.getConfidence().getBroadcastBy().contains(peerOf(p3).getAddress()));
}
@Test
@@ -264,54 +294,68 @@ public class PeerGroupTest extends TestWithNetworkConnections {
// Make sure we can create spends, and that they are announced. Then do the same with offline mode.
// Set up connections and block chain.
MockNetworkConnection n1 = createMockNetworkConnection();
Peer p1 = new Peer(params, blockChain, n1);
MockNetworkConnection n2 = createMockNetworkConnection();
Peer p2 = new Peer(params, blockChain, n2);
peerGroup.start();
peerGroup.addPeer(p1);
peerGroup.addPeer(p2);
FakeChannel p1 = connectPeer(1, new VersionMessage(params, 2));
FakeChannel p2 = connectPeer(2);
PeerGroupThread peerGroupThread = control.createMock(PeerGroupThread.class);
peerGroup.mockStart(peerGroupThread);
peerGroupThread.addTask((FutureTask<Transaction>) anyObject());
EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
@SuppressWarnings("unchecked")
public Void answer() throws Throwable {
((FutureTask<Transaction>)EasyMock.getCurrentArguments()[0]).run();
return null;
}
});
peerGroupThread.interrupt();
EasyMock.expectLastCall();
control.replay();
// Send ourselves a bit of money.
Block b1 = TestUtils.makeSolvedTestBlock(params, blockStore, address);
n1.setVersionMessageForHeight(params, 2);
n1.exchange(b1);
inbound(p1, b1);
assertNull(outbound(p1));
assertEquals(Utils.toNanoCoins(50, 0), wallet.getBalance());
// Now create a spend, and expect the announcement.
Address dest = new ECKey().toAddress(params);
assertNotNull(wallet.sendCoins(peerGroup, dest, Utils.toNanoCoins(1, 0)));
Transaction t1 = (Transaction) n1.outbound();
Transaction t1 = (Transaction) outbound(p1);
assertNotNull(t1);
// 49 BTC in change.
assertEquals(Utils.toNanoCoins(49, 0), t1.getValueSentToMe(wallet));
Transaction t2 = (Transaction) n2.outbound();
Transaction t2 = (Transaction) outbound(p2);
assertEquals(t1, t2);
Block b2 = TestUtils.createFakeBlock(params, blockStore, t1).block;
n1.exchange(b2);
inbound(p1, b2);
assertNull(outbound(p1));
// Do the same thing with an offline transaction.
peerGroup.removeWallet(wallet);
Transaction t3 = wallet.sendCoinsOffline(dest, Utils.toNanoCoins(2, 0));
assertNull(n1.outbound()); // Nothing sent.
assertNull(outbound(p1)); // Nothing sent.
// Add the wallet to the peer group (simulate initialization). Transactions should be announced.
peerGroup.addWallet(wallet);
// Transaction announced on the peers.
InventoryMessage inv1 = (InventoryMessage) n1.outbound();
InventoryMessage inv2 = (InventoryMessage) n2.outbound();
InventoryMessage inv1 = (InventoryMessage) outbound(p1);
InventoryMessage inv2 = (InventoryMessage) outbound(p2);
assertEquals(t3.getHash(), inv1.getItems().get(0).hash);
assertEquals(t3.getHash(), inv2.getItems().get(0).hash);
// Peers ask for the transaction, and get it.
GetDataMessage getdata = new GetDataMessage(params);
getdata.addItem(inv1.getItems().get(0));
Transaction t4 = (Transaction) n1.exchange(getdata);
inbound(p1, getdata);
Transaction t4 = (Transaction) outbound(p1);
assertEquals(t3, t4);
assertEquals(t3, n2.exchange(getdata));
MockNetworkConnection n3 = createMockNetworkConnection();
Peer p3 = new Peer(params, blockChain, n3);
peerGroup.addPeer(p3);
assertTrue(n3.outbound() instanceof InventoryMessage);
inbound(p2, getdata);
assertEquals(t3, outbound(p2));
FakeChannel p3 = connectPeer(3);
assertTrue(outbound(p3) instanceof InventoryMessage);
peerGroup.stop();
control.verify();
}
@Test
@@ -333,11 +377,6 @@ public class PeerGroupTest extends TestWithNetworkConnections {
assertEquals(peerGroup.getFastCatchupTimeSecs(), time - 100000);
}
private void disconnectAndWait(MockNetworkConnection conn) throws IOException, InterruptedException {
conn.disconnect();
disconnectedPeers.take();
}
@Test
public void testSetMaximumConnections() {
peerGroup.setMaxConnections(1);

View File

@@ -16,8 +16,8 @@
package com.google.bitcoin.core;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import java.io.IOException;
import java.math.BigInteger;
@@ -26,25 +26,49 @@ import java.util.List;
import java.util.concurrent.Future;
import static com.google.bitcoin.core.TestUtils.*;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import org.easymock.Capture;
import org.jboss.netty.channel.*;
import org.junit.Before;
import org.junit.Test;
import com.google.bitcoin.core.Peer.PeerHandler;
public class PeerTest extends TestWithNetworkConnections {
private Peer peer;
private MockNetworkConnection conn;
private Capture<DownstreamMessageEvent> event;
private PeerHandler handler;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
conn = createMockNetworkConnection();
peer = new Peer(unitTestParams, blockChain, conn);
VersionMessage ver = new VersionMessage(unitTestParams, 100);
peer = new Peer(unitTestParams, blockChain, ver);
peer.addWallet(wallet);
handler = peer.getHandler();
event = new Capture<DownstreamMessageEvent>();
pipeline.sendDownstream(capture(event));
expectLastCall().anyTimes();
}
private void connect() throws Exception {
connect(handler, channel, ctx);
}
private void connect(PeerHandler handler, Channel channel, ChannelHandlerContext ctx) throws Exception {
handler.connectRequested(ctx, new UpstreamChannelStateEvent(channel, ChannelState.CONNECTED, socketAddress));
VersionMessage peerVersion = new VersionMessage(unitTestParams, 110);
DownstreamMessageEvent versionEvent =
new DownstreamMessageEvent(channel, Channels.future(channel), peerVersion, null);
handler.messageReceived(ctx, versionEvent);
}
@Test
public void testAddEventListener() {
public void testAddEventListener() throws Exception {
control.replay();
connect();
PeerEventListener listener = new AbstractPeerEventListener();
peer.addEventListener(listener);
assertTrue(peer.removeEventListener(listener));
@@ -54,31 +78,28 @@ public class PeerTest extends TestWithNetworkConnections {
// Check that the connection is shut down if there's a read error and that the exception is propagated.
@Test
public void testRun_exception() throws Exception {
conn.exceptionOnRead(new IOException("done"));
try {
peer.run();
fail("did not throw");
} catch (PeerException e) {
assertTrue(e.getCause() instanceof IOException);
}
expect(channel.close()).andReturn(null);
control.replay();
handler.exceptionCaught(ctx,
new DefaultExceptionEvent(channel, new IOException("proto")));
control.verify();
}
@Test
public void testRun_protocolException() throws Exception {
conn.exceptionOnRead(new ProtocolException("proto"));
try {
peer.run();
fail("did not throw");
} catch (PeerException e) {
// expected
assertTrue(e.toString(), e.getCause() instanceof ProtocolException);
}
expect(channel.close()).andReturn(null);
replay(channel);
handler.exceptionCaught(ctx,
new DefaultExceptionEvent(channel, new ProtocolException("proto")));
verify(channel);
}
// Check that it runs through the event loop and shut down correctly
@Test
public void shutdown() throws Exception {
runPeer(peer, conn);
closePeer(peer);
}
@Test
@@ -90,10 +111,13 @@ public class PeerTest extends TestWithNetworkConnections {
Block b3 = makeSolvedTestBlock(unitTestParams, b2);
Block b4 = makeSolvedTestBlock(unitTestParams, b3);
Block b5 = makeSolvedTestBlock(unitTestParams, b4);
conn.setVersionMessageForHeight(unitTestParams, 6);
control.replay();
connect();
peer.startBlockChainDownload();
runPeerAsync(peer, conn);
GetBlocksMessage getblocks = (GetBlocksMessage) conn.outbound();
GetBlocksMessage getblocks = (GetBlocksMessage)outbound();
assertEquals(blockStore.getChainHead().getHeader().getHash(), getblocks.getLocator().get(0));
assertEquals(Sha256Hash.ZERO_HASH, getblocks.getStopHash());
// Remote peer sends us an inv with some blocks.
@@ -101,23 +125,27 @@ public class PeerTest extends TestWithNetworkConnections {
inv.addBlock(b2);
inv.addBlock(b3);
// We do a getdata on them.
GetDataMessage getdata = (GetDataMessage) conn.exchange(inv);
inbound(peer, inv);
GetDataMessage getdata = (GetDataMessage)outbound();
assertEquals(b2.getHash(), getdata.getItems().get(0).hash);
assertEquals(b3.getHash(), getdata.getItems().get(1).hash);
assertEquals(2, getdata.getItems().size());
// Remote peer sends us the blocks. The act of doing a getdata for b3 results in getting an inv with just the
// best chain head in it.
conn.inbound(b2);
conn.inbound(b3);
inbound(peer, b2);
inbound(peer, b3);
inv = new InventoryMessage(unitTestParams);
inv.addBlock(b5);
// We request the head block.
getdata = (GetDataMessage) conn.exchange(inv);
inbound(peer, inv);
getdata = (GetDataMessage)outbound();
assertEquals(b5.getHash(), getdata.getItems().get(0).hash);
assertEquals(1, getdata.getItems().size());
// Peer sends us the head block. The act of receiving the orphan block triggers a getblocks to fill in the
// rest of the chain.
getblocks = (GetBlocksMessage) conn.exchange(b5);
inbound(peer, b5);
getblocks = (GetBlocksMessage)outbound();
assertEquals(b5.getHash(), getblocks.getStopHash());
assertEquals(b3.getHash(), getblocks.getLocator().get(0));
// At this point another block is solved and broadcast. The inv triggers a getdata but we do NOT send another
@@ -128,43 +156,54 @@ public class PeerTest extends TestWithNetworkConnections {
Block b6 = makeSolvedTestBlock(unitTestParams, b5);
inv = new InventoryMessage(unitTestParams);
inv.addBlock(b6);
getdata = (GetDataMessage) conn.exchange(inv);
inbound(peer, inv);
getdata = (GetDataMessage)outbound();
assertEquals(1, getdata.getItems().size());
assertEquals(b6.getHash(), getdata.getItems().get(0).hash);
assertNull(conn.exchange(b6)); // Nothing is sent at this point.
inbound(peer, b6);
assertFalse(event.hasCaptured()); // Nothing is sent at this point.
// We're still waiting for the response to the getblocks (b3,b5) sent above.
inv = new InventoryMessage(unitTestParams);
inv.addBlock(b4);
inv.addBlock(b5);
getdata = (GetDataMessage) conn.exchange(inv);
inbound(peer, inv);
getdata = (GetDataMessage)outbound();
assertEquals(1, getdata.getItems().size());
assertEquals(b4.getHash(), getdata.getItems().get(0).hash);
// We already have b5 from before, so it's not requested again.
assertNull(conn.exchange(b4));
inbound(peer, b4);
assertFalse(event.hasCaptured());
// b5 and b6 are now connected by the block chain and we're done.
closePeer(peer);
control.verify();
}
// Check that an inventory tickle is processed correctly when downloading missing blocks is active.
@Test
public void invTickle() throws Exception {
control.replay();
connect();
Block b1 = createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
// Make a missing block.
Block b2 = makeSolvedTestBlock(unitTestParams, b1);
Block b3 = makeSolvedTestBlock(unitTestParams, b2);
conn.inbound(b3);
inbound(peer, b3);
InventoryMessage inv = new InventoryMessage(unitTestParams);
InventoryItem item = new InventoryItem(InventoryItem.Type.Block, b3.getHash());
inv.addItem(item);
conn.inbound(inv);
runPeer(peer, conn);
GetBlocksMessage getblocks = (GetBlocksMessage) conn.popOutbound();
inbound(peer, inv);
GetBlocksMessage getblocks = (GetBlocksMessage)outbound();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();
expectedLocator.add(b1.getHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
assertEquals(getblocks.getLocator(), expectedLocator);
assertEquals(getblocks.getStopHash(), b3.getHash());
control.verify();
}
// Check that an inv to a peer that is not set to download missing blocks does nothing.
@@ -172,6 +211,10 @@ public class PeerTest extends TestWithNetworkConnections {
public void invNoDownload() throws Exception {
// Don't download missing blocks.
peer.setDownloadData(false);
control.replay();
connect();
// Make a missing block that we receive.
Block b1 = createFakeBlock(unitTestParams, blockStore).block;
@@ -182,15 +225,18 @@ public class PeerTest extends TestWithNetworkConnections {
InventoryMessage inv = new InventoryMessage(unitTestParams);
InventoryItem item = new InventoryItem(InventoryItem.Type.Block, b2.getHash());
inv.addItem(item);
conn.inbound(inv);
inbound(peer, inv);
// Peer does nothing with it.
runPeer(peer, conn);
Message message = conn.popOutbound();
assertNull(message != null ? message.toString() : "", message);
control.verify();
}
@Test
public void invDownloadTx() throws Exception {
control.replay();
connect();
peer.setDownloadData(true);
// Make a transaction and tell the peer we have it.
BigInteger value = Utils.toNanoCoins(1, 0);
@@ -198,30 +244,37 @@ public class PeerTest extends TestWithNetworkConnections {
InventoryMessage inv = new InventoryMessage(unitTestParams);
InventoryItem item = new InventoryItem(InventoryItem.Type.Transaction, tx.getHash());
inv.addItem(item);
conn.inbound(inv);
inbound(peer, inv);
// Peer hasn't seen it before, so will ask for it.
runPeer(peer, conn);
conn.popInbound(); // Pop the disconnect marker so we can reuse
GetDataMessage message = (GetDataMessage) conn.popOutbound();
GetDataMessage message = (GetDataMessage) event.getValue().getMessage();
assertEquals(1, message.getItems().size());
assertEquals(tx.getHash(), message.getItems().get(0).hash);
conn.inbound(tx);
runPeer(peer, conn);
inbound(peer, tx);
assertEquals(value, wallet.getBalance(Wallet.BalanceType.ESTIMATED));
}
@Test
public void invDownloadTxMultiPeer() throws Exception {
ChannelHandlerContext ctx2 = createChannelHandlerContext();
Channel channel2 = createChannel();
createPipeline(channel2);
control.replay();
// Check co-ordination of which peer to download via the memory pool.
MemoryPool pool = new MemoryPool();
peer.setMemoryPool(pool);
MockNetworkConnection conn2 = createMockNetworkConnection();
Peer peer2 = new Peer(unitTestParams, blockChain, conn2);
VersionMessage ver = new VersionMessage(unitTestParams, 100);
Peer peer2 = new Peer(unitTestParams, blockChain, ver);
peer2.addWallet(wallet);
peer2.setMemoryPool(pool);
connect();
connect(peer2.getHandler(), channel2, ctx2);
// Make a tx and advertise it to one of the peers.
BigInteger value = Utils.toNanoCoins(1, 0);
Transaction tx = createFakeTx(unitTestParams, value, address);
@@ -229,50 +282,48 @@ public class PeerTest extends TestWithNetworkConnections {
InventoryItem item = new InventoryItem(InventoryItem.Type.Transaction, tx.getHash());
inv.addItem(item);
conn.inbound(inv);
runPeer(peer, conn);
conn.popInbound(); // Remove the disconnect marker.
inbound(peer, inv);
// We got a getdata message.
GetDataMessage message = (GetDataMessage) conn.popOutbound();
GetDataMessage message = (GetDataMessage)outbound();
assertEquals(1, message.getItems().size());
assertEquals(tx.getHash(), message.getItems().get(0).hash);
assertTrue(pool.maybeWasSeen(tx.getHash()));
// Advertising to peer2 results in no getdata message.
conn2.inbound(inv);
runPeer(peer2, conn2);
assertNull(conn.popOutbound());
assertFalse(event.hasCaptured());
}
// Check that inventory message containing blocks we want is processed correctly.
@Test
public void newBlock() throws Exception {
PeerEventListener listener = control.createMock(PeerEventListener.class);
peer.addEventListener(listener);
Block b1 = createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block b2 = makeSolvedTestBlock(unitTestParams, b1);
conn.setVersionMessageForHeight(unitTestParams, 100);
// Receive notification of a new block.
InventoryMessage inv = new InventoryMessage(unitTestParams);
InventoryItem item = new InventoryItem(InventoryItem.Type.Block, b2.getHash());
inv.addItem(item);
conn.inbound(inv);
// Response to the getdata message.
conn.inbound(b2);
expect(listener.onPreMessageReceived(eq(peer), eq(inv))).andReturn(inv);
expect(listener.onPreMessageReceived(eq(peer), eq(b2))).andReturn(b2);
listener.onBlocksDownloaded(eq(peer), anyObject(Block.class), eq(98));
listener.onBlocksDownloaded(eq(peer), anyObject(Block.class), eq(108));
expectLastCall();
control.replay();
runPeer(peer, conn);
connect();
peer.addEventListener(listener);
inbound(peer, inv);
// Response to the getdata message.
inbound(peer, b2);
control.verify();
GetDataMessage getdata = (GetDataMessage) conn.popOutbound();
GetDataMessage getdata = (GetDataMessage) event.getValue().getMessage();
List<InventoryItem> items = getdata.getItems();
assertEquals(1, items.size());
assertEquals(b2.getHash(), items.get(0).hash);
@@ -283,20 +334,21 @@ public class PeerTest extends TestWithNetworkConnections {
@Test
public void startBlockChainDownload() throws Exception {
PeerEventListener listener = control.createMock(PeerEventListener.class);
peer.addEventListener(listener);
Block b1 = createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block b2 = makeSolvedTestBlock(unitTestParams, b1);
blockChain.add(b2);
conn.setVersionMessageForHeight(unitTestParams, 100);
listener.onChainDownloadStarted(peer, 98);
listener.onChainDownloadStarted(peer, 108);
expectLastCall();
control.replay();
connect();
peer.addEventListener(listener);
peer.startBlockChainDownload();
runPeer(peer, conn);
control.verify();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();
@@ -304,35 +356,41 @@ public class PeerTest extends TestWithNetworkConnections {
expectedLocator.add(b1.getHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
GetBlocksMessage message = (GetBlocksMessage) conn.popOutbound();
GetBlocksMessage message = (GetBlocksMessage) event.getValue().getMessage();
assertEquals(message.getLocator(), expectedLocator);
assertEquals(message.getStopHash(), Sha256Hash.ZERO_HASH);
}
@Test
public void getBlock() throws Exception {
control.replay();
connect();
Block b1 = createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block b2 = makeSolvedTestBlock(unitTestParams, b1);
Block b3 = makeSolvedTestBlock(unitTestParams, b2);
conn.setVersionMessageForHeight(unitTestParams, 100);
runPeerAsync(peer, conn);
// Request the block.
Future<Block> resultFuture = peer.getBlock(b3.getHash());
assertFalse(resultFuture.isDone());
// Peer asks for it.
GetDataMessage message = (GetDataMessage) conn.outbound();
GetDataMessage message = (GetDataMessage) event.getValue().getMessage();
assertEquals(message.getItems().get(0).hash, b3.getHash());
assertFalse(resultFuture.isDone());
// Peer receives it.
conn.inbound(b3);
inbound(peer, b3);
Block b = resultFuture.get();
assertEquals(b, b3);
conn.disconnect();
}
@Test
public void fastCatchup() throws Exception {
control.replay();
connect();
// Check that blocks before the fast catchup point are retrieved using getheaders, and after using getblocks.
// This test is INCOMPLETE because it does not check we handle >2000 blocks correctly.
Block b1 = createFakeBlock(unitTestParams, blockStore).block;
@@ -343,12 +401,11 @@ public class PeerTest extends TestWithNetworkConnections {
Block b3 = makeSolvedTestBlock(unitTestParams, b2);
Utils.rollMockClock(60 * 10);
Block b4 = makeSolvedTestBlock(unitTestParams, b3);
conn.setVersionMessageForHeight(unitTestParams, 4);
// Request headers until the last 2 blocks.
peer.setFastCatchupTime((Utils.now().getTime() / 1000) - (600*2) + 1);
runPeerAsync(peer, conn);
peer.startBlockChainDownload();
GetHeadersMessage getheaders = (GetHeadersMessage) conn.outbound();
GetHeadersMessage getheaders = (GetHeadersMessage) event.getValue().getMessage();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();
expectedLocator.add(b1.getHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
@@ -362,15 +419,23 @@ public class PeerTest extends TestWithNetworkConnections {
expectedLocator.add(b2.getHash());
expectedLocator.add(b1.getHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
GetBlocksMessage getblocks = (GetBlocksMessage) conn.exchange(headers);
inbound(peer, headers);
GetBlocksMessage getblocks = (GetBlocksMessage) event.getValue().getMessage();
assertEquals(expectedLocator, getblocks.getLocator());
assertEquals(b3.getHash(), getblocks.getStopHash());
// We're supposed to get an inv here.
InventoryMessage inv = new InventoryMessage(unitTestParams);
inv.addItem(new InventoryItem(InventoryItem.Type.Block, b3.getHash()));
GetDataMessage getdata = (GetDataMessage) conn.exchange(inv);
inbound(peer, inv);
GetDataMessage getdata = (GetDataMessage) event.getValue().getMessage();
assertEquals(b3.getHash(), getdata.getItems().get(0).hash);
// All done.
assertEquals(null, conn.exchange(b3));
inbound(peer, b3);
}
private Message outbound() {
Message message = (Message)event.getValue().getMessage();
event.reset();
return message;
}
}

View File

@@ -16,15 +16,19 @@
package com.google.bitcoin.core;
import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.BriefLogFormatter;
import org.easymock.IMocksControl;
import static org.easymock.EasyMock.*;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import static org.easymock.EasyMock.createStrictControl;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.jboss.netty.channel.*;
import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.BriefLogFormatter;
/**
* Utility class that makes it easy to work with mock NetworkConnections.
@@ -38,12 +42,16 @@ public class TestWithNetworkConnections {
protected ECKey key;
protected Address address;
private static int fakePort;
protected ChannelHandlerContext ctx;
protected Channel channel;
protected SocketAddress socketAddress;
protected ChannelPipeline pipeline;
public void setUp() throws Exception {
BriefLogFormatter.init();
control = createStrictControl();
control.checkOrder(true);
control.checkOrder(false);
unitTestParams = NetworkParameters.unitTests();
blockStore = new MemoryBlockStore(unitTestParams);
@@ -52,6 +60,33 @@ public class TestWithNetworkConnections {
address = key.toAddress(unitTestParams);
wallet.addKey(key);
blockChain = new BlockChain(unitTestParams, wallet, blockStore);
socketAddress = new InetSocketAddress("127.0.0.1", 1111);
ctx = createChannelHandlerContext();
channel = createChannel();
pipeline = createPipeline(channel);
}
protected ChannelPipeline createPipeline(Channel channel) {
ChannelPipeline pipeline = control.createMock(ChannelPipeline.class);
expect(channel.getPipeline()).andStubReturn(pipeline);
return pipeline;
}
protected Channel createChannel() {
Channel channel = control.createMock(Channel.class);
expect(channel.getRemoteAddress()).andStubReturn(socketAddress);
return channel;
}
protected ChannelHandlerContext createChannelHandlerContext() {
ChannelHandlerContext ctx1 = control.createMock(ChannelHandlerContext.class);
ctx1.sendDownstream(EasyMock.anyObject(ChannelEvent.class));
EasyMock.expectLastCall().anyTimes();
ctx1.sendUpstream(EasyMock.anyObject(ChannelEvent.class));
EasyMock.expectLastCall().anyTimes();
return ctx1;
}
protected MockNetworkConnection createMockNetworkConnection() {
@@ -64,26 +99,28 @@ public class TestWithNetworkConnections {
return conn;
}
protected void runPeer(Peer peer, MockNetworkConnection connection) throws IOException, PeerException {
connection.disconnect();
try {
peer.run();
} catch (PeerException e) {
if (!e.getCause().getMessage().equals("done"))
throw e;
}
protected void closePeer(Peer peer) throws Exception {
peer.getHandler().channelClosed(ctx,
new UpstreamChannelStateEvent(channel, ChannelState.CONNECTED, null));
}
protected void inbound(Peer peer, Message message) throws Exception {
peer.getHandler().messageReceived(ctx,
new UpstreamMessageEvent(channel, message, socketAddress));
}
protected void runPeerAsync(final Peer peer, MockNetworkConnection connection) {
new Thread("Test Peer Thread") {
@Override
public void run() {
try {
peer.run();
} catch (PeerException e) {
if (!e.getCause().getMessage().equals("done")) throw new RuntimeException(e);
}
}
}.start();
protected void inbound(FakeChannel peerChannel, Message message) {
Channels.fireMessageReceived(peerChannel, message);
}
protected Object outbound(FakeChannel p1) {
MessageEvent nextEvent = (MessageEvent)p1.nextEvent();
if (nextEvent == null)
return null;
return nextEvent.getMessage();
}
protected Peer peerOf(Channel ch) {
return PeerGroup.peerFromChannel(ch);
}
}

View File

@@ -21,35 +21,44 @@ import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.MemoryBlockStore;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import org.jboss.netty.channel.ChannelFuture;
/**
* Downloads the block given a block hash from the localhost node and prints it out.
*/
public class FetchBlock {
public static void main(String[] args) throws Exception {
System.out.println("Connecting to node");
final NetworkParameters params = NetworkParameters.prodNet();
final NetworkParameters params = NetworkParameters.testNet();
BlockStore blockStore = new MemoryBlockStore(params);
BlockChain chain = new BlockChain(params, blockStore);
final Peer peer = new Peer(params, new PeerAddress(InetAddress.getLocalHost()), chain);
peer.connect();
new Thread(new Runnable() {
public void run() {
try {
peer.run();
} catch (PeerException e) {
throw new RuntimeException(e);
}
PeerGroup peerGroup = new PeerGroup(params, chain);
peerGroup.start();
final CountDownLatch latch = new CountDownLatch(1);
peerGroup.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
latch.countDown();
}
}).start();
});
ChannelFuture channelFuture =
peerGroup.connectTo(new InetSocketAddress(InetAddress.getLocalHost(), params.port));
latch.await();
Peer peer = PeerGroup.peerFromChannelFuture(channelFuture);
Sha256Hash blockHash = new Sha256Hash(args[0]);
Future<Block> future = peer.getBlock(blockHash);
System.out.println("Waiting for node to send us the requested block: " + blockHash);
Block block = future.get();
System.out.println(block);
peer.disconnect();
peerGroup.stop();
}
}

View File

@@ -16,21 +16,26 @@
package com.google.bitcoin.examples;
import com.google.bitcoin.core.NetworkConnection;
import com.google.bitcoin.core.NetworkParameters;
import com.google.bitcoin.core.PeerAddress;
import com.google.bitcoin.core.TCPNetworkConnection;
import com.google.bitcoin.discovery.DnsDiscovery;
import com.google.bitcoin.discovery.IrcDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import com.google.bitcoin.core.Message;
import com.google.bitcoin.core.NetworkParameters;
import com.google.bitcoin.core.TCPNetworkConnection;
import com.google.bitcoin.core.VersionMessage;
import com.google.bitcoin.discovery.DnsDiscovery;
import com.google.bitcoin.discovery.IrcDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
/**
* Prints a list of IP addresses connected to the rendezvous point on the LFnet IRC channel.
*/
@@ -51,7 +56,7 @@ public class PrintPeers {
private static void printIRC() throws PeerDiscoveryException {
long start = System.currentTimeMillis();
IrcDiscovery d = new IrcDiscovery("#bitcoin") {
IrcDiscovery d = new IrcDiscovery("#bitcoinTEST") {
@Override
protected void onIRCReceive(String message) {
System.out.println("<- " + message);
@@ -88,26 +93,50 @@ public class PrintPeers {
for (InetSocketAddress peer : ircPeers) addrs.add(peer.getAddress());
System.out.println("Scanning " + addrs.size() + " peers:");
final NetworkParameters params = NetworkParameters.testNet();
final Object lock = new Object();
final long[] bestHeight = new long[1];
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
for (final InetAddress addr : addrs) {
pool.submit(new Runnable() {
public void run() {
try {
NetworkConnection conn =
new TCPNetworkConnection(NetworkParameters.prodNet(), 0);
conn.connect(new PeerAddress(addr), 1000);
synchronized (lock) {
long nodeHeight = conn.getVersionMessage().bestHeight;
long diff = bestHeight[0] - nodeHeight;
if (diff > 0) {
System.out.println("Node is behind by " + diff + " blocks: " + addr.toString());
} else {
bestHeight[0] = nodeHeight;
}
}
conn.shutdown();
ChannelPipeline pipeline = Channels.pipeline();
final CountDownLatch latch = new CountDownLatch(1);
TCPNetworkConnection conn =
new TCPNetworkConnection(params, new VersionMessage(params, 0));
pipeline.addLast("codec", conn);
pipeline.addLast("peer", new SimpleChannelHandler() {
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Message m = (Message)e.getMessage();
if (m instanceof VersionMessage) {
VersionMessage ver = (VersionMessage)m;
long nodeHeight = ver.bestHeight;
synchronized (lock) {
long diff = bestHeight[0] - nodeHeight;
if (diff > 0) {
System.out.println("Node is behind by " + diff + " blocks: " + addr.toString());
} else {
bestHeight[0] = nodeHeight;
}
}
e.getChannel().close();
latch.countDown();
}
};
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.err.println(addr.toString() + " : " + e.getCause().getMessage());
latch.countDown();
};
});
bootstrap.setPipeline(pipeline);
bootstrap.connect(new InetSocketAddress(addr, params.port));
latch.await();
} catch (Exception e) {
}
}

View File

@@ -195,6 +195,13 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.3.1.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.madgag</groupId>
<artifactId>sc-light-jdk15on</artifactId>