3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-31 23:32:16 +00:00

BlockingClient: some fixes for Orchid

This commit is contained in:
Mike Hearn 2014-01-15 23:16:04 +01:00
parent ae1e3691f5
commit 654543bb2b
4 changed files with 49 additions and 18 deletions

View File

@ -19,8 +19,10 @@ package com.google.bitcoin.net;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.net.SocketFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -42,7 +44,7 @@ public class BlockingClient implements MessageWriteTarget {
private static final int BUFFER_SIZE_UPPER_BOUND = 65536; private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
private final ByteBuffer dbuf; private final ByteBuffer dbuf;
private final Socket socket; private Socket socket;
private volatile boolean vCloseRequested = false; private volatile boolean vCloseRequested = false;
/** /**
@ -53,17 +55,17 @@ public class BlockingClient implements MessageWriteTarget {
* *
* @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no * @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no
* timeout. * timeout.
* @param socketFactory An object that creates {@link Socket} objects on demand, which may be customised to control
* how this client connects to the internet. If not sure, use SocketFactory.getDefault()
* @param clientSet A set which this object will add itself to after initialization, and then remove itself from * @param clientSet A set which this object will add itself to after initialization, and then remove itself from
* when the connection dies. Note that this set must be thread-safe.
*/ */
public BlockingClient(final SocketAddress serverAddress, final StreamParser parser, public BlockingClient(final SocketAddress serverAddress, final StreamParser parser,
final int connectTimeoutMillis, @Nullable final Set<BlockingClient> clientSet) throws IOException { final int connectTimeoutMillis, final SocketFactory socketFactory, @Nullable final Set<BlockingClient> clientSet) throws IOException {
// Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make // Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make
// sure it doesnt get too large or have to call read too often. // sure it doesnt get too large or have to call read too often.
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
parser.setWriteTarget(this); parser.setWriteTarget(this);
socket = new Socket(); socket = socketFactory.createSocket();
Thread t = new Thread() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
@ -129,7 +131,9 @@ public class BlockingClient implements MessageWriteTarget {
@Override @Override
public synchronized void writeBytes(byte[] message) throws IOException { public synchronized void writeBytes(byte[] message) throws IOException {
try { try {
socket.getOutputStream().write(message); OutputStream stream = socket.getOutputStream();
stream.write(message);
stream.flush();
} catch (IOException e) { } catch (IOException e) {
log.error("Error writing message to connection, closing connection", e); log.error("Error writing message to connection, closing connection", e);
closeConnection(); closeConnection();

View File

@ -18,6 +18,7 @@ package com.google.bitcoin.net;
import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.AbstractIdleService;
import javax.net.SocketFactory;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Collections; import java.util.Collections;
@ -25,6 +26,8 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
/** /**
* <p>A thin wrapper around a set of {@link BlockingClient}s.</p> * <p>A thin wrapper around a set of {@link BlockingClient}s.</p>
* *
@ -33,18 +36,39 @@ import java.util.Set;
* some other network settings that cannot be set using NIO.</p> * some other network settings that cannot be set using NIO.</p>
*/ */
public class BlockingClientManager extends AbstractIdleService implements ClientConnectionManager { public class BlockingClientManager extends AbstractIdleService implements ClientConnectionManager {
private final SocketFactory socketFactory;
private final Set<BlockingClient> clients = Collections.synchronizedSet(new HashSet<BlockingClient>()); private final Set<BlockingClient> clients = Collections.synchronizedSet(new HashSet<BlockingClient>());
private int connectTimeoutMillis = 1000;
public BlockingClientManager() {
socketFactory = SocketFactory.getDefault();
}
/**
* Creates a blocking client manager that will obtain sockets from the given factory. Useful for customising how
* bitcoinj connects to the P2P network.
*/
public BlockingClientManager(SocketFactory socketFactory) {
this.socketFactory = checkNotNull(socketFactory);
}
@Override @Override
public void openConnection(SocketAddress serverAddress, StreamParser parser) { public void openConnection(SocketAddress serverAddress, StreamParser parser) {
if (!isRunning()) if (!isRunning())
throw new IllegalStateException(); throw new IllegalStateException();
try { try {
new BlockingClient(serverAddress, parser, 1000, clients); new BlockingClient(serverAddress, parser, connectTimeoutMillis, socketFactory, clients);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources
} }
} }
/** Sets the number of milliseconds to wait before giving up on a connect attempt */
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
}
@Override @Override
protected void startUp() throws Exception { } protected void startUp() throws Exception { }

View File

@ -24,6 +24,8 @@ import com.google.bitcoin.utils.BriefLogFormatter;
import com.google.bitcoin.utils.Threading; import com.google.bitcoin.utils.Threading;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import javax.annotation.Nullable;
import javax.net.SocketFactory;
import java.io.IOException; import java.io.IOException;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.InetAddress; import java.net.InetAddress;
@ -32,7 +34,6 @@ import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -152,7 +153,7 @@ public class TestWithNetworkConnections {
else if (clientType == ClientType.NIO_CLIENT) else if (clientType == ClientType.NIO_CLIENT)
new NioClient(new InetSocketAddress("127.0.0.1", 2000), peer, 100); new NioClient(new InetSocketAddress("127.0.0.1", 2000), peer, 100);
else if (clientType == ClientType.BLOCKING_CLIENT) else if (clientType == ClientType.BLOCKING_CLIENT)
new BlockingClient(new InetSocketAddress("127.0.0.1", 2000), peer, 100, null); new BlockingClient(new InetSocketAddress("127.0.0.1", 2000), peer, 100, SocketFactory.getDefault(), null);
else else
throw new RuntimeException(); throw new RuntimeException();
// Claim we are connected to a different IP that what we really are, so tx confidence broadcastBy sets work // Claim we are connected to a different IP that what we really are, so tx confidence broadcastBy sets work

View File

@ -16,13 +16,6 @@
package com.google.bitcoin.net; package com.google.bitcoin.net;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.bitcoin.core.Utils; import com.google.bitcoin.core.Utils;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -33,8 +26,17 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import javax.net.SocketFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class NetworkAbstractionTests { public class NetworkAbstractionTests {
@ -68,7 +70,7 @@ public class NetworkAbstractionTests {
} else if (clientType == 2) } else if (clientType == 2)
return new NioClient(addr, parser, 100); return new NioClient(addr, parser, 100);
else if (clientType == 3) else if (clientType == 3)
return new BlockingClient(addr, parser, 100, null); return new BlockingClient(addr, parser, 100, SocketFactory.getDefault(), null);
else else
throw new RuntimeException(); throw new RuntimeException();
} }