diff --git a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelClientConnection.java b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelClientConnection.java index 795027c4..b80a82a9 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelClientConnection.java +++ b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelClientConnection.java @@ -19,7 +19,7 @@ package com.google.bitcoin.protocols.channels; import com.google.bitcoin.core.ECKey; import com.google.bitcoin.core.Sha256Hash; import com.google.bitcoin.core.Wallet; -import com.google.bitcoin.protocols.niowrapper.ProtobufClient; +import com.google.bitcoin.protocols.niowrapper.NioClient; import com.google.bitcoin.protocols.niowrapper.ProtobufParser; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -106,7 +106,7 @@ public class PaymentChannelClientConnection { // Initiate the outbound network connection. We don't need to keep this around. The wireParser object will handle // things from here on out. - new ProtobufClient(server, wireParser, timeoutSeconds * 1000); + new NioClient(server, wireParser, timeoutSeconds * 1000); } /** @@ -153,7 +153,7 @@ public class PaymentChannelClientConnection { // // This call will cause the CLOSE message to be written to the wire, and then the destroyConnection() method that // we defined above will be called, which in turn will call wireParser.closeConnection(), which in turn will invoke - // ProtobufClient.closeConnection(), which will then close the socket triggering interruption of the network + // NioClient.closeConnection(), which will then close the socket triggering interruption of the network // thread it had created. That causes the background thread to die, which on its way out calls // ProtobufParser.connectionClosed which invokes the connectionClosed method we defined above which in turn // then configures the open-future correctly and closes the state object. Phew! diff --git a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServerListener.java b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServerListener.java index 4c0bb38b..038af1c8 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServerListener.java +++ b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServerListener.java @@ -27,9 +27,9 @@ import javax.annotation.Nullable; import com.google.bitcoin.core.Sha256Hash; import com.google.bitcoin.core.TransactionBroadcaster; import com.google.bitcoin.core.Wallet; +import com.google.bitcoin.protocols.niowrapper.NioServer; import com.google.bitcoin.protocols.niowrapper.ProtobufParser; -import com.google.bitcoin.protocols.niowrapper.ProtobufParserFactory; -import com.google.bitcoin.protocols.niowrapper.ProtobufServer; +import com.google.bitcoin.protocols.niowrapper.StreamParserFactory; import org.bitcoin.paymentchannel.Protos; import static com.google.common.base.Preconditions.checkNotNull; @@ -48,7 +48,7 @@ public class PaymentChannelServerListener { private final HandlerFactory eventHandlerFactory; private final BigInteger minAcceptedChannelSize; - private final ProtobufServer server; + private final NioServer server; /** * A factory which generates connection-specific event handlers. @@ -160,7 +160,7 @@ public class PaymentChannelServerListener { this.eventHandlerFactory = checkNotNull(eventHandlerFactory); this.minAcceptedChannelSize = checkNotNull(minAcceptedChannelSize); - server = new ProtobufServer(new ProtobufParserFactory() { + server = new NioServer(new StreamParserFactory() { @Override public ProtobufParser getNewParser(InetAddress inetAddress, int port) { return new ServerHandler(new InetSocketAddress(inetAddress, port), timeoutSeconds).socketProtobufHandler; diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java new file mode 100644 index 00000000..830dd6f0 --- /dev/null +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java @@ -0,0 +1,80 @@ +/* + * Copyright 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.protocols.niowrapper; + +import java.util.Timer; +import java.util.TimerTask; + +/** + *
A stream parser that provides functionality for creating timeouts between arbitrary events.
+ */ +public abstract class AbstractTimeoutHandler { + // TimerTask and timeout value which are added to a timer to kill the connection on timeout + private TimerTask timeoutTask; + private long timeoutMillis = 0; + private boolean timeoutEnabled = true; + + // A timer which manages expiring channels as their timeouts occur (if configured). + private static final Timer timeoutTimer = new Timer("ProtobufParser timeouts", true); + + /** + *Enables or disables the timeout entirely. This may be useful if you want to store the timeout value but wish + * to temporarily disable/enable timeouts.
+ * + *The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).
+ * + *This call will reset the current progress towards the timeout.
+ */ + public synchronized void setTimeoutEnabled(boolean timeoutEnabled) { + this.timeoutEnabled = timeoutEnabled; + resetTimeout(); + } + + /** + *Sets the receive timeout to the given number of milliseconds, automatically killing the connection if no + * messages are received for this long
+ * + *A timeout of 0 is interpreted as no timeout.
+ * + *The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).
+ * + *This call will reset the current progress towards the timeout.
+ */ + public synchronized void setSocketTimeout(int timeoutMillis) { + this.timeoutMillis = timeoutMillis; + resetTimeout(); + } + + /** + * Resets the current progress towards timeout to 0. + */ + protected synchronized void resetTimeout() { + if (timeoutTask != null) + timeoutTask.cancel(); + if (timeoutMillis == 0 || !timeoutEnabled) + return; + timeoutTask = new TimerTask() { + @Override + public void run() { + timeoutOccurred(); + } + }; + timeoutTimer.schedule(timeoutTask, timeoutMillis); + } + + protected abstract void timeoutOccurred(); +} diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufClient.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioClient.java similarity index 82% rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufClient.java rename to core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioClient.java index 875534ab..3a63ee9e 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufClient.java +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioClient.java @@ -29,10 +29,10 @@ import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkState; /** - * Creates a simple connection to a server using a {@link ProtobufParser} to process data. + * Creates a simple connection to a server using a {@link StreamParser} to process data. */ -public class ProtobufClient extends MessageWriteTarget { - private static final org.slf4j.Logger log = LoggerFactory.getLogger(ProtobufClient.class); +public class NioClient extends MessageWriteTarget { + private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioClient.class); private static final int BUFFER_SIZE_LOWER_BOUND = 4096; private static final int BUFFER_SIZE_UPPER_BOUND = 65536; @@ -41,19 +41,19 @@ public class ProtobufClient extends MessageWriteTarget { @Nonnull private final SocketChannel sc; /** - *Creates a new client to the given server address using the given {@link ProtobufParser} to decode the data. + *
Creates a new client to the given server address using the given {@link StreamParser} to decode the data. * The given parser MUST be unique to this object. This does not block while waiting for the connection to - * open, but will call either the {@link ProtobufParser#connectionOpen()} or {@link ProtobufParser#connectionClosed()} - * callback on the created network event processing thread.
+ * open, but will call either the {@link StreamParser#connectionOpened()} or + * {@link StreamParser#connectionClosed()} callback on the created network event processing thread. * * @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no * timeout. */ - public ProtobufClient(final InetSocketAddress serverAddress, final ProtobufParser parser, - final int connectTimeoutMillis) throws IOException { + public NioClient(final InetSocketAddress serverAddress, final StreamParser parser, + final int connectTimeoutMillis) throws IOException { // Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make // sure it doesnt get too large or have to call read too often. - dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.maxMessageSize, BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); + dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); parser.setWriteTarget(this); sc = SocketChannel.open(); @@ -62,7 +62,7 @@ public class ProtobufClient extends MessageWriteTarget { public void run() { try { sc.socket().connect(serverAddress, connectTimeoutMillis); - parser.connectionOpen(); + parser.connectionOpened(); while (true) { int read = sc.read(dbuf); @@ -72,9 +72,9 @@ public class ProtobufClient extends MessageWriteTarget { return; // "flip" the buffer - setting the limit to the current position and setting position to 0 dbuf.flip(); - // Use parser.receive's return value as a double-check that it stopped reading at the right + // Use parser.receiveBytes's return value as a double-check that it stopped reading at the right // location - int bytesConsumed = parser.receive(dbuf); + int bytesConsumed = parser.receiveBytes(dbuf); checkState(dbuf.position() == bytesConsumed); // Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative // position) @@ -97,7 +97,7 @@ public class ProtobufClient extends MessageWriteTarget { } /** - * Closes the connection to the server, triggering the {@link ProtobufParser#connectionClosed()} + * Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()} * event on the network-handling thread where all callbacks occur. */ public void closeConnection() { @@ -109,7 +109,7 @@ public class ProtobufClient extends MessageWriteTarget { } } - // Writes raw bytes to the channel (used by the write method in ProtobufParser) + // Writes raw bytes to the channel (used by the write method in StreamParser) @Override synchronized void writeBytes(byte[] message) { try { diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufServer.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioServer.java similarity index 89% rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufServer.java rename to core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioServer.java index cac5e57d..12f31e94 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufServer.java +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioServer.java @@ -35,13 +35,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; /** - * Creates a simple server listener which listens for incoming client connections and uses a {@link ProtobufParser} to + * Creates a simple server listener which listens for incoming client connections and uses a {@link StreamParser} to * process data. */ -public class ProtobufServer { - private static final org.slf4j.Logger log = LoggerFactory.getLogger(ProtobufServer.class); +public class NioServer { + private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioServer.class); - private final ProtobufParserFactory parserFactory; + private final StreamParserFactory parserFactory; @VisibleForTesting final Thread handlerThread; private final ServerSocketChannel sc; @@ -50,21 +50,21 @@ public class ProtobufServer { private static final int BUFFER_SIZE_UPPER_BOUND = 65536; private class ConnectionHandler extends MessageWriteTarget { - private final ReentrantLock lock = Threading.lock("protobufServerConnectionHandler"); + private final ReentrantLock lock = Threading.lock("nioServerConnectionHandler"); private final ByteBuffer dbuf; private final SocketChannel channel; - private final ProtobufParser parser; + private final StreamParser parser; private boolean closeCalled = false; ConnectionHandler(SocketChannel channel) throws IOException { this.channel = checkNotNull(channel); - ProtobufParser newParser = parserFactory.getNewParser(channel.socket().getInetAddress(), channel.socket().getPort()); + StreamParser newParser = parserFactory.getNewParser(channel.socket().getInetAddress(), channel.socket().getPort()); if (newParser == null) { closeConnection(); throw new IOException("Parser factory.getNewParser returned null"); } this.parser = newParser; - dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(newParser.maxMessageSize, BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); + dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(newParser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); newParser.setWriteTarget(this); } @@ -114,14 +114,14 @@ public class ProtobufServer { newChannel.configureBlocking(false); ConnectionHandler handler = new ConnectionHandler(newChannel); newChannel.register(selector, SelectionKey.OP_READ).attach(handler); - handler.parser.connectionOpen(); + handler.parser.connectionOpened(); } else { // Got a closing channel or a channel to a client connection ConnectionHandler handler = ((ConnectionHandler)key.attachment()); try { if (!key.isValid() && handler != null) handler.closeConnection(); // Key has been cancelled, make sure the socket gets closed else if (handler != null && key.isReadable()) { - // Do a socket read and invoke the parser's receive message + // Do a socket read and invoke the parser's receiveBytes message int read = handler.channel.read(handler.dbuf); if (read == 0) return; // Should probably never happen, but just in case it actually can just return 0 @@ -132,8 +132,8 @@ public class ProtobufServer { } // "flip" the buffer - setting the limit to the current position and setting position to 0 handler.dbuf.flip(); - // Use parser.receive's return value as a double-check that it stopped reading at the right location - int bytesConsumed = handler.parser.receive(handler.dbuf); + // Use parser.receiveBytes's return value as a check that it stopped reading at the right location + int bytesConsumed = handler.parser.receiveBytes(handler.dbuf); checkState(handler.dbuf.position() == bytesConsumed); // Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative // position) @@ -141,7 +141,7 @@ public class ProtobufServer { } } catch (Exception e) { // This can happen eg if the channel closes while the tread is about to get killed - // (ClosedByInterruptException), or if parser.parser.receive throws something + // (ClosedByInterruptException), or if parser.parser.receiveBytes throws something log.error("Error handling SelectionKey", e); if (handler != null) handler.closeConnection(); @@ -151,11 +151,11 @@ public class ProtobufServer { /** * Creates a new server which is capable of listening for incoming connections and processing client provided data - * using {@link ProtobufParser}s created by the given {@link ProtobufParserFactory} + * using {@link StreamParser}s created by the given {@link StreamParserFactory} * * @throws IOException If there is an issue opening the server socket (note that we don't bind yet) */ - public ProtobufServer(final ProtobufParserFactory parserFactory) throws IOException { + public NioServer(final StreamParserFactory parserFactory) throws IOException { this.parserFactory = parserFactory; sc = ServerSocketChannel.open(); diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java index a025f7d2..ec875b7d 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java @@ -22,20 +22,18 @@ import com.google.protobuf.MessageLite; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.Timer; -import java.util.TimerTask; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; /** - *A handler which is used in {@link ProtobufServer} and {@link ProtobufClient} to split up incoming data streams + *
A handler which is used in {@link NioServer} and {@link NioClient} to split up incoming data streams * into protobufs and provide an interface for writing protobufs to the connections.
* *Messages are encoded with a 4-byte signed integer (big endian) prefix to indicate their length followed by the * serialized protobuf
*/ -public class ProtobufParserSets the receive timeout to the given number of milliseconds, automatically killing the connection if no - * messages are received for this long
- * - *A timeout of 0 is interpreted as no timeout
- */ - public synchronized void setSocketTimeout(int timeoutMillis) { - this.timeoutMillis = timeoutMillis; - resetTimeout(); - } - - private synchronized void resetTimeout() { - if (timeoutTask != null) - timeoutTask.cancel(); - if (timeoutMillis == 0) - return; - timeoutTask = new TimerTask() { - @Override - public void run() { - closeConnection(); - } - }; - timeoutTimer.schedule(timeoutTask, timeoutMillis); - } } diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java new file mode 100644 index 00000000..87e45306 --- /dev/null +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java @@ -0,0 +1,55 @@ +/* + * Copyright 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.protocols.niowrapper; + +import java.nio.ByteBuffer; + +/** + * A generic handler which is used in {@link NioServer} and {@link NioClient} to handle incoming data streams. + */ +public interface StreamParser { + /** Called when the connection socket is closed */ + void connectionClosed(); + + /** Called when the connection socket is first opened */ + void connectionOpened(); + + /** + * Called when new bytes are available from the remote end. + * * buff will start with its limit set to the position we can read to and its position set to the location we will + * start reading at + * * May read more than one message (recursively) if there are enough bytes available + * * Uses messageBytes/messageBytesOffset to store message which are larger (incl their length prefix) than buff's + * capacity(), ie it is up to this method to ensure we dont run out of buffer space to decode the next message. + * * buff will end with its limit the same as it was previously, and its position set to the position up to which + * bytes have been read (the same as its return value) + * @return The amount of bytes consumed which should not be provided again + */ + int receiveBytes(ByteBuffer buff) throws Exception; + + /** + * Called when this parser is attached to an upstream write target (ie a low-level connection handler). This + * writeTarget should be stored and used to close the connection or write data to the socket. + */ + void setWriteTarget(MessageWriteTarget writeTarget); + + /** + * Returns the maximum message size of a message on the socket. This is used in calculating size of buffers to + * allocate. + */ + int getMaxMessageSize(); +} diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParserFactory.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParserFactory.java similarity index 80% rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParserFactory.java rename to core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParserFactory.java index c2a115ef..e9f49e75 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParserFactory.java +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParserFactory.java @@ -20,13 +20,13 @@ import java.net.InetAddress; import javax.annotation.Nullable; /** - * A factory which generates new {@link ProtobufParser}s when a new connection is opened. + * A factory which generates new {@link StreamParser}s when a new connection is opened. */ -public interface ProtobufParserFactory { +public interface StreamParserFactory { /** * Returns a new handler or null to have the connection close. * @param inetAddress The client's (IP) address * @param port The remote port on the client side */ - @Nullable public ProtobufParser getNewParser(InetAddress inetAddress, int port); + @Nullable public StreamParser getNewParser(InetAddress inetAddress, int port); } diff --git a/core/src/test/java/com/google/bitcoin/protocols/niowrapper/NioWrapperTest.java b/core/src/test/java/com/google/bitcoin/protocols/niowrapper/NioWrapperTest.java index d54a251d..faa08d6f 100644 --- a/core/src/test/java/com/google/bitcoin/protocols/niowrapper/NioWrapperTest.java +++ b/core/src/test/java/com/google/bitcoin/protocols/niowrapper/NioWrapperTest.java @@ -55,7 +55,7 @@ public class NioWrapperTest { final SettableFuture