mirror of
https://github.com/Qortal/altcoinj.git
synced 2025-02-07 14:54:15 +00:00
Make Protobuf{Server,Client} more generic.
This commit is contained in:
parent
146a6dd37e
commit
9980903572
@ -19,7 +19,7 @@ package com.google.bitcoin.protocols.channels;
|
|||||||
import com.google.bitcoin.core.ECKey;
|
import com.google.bitcoin.core.ECKey;
|
||||||
import com.google.bitcoin.core.Sha256Hash;
|
import com.google.bitcoin.core.Sha256Hash;
|
||||||
import com.google.bitcoin.core.Wallet;
|
import com.google.bitcoin.core.Wallet;
|
||||||
import com.google.bitcoin.protocols.niowrapper.ProtobufClient;
|
import com.google.bitcoin.protocols.niowrapper.NioClient;
|
||||||
import com.google.bitcoin.protocols.niowrapper.ProtobufParser;
|
import com.google.bitcoin.protocols.niowrapper.ProtobufParser;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
@ -106,7 +106,7 @@ public class PaymentChannelClientConnection {
|
|||||||
|
|
||||||
// Initiate the outbound network connection. We don't need to keep this around. The wireParser object will handle
|
// Initiate the outbound network connection. We don't need to keep this around. The wireParser object will handle
|
||||||
// things from here on out.
|
// things from here on out.
|
||||||
new ProtobufClient(server, wireParser, timeoutSeconds * 1000);
|
new NioClient(server, wireParser, timeoutSeconds * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -153,7 +153,7 @@ public class PaymentChannelClientConnection {
|
|||||||
//
|
//
|
||||||
// This call will cause the CLOSE message to be written to the wire, and then the destroyConnection() method that
|
// This call will cause the CLOSE message to be written to the wire, and then the destroyConnection() method that
|
||||||
// we defined above will be called, which in turn will call wireParser.closeConnection(), which in turn will invoke
|
// we defined above will be called, which in turn will call wireParser.closeConnection(), which in turn will invoke
|
||||||
// ProtobufClient.closeConnection(), which will then close the socket triggering interruption of the network
|
// NioClient.closeConnection(), which will then close the socket triggering interruption of the network
|
||||||
// thread it had created. That causes the background thread to die, which on its way out calls
|
// thread it had created. That causes the background thread to die, which on its way out calls
|
||||||
// ProtobufParser.connectionClosed which invokes the connectionClosed method we defined above which in turn
|
// ProtobufParser.connectionClosed which invokes the connectionClosed method we defined above which in turn
|
||||||
// then configures the open-future correctly and closes the state object. Phew!
|
// then configures the open-future correctly and closes the state object. Phew!
|
||||||
|
@ -27,9 +27,9 @@ import javax.annotation.Nullable;
|
|||||||
import com.google.bitcoin.core.Sha256Hash;
|
import com.google.bitcoin.core.Sha256Hash;
|
||||||
import com.google.bitcoin.core.TransactionBroadcaster;
|
import com.google.bitcoin.core.TransactionBroadcaster;
|
||||||
import com.google.bitcoin.core.Wallet;
|
import com.google.bitcoin.core.Wallet;
|
||||||
|
import com.google.bitcoin.protocols.niowrapper.NioServer;
|
||||||
import com.google.bitcoin.protocols.niowrapper.ProtobufParser;
|
import com.google.bitcoin.protocols.niowrapper.ProtobufParser;
|
||||||
import com.google.bitcoin.protocols.niowrapper.ProtobufParserFactory;
|
import com.google.bitcoin.protocols.niowrapper.StreamParserFactory;
|
||||||
import com.google.bitcoin.protocols.niowrapper.ProtobufServer;
|
|
||||||
import org.bitcoin.paymentchannel.Protos;
|
import org.bitcoin.paymentchannel.Protos;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
@ -48,7 +48,7 @@ public class PaymentChannelServerListener {
|
|||||||
private final HandlerFactory eventHandlerFactory;
|
private final HandlerFactory eventHandlerFactory;
|
||||||
private final BigInteger minAcceptedChannelSize;
|
private final BigInteger minAcceptedChannelSize;
|
||||||
|
|
||||||
private final ProtobufServer server;
|
private final NioServer server;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A factory which generates connection-specific event handlers.
|
* A factory which generates connection-specific event handlers.
|
||||||
@ -160,7 +160,7 @@ public class PaymentChannelServerListener {
|
|||||||
this.eventHandlerFactory = checkNotNull(eventHandlerFactory);
|
this.eventHandlerFactory = checkNotNull(eventHandlerFactory);
|
||||||
this.minAcceptedChannelSize = checkNotNull(minAcceptedChannelSize);
|
this.minAcceptedChannelSize = checkNotNull(minAcceptedChannelSize);
|
||||||
|
|
||||||
server = new ProtobufServer(new ProtobufParserFactory() {
|
server = new NioServer(new StreamParserFactory() {
|
||||||
@Override
|
@Override
|
||||||
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
||||||
return new ServerHandler(new InetSocketAddress(inetAddress, port), timeoutSeconds).socketProtobufHandler;
|
return new ServerHandler(new InetSocketAddress(inetAddress, port), timeoutSeconds).socketProtobufHandler;
|
||||||
|
@ -0,0 +1,80 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2013 Google Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.google.bitcoin.protocols.niowrapper;
|
||||||
|
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>A stream parser that provides functionality for creating timeouts between arbitrary events.</p>
|
||||||
|
*/
|
||||||
|
public abstract class AbstractTimeoutHandler {
|
||||||
|
// TimerTask and timeout value which are added to a timer to kill the connection on timeout
|
||||||
|
private TimerTask timeoutTask;
|
||||||
|
private long timeoutMillis = 0;
|
||||||
|
private boolean timeoutEnabled = true;
|
||||||
|
|
||||||
|
// A timer which manages expiring channels as their timeouts occur (if configured).
|
||||||
|
private static final Timer timeoutTimer = new Timer("ProtobufParser timeouts", true);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Enables or disables the timeout entirely. This may be useful if you want to store the timeout value but wish
|
||||||
|
* to temporarily disable/enable timeouts.</p>
|
||||||
|
*
|
||||||
|
* <p>The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).</p>
|
||||||
|
*
|
||||||
|
* <p>This call will reset the current progress towards the timeout.</p>
|
||||||
|
*/
|
||||||
|
public synchronized void setTimeoutEnabled(boolean timeoutEnabled) {
|
||||||
|
this.timeoutEnabled = timeoutEnabled;
|
||||||
|
resetTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Sets the receive timeout to the given number of milliseconds, automatically killing the connection if no
|
||||||
|
* messages are received for this long</p>
|
||||||
|
*
|
||||||
|
* <p>A timeout of 0 is interpreted as no timeout.</p>
|
||||||
|
*
|
||||||
|
* <p>The default is for timeoutEnabled to be true but timeoutMillis to be set to 0 (ie disabled).</p>
|
||||||
|
*
|
||||||
|
* <p>This call will reset the current progress towards the timeout.</p>
|
||||||
|
*/
|
||||||
|
public synchronized void setSocketTimeout(int timeoutMillis) {
|
||||||
|
this.timeoutMillis = timeoutMillis;
|
||||||
|
resetTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resets the current progress towards timeout to 0.
|
||||||
|
*/
|
||||||
|
protected synchronized void resetTimeout() {
|
||||||
|
if (timeoutTask != null)
|
||||||
|
timeoutTask.cancel();
|
||||||
|
if (timeoutMillis == 0 || !timeoutEnabled)
|
||||||
|
return;
|
||||||
|
timeoutTask = new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
timeoutOccurred();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
timeoutTimer.schedule(timeoutTask, timeoutMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void timeoutOccurred();
|
||||||
|
}
|
@ -29,10 +29,10 @@ import org.slf4j.LoggerFactory;
|
|||||||
import static com.google.common.base.Preconditions.checkState;
|
import static com.google.common.base.Preconditions.checkState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a simple connection to a server using a {@link ProtobufParser} to process data.
|
* Creates a simple connection to a server using a {@link StreamParser} to process data.
|
||||||
*/
|
*/
|
||||||
public class ProtobufClient extends MessageWriteTarget {
|
public class NioClient extends MessageWriteTarget {
|
||||||
private static final org.slf4j.Logger log = LoggerFactory.getLogger(ProtobufClient.class);
|
private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioClient.class);
|
||||||
|
|
||||||
private static final int BUFFER_SIZE_LOWER_BOUND = 4096;
|
private static final int BUFFER_SIZE_LOWER_BOUND = 4096;
|
||||||
private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
|
private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
|
||||||
@ -41,19 +41,19 @@ public class ProtobufClient extends MessageWriteTarget {
|
|||||||
@Nonnull private final SocketChannel sc;
|
@Nonnull private final SocketChannel sc;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Creates a new client to the given server address using the given {@link ProtobufParser} to decode the data.
|
* <p>Creates a new client to the given server address using the given {@link StreamParser} to decode the data.
|
||||||
* The given parser <b>MUST</b> be unique to this object. This does not block while waiting for the connection to
|
* The given parser <b>MUST</b> be unique to this object. This does not block while waiting for the connection to
|
||||||
* open, but will call either the {@link ProtobufParser#connectionOpen()} or {@link ProtobufParser#connectionClosed()}
|
* open, but will call either the {@link StreamParser#connectionOpened()} or
|
||||||
* callback on the created network event processing thread.</p>
|
* {@link StreamParser#connectionClosed()} callback on the created network event processing thread.</p>
|
||||||
*
|
*
|
||||||
* @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no
|
* @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no
|
||||||
* timeout.
|
* timeout.
|
||||||
*/
|
*/
|
||||||
public ProtobufClient(final InetSocketAddress serverAddress, final ProtobufParser parser,
|
public NioClient(final InetSocketAddress serverAddress, final StreamParser parser,
|
||||||
final int connectTimeoutMillis) throws IOException {
|
final int connectTimeoutMillis) throws IOException {
|
||||||
// Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make
|
// Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make
|
||||||
// sure it doesnt get too large or have to call read too often.
|
// sure it doesnt get too large or have to call read too often.
|
||||||
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.maxMessageSize, BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
|
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
|
||||||
parser.setWriteTarget(this);
|
parser.setWriteTarget(this);
|
||||||
sc = SocketChannel.open();
|
sc = SocketChannel.open();
|
||||||
|
|
||||||
@ -62,7 +62,7 @@ public class ProtobufClient extends MessageWriteTarget {
|
|||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
sc.socket().connect(serverAddress, connectTimeoutMillis);
|
sc.socket().connect(serverAddress, connectTimeoutMillis);
|
||||||
parser.connectionOpen();
|
parser.connectionOpened();
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
int read = sc.read(dbuf);
|
int read = sc.read(dbuf);
|
||||||
@ -72,9 +72,9 @@ public class ProtobufClient extends MessageWriteTarget {
|
|||||||
return;
|
return;
|
||||||
// "flip" the buffer - setting the limit to the current position and setting position to 0
|
// "flip" the buffer - setting the limit to the current position and setting position to 0
|
||||||
dbuf.flip();
|
dbuf.flip();
|
||||||
// Use parser.receive's return value as a double-check that it stopped reading at the right
|
// Use parser.receiveBytes's return value as a double-check that it stopped reading at the right
|
||||||
// location
|
// location
|
||||||
int bytesConsumed = parser.receive(dbuf);
|
int bytesConsumed = parser.receiveBytes(dbuf);
|
||||||
checkState(dbuf.position() == bytesConsumed);
|
checkState(dbuf.position() == bytesConsumed);
|
||||||
// Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative
|
// Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative
|
||||||
// position)
|
// position)
|
||||||
@ -97,7 +97,7 @@ public class ProtobufClient extends MessageWriteTarget {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes the connection to the server, triggering the {@link ProtobufParser#connectionClosed()}
|
* Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()}
|
||||||
* event on the network-handling thread where all callbacks occur.
|
* event on the network-handling thread where all callbacks occur.
|
||||||
*/
|
*/
|
||||||
public void closeConnection() {
|
public void closeConnection() {
|
||||||
@ -109,7 +109,7 @@ public class ProtobufClient extends MessageWriteTarget {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writes raw bytes to the channel (used by the write method in ProtobufParser)
|
// Writes raw bytes to the channel (used by the write method in StreamParser)
|
||||||
@Override
|
@Override
|
||||||
synchronized void writeBytes(byte[] message) {
|
synchronized void writeBytes(byte[] message) {
|
||||||
try {
|
try {
|
@ -35,13 +35,13 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||||||
import static com.google.common.base.Preconditions.checkState;
|
import static com.google.common.base.Preconditions.checkState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a simple server listener which listens for incoming client connections and uses a {@link ProtobufParser} to
|
* Creates a simple server listener which listens for incoming client connections and uses a {@link StreamParser} to
|
||||||
* process data.
|
* process data.
|
||||||
*/
|
*/
|
||||||
public class ProtobufServer {
|
public class NioServer {
|
||||||
private static final org.slf4j.Logger log = LoggerFactory.getLogger(ProtobufServer.class);
|
private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioServer.class);
|
||||||
|
|
||||||
private final ProtobufParserFactory parserFactory;
|
private final StreamParserFactory parserFactory;
|
||||||
|
|
||||||
@VisibleForTesting final Thread handlerThread;
|
@VisibleForTesting final Thread handlerThread;
|
||||||
private final ServerSocketChannel sc;
|
private final ServerSocketChannel sc;
|
||||||
@ -50,21 +50,21 @@ public class ProtobufServer {
|
|||||||
private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
|
private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
|
||||||
|
|
||||||
private class ConnectionHandler extends MessageWriteTarget {
|
private class ConnectionHandler extends MessageWriteTarget {
|
||||||
private final ReentrantLock lock = Threading.lock("protobufServerConnectionHandler");
|
private final ReentrantLock lock = Threading.lock("nioServerConnectionHandler");
|
||||||
private final ByteBuffer dbuf;
|
private final ByteBuffer dbuf;
|
||||||
private final SocketChannel channel;
|
private final SocketChannel channel;
|
||||||
private final ProtobufParser parser;
|
private final StreamParser parser;
|
||||||
private boolean closeCalled = false;
|
private boolean closeCalled = false;
|
||||||
|
|
||||||
ConnectionHandler(SocketChannel channel) throws IOException {
|
ConnectionHandler(SocketChannel channel) throws IOException {
|
||||||
this.channel = checkNotNull(channel);
|
this.channel = checkNotNull(channel);
|
||||||
ProtobufParser newParser = parserFactory.getNewParser(channel.socket().getInetAddress(), channel.socket().getPort());
|
StreamParser newParser = parserFactory.getNewParser(channel.socket().getInetAddress(), channel.socket().getPort());
|
||||||
if (newParser == null) {
|
if (newParser == null) {
|
||||||
closeConnection();
|
closeConnection();
|
||||||
throw new IOException("Parser factory.getNewParser returned null");
|
throw new IOException("Parser factory.getNewParser returned null");
|
||||||
}
|
}
|
||||||
this.parser = newParser;
|
this.parser = newParser;
|
||||||
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(newParser.maxMessageSize, BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
|
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(newParser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
|
||||||
newParser.setWriteTarget(this);
|
newParser.setWriteTarget(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,14 +114,14 @@ public class ProtobufServer {
|
|||||||
newChannel.configureBlocking(false);
|
newChannel.configureBlocking(false);
|
||||||
ConnectionHandler handler = new ConnectionHandler(newChannel);
|
ConnectionHandler handler = new ConnectionHandler(newChannel);
|
||||||
newChannel.register(selector, SelectionKey.OP_READ).attach(handler);
|
newChannel.register(selector, SelectionKey.OP_READ).attach(handler);
|
||||||
handler.parser.connectionOpen();
|
handler.parser.connectionOpened();
|
||||||
} else { // Got a closing channel or a channel to a client connection
|
} else { // Got a closing channel or a channel to a client connection
|
||||||
ConnectionHandler handler = ((ConnectionHandler)key.attachment());
|
ConnectionHandler handler = ((ConnectionHandler)key.attachment());
|
||||||
try {
|
try {
|
||||||
if (!key.isValid() && handler != null)
|
if (!key.isValid() && handler != null)
|
||||||
handler.closeConnection(); // Key has been cancelled, make sure the socket gets closed
|
handler.closeConnection(); // Key has been cancelled, make sure the socket gets closed
|
||||||
else if (handler != null && key.isReadable()) {
|
else if (handler != null && key.isReadable()) {
|
||||||
// Do a socket read and invoke the parser's receive message
|
// Do a socket read and invoke the parser's receiveBytes message
|
||||||
int read = handler.channel.read(handler.dbuf);
|
int read = handler.channel.read(handler.dbuf);
|
||||||
if (read == 0)
|
if (read == 0)
|
||||||
return; // Should probably never happen, but just in case it actually can just return 0
|
return; // Should probably never happen, but just in case it actually can just return 0
|
||||||
@ -132,8 +132,8 @@ public class ProtobufServer {
|
|||||||
}
|
}
|
||||||
// "flip" the buffer - setting the limit to the current position and setting position to 0
|
// "flip" the buffer - setting the limit to the current position and setting position to 0
|
||||||
handler.dbuf.flip();
|
handler.dbuf.flip();
|
||||||
// Use parser.receive's return value as a double-check that it stopped reading at the right location
|
// Use parser.receiveBytes's return value as a check that it stopped reading at the right location
|
||||||
int bytesConsumed = handler.parser.receive(handler.dbuf);
|
int bytesConsumed = handler.parser.receiveBytes(handler.dbuf);
|
||||||
checkState(handler.dbuf.position() == bytesConsumed);
|
checkState(handler.dbuf.position() == bytesConsumed);
|
||||||
// Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative
|
// Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative
|
||||||
// position)
|
// position)
|
||||||
@ -141,7 +141,7 @@ public class ProtobufServer {
|
|||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// This can happen eg if the channel closes while the tread is about to get killed
|
// This can happen eg if the channel closes while the tread is about to get killed
|
||||||
// (ClosedByInterruptException), or if parser.parser.receive throws something
|
// (ClosedByInterruptException), or if parser.parser.receiveBytes throws something
|
||||||
log.error("Error handling SelectionKey", e);
|
log.error("Error handling SelectionKey", e);
|
||||||
if (handler != null)
|
if (handler != null)
|
||||||
handler.closeConnection();
|
handler.closeConnection();
|
||||||
@ -151,11 +151,11 @@ public class ProtobufServer {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new server which is capable of listening for incoming connections and processing client provided data
|
* Creates a new server which is capable of listening for incoming connections and processing client provided data
|
||||||
* using {@link ProtobufParser}s created by the given {@link ProtobufParserFactory}
|
* using {@link StreamParser}s created by the given {@link StreamParserFactory}
|
||||||
*
|
*
|
||||||
* @throws IOException If there is an issue opening the server socket (note that we don't bind yet)
|
* @throws IOException If there is an issue opening the server socket (note that we don't bind yet)
|
||||||
*/
|
*/
|
||||||
public ProtobufServer(final ProtobufParserFactory parserFactory) throws IOException {
|
public NioServer(final StreamParserFactory parserFactory) throws IOException {
|
||||||
this.parserFactory = parserFactory;
|
this.parserFactory = parserFactory;
|
||||||
|
|
||||||
sc = ServerSocketChannel.open();
|
sc = ServerSocketChannel.open();
|
@ -22,20 +22,18 @@ import com.google.protobuf.MessageLite;
|
|||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
import java.util.Timer;
|
|
||||||
import java.util.TimerTask;
|
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static com.google.common.base.Preconditions.checkState;
|
import static com.google.common.base.Preconditions.checkState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>A handler which is used in {@link ProtobufServer} and {@link ProtobufClient} to split up incoming data streams
|
* <p>A handler which is used in {@link NioServer} and {@link NioClient} to split up incoming data streams
|
||||||
* into protobufs and provide an interface for writing protobufs to the connections.</p>
|
* into protobufs and provide an interface for writing protobufs to the connections.</p>
|
||||||
*
|
*
|
||||||
* <p>Messages are encoded with a 4-byte signed integer (big endian) prefix to indicate their length followed by the
|
* <p>Messages are encoded with a 4-byte signed integer (big endian) prefix to indicate their length followed by the
|
||||||
* serialized protobuf</p>
|
* serialized protobuf</p>
|
||||||
*/
|
*/
|
||||||
public class ProtobufParser<MessageType extends MessageLite> {
|
public class ProtobufParser<MessageType extends MessageLite> extends AbstractTimeoutHandler implements StreamParser {
|
||||||
/**
|
/**
|
||||||
* An interface which can be implemented to handle callbacks as new messages are generated and socket events occur.
|
* An interface which can be implemented to handle callbacks as new messages are generated and socket events occur.
|
||||||
* @param <MessageType> The protobuf type which is used on this socket.
|
* @param <MessageType> The protobuf type which is used on this socket.
|
||||||
@ -68,13 +66,6 @@ public class ProtobufParser<MessageType extends MessageLite> {
|
|||||||
|
|
||||||
private MessageWriteTarget writeTarget;
|
private MessageWriteTarget writeTarget;
|
||||||
|
|
||||||
// TimerTask and timeout value which are added to a timer to kill the connection on timeout
|
|
||||||
private TimerTask timeoutTask;
|
|
||||||
private long timeoutMillis;
|
|
||||||
|
|
||||||
// A timer which manages expiring connections as their timeouts occur (if configured).
|
|
||||||
private static final Timer timeoutTimer = new Timer("ProtobufParser timeouts", true);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new protobuf handler.
|
* Creates a new protobuf handler.
|
||||||
*
|
*
|
||||||
@ -89,16 +80,22 @@ public class ProtobufParser<MessageType extends MessageLite> {
|
|||||||
public ProtobufParser(Listener<MessageType> handler, MessageType prototype, int maxMessageSize, int timeoutMillis) {
|
public ProtobufParser(Listener<MessageType> handler, MessageType prototype, int maxMessageSize, int timeoutMillis) {
|
||||||
this.handler = handler;
|
this.handler = handler;
|
||||||
this.prototype = prototype;
|
this.prototype = prototype;
|
||||||
this.timeoutMillis = timeoutMillis;
|
|
||||||
this.maxMessageSize = Math.min(maxMessageSize, Integer.MAX_VALUE - 4);
|
this.maxMessageSize = Math.min(maxMessageSize, Integer.MAX_VALUE - 4);
|
||||||
|
setTimeoutEnabled(false);
|
||||||
|
setSocketTimeout(timeoutMillis);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets the upstream write channel
|
@Override
|
||||||
synchronized void setWriteTarget(MessageWriteTarget writeTarget) {
|
public synchronized void setWriteTarget(MessageWriteTarget writeTarget) {
|
||||||
checkState(this.writeTarget == null);
|
checkState(this.writeTarget == null);
|
||||||
this.writeTarget = checkNotNull(writeTarget);
|
this.writeTarget = checkNotNull(writeTarget);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxMessageSize() {
|
||||||
|
return maxMessageSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes this connection, eventually triggering a {@link ProtobufParser.Listener#connectionClosed()} event.
|
* Closes this connection, eventually triggering a {@link ProtobufParser.Listener#connectionClosed()} event.
|
||||||
*/
|
*/
|
||||||
@ -106,6 +103,11 @@ public class ProtobufParser<MessageType extends MessageLite> {
|
|||||||
this.writeTarget.closeConnection();
|
this.writeTarget.closeConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void timeoutOccurred() {
|
||||||
|
closeConnection();
|
||||||
|
}
|
||||||
|
|
||||||
// Deserializes and provides a listener event (buff must not have the length prefix in it)
|
// Deserializes and provides a listener event (buff must not have the length prefix in it)
|
||||||
// Does set the buffers's position to its limit
|
// Does set the buffers's position to its limit
|
||||||
private void deserializeMessage(ByteBuffer buff) throws Exception {
|
private void deserializeMessage(ByteBuffer buff) throws Exception {
|
||||||
@ -114,18 +116,8 @@ public class ProtobufParser<MessageType extends MessageLite> {
|
|||||||
handler.messageReceived(this, msg);
|
handler.messageReceived(this, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Called when new bytes are available from the remote end.
|
public synchronized int receiveBytes(ByteBuffer buff) throws Exception {
|
||||||
* * buff will start with its limit set to the position we can read to and its position set to the location we will
|
|
||||||
* start reading at
|
|
||||||
* * May read more than one message (recursively) if there are enough bytes available
|
|
||||||
* * Uses messageBytes/messageBytesOffset to store message which are larger (incl their length prefix) than buff's
|
|
||||||
* capacity(), ie it is up to this method to ensure we dont run out of buffer space to decode the next message.
|
|
||||||
* * buff will end with its limit the same as it was previously, and its position set to the position up to which
|
|
||||||
* bytes have been read (the same as its return value)
|
|
||||||
* @return The amount of bytes consumed which should not be provided again
|
|
||||||
*/
|
|
||||||
synchronized int receive(ByteBuffer buff) throws Exception {
|
|
||||||
if (messageBytes != null) {
|
if (messageBytes != null) {
|
||||||
// Just keep filling up the currently being worked on message
|
// Just keep filling up the currently being worked on message
|
||||||
int bytesToGet = Math.min(messageBytes.length - messageBytesOffset, buff.remaining());
|
int bytesToGet = Math.min(messageBytes.length - messageBytesOffset, buff.remaining());
|
||||||
@ -136,7 +128,7 @@ public class ProtobufParser<MessageType extends MessageLite> {
|
|||||||
deserializeMessage(ByteBuffer.wrap(messageBytes));
|
deserializeMessage(ByteBuffer.wrap(messageBytes));
|
||||||
messageBytes = null;
|
messageBytes = null;
|
||||||
if (buff.hasRemaining())
|
if (buff.hasRemaining())
|
||||||
return bytesToGet + receive(buff);
|
return bytesToGet + receiveBytes(buff);
|
||||||
}
|
}
|
||||||
return bytesToGet;
|
return bytesToGet;
|
||||||
}
|
}
|
||||||
@ -181,19 +173,19 @@ public class ProtobufParser<MessageType extends MessageLite> {
|
|||||||
|
|
||||||
// If there are still bytes remaining, see if we can pull out another message since we won't get called again
|
// If there are still bytes remaining, see if we can pull out another message since we won't get called again
|
||||||
if (buff.hasRemaining())
|
if (buff.hasRemaining())
|
||||||
return len + 4 + receive(buff);
|
return len + 4 + receiveBytes(buff);
|
||||||
else
|
else
|
||||||
return len + 4;
|
return len + 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Called by the upstream connection manager if this connection closes */
|
@Override
|
||||||
void connectionClosed() {
|
public void connectionClosed() {
|
||||||
handler.connectionClosed(this);
|
handler.connectionClosed(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Called by the upstream connection manager when this connection is open */
|
@Override
|
||||||
void connectionOpen() {
|
public void connectionOpened() {
|
||||||
resetTimeout();
|
setTimeoutEnabled(true);
|
||||||
handler.connectionOpen(this);
|
handler.connectionOpen(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -212,29 +204,4 @@ public class ProtobufParser<MessageType extends MessageLite> {
|
|||||||
writeTarget.writeBytes(messageLength);
|
writeTarget.writeBytes(messageLength);
|
||||||
writeTarget.writeBytes(messageBytes);
|
writeTarget.writeBytes(messageBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>Sets the receive timeout to the given number of milliseconds, automatically killing the connection if no
|
|
||||||
* messages are received for this long</p>
|
|
||||||
*
|
|
||||||
* <p>A timeout of 0 is interpreted as no timeout</p>
|
|
||||||
*/
|
|
||||||
public synchronized void setSocketTimeout(int timeoutMillis) {
|
|
||||||
this.timeoutMillis = timeoutMillis;
|
|
||||||
resetTimeout();
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void resetTimeout() {
|
|
||||||
if (timeoutTask != null)
|
|
||||||
timeoutTask.cancel();
|
|
||||||
if (timeoutMillis == 0)
|
|
||||||
return;
|
|
||||||
timeoutTask = new TimerTask() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
closeConnection();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
timeoutTimer.schedule(timeoutTask, timeoutMillis);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,55 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2013 Google Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.google.bitcoin.protocols.niowrapper;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A generic handler which is used in {@link NioServer} and {@link NioClient} to handle incoming data streams.
|
||||||
|
*/
|
||||||
|
public interface StreamParser {
|
||||||
|
/** Called when the connection socket is closed */
|
||||||
|
void connectionClosed();
|
||||||
|
|
||||||
|
/** Called when the connection socket is first opened */
|
||||||
|
void connectionOpened();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when new bytes are available from the remote end.
|
||||||
|
* * buff will start with its limit set to the position we can read to and its position set to the location we will
|
||||||
|
* start reading at
|
||||||
|
* * May read more than one message (recursively) if there are enough bytes available
|
||||||
|
* * Uses messageBytes/messageBytesOffset to store message which are larger (incl their length prefix) than buff's
|
||||||
|
* capacity(), ie it is up to this method to ensure we dont run out of buffer space to decode the next message.
|
||||||
|
* * buff will end with its limit the same as it was previously, and its position set to the position up to which
|
||||||
|
* bytes have been read (the same as its return value)
|
||||||
|
* @return The amount of bytes consumed which should not be provided again
|
||||||
|
*/
|
||||||
|
int receiveBytes(ByteBuffer buff) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when this parser is attached to an upstream write target (ie a low-level connection handler). This
|
||||||
|
* writeTarget should be stored and used to close the connection or write data to the socket.
|
||||||
|
*/
|
||||||
|
void setWriteTarget(MessageWriteTarget writeTarget);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the maximum message size of a message on the socket. This is used in calculating size of buffers to
|
||||||
|
* allocate.
|
||||||
|
*/
|
||||||
|
int getMaxMessageSize();
|
||||||
|
}
|
@ -20,13 +20,13 @@ import java.net.InetAddress;
|
|||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A factory which generates new {@link ProtobufParser}s when a new connection is opened.
|
* A factory which generates new {@link StreamParser}s when a new connection is opened.
|
||||||
*/
|
*/
|
||||||
public interface ProtobufParserFactory {
|
public interface StreamParserFactory {
|
||||||
/**
|
/**
|
||||||
* Returns a new handler or null to have the connection close.
|
* Returns a new handler or null to have the connection close.
|
||||||
* @param inetAddress The client's (IP) address
|
* @param inetAddress The client's (IP) address
|
||||||
* @param port The remote port on the client side
|
* @param port The remote port on the client side
|
||||||
*/
|
*/
|
||||||
@Nullable public ProtobufParser getNewParser(InetAddress inetAddress, int port);
|
@Nullable public StreamParser getNewParser(InetAddress inetAddress, int port);
|
||||||
}
|
}
|
@ -55,7 +55,7 @@ public class NioWrapperTest {
|
|||||||
final SettableFuture<Void> clientConnectionClosed = SettableFuture.create();
|
final SettableFuture<Void> clientConnectionClosed = SettableFuture.create();
|
||||||
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage1Received = SettableFuture.create();
|
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage1Received = SettableFuture.create();
|
||||||
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage2Received = SettableFuture.create();
|
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage2Received = SettableFuture.create();
|
||||||
ProtobufServer server = new ProtobufServer(new ProtobufParserFactory() {
|
NioServer server = new NioServer(new StreamParserFactory() {
|
||||||
@Override
|
@Override
|
||||||
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
||||||
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
||||||
@ -100,7 +100,7 @@ public class NioWrapperTest {
|
|||||||
}
|
}
|
||||||
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
||||||
|
|
||||||
ProtobufClient client = new ProtobufClient(new InetSocketAddress("localhost", 4243), clientHandler, 0);
|
NioClient client = new NioClient(new InetSocketAddress("localhost", 4243), clientHandler, 0);
|
||||||
|
|
||||||
clientConnectionOpen.get();
|
clientConnectionOpen.get();
|
||||||
serverConnectionOpen.get();
|
serverConnectionOpen.get();
|
||||||
@ -130,7 +130,7 @@ public class NioWrapperTest {
|
|||||||
final SettableFuture<Void> clientConnection2Open = SettableFuture.create();
|
final SettableFuture<Void> clientConnection2Open = SettableFuture.create();
|
||||||
final SettableFuture<Void> serverConnection2Closed = SettableFuture.create();
|
final SettableFuture<Void> serverConnection2Closed = SettableFuture.create();
|
||||||
final SettableFuture<Void> clientConnection2Closed = SettableFuture.create();
|
final SettableFuture<Void> clientConnection2Closed = SettableFuture.create();
|
||||||
ProtobufServer server = new ProtobufServer(new ProtobufParserFactory() {
|
NioServer server = new NioServer(new StreamParserFactory() {
|
||||||
@Override
|
@Override
|
||||||
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
||||||
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
||||||
@ -160,7 +160,7 @@ public class NioWrapperTest {
|
|||||||
});
|
});
|
||||||
server.start(new InetSocketAddress("localhost", 4243));
|
server.start(new InetSocketAddress("localhost", 4243));
|
||||||
|
|
||||||
new ProtobufClient(new InetSocketAddress("localhost", 4243), new ProtobufParser<Protos.TwoWayChannelMessage>(
|
new NioClient(new InetSocketAddress("localhost", 4243), new ProtobufParser<Protos.TwoWayChannelMessage>(
|
||||||
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
||||||
@Override
|
@Override
|
||||||
public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) {
|
public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) {
|
||||||
@ -200,7 +200,7 @@ public class NioWrapperTest {
|
|||||||
clientConnection2Closed.set(null);
|
clientConnection2Closed.set(null);
|
||||||
}
|
}
|
||||||
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
||||||
ProtobufClient client2 = new ProtobufClient(new InetSocketAddress("localhost", 4243), client2Handler, 0);
|
NioClient client2 = new NioClient(new InetSocketAddress("localhost", 4243), client2Handler, 0);
|
||||||
|
|
||||||
clientConnection2Open.get();
|
clientConnection2Open.get();
|
||||||
serverConnection2Open.get();
|
serverConnection2Open.get();
|
||||||
@ -216,7 +216,7 @@ public class NioWrapperTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void largeDataTest() throws Exception {
|
public void largeDataTest() throws Exception {
|
||||||
/** Test various large-data handling, essentially testing {@link ProtobufParser#receive(java.nio.ByteBuffer)} */
|
/** Test various large-data handling, essentially testing {@link ProtobufParser#receiveBytes(java.nio.ByteBuffer)} */
|
||||||
final SettableFuture<Void> serverConnectionOpen = SettableFuture.create();
|
final SettableFuture<Void> serverConnectionOpen = SettableFuture.create();
|
||||||
final SettableFuture<Void> clientConnectionOpen = SettableFuture.create();
|
final SettableFuture<Void> clientConnectionOpen = SettableFuture.create();
|
||||||
final SettableFuture<Void> serverConnectionClosed = SettableFuture.create();
|
final SettableFuture<Void> serverConnectionClosed = SettableFuture.create();
|
||||||
@ -225,7 +225,7 @@ public class NioWrapperTest {
|
|||||||
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage2Received = SettableFuture.create();
|
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage2Received = SettableFuture.create();
|
||||||
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage3Received = SettableFuture.create();
|
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage3Received = SettableFuture.create();
|
||||||
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage4Received = SettableFuture.create();
|
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage4Received = SettableFuture.create();
|
||||||
ProtobufServer server = new ProtobufServer(new ProtobufParserFactory() {
|
NioServer server = new NioServer(new StreamParserFactory() {
|
||||||
@Override
|
@Override
|
||||||
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
||||||
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
||||||
@ -277,7 +277,7 @@ public class NioWrapperTest {
|
|||||||
}
|
}
|
||||||
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 0x10000, 0);
|
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 0x10000, 0);
|
||||||
|
|
||||||
ProtobufClient client = new ProtobufClient(new InetSocketAddress("localhost", 4243), clientHandler, 0);
|
NioClient client = new NioClient(new InetSocketAddress("localhost", 4243), clientHandler, 0);
|
||||||
|
|
||||||
clientConnectionOpen.get();
|
clientConnectionOpen.get();
|
||||||
serverConnectionOpen.get();
|
serverConnectionOpen.get();
|
||||||
@ -376,7 +376,7 @@ public class NioWrapperTest {
|
|||||||
final SettableFuture<Protos.TwoWayChannelMessage> client1MessageReceived = SettableFuture.create();
|
final SettableFuture<Protos.TwoWayChannelMessage> client1MessageReceived = SettableFuture.create();
|
||||||
final SettableFuture<Protos.TwoWayChannelMessage> client2MessageReceived = SettableFuture.create();
|
final SettableFuture<Protos.TwoWayChannelMessage> client2MessageReceived = SettableFuture.create();
|
||||||
final SettableFuture<Protos.TwoWayChannelMessage> client3MessageReceived = SettableFuture.create();
|
final SettableFuture<Protos.TwoWayChannelMessage> client3MessageReceived = SettableFuture.create();
|
||||||
ProtobufServer server = new ProtobufServer(new ProtobufParserFactory() {
|
NioServer server = new NioServer(new StreamParserFactory() {
|
||||||
@Override
|
@Override
|
||||||
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
public ProtobufParser getNewParser(InetAddress inetAddress, int port) {
|
||||||
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
|
||||||
@ -429,7 +429,7 @@ public class NioWrapperTest {
|
|||||||
client1ConnectionClosed.set(null);
|
client1ConnectionClosed.set(null);
|
||||||
}
|
}
|
||||||
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
||||||
ProtobufClient client1 = new ProtobufClient(new InetSocketAddress("localhost", 4243), client1Handler, 0);
|
NioClient client1 = new NioClient(new InetSocketAddress("localhost", 4243), client1Handler, 0);
|
||||||
|
|
||||||
client1ConnectionOpen.get();
|
client1ConnectionOpen.get();
|
||||||
serverConnection1Open.get();
|
serverConnection1Open.get();
|
||||||
@ -451,7 +451,7 @@ public class NioWrapperTest {
|
|||||||
client2ConnectionClosed.set(null);
|
client2ConnectionClosed.set(null);
|
||||||
}
|
}
|
||||||
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
||||||
ProtobufClient client2 = new ProtobufClient(new InetSocketAddress("localhost", 4243), client2Handler, 0);
|
NioClient client2 = new NioClient(new InetSocketAddress("localhost", 4243), client2Handler, 0);
|
||||||
|
|
||||||
client2ConnectionOpen.get();
|
client2ConnectionOpen.get();
|
||||||
serverConnection2Open.get();
|
serverConnection2Open.get();
|
||||||
@ -474,7 +474,7 @@ public class NioWrapperTest {
|
|||||||
client3ConnectionClosed.set(null);
|
client3ConnectionClosed.set(null);
|
||||||
}
|
}
|
||||||
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
|
||||||
ProtobufClient client3 = new ProtobufClient(new InetSocketAddress("localhost", 4243), client3Handler, 0);
|
NioClient client3 = new NioClient(new InetSocketAddress("localhost", 4243), client3Handler, 0);
|
||||||
|
|
||||||
client3ConnectionOpen.get();
|
client3ConnectionOpen.get();
|
||||||
serverConnection3Open.get();
|
serverConnection3Open.get();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user