diff --git a/core/pom.xml b/core/pom.xml
index d3959f13..ed62bf34 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -192,12 +192,6 @@
true
-
- io.netty
- netty
- 3.6.3.Final
-
-
com.madgagsc-light-jdk15on
diff --git a/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java b/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java
index 057e5d43..54009048 100644
--- a/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java
+++ b/core/src/main/java/com/google/bitcoin/core/BitcoinSerializer.java
@@ -21,9 +21,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -129,9 +130,9 @@ public class BitcoinSerializer {
}
/**
- * Reads a message from the given InputStream and returns it.
+ * Reads a message from the given ByteBuffer and returns it.
*/
- public Message deserialize(InputStream in) throws ProtocolException, IOException {
+ public Message deserialize(ByteBuffer in) throws ProtocolException, IOException {
// A Bitcoin protocol message has the following format.
//
// - 4 byte magic number: 0xfabfb5da for the testnet or
@@ -156,7 +157,7 @@ public class BitcoinSerializer {
* Deserializes only the header in case packet meta data is needed before decoding
* the payload. This method assumes you have already called seekPastMagicBytes()
*/
- public BitcoinPacketHeader deserializeHeader(InputStream in) throws ProtocolException, IOException {
+ public BitcoinPacketHeader deserializeHeader(ByteBuffer in) throws ProtocolException, IOException {
return new BitcoinPacketHeader(in);
}
@@ -164,16 +165,9 @@ public class BitcoinSerializer {
* Deserialize payload only. You must provide a header, typically obtained by calling
* {@link BitcoinSerializer#deserializeHeader}.
*/
- public Message deserializePayload(BitcoinPacketHeader header, InputStream in) throws ProtocolException, IOException {
- int readCursor = 0;
+ public Message deserializePayload(BitcoinPacketHeader header, ByteBuffer in) throws ProtocolException, BufferUnderflowException {
byte[] payloadBytes = new byte[header.size];
- while (readCursor < payloadBytes.length - 1) {
- int bytesRead = in.read(payloadBytes, readCursor, header.size - readCursor);
- if (bytesRead == -1) {
- throw new IOException("Socket is disconnected");
- }
- readCursor += bytesRead;
- }
+ in.get(payloadBytes, 0, header.size);
// Verify the checksum.
byte[] hash;
@@ -246,17 +240,13 @@ public class BitcoinSerializer {
return message;
}
- public void seekPastMagicBytes(InputStream in) throws IOException {
+ public void seekPastMagicBytes(ByteBuffer in) throws BufferUnderflowException {
int magicCursor = 3; // Which byte of the magic we're looking for currently.
while (true) {
- int b = in.read(); // Read a byte.
- if (b == -1) {
- // There's no more data to read.
- throw new IOException("Socket is disconnected");
- }
+ byte b = in.get();
// We're looking for a run of bytes that is the same as the packet magic but we want to ignore partial
// magics that aren't complete. So we keep track of where we're up to with magicCursor.
- int expectedByte = 0xFF & (int) (params.getPacketMagic() >>> (magicCursor * 8));
+ byte expectedByte = (byte)(0xFF & params.getPacketMagic() >>> (magicCursor * 8));
if (b == expectedByte) {
magicCursor--;
if (magicCursor < 0) {
@@ -287,22 +277,17 @@ public class BitcoinSerializer {
public static class BitcoinPacketHeader {
+ /** The largest number of bytes that a header can represent */
+ public static final int HEADER_LENGTH = COMMAND_LEN + 4 + 4;
+
public final byte[] header;
public final String command;
public final int size;
public final byte[] checksum;
- public BitcoinPacketHeader(InputStream in) throws ProtocolException, IOException {
- header = new byte[COMMAND_LEN + 4 + 4];
- int readCursor = 0;
- while (readCursor < header.length) {
- int bytesRead = in.read(header, readCursor, header.length - readCursor);
- if (bytesRead == -1) {
- // There's no more data to read.
- throw new IOException("Incomplete packet in underlying stream");
- }
- readCursor += bytesRead;
- }
+ public BitcoinPacketHeader(ByteBuffer in) throws ProtocolException, BufferUnderflowException {
+ header = new byte[HEADER_LENGTH];
+ in.get(header, 0, header.length);
int cursor = 0;
diff --git a/core/src/main/java/com/google/bitcoin/core/NetworkConnection.java b/core/src/main/java/com/google/bitcoin/core/NetworkConnection.java
deleted file mode 100644
index d11ed95c..00000000
--- a/core/src/main/java/com/google/bitcoin/core/NetworkConnection.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2011 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.bitcoin.core;
-
-import java.io.IOException;
-
-/**
- *
A NetworkConnection handles talking to a remote Bitcoin peer at a low level. It understands how to read and write
- * messages, but doesn't asynchronously communicate with the peer or handle the higher level details
- * of the protocol. A NetworkConnection is typically stateless, so after constructing a NetworkConnection, give it to a
- * newly created {@link Peer} to handle messages to and from that specific peer.
- *
- *
If you just want to "get on the network" and don't care about the details, you want to use a {@link PeerGroup}
- * instead. A {@link PeerGroup} handles the process of setting up connections to multiple peers, running background threads
- * for them, and many other things.
- *
- *
NetworkConnection is an interface in order to support multiple low level protocols. You likely want a
- * {@link TCPNetworkConnection} as it's currently the only NetworkConnection implementation. In future there may be
- * others that support connections over Bluetooth, NFC, UNIX domain sockets and so on.
- */
-public interface NetworkConnection {
- /**
- * Sends a "ping" message to the remote node. The protocol doesn't presently use this feature much.
- *
- * @throws IOException
- */
- public void ping() throws IOException;
-
- /**
- * Writes the given message out over the network using the protocol tag. For a Transaction
- * this should be "tx" for example. It's safe to call this from multiple threads simultaneously,
- * the actual writing will be serialized.
- *
- * @throws IOException
- */
- public void writeMessage(Message message) throws IOException;
-
- /**
- * Returns the version message received from the other end of the connection during the handshake.
- */
- public VersionMessage getVersionMessage();
-
- /**
- * @return The address of the other side of the network connection.
- */
- public PeerAddress getPeerAddress();
-
- /**
- * Does whatever needed to clean up the given connection, if necessary.
- */
- public void close();
-}
diff --git a/core/src/main/java/com/google/bitcoin/core/NetworkParameters.java b/core/src/main/java/com/google/bitcoin/core/NetworkParameters.java
index 4a1579c0..8ef1e84a 100644
--- a/core/src/main/java/com/google/bitcoin/core/NetworkParameters.java
+++ b/core/src/main/java/com/google/bitcoin/core/NetworkParameters.java
@@ -16,10 +16,7 @@
package com.google.bitcoin.core;
-import com.google.bitcoin.params.MainNetParams;
-import com.google.bitcoin.params.TestNet2Params;
-import com.google.bitcoin.params.TestNet3Params;
-import com.google.bitcoin.params.UnitTestParams;
+import com.google.bitcoin.params.*;
import com.google.bitcoin.script.Script;
import com.google.bitcoin.script.ScriptOpCodes;
import com.google.common.base.Objects;
@@ -162,6 +159,12 @@ public abstract class NetworkParameters implements Serializable {
return UnitTestParams.get();
}
+ /** Returns a standard regression test params (similar to unitTests) */
+ @Deprecated
+ public static NetworkParameters regTests() {
+ return RegTestParams.get();
+ }
+
/**
* A Java package style string acting as unique ID for these parameters
*/
diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java
index 6fbe0208..2f7b62ec 100644
--- a/core/src/main/java/com/google/bitcoin/core/Peer.java
+++ b/core/src/main/java/com/google/bitcoin/core/Peer.java
@@ -28,13 +28,10 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import net.jcip.annotations.GuardedBy;
-import org.jboss.netty.channel.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.IOException;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -47,27 +44,37 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
/**
- * A Peer handles the high level communication with a Bitcoin node.
+ *
A Peer handles the high level communication with a Bitcoin node, extending a {@link PeerSocketHandler} which
+ * handles low-level message (de)serialization.
*
- *
{@link Peer#getHandler()} is part of a Netty Pipeline with a Bitcoin serializer downstream of it.
+ *
Note that timeouts are handled by the extended
+ * {@link com.google.bitcoin.networkabstraction.AbstractTimeoutHandler} and timeout is automatically disabled (using
+ * {@link com.google.bitcoin.networkabstraction.AbstractTimeoutHandler#setTimeoutEnabled(boolean)}) once the version
+ * handshake completes.
*/
-public class Peer {
- interface PeerLifecycleListener {
- /** Called when the peer is connected */
- public void onPeerConnected(Peer peer);
- /** Called when the peer is disconnected */
- public void onPeerDisconnected(Peer peer);
- }
-
+public class Peer extends PeerSocketHandler {
private static final Logger log = LoggerFactory.getLogger(Peer.class);
protected final ReentrantLock lock = Threading.lock("peer");
private final NetworkParameters params;
private final AbstractBlockChain blockChain;
- private volatile PeerAddress vAddress;
- private final CopyOnWriteArrayList> eventListeners;
- private final CopyOnWriteArrayList lifecycleListeners;
+
+ // onPeerDisconnected should not be called directly by Peers when a PeerGroup is involved (we don't know the total
+ // number of connected peers), thus we use a wrapper that PeerGroup can use to register listeners that wont get
+ // onPeerDisconnected calls
+ static class PeerListenerRegistration extends ListenerRegistration {
+ boolean callOnDisconnect = true;
+ public PeerListenerRegistration(PeerEventListener listener, Executor executor) {
+ super(listener, executor);
+ }
+
+ public PeerListenerRegistration(PeerEventListener listener, Executor executor, boolean callOnDisconnect) {
+ this(listener, executor);
+ this.callOnDisconnect = callOnDisconnect;
+ }
+ }
+ private final CopyOnWriteArrayList eventListeners;
// Whether to try and download blocks and transactions from this peer. Set to false by PeerGroup if not the
// primary peer. This is to avoid redundant work and concurrency problems with downloading the same chain
// in parallel.
@@ -130,46 +137,74 @@ public class Peer {
private final CopyOnWriteArrayList pendingPings;
private static final int PING_MOVING_AVERAGE_WINDOW = 20;
- private volatile Channel vChannel;
private volatile VersionMessage vPeerVersionMessage;
private boolean isAcked;
- private final PeerHandler handler;
+
+ // A settable future which completes (with this) when the connection is open
+ private final SettableFuture connectionOpenFuture = SettableFuture.create();
/**
- * Construct a peer that reads/writes from the given block chain.
+ *
Construct a peer that reads/writes from the given block chain.
+ *
+ *
Note that this does NOT make a connection to the given remoteAddress, it only creates a handler for a
+ * connection. If you want to create a one-off connection, create a Peer and pass it to
+ * {@link com.google.bitcoin.networkabstraction.NioClientManager#openConnection(java.net.SocketAddress, com.google.bitcoin.networkabstraction.StreamParser)}
+ * or
+ * {@link com.google.bitcoin.networkabstraction.NioClient#NioClient(java.net.SocketAddress, com.google.bitcoin.networkabstraction.StreamParser, int)}.
+ *
+ *
The remoteAddress provided should match the remote address of the peer which is being connected to, and is
+ * used to keep track of which peers relayed transactions and offer more descriptive logging.
*/
- public Peer(NetworkParameters params, AbstractBlockChain chain, VersionMessage ver) {
- this(params, chain, ver, null);
+ public Peer(NetworkParameters params, VersionMessage ver, @Nullable AbstractBlockChain chain, InetSocketAddress remoteAddress) {
+ this(params, ver, remoteAddress, chain, null);
}
/**
- * Construct a peer that reads/writes from the given block chain and memory pool. Transactions stored
- * in a memory pool will have their confidence levels updated when a peer announces it, to reflect the greater
- * likelyhood that the transaction is valid.
+ *
Construct a peer that reads/writes from the given block chain and memory pool. Transactions stored in a memory
+ * pool will have their confidence levels updated when a peer announces it, to reflect the greater likelyhood that
+ * the transaction is valid.
+ *
+ *
Note that this does NOT make a connection to the given remoteAddress, it only creates a handler for a
+ * connection. If you want to create a one-off connection, create a Peer and pass it to
+ * {@link com.google.bitcoin.networkabstraction.NioClientManager#openConnection(java.net.SocketAddress, com.google.bitcoin.networkabstraction.StreamParser)}
+ * or
+ * {@link com.google.bitcoin.networkabstraction.NioClient#NioClient(java.net.SocketAddress, com.google.bitcoin.networkabstraction.StreamParser, int)}.
+ *
+ *
The remoteAddress provided should match the remote address of the peer which is being connected to, and is
+ * used to keep track of which peers relayed transactions and offer more descriptive logging.
*/
- public Peer(NetworkParameters params, @Nullable AbstractBlockChain chain, VersionMessage ver, @Nullable MemoryPool mempool) {
+ public Peer(NetworkParameters params, VersionMessage ver, InetSocketAddress remoteAddress,
+ @Nullable AbstractBlockChain chain, @Nullable MemoryPool mempool) {
+ super(params, remoteAddress);
this.params = Preconditions.checkNotNull(params);
this.versionMessage = Preconditions.checkNotNull(ver);
this.blockChain = chain; // Allowed to be null.
this.vDownloadData = chain != null;
this.getDataFutures = new CopyOnWriteArrayList();
- this.eventListeners = new CopyOnWriteArrayList>();
- this.lifecycleListeners = new CopyOnWriteArrayList();
+ this.eventListeners = new CopyOnWriteArrayList();
this.fastCatchupTimeSecs = params.getGenesisBlock().getTimeSeconds();
this.isAcked = false;
- this.handler = new PeerHandler();
this.pendingPings = new CopyOnWriteArrayList();
this.wallets = new CopyOnWriteArrayList();
this.memoryPool = mempool;
}
/**
- * Construct a peer that reads/writes from the given chain. Automatically creates a VersionMessage for you from the
- * given software name/version strings, which should be something like "MySimpleTool", "1.0" and which will tell the
- * remote node to relay transaction inv messages before it has received a filter.
+ *
Construct a peer that reads/writes from the given chain. Automatically creates a VersionMessage for you from
+ * the given software name/version strings, which should be something like "MySimpleTool", "1.0" and which will tell
+ * the remote node to relay transaction inv messages before it has received a filter.
+ *
+ *
Note that this does NOT make a connection to the given remoteAddress, it only creates a handler for a
+ * connection. If you want to create a one-off connection, create a Peer and pass it to
+ * {@link com.google.bitcoin.networkabstraction.NioClientManager#openConnection(java.net.SocketAddress, com.google.bitcoin.networkabstraction.StreamParser)}
+ * or
+ * {@link com.google.bitcoin.networkabstraction.NioClient#NioClient(java.net.SocketAddress, com.google.bitcoin.networkabstraction.StreamParser, int)}.
+ *
+ *
The remoteAddress provided should match the remote address of the peer which is being connected to, and is
+ * used to keep track of which peers relayed transactions and offer more descriptive logging.
*/
- public Peer(NetworkParameters params, AbstractBlockChain blockChain, String thisSoftwareName, String thisSoftwareVersion) {
- this(params, blockChain, new VersionMessage(params, blockChain.getBestChainHeight(), true));
+ public Peer(NetworkParameters params, AbstractBlockChain blockChain, InetSocketAddress remoteAddress, String thisSoftwareName, String thisSoftwareVersion) {
+ this(params, new VersionMessage(params, blockChain.getBestChainHeight(), true), blockChain, remoteAddress);
this.versionMessage.appendToSubVer(thisSoftwareName, thisSoftwareVersion, null);
}
@@ -191,24 +226,21 @@ public class Peer {
* threads in order to get the results of those hook methods.
*/
public void addEventListener(PeerEventListener listener, Executor executor) {
- eventListeners.add(new ListenerRegistration(listener, executor));
+ eventListeners.add(new PeerListenerRegistration(listener, executor));
+ }
+
+ // Package-local version for PeerGroup
+ void addEventListenerWithoutOnDisconnect(PeerEventListener listener, Executor executor) {
+ eventListeners.add(new PeerListenerRegistration(listener, executor, false));
}
public boolean removeEventListener(PeerEventListener listener) {
return ListenerRegistration.removeFromList(listener, eventListeners);
}
- void addLifecycleListener(PeerLifecycleListener listener) {
- lifecycleListeners.add(listener);
- }
-
- boolean removeLifecycleListener(PeerLifecycleListener listener) {
- return lifecycleListeners.remove(listener);
- }
-
@Override
public String toString() {
- PeerAddress addr = vAddress;
+ PeerAddress addr = getAddress();
if (addr == null) {
// User-provided NetworkConnection object.
return "Peer()";
@@ -217,59 +249,40 @@ public class Peer {
}
}
- private void notifyDisconnect() {
- for (PeerLifecycleListener listener : lifecycleListeners) {
- listener.onPeerDisconnected(Peer.this);
+ @Override
+ public void connectionClosed() {
+ for (final PeerListenerRegistration registration : eventListeners) {
+ if (registration.callOnDisconnect)
+ registration.executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ registration.listener.onPeerDisconnected(Peer.this, 0);
+ }
+ });
}
}
- class PeerHandler extends SimpleChannelHandler {
- @Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- super.channelClosed(ctx, e);
- notifyDisconnect();
- }
-
- @Override
- public void connectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- vAddress = new PeerAddress((InetSocketAddress)e.getValue());
- vChannel = e.getChannel();
- super.connectRequested(ctx, e);
- }
-
- /** Catch any exceptions, logging them and then closing the channel. */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- String s;
- PeerAddress addr = vAddress;
- s = addr == null ? "?" : addr.toString();
- final Throwable cause = e.getCause();
- if (cause instanceof ConnectException || cause instanceof IOException) {
- // Short message for network errors
- log.info(s + " - " + cause.getMessage());
- } else {
- log.warn(s + " - ", cause);
- Thread.UncaughtExceptionHandler handler = Threading.uncaughtExceptionHandler;
- if (handler != null)
- handler.uncaughtException(Thread.currentThread(), cause);
- }
-
- e.getChannel().close();
- }
-
- /** Handle incoming Bitcoin messages */
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- Message m = (Message)e.getMessage();
- processMessage(e, m);
- }
-
- public Peer getPeer() {
- return Peer.this;
- }
+ @Override
+ public void connectionOpened() {
+ // Announce ourselves. This has to come first to connect to clients beyond v0.3.20.2 which wait to hear
+ // from us until they send their version message back.
+ PeerAddress address = getAddress();
+ log.info("Announcing to {} as: {}", address == null ? "Peer" : address.toSocketAddress(), versionMessage.subVer);
+ sendMessage(versionMessage);
+ connectionOpenFuture.set(this);
+ // When connecting, the remote peer sends us a version message with various bits of
+ // useful data in it. We need to know the peer protocol version before we can talk to it.
}
- private void processMessage(MessageEvent e, Message m) throws Exception {
+ /**
+ * Provides a ListenableFuture that can be used to wait for the socket to connect. A socket connection does not
+ * mean that protocol handshake has occurred.
+ */
+ public ListenableFuture getConnectionOpenFuture() {
+ return connectionOpenFuture;
+ }
+
+ protected void processMessage(Message m) throws Exception {
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (ListenerRegistration registration : eventListeners) {
@@ -312,7 +325,7 @@ public class Peer {
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage) m);
} else if (m instanceof VersionMessage) {
- vPeerVersionMessage = (VersionMessage) m;
+ processVersionMessage((VersionMessage) m);
} else if (m instanceof VersionAck) {
if (vPeerVersionMessage == null) {
throw new ProtocolException("got a version ack before version");
@@ -321,15 +334,22 @@ public class Peer {
throw new ProtocolException("got more than one version ack");
}
isAcked = true;
- for (PeerLifecycleListener listener : lifecycleListeners)
- listener.onPeerConnected(this);
+ this.setTimeoutEnabled(false);
+ for (final ListenerRegistration registration : eventListeners) {
+ registration.executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ registration.listener.onPeerConnected(Peer.this, 1);
+ }
+ });
+ }
// We check min version after onPeerConnected as channel.close() will
// call onPeerDisconnected, and we should probably call onPeerConnected first.
final int version = vMinProtocolVersion;
if (vPeerVersionMessage.clientVersion < version) {
log.warn("Connected to a peer speaking protocol version {} but need {}, closing",
vPeerVersionMessage.clientVersion, version);
- e.getChannel().close();
+ close();
}
} else if (m instanceof Ping) {
if (((Ping) m).hasNonce())
@@ -341,7 +361,36 @@ public class Peer {
}
}
- private void startFilteredBlock(FilteredBlock m) throws IOException {
+ private void processVersionMessage(VersionMessage m) throws ProtocolException {
+ if (vPeerVersionMessage != null)
+ throw new ProtocolException("Got two version messages from peer");
+ vPeerVersionMessage = m;
+ // Switch to the new protocol version.
+ int peerVersion = vPeerVersionMessage.clientVersion;
+ PeerAddress peerAddress = getAddress();
+ log.info("Connected to {}: version={}, subVer='{}', services=0x{}, time={}, blocks={}", new Object[] {
+ peerAddress == null ? "Peer" : peerAddress.getAddr().getHostAddress(),
+ peerVersion,
+ vPeerVersionMessage.subVer,
+ vPeerVersionMessage.localServices,
+ new Date(vPeerVersionMessage.time * 1000),
+ vPeerVersionMessage.bestHeight
+ });
+ // Now it's our turn ...
+ // Send an ACK message stating we accept the peers protocol version.
+ sendMessage(new VersionAck());
+ // bitcoinj is a client mode implementation. That means there's not much point in us talking to other client
+ // mode nodes because we can't download the data from them we need to find/verify transactions. Some bogus
+ // implementations claim to have a block chain in their services field but then report a height of zero, filter
+ // them out here.
+ if (!vPeerVersionMessage.hasBlockChain() ||
+ (!params.allowEmptyPeerChain() && vPeerVersionMessage.bestHeight <= 0)) {
+ // Shut down the channel
+ throw new ProtocolException("Peer does not have a copy of the block chain.");
+ }
+ }
+
+ private void startFilteredBlock(FilteredBlock m) {
// Filtered blocks come before the data that they refer to, so stash it here and then fill it out as
// messages stream in. We'll call endFilteredBlock when a non-tx message arrives (eg, another
// FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after
@@ -390,12 +439,7 @@ public class Peer {
}
}
- /** Returns the Netty Pipeline stage handling the high level Bitcoin protocol. */
- public PeerHandler getHandler() {
- return handler;
- }
-
- private void processHeaders(HeadersMessage m) throws IOException, ProtocolException {
+ private void processHeaders(HeadersMessage m) throws ProtocolException {
// Runs in network loop thread for this peer.
//
// This method can run if a peer just randomly sends us a "headers" message (should never happen), or more
@@ -475,8 +519,8 @@ public class Peer {
}
}
- private void processGetData(GetDataMessage getdata) throws IOException {
- log.info("{}: Received getdata message: {}", vAddress, getdata.toString());
+ private void processGetData(GetDataMessage getdata) {
+ log.info("{}: Received getdata message: {}", getAddress(), getdata.toString());
ArrayList items = new ArrayList();
for (ListenerRegistration registration : eventListeners) {
if (registration.executor != Threading.SAME_THREAD) continue;
@@ -487,19 +531,19 @@ public class Peer {
if (items.size() == 0) {
return;
}
- log.info("{}: Sending {} items gathered from listeners to peer", vAddress, items.size());
+ log.info("{}: Sending {} items gathered from listeners to peer", getAddress(), items.size());
for (Message item : items) {
sendMessage(item);
}
}
- private void processTransaction(Transaction tx) throws VerificationException, IOException {
+ private void processTransaction(Transaction tx) throws VerificationException {
// Check a few basic syntax issues to ensure the received TX isn't nonsense.
tx.verify();
final Transaction fTx;
lock.lock();
try {
- log.debug("{}: Received tx {}", vAddress, tx.getHashAsString());
+ log.debug("{}: Received tx {}", getAddress(), tx.getHashAsString());
if (memoryPool != null) {
// We may get back a different transaction object.
tx = memoryPool.seen(tx, getAddress());
@@ -537,11 +581,11 @@ public class Peer {
Futures.addCallback(downloadDependencies(fTx), new FutureCallback>() {
public void onSuccess(List dependencies) {
try {
- log.info("{}: Dependency download complete!", vAddress);
+ log.info("{}: Dependency download complete!", getAddress());
wallet.receivePending(fTx, dependencies);
} catch (VerificationException e) {
log.error("{}: Wallet failed to process pending transaction {}",
- vAddress, fTx.getHashAsString());
+ getAddress(), fTx.getHashAsString());
log.error("Error was: ", e);
// Not much more we can do at this point.
}
@@ -595,7 +639,7 @@ public class Peer {
checkNotNull(memoryPool, "Must have a configured MemoryPool object to download dependencies.");
TransactionConfidence.ConfidenceType txConfidence = tx.getConfidence().getConfidenceType();
Preconditions.checkArgument(txConfidence != TransactionConfidence.ConfidenceType.BUILDING);
- log.info("{}: Downloading dependencies of {}", vAddress, tx.getHashAsString());
+ log.info("{}: Downloading dependencies of {}", getAddress(), tx.getHashAsString());
final LinkedList results = new LinkedList();
// future will be invoked when the entire dependency tree has been walked and the results compiled.
final ListenableFuture future = downloadDependenciesInternal(tx, new Object(), results);
@@ -646,7 +690,7 @@ public class Peer {
GetDataMessage getdata = new GetDataMessage(params);
final long nonce = (long)(Math.random()*Long.MAX_VALUE);
if (needToRequest.size() > 1)
- log.info("{}: Requesting {} transactions for dep resolution", vAddress, needToRequest.size());
+ log.info("{}: Requesting {} transactions for dep resolution", getAddress(), needToRequest.size());
for (Sha256Hash hash : needToRequest) {
getdata.addTransaction(hash);
GetDataRequest req = new GetDataRequest();
@@ -670,7 +714,7 @@ public class Peer {
List> childFutures = Lists.newLinkedList();
for (Transaction tx : transactions) {
if (tx == null) continue;
- log.info("{}: Downloaded dependency of {}: {}", vAddress, rootTxHash, tx.getHashAsString());
+ log.info("{}: Downloaded dependency of {}: {}", getAddress(), rootTxHash, tx.getHashAsString());
results.add(tx);
// Now recurse into the dependencies of this transaction too.
childFutures.add(downloadDependenciesInternal(tx, marker, results));
@@ -727,9 +771,9 @@ public class Peer {
return resultFuture;
}
- private void processBlock(Block m) throws IOException {
+ private void processBlock(Block m) {
if (log.isDebugEnabled()) {
- log.debug("{}: Received broadcast block {}", vAddress, m.getHashAsString());
+ log.debug("{}: Received broadcast block {}", getAddress(), m.getHashAsString());
}
// Was this block requested by getBlock()?
if (maybeHandleRequestedData(m)) return;
@@ -739,7 +783,7 @@ public class Peer {
}
// Did we lose download peer status after requesting block data?
if (!vDownloadData) {
- log.debug("{}: Received block we did not ask for: {}", vAddress, m.getHashAsString());
+ log.debug("{}: Received block we did not ask for: {}", getAddress(), m.getHashAsString());
return;
}
pendingBlockDownloads.remove(m.getHash());
@@ -781,7 +825,7 @@ public class Peer {
}
} catch (VerificationException e) {
// We don't want verification failures to kill the thread.
- log.warn("{}: Block verification failed", vAddress, e);
+ log.warn("{}: Block verification failed", getAddress(), e);
} catch (PrunedException e) {
// Unreachable when in SPV mode.
throw new RuntimeException(e);
@@ -789,12 +833,12 @@ public class Peer {
}
// TODO: Fix this duplication.
- private void endFilteredBlock(FilteredBlock m) throws IOException {
+ private void endFilteredBlock(FilteredBlock m) {
if (log.isDebugEnabled()) {
- log.debug("{}: Received broadcast filtered block {}", vAddress, m.getHash().toString());
+ log.debug("{}: Received broadcast filtered block {}", getAddress(), m.getHash().toString());
}
if (!vDownloadData) {
- log.debug("{}: Received block we did not ask for: {}", vAddress, m.getHash().toString());
+ log.debug("{}: Received block we did not ask for: {}", getAddress(), m.getHash().toString());
return;
}
if (blockChain == null) {
@@ -850,7 +894,7 @@ public class Peer {
}
} catch (VerificationException e) {
// We don't want verification failures to kill the thread.
- log.warn("{}: FilteredBlock verification failed", vAddress, e);
+ log.warn("{}: FilteredBlock verification failed", getAddress(), e);
} catch (PrunedException e) {
// We pruned away some of the data we need to properly handle this block. We need to request the needed
// data from the remote peer and fix things. Or just give up.
@@ -888,7 +932,7 @@ public class Peer {
}
}
- private void processInv(InventoryMessage inv) throws IOException {
+ private void processInv(InventoryMessage inv) {
List items = inv.getItems();
// Separate out the blocks and transactions, we'll handle them differently
@@ -945,7 +989,7 @@ public class Peer {
// Some other peer already announced this so don't download.
it.remove();
} else {
- log.debug("{}: getdata on tx {}", vAddress, item.hash);
+ log.debug("{}: getdata on tx {}", getAddress(), item.hash);
getdata.addItem(item);
}
// This can trigger transaction confidence listeners.
@@ -1017,7 +1061,7 @@ public class Peer {
* If you want the block right away and don't mind waiting for it, just call .get() on the result. Your thread
* will block until the peer answers.
*/
- public ListenableFuture getBlock(Sha256Hash blockHash) throws IOException {
+ public ListenableFuture getBlock(Sha256Hash blockHash) {
// This does not need to be locked.
log.info("Request to fetch block {}", blockHash);
GetDataMessage getdata = new GetDataMessage(params);
@@ -1030,7 +1074,7 @@ public class Peer {
* retrieved this way because peers don't have a transaction ID to transaction-pos-on-disk index, and besides,
* in future many peers will delete old transaction data they don't need.
*/
- public ListenableFuture getPeerMempoolTransaction(Sha256Hash hash) throws IOException {
+ public ListenableFuture getPeerMempoolTransaction(Sha256Hash hash) {
// This does not need to be locked.
// TODO: Unit test this method.
log.info("Request to fetch peer mempool tx {}", hash);
@@ -1040,7 +1084,7 @@ public class Peer {
}
/** Sends a getdata with a single item in it. */
- private ListenableFuture sendSingleGetData(GetDataMessage getdata) throws IOException {
+ private ListenableFuture sendSingleGetData(GetDataMessage getdata) {
// This does not need to be locked.
Preconditions.checkArgument(getdata.getItems().size() == 1);
GetDataRequest req = new GetDataRequest();
@@ -1095,21 +1139,13 @@ public class Peer {
wallets.remove(wallet);
}
- /**
- * Sends the given message on the peers Channel.
- */
- public ChannelFuture sendMessage(Message m) {
- // This does not need to be locked.
- return Channels.write(vChannel, m);
- }
-
// Keep track of the last request we made to the peer in blockChainDownloadLocked so we can avoid redundant and harmful
// getblocks requests.
@GuardedBy("lock")
private Sha256Hash lastGetBlocksBegin, lastGetBlocksEnd;
@GuardedBy("lock")
- private void blockChainDownloadLocked(Sha256Hash toHash) throws IOException {
+ private void blockChainDownloadLocked(Sha256Hash toHash) {
checkState(lock.isHeldByCurrentThread());
// The block chain download process is a bit complicated. Basically, we start with one or more blocks in a
// chain that we have from a previous session. We want to catch up to the head of the chain BUT we don't know
@@ -1197,7 +1233,7 @@ public class Peer {
* Starts an asynchronous download of the block chain. The chain download is deemed to be complete once we've
* downloaded the same number of blocks that the peer advertised having in its version handshake message.
*/
- public void startBlockChainDownload() throws IOException {
+ public void startBlockChainDownload() {
setDownloadData(true);
// TODO: peer might still have blocks that we don't have, and even have a heavier
// chain even if the chain block count is lower.
@@ -1271,11 +1307,11 @@ public class Peer {
* updated.
* @throws ProtocolException if the peer version is too low to support measurable pings.
*/
- public ListenableFuture ping() throws IOException, ProtocolException {
+ public ListenableFuture ping() throws ProtocolException {
return ping((long) (Math.random() * Long.MAX_VALUE));
}
- protected ListenableFuture ping(long nonce) throws IOException, ProtocolException {
+ protected ListenableFuture ping(long nonce) throws ProtocolException {
final VersionMessage ver = vPeerVersionMessage;
if (!ver.isPingPongSupported())
throw new ProtocolException("Peer version is too low for measurable pings: " + ver);
@@ -1366,13 +1402,6 @@ public class Peer {
this.vDownloadData = downloadData;
}
- /**
- * @return the IP address and port of peer.
- */
- public PeerAddress getAddress() {
- return vAddress;
- }
-
/** Returns version data announced by the remote peer. */
public VersionMessage getPeerVersionMessage() {
return vPeerVersionMessage;
@@ -1393,16 +1422,16 @@ public class Peer {
/**
* The minimum P2P protocol version that is accepted. If the peer speaks a protocol version lower than this, it
* will be disconnected.
- * @return if not-null then this is the future for the Peer disconnection event.
+ * @return true if the peer was disconnected as a result
*/
- @Nullable public ChannelFuture setMinProtocolVersion(int minProtocolVersion) {
+ public boolean setMinProtocolVersion(int minProtocolVersion) {
this.vMinProtocolVersion = minProtocolVersion;
if (getVersionMessage().clientVersion < minProtocolVersion) {
log.warn("{}: Disconnecting due to new min protocol version {}", this, minProtocolVersion);
- return Channels.close(vChannel);
- } else {
- return null;
+ close();
+ return true;
}
+ return false;
}
/**
@@ -1420,7 +1449,7 @@ public class Peer {
*
If the remote peer doesn't support Bloom filtering, then this call is ignored. Once set you presently cannot
* unset a filter, though the underlying p2p protocol does support it.
*/
- public void setBloomFilter(BloomFilter filter) throws IOException {
+ public void setBloomFilter(BloomFilter filter) {
checkNotNull(filter, "Clearing filters is not currently supported");
final VersionMessage ver = vPeerVersionMessage;
if (ver == null || !ver.isBloomFilteringSupported())
@@ -1428,13 +1457,8 @@ public class Peer {
vBloomFilter = filter;
boolean shouldQueryMemPool = memoryPool != null || vDownloadData;
log.info("{}: Sending Bloom filter{}", this, shouldQueryMemPool ? " and querying mempool" : "");
- ChannelFuture future = sendMessage(filter);
- if (shouldQueryMemPool)
- future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) throws Exception {
- sendMessage(new MemoryPoolMessage());
- }
- });
+ sendMessage(filter);
+ sendMessage(new MemoryPoolMessage());
}
/**
diff --git a/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java b/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java
index 385279f7..5caa87b9 100644
--- a/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java
+++ b/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java
@@ -45,7 +45,7 @@ public interface PeerEventListener {
/**
* Called when a peer is connected. If this listener is registered to a {@link Peer} instead of a {@link PeerGroup},
- * this will never be called.
+ * peerCount will always be 1.
*
* @param peer
* @param peerCount the total number of connected peers
@@ -55,7 +55,7 @@ public interface PeerEventListener {
/**
* Called when a peer is disconnected. Note that this won't be called if the listener is registered on a
* {@link PeerGroup} and the group is in the process of shutting down. If this listener is registered to a
- * {@link Peer} instead of a {@link PeerGroup}, this will never be called.
+ * {@link Peer} instead of a {@link PeerGroup}, peerCount will always be 0.
*
* @param peer
* @param peerCount the total number of connected peers
@@ -79,8 +79,11 @@ public interface PeerEventListener {
public void onTransaction(Peer peer, Transaction t);
/**
- * Called when a peer receives a getdata message, usually in response to an "inv" being broadcast. Return as many
- * items as possible which appear in the {@link GetDataMessage}, or null if you're not interested in responding.
+ *
Called when a peer receives a getdata message, usually in response to an "inv" being broadcast. Return as many
+ * items as possible which appear in the {@link GetDataMessage}, or null if you're not interested in responding.
+ *
+ *
Note that this will never be called if registered with any executor other than
+ * {@link com.google.bitcoin.utils.Threading#SAME_THREAD}
*/
public List getData(Peer peer, GetDataMessage m);
}
diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java
index 58923f50..5fc1b253 100644
--- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java
+++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java
@@ -17,30 +17,24 @@
package com.google.bitcoin.core;
-import com.google.bitcoin.core.Peer.PeerHandler;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.script.Script;
+import com.google.bitcoin.networkabstraction.ClientConnectionManager;
+import com.google.bitcoin.networkabstraction.NioClientManager;
import com.google.bitcoin.utils.ListenerRegistration;
import com.google.bitcoin.utils.Threading;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.*;
import net.jcip.annotations.GuardedBy;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -83,7 +77,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
private final CopyOnWriteArrayList peers;
// Currently connecting peers.
private final CopyOnWriteArrayList pendingPeers;
- private final ChannelGroup channels;
+ private final ClientConnectionManager channels;
// The peer that has been selected for the purposes of downloading announced data.
@GuardedBy("lock") private Peer downloadPeer;
@@ -126,7 +120,6 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
}
};
- private ClientBootstrap bootstrap;
private int minBroadcastConnections = 0;
private AbstractWalletEventListener walletEventListener = new AbstractWalletEventListener() {
private void onChanged() {
@@ -138,19 +131,21 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
@Override public void onCoinsSent(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { onChanged(); }
};
- private class PeerStartupListener implements Peer.PeerLifecycleListener {
- public void onPeerConnected(Peer peer) {
+ private class PeerStartupListener extends AbstractPeerEventListener {
+ @Override
+ public void onPeerConnected(Peer peer, int peerCount) {
handleNewPeer(peer);
}
- public void onPeerDisconnected(Peer peer) {
+ @Override
+ public void onPeerDisconnected(Peer peer, int peerCount) {
// The channel will be automatically removed from channels.
handlePeerDeath(peer);
}
}
// Visible for testing
- Peer.PeerLifecycleListener startupListener = new PeerStartupListener();
+ PeerEventListener startupListener = new PeerStartupListener();
// A bloom filter generated from all connected wallets that is given to new peers
private BloomFilter bloomFilter;
@@ -164,6 +159,10 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
private final long bloomFilterTweak = (long) (Math.random() * Long.MAX_VALUE);
private int lastBloomFilterElementCount;
+ /** The default timeout between when a connection attempt begins and version message exchange completes */
+ public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 2000;
+ private volatile int vConnectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT_MILLIS;
+
/**
* Creates a PeerGroup with the given parameters. No chain is provided so this node will report its chain height
* as zero to other peers. This constructor is useful if you just want to explore the network but aren't interested
@@ -179,29 +178,15 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
* Creates a PeerGroup for the given network and chain. Blocks will be passed to the chain as they are broadcast
* and downloaded. This is probably the constructor you want to use.
*/
- public PeerGroup(NetworkParameters params, AbstractBlockChain chain) {
- this(params, chain, null);
+ public PeerGroup(NetworkParameters params, @Nullable AbstractBlockChain chain) {
+ this(params, chain, new NioClientManager());
}
-
+
/**
- *
Creates a PeerGroup for the given network and chain, using the provided Netty {@link ClientBootstrap} object.
- *
- *
- *
A ClientBootstrap creates raw (TCP) connections to other nodes on the network. Normally you won't need to
- * provide one - use the other constructors. Providing your own bootstrap is useful if you want to control
- * details like how many network threads are used, the connection timeout value and so on. To do this, you can
- * use {@link PeerGroup#createClientBootstrap()} method and then customize the resulting object. Example:
The ClientBootstrap provided does not need a channel pipeline factory set. If one wasn't set, the provided
- * bootstrap will be modified to have one that sets up the pipelines correctly.
+ * Creates a new PeerGroup allowing you to specify the {@link ClientConnectionManager} which is used to create new
+ * connections and keep track of existing ones.
*/
- public PeerGroup(NetworkParameters params, @Nullable AbstractBlockChain chain, @Nullable ClientBootstrap bootstrap) {
+ public PeerGroup(NetworkParameters params, @Nullable AbstractBlockChain chain, ClientConnectionManager connectionManager) {
this.params = checkNotNull(params);
this.chain = chain;
this.fastCatchupTimeSecs = params.getGenesisBlock().getTimeSeconds();
@@ -219,64 +204,14 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
memoryPool = new MemoryPool();
- // Configure Netty. The "ClientBootstrap" creates connections to other nodes. It can be configured in various
- // ways to control the network.
- if (bootstrap == null) {
- this.bootstrap = createClientBootstrap();
- this.bootstrap.setPipelineFactory(makePipelineFactory(params, chain));
- } else {
- this.bootstrap = bootstrap;
- }
-
inactives = new ArrayList();
peers = new CopyOnWriteArrayList();
pendingPeers = new CopyOnWriteArrayList();
- channels = new DefaultChannelGroup();
- peerDiscoverers = new CopyOnWriteArraySet();
+ channels = connectionManager;
+ peerDiscoverers = new CopyOnWriteArraySet();
peerEventListeners = new CopyOnWriteArrayList>();
}
- /**
- * Helper method that just sets up a normal Netty ClientBootstrap using the default options, except for a custom
- * thread factory that gives worker threads useful names and lowers their priority (to avoid competing with UI
- * threads). You don't normally need to call this - if you aren't sure what it does, just use the regular
- * constructors for {@link PeerGroup} that don't take a ClientBootstrap object.
- */
- public static ClientBootstrap createClientBootstrap() {
- ExecutorService bossExecutor = Executors.newCachedThreadPool(new PeerGroupThreadFactory());
- ExecutorService workerExecutor = Executors.newCachedThreadPool(new PeerGroupThreadFactory());
- NioClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
- ClientBootstrap bs = new ClientBootstrap(channelFactory);
- bs.setOption("connectTimeoutMillis", 2000);
- return bs;
- }
-
- // Create a Netty pipeline factory. The pipeline factory will create a network processing
- // pipeline with the bitcoin serializer ({@code TCPNetworkConnection}) downstream
- // of the higher level {@code Peer}. Received packets will first be decoded, then passed
- // {@code Peer}. Sent packets will be created by the {@code Peer}, then encoded and sent.
- private ChannelPipelineFactory makePipelineFactory(final NetworkParameters params, @Nullable final AbstractBlockChain chain) {
- return new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() throws Exception {
- // This runs unlocked.
- VersionMessage ver = getVersionMessage().duplicate();
- ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight();
- ver.time = Utils.now().getTime() / 1000;
-
- ChannelPipeline p = Channels.pipeline();
-
- Peer peer = new Peer(params, chain, ver, memoryPool);
- peer.addLifecycleListener(startupListener);
- peer.setMinProtocolVersion(vMinRequiredProtocolVersion);
- pendingPeers.add(peer);
- TCPNetworkConnection codec = new TCPNetworkConnection(params, peer.getVersionMessage());
- p.addLast("codec", codec.getHandler());
- p.addLast("peer", peer.getHandler());
- return p;
- }
- };
- }
-
/**
* Adjusts the desired number of connections that we will create to peers. Note that if there are already peers
* open and the new value is lower than the current number of peers, those connections will be terminated. Likewise
@@ -292,7 +227,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
lock.unlock();
}
// We may now have too many or too few open connections. Add more or drop some to get to the right amount.
- adjustment = maxConnections - channels.size();
+ adjustment = maxConnections - channels.getConnectedClientCount();
while (adjustment > 0) {
try {
connectToAnyPeer();
@@ -301,10 +236,8 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
}
adjustment--;
}
- while (adjustment < 0) {
- channels.iterator().next().close();
- adjustment++;
- }
+ if (adjustment < 0)
+ channels.closeConnections(-adjustment);
}
/** The maximum number of connections that we will create to peers. */
@@ -576,6 +509,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
protected void startUp() throws Exception {
// This is run in a background thread by the AbstractIdleService implementation.
vPingTimer = new Timer("Peer pinging thread", true);
+ channels.startAndWait();
// Bring up the requested number of connections. If a connect attempt fails,
// new peers will be tried until there is a success, so just calling connectToAnyPeer for the wanted number
// of peers is sufficient.
@@ -593,11 +527,8 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
protected void shutDown() throws Exception {
// This is run on a separate thread by the AbstractIdleService implementation.
vPingTimer.cancel();
- // Blocking close of all sockets. TODO: there is a race condition here, for the solution see:
- // http://biasedbit.com/netty-releaseexternalresources-hangs/
- channels.close().await();
- // All thread pools should be stopped by this call.
- bootstrap.releaseExternalResources();
+ // Blocking close of all sockets.
+ channels.stopAndWait();
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown();
}
@@ -701,11 +632,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
if (!filter.equals(bloomFilter)) {
bloomFilter = filter;
for (Peer peer : peers)
- try {
- peer.setBloomFilter(filter);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ peer.setBloomFilter(filter);
}
}
// Now adjust the earliest key time backwards by a week to handle the case of clock drift. This can occur
@@ -747,38 +674,32 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
}
/**
- * Connect to a peer by creating a Netty channel to the destination address.
+ * Connect to a peer by creating a channel to the destination address.
*
* @param address destination IP and port.
- * @return a ChannelFuture that can be used to wait for the socket to connect. A socket
- * connection does not mean that protocol handshake has occured.
+ * @return The newly created Peer object. Use {@link com.google.bitcoin.core.Peer#getConnectionOpenFuture()} if you
+ * want a future which completes when the connection is open.
*/
- public ChannelFuture connectTo(SocketAddress address) {
+ public Peer connectTo(InetSocketAddress address) {
return connectTo(address, true);
}
// Internal version.
- protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) {
- ChannelFuture future = bootstrap.connect(address);
- // Make sure that the channel group gets access to the channel only if it connects successfully (otherwise
- // it cannot be closed and trying to do so will cause problems).
- future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess())
- channels.add(future.getChannel());
- }
- });
+ protected Peer connectTo(InetSocketAddress address, boolean incrementMaxConnections) {
+ VersionMessage ver = getVersionMessage().duplicate();
+ ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight();
+ ver.time = Utils.now().getTime() / 1000;
+
+ Peer peer = new Peer(params, ver, address, chain, memoryPool);
+ peer.addEventListener(startupListener, Threading.SAME_THREAD);
+ peer.setMinProtocolVersion(vMinRequiredProtocolVersion);
+ pendingPeers.add(peer);
+
+ channels.openConnection(address, peer);
+ peer.setSocketTimeout(vConnectTimeoutMillis);
// When the channel has connected and version negotiated successfully, handleNewPeer will end up being called on
// a worker thread.
- // Set up the address on the TCPNetworkConnection handler object.
- // TODO: This is stupid and racy, get rid of it.
- TCPNetworkConnection.NetworkHandler networkHandler =
- (TCPNetworkConnection.NetworkHandler) future.getChannel().getPipeline().get("codec");
- if (networkHandler != null) {
- // This can be null in unit tests or apps that don't use TCP connections.
- networkHandler.getOwnerObject().setRemoteAddress(address);
- }
if (incrementMaxConnections) {
// We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new
// outbound connection.
@@ -789,15 +710,15 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
lock.unlock();
}
}
- return future;
+ return peer;
}
- static public Peer peerFromChannelFuture(ChannelFuture future) {
- return peerFromChannel(future.getChannel());
- }
-
- static public Peer peerFromChannel(Channel channel) {
- return ((PeerHandler)channel.getPipeline().get("peer")).getPeer();
+ /**
+ * Sets the timeout between when a connection attempt to a peer begins and when the version message exchange
+ * completes. This does not apply to currently pending peers.
+ */
+ public void setConnectTimeoutMillis(int connectTimeoutMillis) {
+ this.vConnectTimeoutMillis = connectTimeoutMillis;
}
/**
@@ -852,11 +773,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
// Give the peer a filter that can be used to probabilistically drop transactions that
// aren't relevant to our wallet. We may still receive some false positives, which is
// OK because it helps improve wallet privacy. Old nodes will just ignore the message.
- try {
- if (bloomFilter != null) peer.setBloomFilter(bloomFilter);
- } catch (IOException e) {
- // That was quick...already disconnected
- }
+ if (bloomFilter != null) peer.setBloomFilter(bloomFilter);
// Link the peer to the memory pool so broadcast transactions have their confidence levels updated.
peer.setDownloadData(false);
// TODO: The peer should calculate the fast catchup time from the added wallets here.
@@ -875,7 +792,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
peer.addEventListener(getDataListener, Threading.SAME_THREAD);
// And set up event listeners for clients. This will allow them to find out about new transactions and blocks.
for (ListenerRegistration registration : peerEventListeners) {
- peer.addEventListener(registration.listener, registration.executor);
+ peer.addEventListenerWithoutOnDisconnect(registration.listener, registration.executor);
}
setupPingingForNewPeer(peer);
} finally {
@@ -1080,8 +997,6 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
setDownloadPeer(peer);
// startBlockChainDownload will setDownloadData(true) on itself automatically.
peer.startBlockChainDownload();
- } catch (IOException e) {
- log.error("failed to start block chain download from " + peer, e);
} finally {
lock.unlock();
}
@@ -1335,6 +1250,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
// zap peers if they upgrade early. If we can't find any peers that have our preferred protocol version or
// better then we'll settle for the highest we found instead.
int highestVersion = 0, preferredVersion = 0;
+ // If/when PREFERRED_VERSION is not equal to vMinRequiredProtocolVersion, reenable the last test in PeerGroupTest.downloadPeerSelection
final int PREFERRED_VERSION = FilteredBlock.MIN_PROTOCOL_VERSION;
for (Peer peer : candidates) {
highestVersion = Math.max(peer.getPeerVersionMessage().clientVersion, highestVersion);
diff --git a/core/src/main/java/com/google/bitcoin/core/PeerSocketHandler.java b/core/src/main/java/com/google/bitcoin/core/PeerSocketHandler.java
new file mode 100644
index 00000000..afb6ae9e
--- /dev/null
+++ b/core/src/main/java/com/google/bitcoin/core/PeerSocketHandler.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2013 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.bitcoin.core;
+
+import java.io.*;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.channels.NotYetConnectedException;
+import java.util.concurrent.locks.Lock;
+
+import com.google.bitcoin.networkabstraction.AbstractTimeoutHandler;
+import com.google.bitcoin.networkabstraction.MessageWriteTarget;
+import com.google.bitcoin.networkabstraction.StreamParser;
+import com.google.bitcoin.utils.Threading;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Handles high-level message (de)serialization for peers, acting as the bridge between the
+ * {@link com.google.bitcoin.networkabstraction} classes and {@link Peer}.
+ */
+public abstract class PeerSocketHandler extends AbstractTimeoutHandler implements StreamParser {
+ private static final Logger log = LoggerFactory.getLogger(PeerSocketHandler.class);
+
+ // The IP address to which we are connecting.
+ @VisibleForTesting
+ InetSocketAddress remoteIp;
+
+ private final BitcoinSerializer serializer;
+
+ /** If we close() before we know our writeTarget, set this to true to call writeTarget.closeConnection() right away */
+ private boolean closePending = false;
+ // writeTarget will be thread-safe, and may call into PeerGroup, which calls us, so we should call it unlocked
+ @VisibleForTesting MessageWriteTarget writeTarget = null;
+
+ // The ByteBuffers passed to us from the writeTarget are static in size, and usually smaller than some messages we
+ // will receive. For SPV clients, this should be rare (ie we're mostly dealing with small transactions), but for
+ // messages which are larger than the read buffer, we have to keep a temporary buffer with its bytes.
+ private byte[] largeReadBuffer;
+ private int largeReadBufferPos;
+ private BitcoinSerializer.BitcoinPacketHeader header;
+
+ private Lock lock = Threading.lock("PeerSocketHandler");
+
+ public PeerSocketHandler(NetworkParameters params, InetSocketAddress peerAddress) {
+ serializer = new BitcoinSerializer(checkNotNull(params));
+ this.remoteIp = checkNotNull(peerAddress);
+ }
+
+ /**
+ * Sends the given message to the peer. Due to the asynchronousness of network programming, there is no guarantee
+ * the peer will have received it. Throws NotYetConnectedException if we are not yet connected to the remote peer.
+ * TODO: Maybe use something other than the unchecked NotYetConnectedException here
+ */
+ public void sendMessage(Message message) throws NotYetConnectedException {
+ lock.lock();
+ try {
+ if (writeTarget == null)
+ throw new NotYetConnectedException();
+ } finally {
+ lock.unlock();
+ }
+ // TODO: Some round-tripping could be avoided here
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ serializer.serialize(message, out);
+ writeTarget.writeBytes(out.toByteArray());
+ } catch (IOException e) {
+ exceptionCaught(e);
+ }
+ }
+
+ /**
+ * Closes the connection to the peer if one exists, or immediately closes the connection as soon as it opens
+ */
+ public void close() {
+ lock.lock();
+ try {
+ if (writeTarget == null) {
+ closePending = true;
+ return;
+ }
+ } finally {
+ lock.unlock();
+ }
+ writeTarget.closeConnection();
+ }
+
+ @Override
+ protected void timeoutOccurred() {
+ close();
+ }
+
+ /**
+ * Called every time a message is received from the network
+ */
+ protected abstract void processMessage(Message m) throws Exception;
+
+ @Override
+ public int receiveBytes(ByteBuffer buff) {
+ checkArgument(buff.position() == 0 &&
+ buff.capacity() >= BitcoinSerializer.BitcoinPacketHeader.HEADER_LENGTH + 4);
+ try {
+ // Repeatedly try to deserialize messages until we hit a BufferUnderflowException
+ for (int i = 0; true; i++) {
+ // If we are in the middle of reading a message, try to fill that one first, before we expect another
+ if (largeReadBuffer != null) {
+ // This can only happen in the first iteration
+ checkState(i == 0);
+ // Read new bytes into the largeReadBuffer
+ int bytesToGet = Math.min(buff.remaining(), largeReadBuffer.length - largeReadBufferPos);
+ buff.get(largeReadBuffer, largeReadBufferPos, bytesToGet);
+ largeReadBufferPos += bytesToGet;
+ // Check the largeReadBuffer's status
+ if (largeReadBufferPos == largeReadBuffer.length) {
+ // ...processing a message if one is available
+ processMessage(serializer.deserializePayload(header, ByteBuffer.wrap(largeReadBuffer)));
+ largeReadBuffer = null;
+ header = null;
+ } else // ...or just returning if we don't have enough bytes yet
+ return buff.position();
+ }
+ // Now try to deserialize any messages left in buff
+ Message message;
+ int preSerializePosition = buff.position();
+ try {
+ message = serializer.deserialize(buff);
+ } catch (BufferUnderflowException e) {
+ // If we went through the whole buffer without a full message, we need to use the largeReadBuffer
+ if (i == 0 && buff.limit() == buff.capacity()) {
+ // ...so reposition the buffer to 0 and read the next message header
+ buff.position(0);
+ try {
+ serializer.seekPastMagicBytes(buff);
+ header = serializer.deserializeHeader(buff);
+ // Initialize the largeReadBuffer with the next message's size and fill it with any bytes
+ // left in buff
+ largeReadBuffer = new byte[header.size];
+ largeReadBufferPos = buff.remaining();
+ buff.get(largeReadBuffer, 0, largeReadBufferPos);
+ } catch (BufferUnderflowException e1) {
+ // If we went through a whole buffer's worth of bytes without getting a header, give up
+ // In cases where the buff is just really small, we could create a second largeReadBuffer
+ // that we use to deserialize the magic+header, but that is rather complicated when the buff
+ // should probably be at least that big anyway (for efficiency)
+ throw new ProtocolException("No magic bytes+header after reading " + buff.capacity() + " bytes");
+ }
+ } else {
+ // Reposition the buffer to its original position, which saves us from skipping messages by
+ // seeking past part of the magic bytes before all of them are in the buffer
+ buff.position(preSerializePosition);
+ }
+ return buff.position();
+ }
+ // Process our freshly deserialized message
+ processMessage(message);
+ }
+ } catch (Exception e) {
+ exceptionCaught(e);
+ return -1; // Returning -1 also throws an IllegalStateException upstream and kills the connection
+ }
+ }
+
+ /**
+ * Sets the {@link MessageWriteTarget} used to write messages to the peer. This should almost never be called, it is
+ * called automatically by {@link com.google.bitcoin.networkabstraction.NioClient} or
+ * {@link com.google.bitcoin.networkabstraction.NioClientManager} once the socket finishes initialization.
+ */
+ @Override
+ public void setWriteTarget(MessageWriteTarget writeTarget) {
+ lock.lock();
+ boolean closeNow = false;
+ try {
+ closeNow = closePending;
+ this.writeTarget = writeTarget;
+ } finally {
+ lock.unlock();
+ }
+ if (closeNow)
+ writeTarget.closeConnection();
+ }
+
+ @Override
+ public int getMaxMessageSize() {
+ return Message.MAX_SIZE;
+ }
+
+ /**
+ * @return the IP address and port of peer.
+ */
+ public PeerAddress getAddress() {
+ return new PeerAddress(remoteIp);
+ }
+
+ /** Catch any exceptions, logging them and then closing the channel. */
+ private void exceptionCaught(Exception e) {
+ PeerAddress addr = getAddress();
+ String s = addr == null ? "?" : addr.toString();
+ if (e instanceof ConnectException || e instanceof IOException) {
+ // Short message for network errors
+ log.info(s + " - " + e.getMessage());
+ } else {
+ log.warn(s + " - ", e);
+ Thread.UncaughtExceptionHandler handler = Threading.uncaughtExceptionHandler;
+ if (handler != null)
+ handler.uncaughtException(Thread.currentThread(), e);
+ }
+
+ close();
+ }
+}
diff --git a/core/src/main/java/com/google/bitcoin/core/TCPNetworkConnection.java b/core/src/main/java/com/google/bitcoin/core/TCPNetworkConnection.java
deleted file mode 100644
index 5c082540..00000000
--- a/core/src/main/java/com/google/bitcoin/core/TCPNetworkConnection.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Copyright 2011 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.bitcoin.core;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
-import org.jboss.netty.handler.codec.replay.VoidEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Date;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.jboss.netty.channel.Channels.write;
-
-// TODO: Remove this class and refactor the way we build Netty pipelines.
-
-/**
- *
A {@code TCPNetworkConnection} is used for connecting to a Bitcoin node over the standard TCP/IP protocol.
- *
- *
{@link TCPNetworkConnection#getHandler()} is part of a Netty Pipeline, downstream of other pipeline stages.
- *
- */
-public class TCPNetworkConnection implements NetworkConnection {
- private static final Logger log = LoggerFactory.getLogger(TCPNetworkConnection.class);
-
- // The IP address to which we are connecting.
- private InetAddress remoteIp;
- private final NetworkParameters params;
- private VersionMessage versionMessage;
-
- private BitcoinSerializer serializer = null;
-
- private VersionMessage myVersionMessage;
- private Channel channel;
-
- private NetworkHandler handler;
- // For ping nonces.
- private Random random = new Random();
-
- /**
- * Construct a network connection with the given params and version. If you use this constructor you need to set
- * up the Netty pipelines and infrastructure yourself. If all you have is an IP address and port, use the static
- * connectTo method.
- *
- * @param params Defines which network to connect to and details of the protocol.
- * @param ver The VersionMessage to announce to the other side of the connection.
- */
- public TCPNetworkConnection(NetworkParameters params, VersionMessage ver) {
- this.params = params;
- this.myVersionMessage = ver;
- this.serializer = new BitcoinSerializer(this.params);
- this.handler = new NetworkHandler();
- }
-
- // Some members that are used for convenience APIs. If the app only uses PeerGroup then these won't be used.
- private static NioClientSocketChannelFactory channelFactory;
- private SettableFuture handshakeFuture;
-
- /**
- * Returns a future for a TCPNetworkConnection that is connected and version negotiated to the given remote address.
- * Behind the scenes this method sets up a thread pool and a Netty pipeline that uses it. The equivalent Netty code
- * is quite complex so use this method if you aren't writing a complex app. The future completes once version
- * handshaking is done, use .get() on the response to wait for it.
- *
- * @param params The network parameters to use (production or testnet)
- * @param address IP address and port to use
- * @param connectTimeoutMsec How long to wait before giving up and setting the future to failure.
- * @param peer If not null, this peer will be added to the pipeline.
- */
- public static ListenableFuture connectTo(NetworkParameters params, InetSocketAddress address,
- int connectTimeoutMsec, @Nullable Peer peer) {
- synchronized (TCPNetworkConnection.class) {
- if (channelFactory == null) {
- ExecutorService bossExecutor = Executors.newCachedThreadPool();
- ExecutorService workerExecutor = Executors.newCachedThreadPool();
- channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
- }
- }
- // Run the connection in the thread pool and wait for it to complete.
- ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
- ChannelPipeline pipeline = Channels.pipeline();
- final TCPNetworkConnection conn = new TCPNetworkConnection(params, new VersionMessage(params, 0));
- conn.handshakeFuture = SettableFuture.create();
- conn.setRemoteAddress(address);
- pipeline.addLast("codec", conn.getHandler());
- if (peer != null) pipeline.addLast("peer", peer.getHandler());
- clientBootstrap.setPipeline(pipeline);
- clientBootstrap.setOption("connectTimeoutMillis", connectTimeoutMsec);
- ChannelFuture socketFuture = clientBootstrap.connect(address);
- // Once the socket is either connected on the TCP level, or failed ...
- socketFuture.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- // Check if it failed ...
- if (channelFuture.isDone() && !channelFuture.isSuccess()) {
- // And complete the returned future with an exception.
- conn.handshakeFuture.setException(channelFuture.getCause());
- }
- // Otherwise the handshakeFuture will be marked as completed once we did ver/verack exchange.
- }
- });
- return conn.handshakeFuture;
- }
-
- public void writeMessage(Message message) throws IOException {
- write(channel, message);
- }
-
- private void onVersionMessage(Message m) throws IOException, ProtocolException {
- if (!(m instanceof VersionMessage)) {
- // Bad peers might not follow the protocol. This has been seen in the wild (issue 81).
- log.info("First message received was not a version message but rather " + m);
- return;
- }
- versionMessage = (VersionMessage) m;
- // Switch to the new protocol version.
- int peerVersion = versionMessage.clientVersion;
- log.info("Connected to {}: version={}, subVer='{}', services=0x{}, time={}, blocks={}",
- getPeerAddress().getAddr().getHostAddress(),
- peerVersion,
- versionMessage.subVer,
- versionMessage.localServices,
- new Date(versionMessage.time * 1000),
- versionMessage.bestHeight);
- // Now it's our turn ...
- // Send an ACK message stating we accept the peers protocol version.
- write(channel, new VersionAck());
- // bitcoinj is a client mode implementation. That means there's not much point in us talking to other client
- // mode nodes because we can't download the data from them we need to find/verify transactions. Some bogus
- // implementations claim to have a block chain in their services field but then report a height of zero, filter
- // them out here.
- if (!versionMessage.hasBlockChain() ||
- (!params.allowEmptyPeerChain() && versionMessage.bestHeight <= 0)) {
- // Shut down the channel
- throw new ProtocolException("Peer does not have a copy of the block chain.");
- }
- // Handshake is done!
- if (handshakeFuture != null)
- handshakeFuture.set(this);
- }
-
- public void ping() throws IOException {
- // pong/nonce messages were added to any protocol version greater than 60000
- if (versionMessage.clientVersion > 60000) {
- write(channel, new Ping(random.nextLong()));
- }
- else
- write(channel, new Ping());
- }
-
- @Override
- public String toString() {
- return "[" + remoteIp.getHostAddress() + "]:" + params.getPort();
- }
-
- public class NetworkHandler extends ReplayingDecoder implements ChannelDownstreamHandler {
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- super.channelConnected(ctx, e);
- channel = e.getChannel();
- // The version message does not use checksumming, until Feb 2012 when it magically does.
- // Announce ourselves. This has to come first to connect to clients beyond v0.30.20.2 which wait to hear
- // from us until they send their version message back.
- log.info("Announcing to {} as: {}", channel.getRemoteAddress(), myVersionMessage.subVer);
- write(channel, myVersionMessage);
- // When connecting, the remote peer sends us a version message with various bits of
- // useful data in it. We need to know the peer protocol version before we can talk to it.
- }
-
- // Attempt to decode a Bitcoin message passing upstream in the channel.
- //
- // By extending ReplayingDecoder, reading past the end of buffer will throw a special Error
- // causing the channel to read more and retry.
- //
- // On VMs/systems where exception handling is slow, this will impact performance. On the
- // other hand, implementing a FrameDecoder will increase code complexity due to having
- // to implement retries ourselves.
- //
- // TODO: consider using a decoder state and checkpoint() if performance is an issue.
- @Override
- protected Object decode(ChannelHandlerContext ctx, Channel chan,
- ChannelBuffer buffer, VoidEnum state) throws Exception {
- Message message = serializer.deserialize(new ChannelBufferInputStream(buffer));
- if (message instanceof VersionMessage)
- onVersionMessage(message);
- return message;
- }
-
- /** Serialize outgoing Bitcoin messages passing downstream in the channel. */
- public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
- if (!(evt instanceof MessageEvent)) {
- ctx.sendDownstream(evt);
- return;
- }
-
- MessageEvent e = (MessageEvent) evt;
- Message message = (Message)e.getMessage();
-
- ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
- serializer.serialize(message, new ChannelBufferOutputStream(buffer));
- write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
- }
-
- public TCPNetworkConnection getOwnerObject() {
- return TCPNetworkConnection.this;
- }
- }
-
- /** Returns the Netty Pipeline stage handling Bitcoin serialization for this connection. */
- public NetworkHandler getHandler() {
- return handler;
- }
-
- public VersionMessage getVersionMessage() {
- return versionMessage;
- }
-
- public PeerAddress getPeerAddress() {
- return new PeerAddress(remoteIp, params.getPort());
- }
-
- public void close() {
- channel.close();
- }
-
- public void setRemoteAddress(SocketAddress address) {
- if (address instanceof InetSocketAddress)
- remoteIp = ((InetSocketAddress)address).getAddress();
- }
-}
diff --git a/core/src/main/java/com/google/bitcoin/core/Wallet.java b/core/src/main/java/com/google/bitcoin/core/Wallet.java
index 17239e6d..cf0044cb 100644
--- a/core/src/main/java/com/google/bitcoin/core/Wallet.java
+++ b/core/src/main/java/com/google/bitcoin/core/Wallet.java
@@ -1774,7 +1774,7 @@ public class Wallet implements Serializable, BlockChainListener, PeerFilterProvi
* @throws InsufficientMoneyException if the request could not be completed due to not enough balance.
* @throws IOException if there was a problem broadcasting the transaction
*/
- public Transaction sendCoins(Peer peer, SendRequest request) throws IOException, InsufficientMoneyException {
+ public Transaction sendCoins(Peer peer, SendRequest request) throws InsufficientMoneyException {
Transaction tx = sendCoinsOffline(request);
peer.sendMessage(tx);
return tx;
diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java b/core/src/main/java/com/google/bitcoin/networkabstraction/AbstractTimeoutHandler.java
similarity index 87%
rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java
rename to core/src/main/java/com/google/bitcoin/networkabstraction/AbstractTimeoutHandler.java
index 830dd6f0..dabe3077 100644
--- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/AbstractTimeoutHandler.java
@@ -14,13 +14,15 @@
* limitations under the License.
*/
-package com.google.bitcoin.protocols.niowrapper;
+package com.google.bitcoin.networkabstraction;
import java.util.Timer;
import java.util.TimerTask;
/**
- *
A stream parser that provides functionality for creating timeouts between arbitrary events.
+ *
A base class which provides basic support for socket timeouts. It is used instead of integrating timeouts into the
+ * NIO select thread both for simplicity and to keep code shared between NIO and blocking sockets as much as possible.
+ *
*/
public abstract class AbstractTimeoutHandler {
// TimerTask and timeout value which are added to a timer to kill the connection on timeout
@@ -29,7 +31,7 @@ public abstract class AbstractTimeoutHandler {
private boolean timeoutEnabled = true;
// A timer which manages expiring channels as their timeouts occur (if configured).
- private static final Timer timeoutTimer = new Timer("ProtobufParser timeouts", true);
+ private static final Timer timeoutTimer = new Timer("AbstractTimeoutHandler timeouts", true);
/**
*
Enables or disables the timeout entirely. This may be useful if you want to store the timeout value but wish
diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioClient.java b/core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClient.java
similarity index 60%
rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioClient.java
rename to core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClient.java
index 66652d90..e41590dd 100644
--- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioClient.java
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClient.java
@@ -14,31 +14,37 @@
* limitations under the License.
*/
-package com.google.bitcoin.protocols.niowrapper;
+package com.google.bitcoin.networkabstraction;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SocketChannel;
+import java.util.Set;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkState;
/**
- * Creates a simple connection to a server using a {@link StreamParser} to process data.
+ *
Creates a simple connection to a server using a {@link StreamParser} to process data.
+ *
+ *
Generally, using {@link NioClient} and {@link NioClientManager} should be preferred over {@link BlockingClient}
+ * and {@link BlockingClientManager}, unless you wish to connect over a proxy or use some other network settings that
+ * cannot be set using NIO.
*/
-public class NioClient implements MessageWriteTarget {
- private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioClient.class);
+public class BlockingClient implements MessageWriteTarget {
+ private static final org.slf4j.Logger log = LoggerFactory.getLogger(BlockingClient.class);
private static final int BUFFER_SIZE_LOWER_BOUND = 4096;
private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
@Nonnull private final ByteBuffer dbuf;
- @Nonnull private final SocketChannel sc;
+ @Nonnull private final Socket socket;
+ private volatile boolean vCloseRequested = false;
/**
*
Creates a new client to the given server address using the given {@link StreamParser} to decode the data.
@@ -48,28 +54,35 @@ public class NioClient implements MessageWriteTarget {
*
* @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no
* timeout.
+ * @param clientSet A set which this object will add itself to after initialization, and then remove itself from
+ * when the connection dies. Note that this set must be thread-safe.
*/
- public NioClient(final InetSocketAddress serverAddress, final StreamParser parser,
- final int connectTimeoutMillis) throws IOException {
+ public BlockingClient(final SocketAddress serverAddress, final StreamParser parser,
+ final int connectTimeoutMillis, @Nullable final Set clientSet) throws IOException {
// Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make
// sure it doesnt get too large or have to call read too often.
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
parser.setWriteTarget(this);
- sc = SocketChannel.open();
+ socket = new Socket();
- new Thread() {
+ Thread t = new Thread() {
@Override
public void run() {
+ if (clientSet != null)
+ clientSet.add(BlockingClient.this);
try {
- sc.socket().connect(serverAddress, connectTimeoutMillis);
+ socket.connect(serverAddress, connectTimeoutMillis);
parser.connectionOpened();
+ InputStream stream = socket.getInputStream();
+ byte[] readBuff = new byte[dbuf.capacity()];
while (true) {
- int read = sc.read(dbuf);
- if (read == 0)
- continue;
- else if (read == -1)
+ // TODO Kill the message duplication here
+ checkState(dbuf.remaining() > 0 && dbuf.remaining() <= readBuff.length);
+ int read = stream.read(readBuff, 0, Math.max(1, Math.min(dbuf.remaining(), stream.available())));
+ if (read == -1)
return;
+ dbuf.put(readBuff, 0, read);
// "flip" the buffer - setting the limit to the current position and setting position to 0
dbuf.flip();
// Use parser.receiveBytes's return value as a double-check that it stopped reading at the right
@@ -80,20 +93,24 @@ public class NioClient implements MessageWriteTarget {
// position)
dbuf.compact();
}
- } catch (AsynchronousCloseException e) {// Expected if the connection is closed
- } catch (ClosedChannelException e) { // Expected if the connection is closed
} catch (Exception e) {
- log.error("Error trying to open/read from connection", e);
+ if (!vCloseRequested)
+ log.error("Error trying to open/read from connection", e);
} finally {
try {
- sc.close();
+ socket.close();
} catch (IOException e1) {
// At this point there isn't much we can do, and we can probably assume the channel is closed
}
+ if (clientSet != null)
+ clientSet.remove(BlockingClient.this);
parser.connectionClosed();
}
}
- }.start();
+ };
+ t.setName("BlockingClient network thread for " + serverAddress);
+ t.setDaemon(true);
+ t.start();
}
/**
@@ -103,21 +120,21 @@ public class NioClient implements MessageWriteTarget {
public void closeConnection() {
// Closes the channel, triggering an exception in the network-handling thread triggering connectionClosed()
try {
- sc.close();
+ vCloseRequested = true;
+ socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
- // Writes raw bytes to the channel (used by the write method in StreamParser)
@Override
- public synchronized void writeBytes(byte[] message) {
+ public synchronized void writeBytes(byte[] message) throws IOException {
try {
- if (sc.write(ByteBuffer.wrap(message)) != message.length)
- throw new IOException("Couldn't write all of message to socket");
+ socket.getOutputStream().write(message);
} catch (IOException e) {
log.error("Error writing message to connection, closing connection", e);
closeConnection();
+ throw e;
}
}
}
diff --git a/core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClientManager.java b/core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClientManager.java
new file mode 100644
index 00000000..e09d3576
--- /dev/null
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClientManager.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2013 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.bitcoin.networkabstraction;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ *
A thin wrapper around a set of {@link BlockingClient}s.
+ *
+ *
Generally, using {@link NioClient} and {@link NioClientManager} should be preferred over {@link BlockingClient}
+ * and {@link BlockingClientManager} as they scale significantly better, unless you wish to connect over a proxy or use
+ * some other network settings that cannot be set using NIO.
+ */
+public class BlockingClientManager extends AbstractIdleService implements ClientConnectionManager {
+ private final Set clients = Collections.synchronizedSet(new HashSet());
+ @Override
+ public void openConnection(SocketAddress serverAddress, StreamParser parser) {
+ if (!isRunning())
+ throw new IllegalStateException();
+ try {
+ new BlockingClient(serverAddress, parser, 1000, clients);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources
+ }
+ }
+
+ @Override
+ protected void startUp() throws Exception { }
+
+ @Override
+ protected void shutDown() throws Exception {
+ synchronized (clients) {
+ for (BlockingClient client : clients)
+ client.closeConnection();
+ }
+ }
+
+ @Override
+ public int getConnectedClientCount() {
+ return clients.size();
+ }
+
+ @Override
+ public void closeConnections(int n) {
+ if (!isRunning())
+ throw new IllegalStateException();
+ synchronized (clients) {
+ Iterator it = clients.iterator();
+ while (n-- > 0 && it.hasNext())
+ it.next().closeConnection();
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/bitcoin/networkabstraction/ClientConnectionManager.java b/core/src/main/java/com/google/bitcoin/networkabstraction/ClientConnectionManager.java
new file mode 100644
index 00000000..d42720b0
--- /dev/null
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/ClientConnectionManager.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2013 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.bitcoin.networkabstraction;
+
+import com.google.common.util.concurrent.Service;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+/**
+ *
A generic interface for an object which keeps track of a set of open client connections, creates new ones and
+ * ensures they are serviced properly.
+ *
+ *
When the service is {@link com.google.common.util.concurrent.Service#stop()}ed, all connections will be closed and
+ * the appropriate connectionClosed() calls must be made.
+ */
+public interface ClientConnectionManager extends Service {
+ /**
+ * Creates a new connection to the given address, with the given parser used to handle incoming data.
+ */
+ void openConnection(SocketAddress serverAddress, StreamParser parser);
+
+ /** Gets the number of connected peers */
+ int getConnectedClientCount();
+
+ /** Closes n peer connections */
+ void closeConnections(int n);
+}
diff --git a/core/src/main/java/com/google/bitcoin/networkabstraction/ConnectionHandler.java b/core/src/main/java/com/google/bitcoin/networkabstraction/ConnectionHandler.java
new file mode 100644
index 00000000..b8c72128
--- /dev/null
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/ConnectionHandler.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2013 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.bitcoin.networkabstraction;
+
+import com.google.bitcoin.core.Message;
+import com.google.bitcoin.utils.Threading;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A simple NIO MessageWriteTarget which handles all the business logic of a connection (reading+writing bytes).
+ * Used only by the NioClient and NioServer classes
+ */
+class ConnectionHandler implements MessageWriteTarget {
+ private static final org.slf4j.Logger log = LoggerFactory.getLogger(ConnectionHandler.class);
+
+ private static final int BUFFER_SIZE_LOWER_BOUND = 4096;
+ private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
+
+ private static final int OUTBOUND_BUFFER_BYTE_COUNT = Message.MAX_SIZE + 24; // 24 byte message header
+
+ // We lock when touching local flags and when writing data, but NEVER when calling any methods which leave this
+ // class into non-Java classes.
+ private final ReentrantLock lock = Threading.lock("nioConnectionHandler");
+ @GuardedBy("lock") private final ByteBuffer readBuff;
+ @GuardedBy("lock") private final SocketChannel channel;
+ @GuardedBy("lock") private final SelectionKey key;
+ @GuardedBy("lock") final StreamParser parser;
+ @GuardedBy("lock") private boolean closeCalled = false;
+
+ @GuardedBy("lock") private long bytesToWriteRemaining = 0;
+ @GuardedBy("lock") private final LinkedList bytesToWrite = new LinkedList();
+
+ private Set connectedHandlers;
+
+ public ConnectionHandler(StreamParserFactory parserFactory, SelectionKey key) throws IOException {
+ this(parserFactory.getNewParser(((SocketChannel)key.channel()).socket().getInetAddress(), ((SocketChannel)key.channel()).socket().getPort()), key);
+ if (parser == null)
+ throw new IOException("Parser factory.getNewParser returned null");
+ }
+
+ private ConnectionHandler(StreamParser parser, SelectionKey key) {
+ this.key = key;
+ this.channel = checkNotNull(((SocketChannel)key.channel()));
+ this.parser = parser;
+ if (parser == null) {
+ readBuff = null;
+ closeConnection();
+ return;
+ }
+ readBuff = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
+ parser.setWriteTarget(this); // May callback into us (eg closeConnection() now)
+ connectedHandlers = null;
+ }
+
+ public ConnectionHandler(StreamParser parser, SelectionKey key, Set connectedHandlers) {
+ this(checkNotNull(parser), key);
+
+ // closeConnection() may have already happened, in which case we shouldn't add ourselves to the connectedHandlers set
+ lock.lock();
+ boolean alreadyClosed = false;
+ try {
+ alreadyClosed = closeCalled;
+ this.connectedHandlers = connectedHandlers;
+ } finally {
+ lock.unlock();
+ }
+ if (!alreadyClosed)
+ checkState(connectedHandlers.add(this));
+ }
+
+ // Tries to write any outstanding write bytes, runs in any thread (possibly unlocked)
+ private void tryWriteBytes() throws IOException {
+ lock.lock();
+ try {
+ // Iterate through the outbound ByteBuff queue, pushing as much as possible into the OS' network buffer.
+ Iterator bytesIterator = bytesToWrite.iterator();
+ while (bytesIterator.hasNext()) {
+ ByteBuffer buff = bytesIterator.next();
+ bytesToWriteRemaining -= channel.write(buff);
+ if (!buff.hasRemaining())
+ bytesIterator.remove();
+ else {
+ // Make sure we are registered to get updated when writing is available again
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ // Refresh the selector to make sure it gets the new interestOps
+ key.selector().wakeup();
+ break;
+ }
+ }
+ // If we are done writing, clear the OP_WRITE interestOps
+ if (bytesToWrite.isEmpty())
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+ // Don't bother waking up the selector here, since we're just removing an op, not adding
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void writeBytes(byte[] message) throws IOException {
+ lock.lock();
+ try {
+ // Network buffers are not unlimited (and are often smaller than some messages we may wish to send), and
+ // thus we have to buffer outbound messages sometimes. To do this, we use a queue of ByteBuffers and just
+ // append to it when we want to send a message. We then let tryWriteBytes() either send the message or
+ // register our SelectionKey to wakeup when we have free outbound buffer space available.
+
+ if (bytesToWriteRemaining + message.length > OUTBOUND_BUFFER_BYTE_COUNT)
+ throw new IOException("Outbound buffer overflowed");
+ // Just dump the message onto the write buffer and call tryWriteBytes
+ // TODO: Kill the needless message duplication when the write completes right away
+ bytesToWrite.offer(ByteBuffer.wrap(Arrays.copyOf(message, message.length)));
+ bytesToWriteRemaining += message.length;
+ tryWriteBytes();
+ } catch (IOException e) {
+ lock.unlock();
+ log.error("Error writing message to connection, closing connection", e);
+ closeConnection();
+ throw e;
+ }
+ lock.unlock();
+ }
+
+ @Override
+ // May NOT be called with lock held
+ public void closeConnection() {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ connectionClosed();
+ }
+
+ private void connectionClosed() {
+ boolean callClosed = false;
+ lock.lock();
+ try {
+ callClosed = !closeCalled;
+ closeCalled = true;
+ } finally {
+ lock.unlock();
+ }
+ if (callClosed) {
+ checkState(connectedHandlers == null || connectedHandlers.remove(this));
+ parser.connectionClosed();
+ }
+ }
+
+ // Handle a SelectionKey which was selected
+ // Runs unlocked as the caller is single-threaded (or if not, should enforce that handleKey is only called
+ // atomically for a given ConnectionHandler)
+ public static void handleKey(SelectionKey key) {
+ ConnectionHandler handler = ((ConnectionHandler)key.attachment());
+ try {
+ if (handler == null)
+ return;
+ if (!key.isValid()) {
+ handler.closeConnection(); // Key has been cancelled, make sure the socket gets closed
+ return;
+ }
+ if (key.isReadable()) {
+ // Do a socket read and invoke the parser's receiveBytes message
+ int read = handler.channel.read(handler.readBuff);
+ if (read == 0)
+ return; // Was probably waiting on a write
+ else if (read == -1) { // Socket was closed
+ key.cancel();
+ handler.closeConnection();
+ return;
+ }
+ // "flip" the buffer - setting the limit to the current position and setting position to 0
+ handler.readBuff.flip();
+ // Use parser.receiveBytes's return value as a check that it stopped reading at the right location
+ int bytesConsumed = handler.parser.receiveBytes(handler.readBuff);
+ checkState(handler.readBuff.position() == bytesConsumed);
+ // Now drop the bytes which were read by compacting readBuff (resetting limit and keeping relative
+ // position)
+ handler.readBuff.compact();
+ }
+ if (key.isWritable())
+ handler.tryWriteBytes();
+ } catch (Exception e) {
+ // This can happen eg if the channel closes while the thread is about to get killed
+ // (ClosedByInterruptException), or if handler.parser.receiveBytes throws something
+ log.error("Error handling SelectionKey", e);
+ if (handler != null)
+ handler.closeConnection();
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/MessageWriteTarget.java b/core/src/main/java/com/google/bitcoin/networkabstraction/MessageWriteTarget.java
similarity index 73%
rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/MessageWriteTarget.java
rename to core/src/main/java/com/google/bitcoin/networkabstraction/MessageWriteTarget.java
index 6bee48be..704f8f0b 100644
--- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/MessageWriteTarget.java
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/MessageWriteTarget.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.google.bitcoin.protocols.niowrapper;
+package com.google.bitcoin.networkabstraction;
import java.io.IOException;
@@ -22,6 +22,13 @@ import java.io.IOException;
* A target to which messages can be written/connection can be closed
*/
public interface MessageWriteTarget {
+ /**
+ * Writes the given bytes to the remote server.
+ */
void writeBytes(byte[] message) throws IOException;
+ /**
+ * Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()}
+ * event on the network-handling thread where all callbacks occur.
+ */
void closeConnection();
}
diff --git a/core/src/main/java/com/google/bitcoin/networkabstraction/NioClient.java b/core/src/main/java/com/google/bitcoin/networkabstraction/NioClient.java
new file mode 100644
index 00000000..496b54ba
--- /dev/null
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/NioClient.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2013 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.bitcoin.networkabstraction;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import javax.annotation.Nonnull;
+
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Creates a simple connection to a server using a {@link StreamParser} to process data.
+ */
+public class NioClient implements MessageWriteTarget {
+ private final Handler handler;
+ private final NioClientManager manager = new NioClientManager();
+
+ class Handler extends AbstractTimeoutHandler implements StreamParser {
+ private final StreamParser upstreamParser;
+ private MessageWriteTarget writeTarget;
+ private boolean closeOnOpen = false;
+ Handler(StreamParser upstreamParser, int connectTimeoutMillis) {
+ this.upstreamParser = upstreamParser;
+ setSocketTimeout(connectTimeoutMillis);
+ setTimeoutEnabled(true);
+ }
+
+ @Override
+ protected synchronized void timeoutOccurred() {
+ upstreamParser.connectionClosed();
+ closeOnOpen = true;
+ }
+
+ @Override
+ public void connectionClosed() {
+ upstreamParser.connectionClosed();
+ manager.stop();
+ }
+
+ @Override
+ public synchronized void connectionOpened() {
+ if (!closeOnOpen)
+ upstreamParser.connectionOpened();
+ }
+
+ @Override
+ public int receiveBytes(ByteBuffer buff) throws Exception {
+ return upstreamParser.receiveBytes(buff);
+ }
+
+ @Override
+ public synchronized void setWriteTarget(MessageWriteTarget writeTarget) {
+ if (closeOnOpen)
+ writeTarget.closeConnection();
+ else {
+ setTimeoutEnabled(false);
+ this.writeTarget = writeTarget;
+ upstreamParser.setWriteTarget(writeTarget);
+ }
+ }
+
+ @Override
+ public int getMaxMessageSize() {
+ return upstreamParser.getMaxMessageSize();
+ }
+ }
+
+ /**
+ *
Creates a new client to the given server address using the given {@link StreamParser} to decode the data.
+ * The given parser MUST be unique to this object. This does not block while waiting for the connection to
+ * open, but will call either the {@link StreamParser#connectionOpened()} or
+ * {@link StreamParser#connectionClosed()} callback on the created network event processing thread.
+ *
+ * @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no
+ * timeout.
+ */
+ public NioClient(final SocketAddress serverAddress, final StreamParser parser,
+ final int connectTimeoutMillis) throws IOException {
+ manager.startAndWait();
+ handler = new Handler(parser, connectTimeoutMillis);
+ manager.openConnection(serverAddress, handler);
+ }
+
+ @Override
+ public void closeConnection() {
+ handler.writeTarget.closeConnection();
+ }
+
+ @Override
+ public synchronized void writeBytes(byte[] message) throws IOException {
+ handler.writeTarget.writeBytes(message);
+ }
+}
diff --git a/core/src/main/java/com/google/bitcoin/networkabstraction/NioClientManager.java b/core/src/main/java/com/google/bitcoin/networkabstraction/NioClientManager.java
new file mode 100644
index 00000000..e9611046
--- /dev/null
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/NioClientManager.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2013 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.bitcoin.networkabstraction;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.*;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class which manages a set of client connections. Uses Java NIO to select network events and processes them in a
+ * single network processing thread.
+ */
+public class NioClientManager extends AbstractExecutionThreadService implements ClientConnectionManager {
+ private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioClientManager.class);
+
+ private final Selector selector;
+
+ // SocketChannels and StreamParsers of newly-created connections which should be registered with OP_CONNECT
+ class SocketChannelAndParser {
+ SocketChannel sc; StreamParser parser;
+ SocketChannelAndParser(SocketChannel sc, StreamParser parser) { this.sc = sc; this.parser = parser; }
+ }
+ final Queue newConnectionChannels = new LinkedBlockingQueue();
+
+ // Added to/removed from by the individual ConnectionHandler's, thus must by synchronized on its own.
+ private final Set connectedHandlers = Collections.synchronizedSet(new HashSet());
+
+ // Handle a SelectionKey which was selected
+ private void handleKey(SelectionKey key) throws IOException {
+ // We could have a !isValid() key here if the connection is already closed at this point
+ if (key.isValid() && key.isConnectable()) { // ie a client connection which has finished the initial connect process
+ // Create a ConnectionHandler and hook everything together
+ StreamParser parser = (StreamParser) key.attachment();
+ SocketChannel sc = (SocketChannel) key.channel();
+ ConnectionHandler handler = new ConnectionHandler(parser, key, connectedHandlers);
+ try {
+ if (sc.finishConnect()) {
+ log.info("Successfully connected to {}", sc.socket().getRemoteSocketAddress());
+ handler.parser.connectionOpened();
+ key.interestOps(SelectionKey.OP_READ).attach(handler);
+ } else {
+ log.error("Failed to connect to {}", sc.socket().getRemoteSocketAddress());
+ handler.closeConnection(); // Failed to connect for some reason
+ }
+ } catch (IOException e) {
+ // Calling sc.socket().getRemoteSocketAddress() here throws an exception, so we can only log the error itself
+ log.error("Failed to connect with exception", e);
+ handler.closeConnection();
+ } catch (CancelledKeyException e) { // There is a race to get to interestOps after finishConnect() which may cause this
+ // Calling sc.socket().getRemoteSocketAddress() here throws an exception, so we can only log the error itself
+ log.error("Failed to connect with exception", e);
+ handler.closeConnection();
+ }
+ } else // Process bytes read
+ ConnectionHandler.handleKey(key);
+ }
+
+ /**
+ * Creates a new client manager which uses Java NIO for socket management. Uses a single thread to handle all select
+ * calls.
+ */
+ public NioClientManager() {
+ try {
+ selector = SelectorProvider.provider().openSelector();
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Shouldn't ever happen
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (isRunning()) {
+ SocketChannelAndParser conn;
+ while ((conn = newConnectionChannels.poll()) != null) {
+ SelectionKey key = null;
+ try {
+ key = conn.sc.register(selector, SelectionKey.OP_CONNECT);
+ } catch (ClosedChannelException e) {
+ log.info("SocketChannel was closed before it could be registered");
+ }
+ key.attach(conn.parser);
+ }
+
+ selector.select();
+
+ Iterator keyIterator = selector.selectedKeys().iterator();
+ while (keyIterator.hasNext()) {
+ SelectionKey key = keyIterator.next();
+ keyIterator.remove();
+
+ handleKey(key);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error trying to open/read from connection: ", e);
+ } finally {
+ // Go through and close everything, without letting IOExceptions get in our way
+ for (SelectionKey key : selector.keys()) {
+ try {
+ key.channel().close();
+ } catch (IOException e) {
+ log.error("Error closing channel", e);
+ }
+ key.cancel();
+ if (key.attachment() instanceof ConnectionHandler)
+ ConnectionHandler.handleKey(key); // Close connection if relevant
+ }
+ try {
+ selector.close();
+ } catch (IOException e) {
+ log.error("Error closing client manager selector", e);
+ }
+ }
+ }
+
+ @Override
+ public void openConnection(SocketAddress serverAddress, StreamParser parser) {
+ if (!isRunning())
+ throw new IllegalStateException();
+ // Create a new connection, give it a parser as an attachment
+ try {
+ SocketChannel sc = SocketChannel.open();
+ sc.configureBlocking(false);
+ sc.connect(serverAddress);
+ newConnectionChannels.offer(new SocketChannelAndParser(sc, parser));
+ selector.wakeup();
+ } catch (IOException e) {
+ throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources
+ }
+ }
+
+ @Override
+ public void triggerShutdown() {
+ selector.wakeup();
+ }
+
+ @Override
+ public int getConnectedClientCount() {
+ return connectedHandlers.size();
+ }
+
+ @Override
+ public void closeConnections(int n) {
+ while (n-- > 0) {
+ ConnectionHandler handler;
+ synchronized (connectedHandlers) {
+ handler = connectedHandlers.iterator().next();
+ }
+ if (handler != null)
+ handler.closeConnection(); // Removes handler from connectedHandlers before returning
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/bitcoin/networkabstraction/NioServer.java b/core/src/main/java/com/google/bitcoin/networkabstraction/NioServer.java
new file mode 100644
index 00000000..8c75f58d
--- /dev/null
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/NioServer.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2013 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.bitcoin.networkabstraction;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.*;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.Iterator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Creates a simple server listener which listens for incoming client connections and uses a {@link StreamParser} to
+ * process data.
+ */
+public class NioServer extends AbstractExecutionThreadService {
+ private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioServer.class);
+
+ private final StreamParserFactory parserFactory;
+
+ private final ServerSocketChannel sc;
+ @VisibleForTesting final Selector selector;
+
+ // Handle a SelectionKey which was selected
+ private void handleKey(Selector selector, SelectionKey key) throws IOException {
+ if (key.isValid() && key.isAcceptable()) {
+ // Accept a new connection, give it a parser as an attachment
+ SocketChannel newChannel = sc.accept();
+ newChannel.configureBlocking(false);
+ SelectionKey newKey = newChannel.register(selector, SelectionKey.OP_READ);
+ ConnectionHandler handler = new ConnectionHandler(parserFactory, newKey);
+ newKey.attach(handler);
+ handler.parser.connectionOpened();
+ } else { // Got a closing channel or a channel to a client connection
+ ConnectionHandler.handleKey(key);
+ }
+ }
+
+ /**
+ * Creates a new server which is capable of listening for incoming connections and processing client provided data
+ * using {@link StreamParser}s created by the given {@link StreamParserFactory}
+ *
+ * @throws IOException If there is an issue opening the server socket or binding fails for some reason
+ */
+ public NioServer(final StreamParserFactory parserFactory, InetSocketAddress bindAddress) throws IOException {
+ this.parserFactory = parserFactory;
+
+ sc = ServerSocketChannel.open();
+ sc.configureBlocking(false);
+ sc.socket().bind(bindAddress);
+ selector = SelectorProvider.provider().openSelector();
+ sc.register(selector, SelectionKey.OP_ACCEPT);
+ }
+
+ @Override
+ protected void run() throws Exception {
+ try {
+ while (isRunning()) {
+ selector.select();
+
+ Iterator keyIterator = selector.selectedKeys().iterator();
+ while (keyIterator.hasNext()) {
+ SelectionKey key = keyIterator.next();
+ keyIterator.remove();
+
+ handleKey(selector, key);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error trying to open/read from connection: {}", e);
+ } finally {
+ // Go through and close everything, without letting IOExceptions get in our way
+ for (SelectionKey key : selector.keys()) {
+ try {
+ key.channel().close();
+ } catch (IOException e) {
+ log.error("Error closing channel", e);
+ }
+ try {
+ key.cancel();
+ handleKey(selector, key);
+ } catch (IOException e) {
+ log.error("Error closing selection key", e);
+ }
+ }
+ try {
+ selector.close();
+ } catch (IOException e) {
+ log.error("Error closing server selector", e);
+ }
+ try {
+ sc.close();
+ } catch (IOException e) {
+ log.error("Error closing server channel", e);
+ }
+ }
+ }
+
+ /**
+ * Invoked by the Execution service when it's time to stop.
+ * Calling this method directly will NOT stop the service, call
+ * {@link com.google.common.util.concurrent.AbstractExecutionThreadService#stop()} instead.
+ */
+ @Override
+ public void triggerShutdown() {
+ // Wake up the selector and let the selection thread break its loop as the ExecutionService !isRunning()
+ selector.wakeup();
+ }
+}
diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java b/core/src/main/java/com/google/bitcoin/networkabstraction/ProtobufParser.java
similarity index 97%
rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java
rename to core/src/main/java/com/google/bitcoin/networkabstraction/ProtobufParser.java
index 1c7c8852..257d975e 100644
--- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/ProtobufParser.java
@@ -14,10 +14,11 @@
* limitations under the License.
*/
-package com.google.bitcoin.protocols.niowrapper;
+package com.google.bitcoin.networkabstraction;
import com.google.bitcoin.core.Utils;
import com.google.bitcoin.utils.Threading;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.MessageLite;
import org.slf4j.Logger;
@@ -74,7 +75,7 @@ public class ProtobufParser extends AbstractTim
@GuardedBy("lock") private byte[] messageBytes;
private final ReentrantLock lock = Threading.lock("ProtobufParser");
- private final AtomicReference writeTarget = new AtomicReference();
+ @VisibleForTesting final AtomicReference writeTarget = new AtomicReference();
/**
* Creates a new protobuf handler.
diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java b/core/src/main/java/com/google/bitcoin/networkabstraction/StreamParser.java
similarity index 52%
rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java
rename to core/src/main/java/com/google/bitcoin/networkabstraction/StreamParser.java
index 87e45306..ecc9f6da 100644
--- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java
+++ b/core/src/main/java/com/google/bitcoin/networkabstraction/StreamParser.java
@@ -14,12 +14,13 @@
* limitations under the License.
*/
-package com.google.bitcoin.protocols.niowrapper;
+package com.google.bitcoin.networkabstraction;
import java.nio.ByteBuffer;
/**
- * A generic handler which is used in {@link NioServer} and {@link NioClient} to handle incoming data streams.
+ * A generic handler which is used in {@link NioServer}, {@link NioClient} and {@link BlockingClient} to handle incoming
+ * data streams.
*/
public interface StreamParser {
/** Called when the connection socket is closed */
@@ -29,14 +30,22 @@ public interface StreamParser {
void connectionOpened();
/**
- * Called when new bytes are available from the remote end.
- * * buff will start with its limit set to the position we can read to and its position set to the location we will
- * start reading at
- * * May read more than one message (recursively) if there are enough bytes available
- * * Uses messageBytes/messageBytesOffset to store message which are larger (incl their length prefix) than buff's
- * capacity(), ie it is up to this method to ensure we dont run out of buffer space to decode the next message.
- * * buff will end with its limit the same as it was previously, and its position set to the position up to which
- * bytes have been read (the same as its return value)
+ *