diff --git a/core/src/main/java/org/bitcoinj/net/BlockingClient.java b/core/src/main/java/org/bitcoinj/net/BlockingClient.java index b3c07525..2aa4d717 100644 --- a/core/src/main/java/org/bitcoinj/net/BlockingClient.java +++ b/core/src/main/java/org/bitcoinj/net/BlockingClient.java @@ -42,7 +42,6 @@ public class BlockingClient implements MessageWriteTarget { private static final int BUFFER_SIZE_LOWER_BOUND = 4096; private static final int BUFFER_SIZE_UPPER_BOUND = 65536; - private final ByteBuffer dbuf; private Socket socket; private volatile boolean vCloseRequested = false; private SettableFuture connectFuture; @@ -60,11 +59,11 @@ public class BlockingClient implements MessageWriteTarget { * @param clientSet A set which this object will add itself to after initialization, and then remove itself from */ public BlockingClient(final SocketAddress serverAddress, final StreamParser parser, - final int connectTimeoutMillis, final SocketFactory socketFactory, @Nullable final Set clientSet) throws IOException { + final int connectTimeoutMillis, final SocketFactory socketFactory, + @Nullable final Set clientSet) throws IOException { connectFuture = SettableFuture.create(); // Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make // sure it doesnt get too large or have to call read too often. - dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); parser.setWriteTarget(this); socket = socketFactory.createSocket(); final Context context = Context.get(); @@ -79,25 +78,7 @@ public class BlockingClient implements MessageWriteTarget { parser.connectionOpened(); connectFuture.set(serverAddress); InputStream stream = socket.getInputStream(); - byte[] readBuff = new byte[dbuf.capacity()]; - - while (true) { - // TODO Kill the message duplication here - checkState(dbuf.remaining() > 0 && dbuf.remaining() <= readBuff.length); - int read = stream.read(readBuff, 0, Math.max(1, Math.min(dbuf.remaining(), stream.available()))); - if (read == -1) - return; - dbuf.put(readBuff, 0, read); - // "flip" the buffer - setting the limit to the current position and setting position to 0 - dbuf.flip(); - // Use parser.receiveBytes's return value as a double-check that it stopped reading at the right - // location - int bytesConsumed = parser.receiveBytes(dbuf); - checkState(dbuf.position() == bytesConsumed); - // Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative - // position) - dbuf.compact(); - } + runReadLoop(stream, parser); } catch (Exception e) { if (!vCloseRequested) { log.error("Error trying to open/read from connection: {}: {}", serverAddress, e.getMessage()); @@ -120,6 +101,32 @@ public class BlockingClient implements MessageWriteTarget { t.start(); } + /** + * A blocking call that never returns, except by throwing an exception. It reads bytes from the input stream + * and feeds them to the provided {@link StreamParser}, for example, a {@link Peer}. + */ + public static void runReadLoop(InputStream stream, StreamParser parser) throws Exception { + ByteBuffer dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); + byte[] readBuff = new byte[dbuf.capacity()]; + while (true) { + // TODO Kill the message duplication here + checkState(dbuf.remaining() > 0 && dbuf.remaining() <= readBuff.length); + int read = stream.read(readBuff, 0, Math.max(1, Math.min(dbuf.remaining(), stream.available()))); + if (read == -1) + return; + dbuf.put(readBuff, 0, read); + // "flip" the buffer - setting the limit to the current position and setting position to 0 + dbuf.flip(); + // Use parser.receiveBytes's return value as a double-check that it stopped reading at the right + // location + int bytesConsumed = parser.receiveBytes(dbuf); + checkState(dbuf.position() == bytesConsumed); + // Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative + // position) + dbuf.compact(); + } + } + /** * Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()} * event on the network-handling thread where all callbacks occur.