3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-30 23:02:15 +00:00

ClientConnectionManager.openConnection now returns a future. Some logging about connection failures was removed.

This commit is contained in:
Mike Hearn 2014-12-18 02:23:21 +01:00
parent 200dc1294c
commit ae585608e6
4 changed files with 48 additions and 24 deletions

View File

@ -16,6 +16,8 @@
package org.bitcoinj.net;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@ -47,6 +49,7 @@ public class BlockingClient implements MessageWriteTarget {
private final ByteBuffer dbuf;
private Socket socket;
private volatile boolean vCloseRequested = false;
private SettableFuture<SocketAddress> connectFuture;
/**
* <p>Creates a new client to the given server address using the given {@link StreamParser} to decode the data.
@ -62,6 +65,7 @@ public class BlockingClient implements MessageWriteTarget {
*/
public BlockingClient(final SocketAddress serverAddress, final StreamParser parser,
final int connectTimeoutMillis, final SocketFactory socketFactory, @Nullable final Set<BlockingClient> clientSet) throws IOException {
connectFuture = SettableFuture.create();
// Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make
// sure it doesnt get too large or have to call read too often.
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
@ -73,9 +77,9 @@ public class BlockingClient implements MessageWriteTarget {
if (clientSet != null)
clientSet.add(BlockingClient.this);
try {
InetSocketAddress iServerAddress = (InetSocketAddress)serverAddress;
socket.connect(serverAddress, connectTimeoutMillis);
parser.connectionOpened();
connectFuture.set(serverAddress);
InputStream stream = socket.getInputStream();
byte[] readBuff = new byte[dbuf.capacity()];
@ -97,8 +101,10 @@ public class BlockingClient implements MessageWriteTarget {
dbuf.compact();
}
} catch (Exception e) {
if (!vCloseRequested)
if (!vCloseRequested) {
log.error("Error trying to open/read from connection: " + serverAddress, e);
connectFuture.setException(e);
}
} finally {
try {
socket.close();
@ -143,4 +149,9 @@ public class BlockingClient implements MessageWriteTarget {
throw e;
}
}
/** Returns a future that completes once connection has occurred at the socket level or with an exception if failed to connect. */
public ListenableFuture<SocketAddress> getConnectFuture() {
return connectFuture;
}
}

View File

@ -17,6 +17,7 @@
package org.bitcoinj.net;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ListenableFuture;
import javax.net.SocketFactory;
import java.io.IOException;
@ -54,11 +55,11 @@ public class BlockingClientManager extends AbstractIdleService implements Client
}
@Override
public void openConnection(SocketAddress serverAddress, StreamParser parser) {
if (!isRunning())
throw new IllegalStateException();
public ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamParser parser) {
try {
new BlockingClient(serverAddress, parser, connectTimeoutMillis, socketFactory, clients);
if (!isRunning())
throw new IllegalStateException();
return new BlockingClient(serverAddress, parser, connectTimeoutMillis, socketFactory, clients).getConnectFuture();
} catch (IOException e) {
throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources
}

View File

@ -16,6 +16,7 @@
package org.bitcoinj.net;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import java.net.SocketAddress;
@ -31,7 +32,7 @@ public interface ClientConnectionManager extends Service {
/**
* Creates a new connection to the given address, with the given parser used to handle incoming data.
*/
void openConnection(SocketAddress serverAddress, StreamParser parser);
ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamParser parser);
/** Gets the number of connected peers */
int getConnectedClientCount();

View File

@ -18,9 +18,13 @@ package org.bitcoinj.net;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
@ -36,12 +40,15 @@ public class NioClientManager extends AbstractExecutionThreadService implements
private final Selector selector;
// SocketChannels and StreamParsers of newly-created connections which should be registered with OP_CONNECT
class SocketChannelAndParser {
SocketChannel sc; StreamParser parser;
SocketChannelAndParser(SocketChannel sc, StreamParser parser) { this.sc = sc; this.parser = parser; }
class PendingConnect {
SocketChannel sc;
StreamParser parser;
SocketAddress address;
SettableFuture<SocketAddress> future = SettableFuture.create();
PendingConnect(SocketChannel sc, StreamParser parser, SocketAddress address) { this.sc = sc; this.parser = parser; this.address = address; }
}
final Queue<SocketChannelAndParser> newConnectionChannels = new LinkedBlockingQueue<SocketChannelAndParser>();
final Queue<PendingConnect> newConnectionChannels = new LinkedBlockingQueue<PendingConnect>();
// Added to/removed from by the individual ConnectionHandler's, thus must by synchronized on its own.
private final Set<ConnectionHandler> connectedHandlers = Collections.synchronizedSet(new HashSet<ConnectionHandler>());
@ -51,17 +58,21 @@ public class NioClientManager extends AbstractExecutionThreadService implements
// We could have a !isValid() key here if the connection is already closed at this point
if (key.isValid() && key.isConnectable()) { // ie a client connection which has finished the initial connect process
// Create a ConnectionHandler and hook everything together
StreamParser parser = (StreamParser) key.attachment();
PendingConnect data = (PendingConnect) key.attachment();
StreamParser parser = data.parser;
SocketChannel sc = (SocketChannel) key.channel();
ConnectionHandler handler = new ConnectionHandler(parser, key, connectedHandlers);
try {
if (sc.finishConnect()) {
log.info("Successfully connected to {}", sc.socket().getRemoteSocketAddress());
key.interestOps((key.interestOps() | SelectionKey.OP_READ) & ~SelectionKey.OP_CONNECT).attach(handler);
handler.parser.connectionOpened();
parser.connectionOpened();
data.future.set(data.address);
} else {
log.error("Failed to connect to {}", sc.socket().getRemoteSocketAddress());
handler.closeConnection(); // Failed to connect for some reason
data.future.setException(new ConnectException("Unknown reason"));
data.future = null;
}
} catch (Exception e) {
// If e is a CancelledKeyException, there is a race to get to interestOps after finishConnect() which
@ -70,6 +81,8 @@ public class NioClientManager extends AbstractExecutionThreadService implements
Throwable cause = Throwables.getRootCause(e);
log.error("Failed to connect with exception: {}: {}", cause.getClass().getName(), cause.getMessage());
handler.closeConnection();
data.future.setException(cause);
data.future = null;
}
} else // Process bytes read
ConnectionHandler.handleKey(key);
@ -92,11 +105,11 @@ public class NioClientManager extends AbstractExecutionThreadService implements
try {
Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
while (isRunning()) {
SocketChannelAndParser conn;
PendingConnect conn;
while ((conn = newConnectionChannels.poll()) != null) {
try {
SelectionKey key = conn.sc.register(selector, SelectionKey.OP_CONNECT);
key.attach(conn.parser);
key.attach(conn);
} catch (ClosedChannelException e) {
log.info("SocketChannel was closed before it could be registered");
}
@ -134,7 +147,7 @@ public class NioClientManager extends AbstractExecutionThreadService implements
}
@Override
public void openConnection(SocketAddress serverAddress, StreamParser parser) {
public ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamParser parser) {
if (!isRunning())
throw new IllegalStateException();
// Create a new connection, give it a parser as an attachment
@ -142,14 +155,12 @@ public class NioClientManager extends AbstractExecutionThreadService implements
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(serverAddress);
newConnectionChannels.offer(new SocketChannelAndParser(sc, parser));
PendingConnect data = new PendingConnect(sc, parser, serverAddress);
newConnectionChannels.offer(data);
selector.wakeup();
} catch (IOException e) {
log.error("Could not connect to " + serverAddress);
throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources
} catch (AssertionError e) {
log.error("Could not connect to " + serverAddress);
throw new RuntimeException(e); // Happens on Android when libcore.io.Posix.getsockname() throws libcore.io.ErrnoException.
return data.future;
} catch (Throwable e) {
return Futures.immediateFailedFuture(e);
}
}