From 9980903572709fcda6b09059a1d1c6c4931d359a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 23 Jul 2013 18:31:43 +0200 Subject: [PATCH] Make Protobuf{Server,Client} more generic. --- .../PaymentChannelClientConnection.java | 6 +- .../PaymentChannelServerListener.java | 8 +- .../niowrapper/AbstractTimeoutHandler.java | 80 ++++++++++++++++++ .../{ProtobufClient.java => NioClient.java} | 28 +++---- .../{ProtobufServer.java => NioServer.java} | 30 +++---- .../protocols/niowrapper/ProtobufParser.java | 83 ++++++------------- .../protocols/niowrapper/StreamParser.java | 55 ++++++++++++ ...rFactory.java => StreamParserFactory.java} | 6 +- .../protocols/niowrapper/NioWrapperTest.java | 24 +++--- 9 files changed, 211 insertions(+), 109 deletions(-) create mode 100644 core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java rename core/src/main/java/com/google/bitcoin/protocols/niowrapper/{ProtobufClient.java => NioClient.java} (82%) rename core/src/main/java/com/google/bitcoin/protocols/niowrapper/{ProtobufServer.java => NioServer.java} (89%) create mode 100644 core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java rename core/src/main/java/com/google/bitcoin/protocols/niowrapper/{ProtobufParserFactory.java => StreamParserFactory.java} (80%) diff --git a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelClientConnection.java b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelClientConnection.java index 795027c4..b80a82a9 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelClientConnection.java +++ b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelClientConnection.java @@ -19,7 +19,7 @@ package com.google.bitcoin.protocols.channels; import com.google.bitcoin.core.ECKey; import com.google.bitcoin.core.Sha256Hash; import com.google.bitcoin.core.Wallet; -import com.google.bitcoin.protocols.niowrapper.ProtobufClient; +import com.google.bitcoin.protocols.niowrapper.NioClient; import com.google.bitcoin.protocols.niowrapper.ProtobufParser; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -106,7 +106,7 @@ public class PaymentChannelClientConnection { // Initiate the outbound network connection. We don't need to keep this around. The wireParser object will handle // things from here on out. - new ProtobufClient(server, wireParser, timeoutSeconds * 1000); + new NioClient(server, wireParser, timeoutSeconds * 1000); } /** @@ -153,7 +153,7 @@ public class PaymentChannelClientConnection { // // This call will cause the CLOSE message to be written to the wire, and then the destroyConnection() method that // we defined above will be called, which in turn will call wireParser.closeConnection(), which in turn will invoke - // ProtobufClient.closeConnection(), which will then close the socket triggering interruption of the network + // NioClient.closeConnection(), which will then close the socket triggering interruption of the network // thread it had created. That causes the background thread to die, which on its way out calls // ProtobufParser.connectionClosed which invokes the connectionClosed method we defined above which in turn // then configures the open-future correctly and closes the state object. Phew! diff --git a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServerListener.java b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServerListener.java index 4c0bb38b..038af1c8 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServerListener.java +++ b/core/src/main/java/com/google/bitcoin/protocols/channels/PaymentChannelServerListener.java @@ -27,9 +27,9 @@ import javax.annotation.Nullable; import com.google.bitcoin.core.Sha256Hash; import com.google.bitcoin.core.TransactionBroadcaster; import com.google.bitcoin.core.Wallet; +import com.google.bitcoin.protocols.niowrapper.NioServer; import com.google.bitcoin.protocols.niowrapper.ProtobufParser; -import com.google.bitcoin.protocols.niowrapper.ProtobufParserFactory; -import com.google.bitcoin.protocols.niowrapper.ProtobufServer; +import com.google.bitcoin.protocols.niowrapper.StreamParserFactory; import org.bitcoin.paymentchannel.Protos; import static com.google.common.base.Preconditions.checkNotNull; @@ -48,7 +48,7 @@ public class PaymentChannelServerListener { private final HandlerFactory eventHandlerFactory; private final BigInteger minAcceptedChannelSize; - private final ProtobufServer server; + private final NioServer server; /** * A factory which generates connection-specific event handlers. @@ -160,7 +160,7 @@ public class PaymentChannelServerListener { this.eventHandlerFactory = checkNotNull(eventHandlerFactory); this.minAcceptedChannelSize = checkNotNull(minAcceptedChannelSize); - server = new ProtobufServer(new ProtobufParserFactory() { + server = new NioServer(new StreamParserFactory() { @Override public ProtobufParser getNewParser(InetAddress inetAddress, int port) { return new ServerHandler(new InetSocketAddress(inetAddress, port), timeoutSeconds).socketProtobufHandler; diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java new file mode 100644 index 00000000..830dd6f0 --- /dev/null +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/AbstractTimeoutHandler.java @@ -0,0 +1,80 @@ +/* + * Copyright 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.protocols.niowrapper; + +import java.util.Timer; +import java.util.TimerTask; + +/** + *

A stream parser that provides functionality for creating timeouts between arbitrary events.

+ */ +public abstract class AbstractTimeoutHandler { + // TimerTask and timeout value which are added to a timer to kill the connection on timeout + private TimerTask timeoutTask; + private long timeoutMillis = 0; + private boolean timeoutEnabled = true; + + // A timer which manages expiring channels as their timeouts occur (if configured). + private static final Timer timeoutTimer = new Timer("ProtobufParser timeouts", true); + + /** + *

Enables or disables the timeout entirely. This may be useful if you want to store the timeout value but wish + * to temporarily disable/enable timeouts.

+ * + *

The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).

+ * + *

This call will reset the current progress towards the timeout.

+ */ + public synchronized void setTimeoutEnabled(boolean timeoutEnabled) { + this.timeoutEnabled = timeoutEnabled; + resetTimeout(); + } + + /** + *

Sets the receive timeout to the given number of milliseconds, automatically killing the connection if no + * messages are received for this long

+ * + *

A timeout of 0 is interpreted as no timeout.

+ * + *

The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).

+ * + *

This call will reset the current progress towards the timeout.

+ */ + public synchronized void setSocketTimeout(int timeoutMillis) { + this.timeoutMillis = timeoutMillis; + resetTimeout(); + } + + /** + * Resets the current progress towards timeout to 0. + */ + protected synchronized void resetTimeout() { + if (timeoutTask != null) + timeoutTask.cancel(); + if (timeoutMillis == 0 || !timeoutEnabled) + return; + timeoutTask = new TimerTask() { + @Override + public void run() { + timeoutOccurred(); + } + }; + timeoutTimer.schedule(timeoutTask, timeoutMillis); + } + + protected abstract void timeoutOccurred(); +} diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufClient.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioClient.java similarity index 82% rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufClient.java rename to core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioClient.java index 875534ab..3a63ee9e 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufClient.java +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioClient.java @@ -29,10 +29,10 @@ import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkState; /** - * Creates a simple connection to a server using a {@link ProtobufParser} to process data. + * Creates a simple connection to a server using a {@link StreamParser} to process data. */ -public class ProtobufClient extends MessageWriteTarget { - private static final org.slf4j.Logger log = LoggerFactory.getLogger(ProtobufClient.class); +public class NioClient extends MessageWriteTarget { + private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioClient.class); private static final int BUFFER_SIZE_LOWER_BOUND = 4096; private static final int BUFFER_SIZE_UPPER_BOUND = 65536; @@ -41,19 +41,19 @@ public class ProtobufClient extends MessageWriteTarget { @Nonnull private final SocketChannel sc; /** - *

Creates a new client to the given server address using the given {@link ProtobufParser} to decode the data. + *

Creates a new client to the given server address using the given {@link StreamParser} to decode the data. * The given parser MUST be unique to this object. This does not block while waiting for the connection to - * open, but will call either the {@link ProtobufParser#connectionOpen()} or {@link ProtobufParser#connectionClosed()} - * callback on the created network event processing thread.

+ * open, but will call either the {@link StreamParser#connectionOpened()} or + * {@link StreamParser#connectionClosed()} callback on the created network event processing thread.

* * @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no * timeout. */ - public ProtobufClient(final InetSocketAddress serverAddress, final ProtobufParser parser, - final int connectTimeoutMillis) throws IOException { + public NioClient(final InetSocketAddress serverAddress, final StreamParser parser, + final int connectTimeoutMillis) throws IOException { // Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make // sure it doesnt get too large or have to call read too often. - dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.maxMessageSize, BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); + dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); parser.setWriteTarget(this); sc = SocketChannel.open(); @@ -62,7 +62,7 @@ public class ProtobufClient extends MessageWriteTarget { public void run() { try { sc.socket().connect(serverAddress, connectTimeoutMillis); - parser.connectionOpen(); + parser.connectionOpened(); while (true) { int read = sc.read(dbuf); @@ -72,9 +72,9 @@ public class ProtobufClient extends MessageWriteTarget { return; // "flip" the buffer - setting the limit to the current position and setting position to 0 dbuf.flip(); - // Use parser.receive's return value as a double-check that it stopped reading at the right + // Use parser.receiveBytes's return value as a double-check that it stopped reading at the right // location - int bytesConsumed = parser.receive(dbuf); + int bytesConsumed = parser.receiveBytes(dbuf); checkState(dbuf.position() == bytesConsumed); // Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative // position) @@ -97,7 +97,7 @@ public class ProtobufClient extends MessageWriteTarget { } /** - * Closes the connection to the server, triggering the {@link ProtobufParser#connectionClosed()} + * Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()} * event on the network-handling thread where all callbacks occur. */ public void closeConnection() { @@ -109,7 +109,7 @@ public class ProtobufClient extends MessageWriteTarget { } } - // Writes raw bytes to the channel (used by the write method in ProtobufParser) + // Writes raw bytes to the channel (used by the write method in StreamParser) @Override synchronized void writeBytes(byte[] message) { try { diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufServer.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioServer.java similarity index 89% rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufServer.java rename to core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioServer.java index cac5e57d..12f31e94 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufServer.java +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/NioServer.java @@ -35,13 +35,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; /** - * Creates a simple server listener which listens for incoming client connections and uses a {@link ProtobufParser} to + * Creates a simple server listener which listens for incoming client connections and uses a {@link StreamParser} to * process data. */ -public class ProtobufServer { - private static final org.slf4j.Logger log = LoggerFactory.getLogger(ProtobufServer.class); +public class NioServer { + private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioServer.class); - private final ProtobufParserFactory parserFactory; + private final StreamParserFactory parserFactory; @VisibleForTesting final Thread handlerThread; private final ServerSocketChannel sc; @@ -50,21 +50,21 @@ public class ProtobufServer { private static final int BUFFER_SIZE_UPPER_BOUND = 65536; private class ConnectionHandler extends MessageWriteTarget { - private final ReentrantLock lock = Threading.lock("protobufServerConnectionHandler"); + private final ReentrantLock lock = Threading.lock("nioServerConnectionHandler"); private final ByteBuffer dbuf; private final SocketChannel channel; - private final ProtobufParser parser; + private final StreamParser parser; private boolean closeCalled = false; ConnectionHandler(SocketChannel channel) throws IOException { this.channel = checkNotNull(channel); - ProtobufParser newParser = parserFactory.getNewParser(channel.socket().getInetAddress(), channel.socket().getPort()); + StreamParser newParser = parserFactory.getNewParser(channel.socket().getInetAddress(), channel.socket().getPort()); if (newParser == null) { closeConnection(); throw new IOException("Parser factory.getNewParser returned null"); } this.parser = newParser; - dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(newParser.maxMessageSize, BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); + dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(newParser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); newParser.setWriteTarget(this); } @@ -114,14 +114,14 @@ public class ProtobufServer { newChannel.configureBlocking(false); ConnectionHandler handler = new ConnectionHandler(newChannel); newChannel.register(selector, SelectionKey.OP_READ).attach(handler); - handler.parser.connectionOpen(); + handler.parser.connectionOpened(); } else { // Got a closing channel or a channel to a client connection ConnectionHandler handler = ((ConnectionHandler)key.attachment()); try { if (!key.isValid() && handler != null) handler.closeConnection(); // Key has been cancelled, make sure the socket gets closed else if (handler != null && key.isReadable()) { - // Do a socket read and invoke the parser's receive message + // Do a socket read and invoke the parser's receiveBytes message int read = handler.channel.read(handler.dbuf); if (read == 0) return; // Should probably never happen, but just in case it actually can just return 0 @@ -132,8 +132,8 @@ public class ProtobufServer { } // "flip" the buffer - setting the limit to the current position and setting position to 0 handler.dbuf.flip(); - // Use parser.receive's return value as a double-check that it stopped reading at the right location - int bytesConsumed = handler.parser.receive(handler.dbuf); + // Use parser.receiveBytes's return value as a check that it stopped reading at the right location + int bytesConsumed = handler.parser.receiveBytes(handler.dbuf); checkState(handler.dbuf.position() == bytesConsumed); // Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative // position) @@ -141,7 +141,7 @@ public class ProtobufServer { } } catch (Exception e) { // This can happen eg if the channel closes while the tread is about to get killed - // (ClosedByInterruptException), or if parser.parser.receive throws something + // (ClosedByInterruptException), or if parser.parser.receiveBytes throws something log.error("Error handling SelectionKey", e); if (handler != null) handler.closeConnection(); @@ -151,11 +151,11 @@ public class ProtobufServer { /** * Creates a new server which is capable of listening for incoming connections and processing client provided data - * using {@link ProtobufParser}s created by the given {@link ProtobufParserFactory} + * using {@link StreamParser}s created by the given {@link StreamParserFactory} * * @throws IOException If there is an issue opening the server socket (note that we don't bind yet) */ - public ProtobufServer(final ProtobufParserFactory parserFactory) throws IOException { + public NioServer(final StreamParserFactory parserFactory) throws IOException { this.parserFactory = parserFactory; sc = ServerSocketChannel.open(); diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java index a025f7d2..ec875b7d 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParser.java @@ -22,20 +22,18 @@ import com.google.protobuf.MessageLite; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.Timer; -import java.util.TimerTask; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; /** - *

A handler which is used in {@link ProtobufServer} and {@link ProtobufClient} to split up incoming data streams + *

A handler which is used in {@link NioServer} and {@link NioClient} to split up incoming data streams * into protobufs and provide an interface for writing protobufs to the connections.

* *

Messages are encoded with a 4-byte signed integer (big endian) prefix to indicate their length followed by the * serialized protobuf

*/ -public class ProtobufParser { +public class ProtobufParser extends AbstractTimeoutHandler implements StreamParser { /** * An interface which can be implemented to handle callbacks as new messages are generated and socket events occur. * @param The protobuf type which is used on this socket. @@ -68,13 +66,6 @@ public class ProtobufParser { private MessageWriteTarget writeTarget; - // TimerTask and timeout value which are added to a timer to kill the connection on timeout - private TimerTask timeoutTask; - private long timeoutMillis; - - // A timer which manages expiring connections as their timeouts occur (if configured). - private static final Timer timeoutTimer = new Timer("ProtobufParser timeouts", true); - /** * Creates a new protobuf handler. * @@ -89,16 +80,22 @@ public class ProtobufParser { public ProtobufParser(Listener handler, MessageType prototype, int maxMessageSize, int timeoutMillis) { this.handler = handler; this.prototype = prototype; - this.timeoutMillis = timeoutMillis; this.maxMessageSize = Math.min(maxMessageSize, Integer.MAX_VALUE - 4); + setTimeoutEnabled(false); + setSocketTimeout(timeoutMillis); } - // Sets the upstream write channel - synchronized void setWriteTarget(MessageWriteTarget writeTarget) { + @Override + public synchronized void setWriteTarget(MessageWriteTarget writeTarget) { checkState(this.writeTarget == null); this.writeTarget = checkNotNull(writeTarget); } + @Override + public int getMaxMessageSize() { + return maxMessageSize; + } + /** * Closes this connection, eventually triggering a {@link ProtobufParser.Listener#connectionClosed()} event. */ @@ -106,6 +103,11 @@ public class ProtobufParser { this.writeTarget.closeConnection(); } + @Override + protected void timeoutOccurred() { + closeConnection(); + } + // Deserializes and provides a listener event (buff must not have the length prefix in it) // Does set the buffers's position to its limit private void deserializeMessage(ByteBuffer buff) throws Exception { @@ -114,18 +116,8 @@ public class ProtobufParser { handler.messageReceived(this, msg); } - /** - * Called when new bytes are available from the remote end. - * * buff will start with its limit set to the position we can read to and its position set to the location we will - * start reading at - * * May read more than one message (recursively) if there are enough bytes available - * * Uses messageBytes/messageBytesOffset to store message which are larger (incl their length prefix) than buff's - * capacity(), ie it is up to this method to ensure we dont run out of buffer space to decode the next message. - * * buff will end with its limit the same as it was previously, and its position set to the position up to which - * bytes have been read (the same as its return value) - * @return The amount of bytes consumed which should not be provided again - */ - synchronized int receive(ByteBuffer buff) throws Exception { + @Override + public synchronized int receiveBytes(ByteBuffer buff) throws Exception { if (messageBytes != null) { // Just keep filling up the currently being worked on message int bytesToGet = Math.min(messageBytes.length - messageBytesOffset, buff.remaining()); @@ -136,7 +128,7 @@ public class ProtobufParser { deserializeMessage(ByteBuffer.wrap(messageBytes)); messageBytes = null; if (buff.hasRemaining()) - return bytesToGet + receive(buff); + return bytesToGet + receiveBytes(buff); } return bytesToGet; } @@ -181,19 +173,19 @@ public class ProtobufParser { // If there are still bytes remaining, see if we can pull out another message since we won't get called again if (buff.hasRemaining()) - return len + 4 + receive(buff); + return len + 4 + receiveBytes(buff); else return len + 4; } - /** Called by the upstream connection manager if this connection closes */ - void connectionClosed() { + @Override + public void connectionClosed() { handler.connectionClosed(this); } - /** Called by the upstream connection manager when this connection is open */ - void connectionOpen() { - resetTimeout(); + @Override + public void connectionOpened() { + setTimeoutEnabled(true); handler.connectionOpen(this); } @@ -212,29 +204,4 @@ public class ProtobufParser { writeTarget.writeBytes(messageLength); writeTarget.writeBytes(messageBytes); } - - /** - *

Sets the receive timeout to the given number of milliseconds, automatically killing the connection if no - * messages are received for this long

- * - *

A timeout of 0 is interpreted as no timeout

- */ - public synchronized void setSocketTimeout(int timeoutMillis) { - this.timeoutMillis = timeoutMillis; - resetTimeout(); - } - - private synchronized void resetTimeout() { - if (timeoutTask != null) - timeoutTask.cancel(); - if (timeoutMillis == 0) - return; - timeoutTask = new TimerTask() { - @Override - public void run() { - closeConnection(); - } - }; - timeoutTimer.schedule(timeoutTask, timeoutMillis); - } } diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java new file mode 100644 index 00000000..87e45306 --- /dev/null +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParser.java @@ -0,0 +1,55 @@ +/* + * Copyright 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.protocols.niowrapper; + +import java.nio.ByteBuffer; + +/** + * A generic handler which is used in {@link NioServer} and {@link NioClient} to handle incoming data streams. + */ +public interface StreamParser { + /** Called when the connection socket is closed */ + void connectionClosed(); + + /** Called when the connection socket is first opened */ + void connectionOpened(); + + /** + * Called when new bytes are available from the remote end. + * * buff will start with its limit set to the position we can read to and its position set to the location we will + * start reading at + * * May read more than one message (recursively) if there are enough bytes available + * * Uses messageBytes/messageBytesOffset to store message which are larger (incl their length prefix) than buff's + * capacity(), ie it is up to this method to ensure we dont run out of buffer space to decode the next message. + * * buff will end with its limit the same as it was previously, and its position set to the position up to which + * bytes have been read (the same as its return value) + * @return The amount of bytes consumed which should not be provided again + */ + int receiveBytes(ByteBuffer buff) throws Exception; + + /** + * Called when this parser is attached to an upstream write target (ie a low-level connection handler). This + * writeTarget should be stored and used to close the connection or write data to the socket. + */ + void setWriteTarget(MessageWriteTarget writeTarget); + + /** + * Returns the maximum message size of a message on the socket. This is used in calculating size of buffers to + * allocate. + */ + int getMaxMessageSize(); +} diff --git a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParserFactory.java b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParserFactory.java similarity index 80% rename from core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParserFactory.java rename to core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParserFactory.java index c2a115ef..e9f49e75 100644 --- a/core/src/main/java/com/google/bitcoin/protocols/niowrapper/ProtobufParserFactory.java +++ b/core/src/main/java/com/google/bitcoin/protocols/niowrapper/StreamParserFactory.java @@ -20,13 +20,13 @@ import java.net.InetAddress; import javax.annotation.Nullable; /** - * A factory which generates new {@link ProtobufParser}s when a new connection is opened. + * A factory which generates new {@link StreamParser}s when a new connection is opened. */ -public interface ProtobufParserFactory { +public interface StreamParserFactory { /** * Returns a new handler or null to have the connection close. * @param inetAddress The client's (IP) address * @param port The remote port on the client side */ - @Nullable public ProtobufParser getNewParser(InetAddress inetAddress, int port); + @Nullable public StreamParser getNewParser(InetAddress inetAddress, int port); } diff --git a/core/src/test/java/com/google/bitcoin/protocols/niowrapper/NioWrapperTest.java b/core/src/test/java/com/google/bitcoin/protocols/niowrapper/NioWrapperTest.java index d54a251d..faa08d6f 100644 --- a/core/src/test/java/com/google/bitcoin/protocols/niowrapper/NioWrapperTest.java +++ b/core/src/test/java/com/google/bitcoin/protocols/niowrapper/NioWrapperTest.java @@ -55,7 +55,7 @@ public class NioWrapperTest { final SettableFuture clientConnectionClosed = SettableFuture.create(); final SettableFuture clientMessage1Received = SettableFuture.create(); final SettableFuture clientMessage2Received = SettableFuture.create(); - ProtobufServer server = new ProtobufServer(new ProtobufParserFactory() { + NioServer server = new NioServer(new StreamParserFactory() { @Override public ProtobufParser getNewParser(InetAddress inetAddress, int port) { return new ProtobufParser(new ProtobufParser.Listener() { @@ -100,7 +100,7 @@ public class NioWrapperTest { } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); - ProtobufClient client = new ProtobufClient(new InetSocketAddress("localhost", 4243), clientHandler, 0); + NioClient client = new NioClient(new InetSocketAddress("localhost", 4243), clientHandler, 0); clientConnectionOpen.get(); serverConnectionOpen.get(); @@ -130,7 +130,7 @@ public class NioWrapperTest { final SettableFuture clientConnection2Open = SettableFuture.create(); final SettableFuture serverConnection2Closed = SettableFuture.create(); final SettableFuture clientConnection2Closed = SettableFuture.create(); - ProtobufServer server = new ProtobufServer(new ProtobufParserFactory() { + NioServer server = new NioServer(new StreamParserFactory() { @Override public ProtobufParser getNewParser(InetAddress inetAddress, int port) { return new ProtobufParser(new ProtobufParser.Listener() { @@ -160,7 +160,7 @@ public class NioWrapperTest { }); server.start(new InetSocketAddress("localhost", 4243)); - new ProtobufClient(new InetSocketAddress("localhost", 4243), new ProtobufParser( + new NioClient(new InetSocketAddress("localhost", 4243), new ProtobufParser( new ProtobufParser.Listener() { @Override public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { @@ -200,7 +200,7 @@ public class NioWrapperTest { clientConnection2Closed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); - ProtobufClient client2 = new ProtobufClient(new InetSocketAddress("localhost", 4243), client2Handler, 0); + NioClient client2 = new NioClient(new InetSocketAddress("localhost", 4243), client2Handler, 0); clientConnection2Open.get(); serverConnection2Open.get(); @@ -216,7 +216,7 @@ public class NioWrapperTest { @Test public void largeDataTest() throws Exception { - /** Test various large-data handling, essentially testing {@link ProtobufParser#receive(java.nio.ByteBuffer)} */ + /** Test various large-data handling, essentially testing {@link ProtobufParser#receiveBytes(java.nio.ByteBuffer)} */ final SettableFuture serverConnectionOpen = SettableFuture.create(); final SettableFuture clientConnectionOpen = SettableFuture.create(); final SettableFuture serverConnectionClosed = SettableFuture.create(); @@ -225,7 +225,7 @@ public class NioWrapperTest { final SettableFuture clientMessage2Received = SettableFuture.create(); final SettableFuture clientMessage3Received = SettableFuture.create(); final SettableFuture clientMessage4Received = SettableFuture.create(); - ProtobufServer server = new ProtobufServer(new ProtobufParserFactory() { + NioServer server = new NioServer(new StreamParserFactory() { @Override public ProtobufParser getNewParser(InetAddress inetAddress, int port) { return new ProtobufParser(new ProtobufParser.Listener() { @@ -277,7 +277,7 @@ public class NioWrapperTest { } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 0x10000, 0); - ProtobufClient client = new ProtobufClient(new InetSocketAddress("localhost", 4243), clientHandler, 0); + NioClient client = new NioClient(new InetSocketAddress("localhost", 4243), clientHandler, 0); clientConnectionOpen.get(); serverConnectionOpen.get(); @@ -376,7 +376,7 @@ public class NioWrapperTest { final SettableFuture client1MessageReceived = SettableFuture.create(); final SettableFuture client2MessageReceived = SettableFuture.create(); final SettableFuture client3MessageReceived = SettableFuture.create(); - ProtobufServer server = new ProtobufServer(new ProtobufParserFactory() { + NioServer server = new NioServer(new StreamParserFactory() { @Override public ProtobufParser getNewParser(InetAddress inetAddress, int port) { return new ProtobufParser(new ProtobufParser.Listener() { @@ -429,7 +429,7 @@ public class NioWrapperTest { client1ConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); - ProtobufClient client1 = new ProtobufClient(new InetSocketAddress("localhost", 4243), client1Handler, 0); + NioClient client1 = new NioClient(new InetSocketAddress("localhost", 4243), client1Handler, 0); client1ConnectionOpen.get(); serverConnection1Open.get(); @@ -451,7 +451,7 @@ public class NioWrapperTest { client2ConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); - ProtobufClient client2 = new ProtobufClient(new InetSocketAddress("localhost", 4243), client2Handler, 0); + NioClient client2 = new NioClient(new InetSocketAddress("localhost", 4243), client2Handler, 0); client2ConnectionOpen.get(); serverConnection2Open.get(); @@ -474,7 +474,7 @@ public class NioWrapperTest { client3ConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); - ProtobufClient client3 = new ProtobufClient(new InetSocketAddress("localhost", 4243), client3Handler, 0); + NioClient client3 = new NioClient(new InetSocketAddress("localhost", 4243), client3Handler, 0); client3ConnectionOpen.get(); serverConnection3Open.get();