diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index 6831fcc8..38de48cb 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -155,7 +155,7 @@ public class Peer extends PeerSocketHandler { *

The remoteAddress provided should match the remote address of the peer which is being connected to, and is * used to keep track of which peers relayed transactions and offer more descriptive logging.

*/ - public Peer(NetworkParameters params, VersionMessage ver, @Nullable AbstractBlockChain chain, InetSocketAddress remoteAddress) { + public Peer(NetworkParameters params, VersionMessage ver, @Nullable AbstractBlockChain chain, PeerAddress remoteAddress) { this(params, ver, remoteAddress, chain, null); } @@ -173,7 +173,7 @@ public class Peer extends PeerSocketHandler { *

The remoteAddress provided should match the remote address of the peer which is being connected to, and is * used to keep track of which peers relayed transactions and offer more descriptive logging.

*/ - public Peer(NetworkParameters params, VersionMessage ver, InetSocketAddress remoteAddress, + public Peer(NetworkParameters params, VersionMessage ver, PeerAddress remoteAddress, @Nullable AbstractBlockChain chain, @Nullable MemoryPool mempool) { super(params, remoteAddress); this.params = Preconditions.checkNotNull(params); @@ -203,8 +203,8 @@ public class Peer extends PeerSocketHandler { *

The remoteAddress provided should match the remote address of the peer which is being connected to, and is * used to keep track of which peers relayed transactions and offer more descriptive logging.

*/ - public Peer(NetworkParameters params, AbstractBlockChain blockChain, InetSocketAddress remoteAddress, String thisSoftwareName, String thisSoftwareVersion) { - this(params, new VersionMessage(params, blockChain.getBestChainHeight(), true), blockChain, remoteAddress); + public Peer(NetworkParameters params, AbstractBlockChain blockChain, PeerAddress peerAddress, String thisSoftwareName, String thisSoftwareVersion) { + this(params, new VersionMessage(params, blockChain.getBestChainHeight(), true), blockChain, peerAddress); this.versionMessage.appendToSubVer(thisSoftwareName, thisSoftwareVersion, null); } diff --git a/core/src/main/java/com/google/bitcoin/core/PeerAddress.java b/core/src/main/java/com/google/bitcoin/core/PeerAddress.java index 3d689fa4..7cd34a4b 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerAddress.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerAddress.java @@ -252,6 +252,7 @@ public class PeerAddress extends ChildMessage { other.port == port && other.services.equals(services) && other.time == time; + //FIXME including services and time could cause same peer to be added multiple times in collections } @Override diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 6cd4c61f..4b9b1bde 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -22,6 +22,7 @@ import com.google.bitcoin.net.NioClientManager; import com.google.bitcoin.net.discovery.PeerDiscovery; import com.google.bitcoin.net.discovery.PeerDiscoveryException; import com.google.bitcoin.script.Script; +import com.google.bitcoin.utils.ExponentialBackoff; import com.google.bitcoin.utils.ListenerRegistration; import com.google.bitcoin.utils.Threading; import com.google.common.base.Preconditions; @@ -36,10 +37,7 @@ import java.math.BigInteger; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; import static com.google.common.base.Preconditions.checkNotNull; @@ -67,14 +65,16 @@ import static com.google.common.base.Preconditions.checkState; * when finished. Note that not all methods of PeerGroup are safe to call from a UI thread as some may do * network IO, but starting and stopping the service should be fine.

*/ -public class PeerGroup extends AbstractIdleService implements TransactionBroadcaster { +public class PeerGroup extends AbstractExecutionThreadService implements TransactionBroadcaster { private static final int DEFAULT_CONNECTIONS = 4; private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); protected final ReentrantLock lock = Threading.lock("peergroup"); // Addresses to try to connect to, excluding active peers. - @GuardedBy("lock") private final List inactives; + @GuardedBy("lock") private final PriorityQueue inactives; + @GuardedBy("lock") private final Map backoffMap; + // Currently active peers. This is an ordered list rather than a set to make unit tests predictable. private final CopyOnWriteArrayList peers; // Currently connecting peers. @@ -142,6 +142,21 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca @Override public void onCoinsSent(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { onChanged(); } }; + // Exponential backoff for peers starts at 1 second and maxes at 10 minutes. + private ExponentialBackoff.Params peerBackoffParams = new ExponentialBackoff.Params(1000, 1.5f, 10 * 60 * 1000); + // Tracks failures globally in case of a network failure + private ExponentialBackoff groupBackoff = new ExponentialBackoff(new ExponentialBackoff.Params(100, 1.1f, 30 * 1000)); + + private LinkedBlockingQueue morePeersMailbox = new LinkedBlockingQueue(); + + private void handleBlocksDownloaded() { + double rate = chain.getFalsePositiveRate(); + if (rate > bloomFilterFPRate * MAX_FP_RATE_INCREASE) { + log.info("Force update Bloom filter due to high false positive rate"); + recalculateFastCatchupAndFilter(true); + } + } + private class PeerStartupListener extends AbstractPeerEventListener { @Override public void onPeerConnected(Peer peer, int peerCount) { @@ -222,7 +237,17 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca memoryPool = new MemoryPool(); - inactives = new ArrayList(); + inactives = new PriorityQueue(1, new Comparator() { + @Override + public int compare(PeerAddress a, PeerAddress b) { + int result = backoffMap.get(a).compareTo(backoffMap.get(b)); + // Sort by port if otherwise equals - for testing + if (result == 0) + result = Integer.valueOf(a.getPort()).compareTo(b.getPort()); + return result; + } + }); + backoffMap = new HashMap(); peers = new CopyOnWriteArrayList(); pendingPeers = new CopyOnWriteArrayList(); channels = connectionManager; @@ -246,18 +271,17 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca } // We may now have too many or too few open connections. Add more or drop some to get to the right amount. adjustment = maxConnections - channels.getConnectedClientCount(); - while (adjustment > 0) { - try { - connectToAnyPeer(); - } catch (PeerDiscoveryException e) { - throw new RuntimeException(e); - } - adjustment--; - } + if (adjustment > 0) + notifyServiceThread(); + if (adjustment < 0) channels.closeConnections(-adjustment); } + private void notifyServiceThread() { + morePeersMailbox.offer(this); // Any non-null object will do. + } + /** The maximum number of connections that we will create to peers. */ public int getMaxConnections() { lock.lock(); @@ -442,7 +466,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca int newMax; lock.lock(); try { - inactives.add(peerAddress); + addInactive(peerAddress); newMax = getMaxConnections() + 1; } finally { lock.unlock(); @@ -450,6 +474,14 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca setMaxConnections(newMax); } + private void addInactive(PeerAddress peerAddress) { + // Deduplicate + if (backoffMap.containsKey(peerAddress)) + return; + backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams)); + inactives.offer(peerAddress); + } + /** Convenience method for addAddress(new PeerAddress(address, params.port)); */ public void addAddress(InetAddress address) { addAddress(new PeerAddress(address, params.getPort())); @@ -481,11 +513,37 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca } lock.lock(); try { - inactives.addAll(addressSet); + for (PeerAddress address : addressSet) { + addInactive(address); + } } finally { lock.unlock(); } - log.info("Peer discovery took {}msec", System.currentTimeMillis() - start); + log.info("Peer discovery took {}msec and returned {} items", + System.currentTimeMillis() - start, addressSet.size()); + } + + @Override + protected void run() throws Exception { + while (isRunning()) { + int numPeers; + lock.lock(); + try { + numPeers = peers.size() + pendingPeers.size(); + } finally { + lock.unlock(); + } + + if (numPeers < getMaxConnections()) { + try { + connectToAnyPeer(); + } catch(PeerDiscoveryException e) { + groupBackoff.trackFailure(); + } + } + else + morePeersMailbox.take(); + } } /** Picks a peer from discovery and connects to it. If connection fails, picks another and tries again. */ @@ -494,46 +552,56 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca if (!(state == State.STARTING || state == State.RUNNING)) return; final PeerAddress addr; + + long nowMillis = Utils.currentTimeMillis(); + lock.lock(); try { - if (inactives.size() == 0) { + if (!haveReadyInactivePeer(nowMillis)) { discoverPeers(); + groupBackoff.trackSuccess(); + nowMillis = Utils.currentTimeMillis(); } if (inactives.size() == 0) { log.debug("Peer discovery didn't provide us any more peers, not trying to build new connection."); return; } - addr = inactives.remove(inactives.size() - 1); + addr = inactives.poll(); } finally { lock.unlock(); } - // This method eventually constructs a Peer and puts it into pendingPeers. If the connection fails to establish, - // handlePeerDeath will be called, which will potentially call this method again to replace the dead or failed - // connection. - connectTo(addr.toSocketAddress(), false); + + // Delay if any backoff is required + long retryTime = Math.max(backoffMap.get(addr).getRetryTime(), groupBackoff.getRetryTime()); + if (retryTime > nowMillis) { + // Sleep until retry time + Utils.sleep(retryTime - nowMillis); + } + + // This method constructs a Peer and puts it into pendingPeers. + connectTo(addr, false); + } + + private boolean haveReadyInactivePeer(long nowMillis) { + // No inactive peers to try? + if (inactives.size() == 0) + return false; + // All peers have not reached backoff retry time? + if (backoffMap.get(inactives.peek()).getRetryTime() > nowMillis) + return false; + return true; } @Override protected void startUp() throws Exception { - // This is run in a background thread by the AbstractIdleService implementation. + // This is run in a background thread by the Service implementation. vPingTimer = new Timer("Peer pinging thread", true); channels.startAndWait(); - // Bring up the requested number of connections. If a connect attempt fails, - // new peers will be tried until there is a success, so just calling connectToAnyPeer for the wanted number - // of peers is sufficient. - for (int i = 0; i < getMaxConnections(); i++) { - try { - connectToAnyPeer(); - } catch (PeerDiscoveryException e) { - if (e.getCause() instanceof InterruptedException) return; - log.error(e.getMessage()); - } - } } @Override protected void shutDown() throws Exception { - // This is run on a separate thread by the AbstractIdleService implementation. + // This is run on a separate thread by the Service implementation. vPingTimer.cancel(); // Blocking close of all sockets. channels.stopAndWait(); @@ -542,6 +610,11 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca } } + @Override + protected void triggerShutdown() { + notifyServiceThread(); + } + /** *

Link the given wallet to this PeerGroup. This is used for three purposes:

* @@ -690,20 +763,24 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca } /** - * Connect to a peer by creating a channel to the destination address. + * Connect to a peer by creating a channel to the destination address. This should not be + * used normally - let the PeerGroup manage connections through {@link #start()} * * @param address destination IP and port. - * @return The newly created Peer object. Use {@link com.google.bitcoin.core.Peer#getConnectionOpenFuture()} if you - * want a future which completes when the connection is open, or null if the peer could not be connected. + * @return The newly created Peer object or null if the peer could not be connected. + * Use {@link com.google.bitcoin.core.Peer#getConnectionOpenFuture()} if you + * want a future which completes when the connection is open. */ @Nullable public Peer connectTo(InetSocketAddress address) { - return connectTo(address, true); + PeerAddress peerAddress = new PeerAddress(address); + backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams)); + return connectTo(peerAddress, true); } // Internal version. @Nullable - protected Peer connectTo(InetSocketAddress address, boolean incrementMaxConnections) { + protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections) { VersionMessage ver = getVersionMessage().duplicate(); ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight(); ver.time = Utils.now().getTime() / 1000; @@ -714,7 +791,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca pendingPeers.add(peer); try { - channels.openConnection(address, peer); + channels.openConnection(address.toSocketAddress(), peer); } catch (Exception e) { log.warn("Failed to connect to " + address + ": " + e.getMessage()); handlePeerDeath(peer); @@ -790,6 +867,9 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca int newSize = -1; lock.lock(); try { + groupBackoff.trackSuccess(); + backoffMap.get(peer.getAddress()).trackSuccess(); + // Sets up the newly connected peer so it can do everything it needs to. log.info("{}: New peer", peer); pendingPeers.remove(peer); @@ -972,7 +1052,10 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca try { pendingPeers.remove(peer); peers.remove(peer); - log.info("{}: Peer died", peer.getAddress()); + + PeerAddress address = peer.getAddress(); + + log.info("{}: Peer died", address); if (peer == downloadPeer) { log.info("Download peer died. Picking a new one."); setDownloadPeer(null); @@ -987,17 +1070,21 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca } numPeers = peers.size() + pendingPeers.size(); numConnectedPeers = peers.size(); + + groupBackoff.trackFailure(); + + //TODO: if network failure is suspected, do not backoff peer + backoffMap.get(address).trackFailure(); + // Put back on inactive list + inactives.offer(address); + + if (numPeers < getMaxConnections()) { + notifyServiceThread(); + } } finally { lock.unlock(); } - // Replace this peer with a new one to keep our connection count up, if necessary. - if (numPeers < getMaxConnections()) { - try { - connectToAnyPeer(); - } catch (PeerDiscoveryException e) { - log.error(e.getMessage()); - } - } + peer.removeEventListener(peerListener); for (Wallet wallet : wallets) { peer.removeWallet(wallet); diff --git a/core/src/main/java/com/google/bitcoin/core/PeerSocketHandler.java b/core/src/main/java/com/google/bitcoin/core/PeerSocketHandler.java index d5cb02eb..d9e262fa 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerSocketHandler.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerSocketHandler.java @@ -42,13 +42,9 @@ import static com.google.common.base.Preconditions.*; public abstract class PeerSocketHandler extends AbstractTimeoutHandler implements StreamParser { private static final Logger log = LoggerFactory.getLogger(PeerSocketHandler.class); - // The IP address to which we are connecting. - @VisibleForTesting - InetSocketAddress remoteIp; - private final BitcoinSerializer serializer; - - /** If we close() before we know our writeTarget, set this to true to call writeTarget.closeConnection() right away */ + protected PeerAddress peerAddress; + // If we close() before we know our writeTarget, set this to true to call writeTarget.closeConnection() right away. private boolean closePending = false; // writeTarget will be thread-safe, and may call into PeerGroup, which calls us, so we should call it unlocked @VisibleForTesting MessageWriteTarget writeTarget = null; @@ -62,9 +58,14 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement private Lock lock = Threading.lock("PeerSocketHandler"); - public PeerSocketHandler(NetworkParameters params, InetSocketAddress peerAddress) { + public PeerSocketHandler(NetworkParameters params, InetSocketAddress remoteIp) { serializer = new BitcoinSerializer(checkNotNull(params)); - this.remoteIp = checkNotNull(peerAddress); + this.peerAddress = new PeerAddress(remoteIp); + } + + public PeerSocketHandler(NetworkParameters params, PeerAddress peerAddress) { + serializer = new BitcoinSerializer(checkNotNull(params)); + this.peerAddress = checkNotNull(peerAddress); } /** @@ -212,7 +213,7 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement * @return the IP address and port of peer. */ public PeerAddress getAddress() { - return new PeerAddress(remoteIp); + return peerAddress; } /** Catch any exceptions, logging them and then closing the channel. */ diff --git a/core/src/main/java/com/google/bitcoin/core/Utils.java b/core/src/main/java/com/google/bitcoin/core/Utils.java index 04c2d0d0..b4a11e44 100644 --- a/core/src/main/java/com/google/bitcoin/core/Utils.java +++ b/core/src/main/java/com/google/bitcoin/core/Utils.java @@ -30,8 +30,12 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Date; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; /** * A collection of various utility methods that are helpful for working with the Bitcoin protocol. @@ -71,6 +75,7 @@ public class Utils { * of them in a coin (whereas one would expect 1 billion). */ public static final BigInteger CENT = new BigInteger("1000000", 10); + private static BlockingQueue mockSleepQueue; /** * Convert an amount expressed in the way humans are used to into nanocoins. @@ -447,15 +452,21 @@ public class Utils { * Advances (or rewinds) the mock clock by the given number of seconds. */ public static Date rollMockClock(int seconds) { + return rollMockClockMillis(seconds * 1000); + } + + /** + * Advances (or rewinds) the mock clock by the given number of milliseconds. + */ + public static Date rollMockClockMillis(long millis) { if (mockTime == null) mockTime = new Date(); - mockTime = new Date(mockTime.getTime() + (seconds * 1000)); + mockTime = new Date(mockTime.getTime() + millis); return mockTime; } /** * Sets the mock clock to the given time (in seconds) - * @param mockClock */ public static void setMockClock(long mockClock) { mockTime = new Date(mockClock * 1000); @@ -470,6 +481,14 @@ public class Utils { else return new Date(); } + + /** Returns the current time in seconds since the epoch, or a mocked out equivalent. */ + public static long currentTimeMillis() { + if (mockTime != null) + return mockTime.getTime(); + else + return System.currentTimeMillis(); + } public static byte[] copyOf(byte[] in, int length) { byte[] out = new byte[length]; @@ -539,4 +558,42 @@ public class Utils { public static void setBitLE(byte[] data, int index) { data[index >>> 3] |= bitMask[7 & index]; } + + /** Sleep for a span of time, or mock sleep if enabled */ + public static void sleep(long millis) { + if (mockSleepQueue == null) { + sleepUninterruptibly(millis, TimeUnit.MILLISECONDS); + } else { + try { + boolean isMultiPass = mockSleepQueue.take(); + rollMockClockMillis(millis); + if (isMultiPass) + mockSleepQueue.offer(true); + } catch (InterruptedException e) { + // Ignored. + } + } + } + + /** Enable or disable mock sleep. If enabled, set mock time to current time. */ + public static void setMockSleep(boolean isEnable) { + if (isEnable) { + mockSleepQueue = new ArrayBlockingQueue(1); + mockTime = new Date(System.currentTimeMillis()); + } else { + mockSleepQueue = null; + } + } + + /** Let sleeping thread pass the synchronization point. */ + public static void passMockSleep() { + mockSleepQueue.offer(false); + } + + /** Let the sleeping thread pass the synchronization point any number of times. */ + public static void finishMockSleep() { + if (mockSleepQueue != null) { + mockSleepQueue.offer(true); + } + } } diff --git a/core/src/main/java/com/google/bitcoin/utils/ExponentialBackoff.java b/core/src/main/java/com/google/bitcoin/utils/ExponentialBackoff.java new file mode 100644 index 00000000..bee37dab --- /dev/null +++ b/core/src/main/java/com/google/bitcoin/utils/ExponentialBackoff.java @@ -0,0 +1,104 @@ +/** + * Copyright 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.utils; + +import com.google.bitcoin.core.Utils; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + *

Tracks successes and failures and calculates a time to retry the operation.

+ * + *

The retries are exponentially backed off, up to a maximum interval. On success the back off interval is reset.

+ */ +public class ExponentialBackoff implements Comparable { + public static final int DEFAULT_INITIAL_MILLIS = 100; + public static final float DEFAULT_MULTIPLIER = 1.1f; + public static final int DEFAULT_MAXIMUM_MILLIS = 30 * 1000; + + private float backoff; + private long retryTime; + private final Params params; + + /** + * Parameters to configure a particular kind of exponential backoff. + */ + public static class Params { + private final float initial; + private final float multiplier; + private final float maximum; + + /** + * @param initialMillis the initial interval to wait, in milliseconds + * @param multiplier the multiplier to apply on each failure + * @param maximumMillis the maximum interval to wait, in milliseconds + */ + public Params(long initialMillis, float multiplier, long maximumMillis) { + checkArgument(multiplier > 1.0f, "multiplier must be greater than 1.0"); + checkArgument(maximumMillis >= initialMillis, "maximum must not be less than initial"); + + this.initial = initialMillis; + this.multiplier = multiplier; + this.maximum = maximumMillis; + } + + /** + * Construct params with default values. + */ + public Params() { + initial = DEFAULT_INITIAL_MILLIS; + multiplier = DEFAULT_MULTIPLIER; + maximum = DEFAULT_MAXIMUM_MILLIS; + } + } + + public ExponentialBackoff(Params params) { + this.params = params; + trackSuccess(); + } + + /** Track a success - reset back off interval to the initial value */ + public void trackSuccess() { + backoff = params.initial; + retryTime = Utils.currentTimeMillis(); + } + + /** Track a failure - multiply the back off interval by the multiplier */ + public void trackFailure() { + retryTime = Utils.currentTimeMillis() + (long)backoff; + backoff = Math.min(backoff * params.multiplier, params.maximum); + } + + /** Get the next time to retry, in milliseconds since the epoch */ + public long getRetryTime() { + return retryTime; + } + + @Override + public int compareTo(ExponentialBackoff other) { + if (retryTime < other.retryTime) + return -1; + if (retryTime > other.retryTime) + return 1; + return 0; + } + + @Override + public String toString() { + return "ExponentialBackoff retry=" + retryTime + " backoff=" + backoff; + } +} diff --git a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java index 3418ca28..54eb27d8 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java @@ -22,6 +22,7 @@ import com.google.bitcoin.params.UnitTestParams; import com.google.bitcoin.store.MemoryBlockStore; import com.google.bitcoin.utils.TestUtils; import com.google.bitcoin.utils.Threading; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; import org.junit.After; import org.junit.Before; @@ -44,6 +45,10 @@ import static org.junit.Assert.*; @RunWith(value = Parameterized.class) public class PeerGroupTest extends TestWithPeerGroup { static final NetworkParameters params = UnitTestParams.get(); + private BlockingQueue connectedPeers; + private BlockingQueue disconnectedPeers; + private PeerEventListener listener; + private Map peerToMessageCount; @Parameterized.Parameters public static Collection parameters() { @@ -58,24 +63,10 @@ public class PeerGroupTest extends TestWithPeerGroup { @Override @Before public void setUp() throws Exception { - super.setUp(new MemoryBlockStore(UnitTestParams.get())); - peerGroup.addWallet(wallet); - } - - @After - public void tearDown() throws Exception { - super.tearDown(); - peerGroup.stopAndWait(); - } - - @Test - public void listener() throws Exception { - final BlockingQueue connectedPeers = new LinkedBlockingQueue(); - final BlockingQueue disconnectedPeers = new LinkedBlockingQueue(); - final SettableFuture firstDisconnectFuture = SettableFuture.create(); - final SettableFuture secondDisconnectFuture = SettableFuture.create(); - final Map peerToMessageCount = new HashMap(); - AbstractPeerEventListener listener = new AbstractPeerEventListener() { + peerToMessageCount = new HashMap(); + connectedPeers = new LinkedBlockingQueue(); + disconnectedPeers = new LinkedBlockingQueue(); + listener = new AbstractPeerEventListener() { @Override public void onPeerConnected(Peer peer, int peerCount) { connectedPeers.add(peer); @@ -98,6 +89,21 @@ public class PeerGroupTest extends TestWithPeerGroup { return m; } }; + super.setUp(new MemoryBlockStore(UnitTestParams.get())); + peerGroup.addWallet(wallet); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + Utils.finishMockSleep(); + peerGroup.stopAndWait(); + } + + @Test + public void listener() throws Exception { + final SettableFuture firstDisconnectFuture = SettableFuture.create(); + final SettableFuture secondDisconnectFuture = SettableFuture.create(); peerGroup.startAndWait(); peerGroup.addEventListener(listener); @@ -430,6 +436,75 @@ public class PeerGroupTest extends TestWithPeerGroup { Thread.sleep(50); assertFalse(peerConnectedFuture.isDone() || peerDisconnectedFuture.isDone()); Thread.sleep(60); + assertTrue(!peerConnectedFuture.isDone()); assertTrue(!peerConnectedFuture.isDone() && peerDisconnectedFuture.isDone()); } + + @Test + public void peerPriority() throws Exception { + final List addresses = Lists.newArrayList( + new InetSocketAddress("localhost", 2000), + new InetSocketAddress("localhost", 2001), + new InetSocketAddress("localhost", 2002) + ); + peerGroup.addEventListener(listener); + peerGroup.addPeerDiscovery(new PeerDiscovery() { + public InetSocketAddress[] getPeers(long unused, TimeUnit unused2) throws PeerDiscoveryException { + return addresses.toArray(new InetSocketAddress[0]); + } + + public void shutdown() { + } + }); + peerGroup.setMaxConnections(3); + Utils.setMockSleep(true); + peerGroup.startAndWait(); + + handleConnectToPeer(0); + handleConnectToPeer(1); + handleConnectToPeer(2); + connectedPeers.take(); + connectedPeers.take(); + connectedPeers.take(); + addresses.clear(); + addresses.addAll(Lists.newArrayList(new InetSocketAddress("localhost", 2003))); + stopPeerServer(2); + assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); // peer died + + // discovers, connects to new peer + handleConnectToPeer(3); + assertEquals(2003, connectedPeers.take().getAddress().getPort()); + + stopPeerServer(1); + assertEquals(2001, disconnectedPeers.take().getAddress().getPort()); // peer died + + // Alternates trying two offline peers + Utils.passMockSleep(); + assertEquals(2001, disconnectedPeers.take().getAddress().getPort()); + Utils.passMockSleep(); + assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); + Utils.passMockSleep(); + assertEquals(2001, disconnectedPeers.take().getAddress().getPort()); + Utils.passMockSleep(); + assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); + Utils.passMockSleep(); + assertEquals(2001, disconnectedPeers.take().getAddress().getPort()); + + // Peer 2 comes online + startPeerServer(2); + Utils.passMockSleep(); + handleConnectToPeer(2); + assertEquals(2002, connectedPeers.take().getAddress().getPort()); + + stopPeerServer(2); + assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); // peer died + + // Peer 2 is tried twice before peer 1, since it has a lower backoff due to recent success + Utils.passMockSleep(); + assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); + Utils.passMockSleep(); + assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); + Utils.passMockSleep(); + assertEquals(2001, disconnectedPeers.take().getAddress().getPort()); + } } diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index 539f6941..627039fe 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -78,7 +78,8 @@ public class PeerTest extends TestWithNetworkConnections { memoryPool = new MemoryPool(); VersionMessage ver = new VersionMessage(unitTestParams, 100); - peer = new Peer(unitTestParams, ver, new InetSocketAddress("127.0.0.1", 4000), blockChain, memoryPool); + InetSocketAddress address = new InetSocketAddress("127.0.0.1", 4000); + peer = new Peer(unitTestParams, ver, new PeerAddress(address), blockChain, memoryPool); peer.addWallet(wallet); } @@ -265,7 +266,8 @@ public class PeerTest extends TestWithNetworkConnections { public void invDownloadTxMultiPeer() throws Exception { // Check co-ordination of which peer to download via the memory pool. VersionMessage ver = new VersionMessage(unitTestParams, 100); - Peer peer2 = new Peer(unitTestParams, ver, new InetSocketAddress("127.0.0.1", 4242), blockChain, memoryPool); + InetSocketAddress address = new InetSocketAddress("127.0.0.1", 4242); + Peer peer2 = new Peer(unitTestParams, ver, new PeerAddress(address), blockChain, memoryPool); peer2.addWallet(wallet); VersionMessage peerVersion = new VersionMessage(unitTestParams, OTHER_PEER_CHAIN_HEIGHT); peerVersion.clientVersion = 70001; @@ -276,7 +278,7 @@ public class PeerTest extends TestWithNetworkConnections { // Make a tx and advertise it to one of the peers. BigInteger value = Utils.toNanoCoins(1, 0); - Transaction tx = createFakeTx(unitTestParams, value, address); + Transaction tx = createFakeTx(unitTestParams, value, this.address); InventoryMessage inv = new InventoryMessage(unitTestParams); InventoryItem item = new InventoryItem(InventoryItem.Type.Transaction, tx.getHash()); inv.addItem(item); diff --git a/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java b/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java index 8b0f15fe..ec85853e 100644 --- a/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java +++ b/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java @@ -24,6 +24,7 @@ import com.google.bitcoin.utils.BriefLogFormatter; import com.google.bitcoin.utils.Threading; import com.google.common.util.concurrent.SettableFuture; +import java.io.IOException; import java.math.BigInteger; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -40,6 +41,7 @@ import static org.junit.Assert.assertTrue; * Utility class that makes it easy to work with mock NetworkConnections. */ public class TestWithNetworkConnections { + public static final int PEER_SERVERS = 5; protected NetworkParameters unitTestParams; protected BlockStore blockStore; protected BlockChain blockChain; @@ -48,7 +50,7 @@ public class TestWithNetworkConnections { protected Address address; protected SocketAddress socketAddress; - private NioServer peerServer; + private NioServer peerServers[] = new NioServer[PEER_SERVERS]; private final ClientConnectionManager channels; protected final BlockingQueue newPeerWriteTargetQueue = new LinkedBlockingQueue(); @@ -85,29 +87,51 @@ public class TestWithNetworkConnections { wallet.addKey(key); blockChain = new BlockChain(unitTestParams, wallet, blockStore); - peerServer = new NioServer(new StreamParserFactory() { - @Nullable - @Override - public StreamParser getNewParser(InetAddress inetAddress, int port) { - return new InboundMessageQueuer(unitTestParams) { - @Override public void connectionClosed() { } - @Override - public void connectionOpened() { - newPeerWriteTargetQueue.offer(this); - } - }; - } - }, new InetSocketAddress("127.0.0.1", 2000)); - peerServer.startAndWait(); + startPeerServers(); if (clientType == ClientType.NIO_CLIENT_MANAGER || clientType == ClientType.BLOCKING_CLIENT_MANAGER) channels.startAndWait(); socketAddress = new InetSocketAddress("127.0.0.1", 1111); } + protected void startPeerServers() throws IOException { + for (int i = 0 ; i < PEER_SERVERS ; i++) { + startPeerServer(i); + } + } + + protected void startPeerServer(int i) throws IOException { + peerServers[i] = new NioServer(new StreamParserFactory() { + @Nullable + @Override + public StreamParser getNewParser(InetAddress inetAddress, int port) { + return new InboundMessageQueuer(unitTestParams) { + @Override + public void connectionClosed() { + } + + @Override + public void connectionOpened() { + newPeerWriteTargetQueue.offer(this); + } + }; + } + }, new InetSocketAddress("127.0.0.1", 2000 + i)); + peerServers[i].startAndWait(); + } + public void tearDown() throws Exception { Wallet.SendRequest.DEFAULT_FEE_PER_KB = Transaction.REFERENCE_DEFAULT_MIN_TX_FEE; - peerServer.stopAndWait(); + stopPeerServers(); + } + + protected void stopPeerServers() { + for (int i = 0 ; i < PEER_SERVERS ; i++) + stopPeerServer(i); + } + + protected void stopPeerServer(int i) { + peerServers[i].stopAndWait(); } protected InboundMessageQueuer connect(Peer peer, VersionMessage versionMessage) throws Exception { diff --git a/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java b/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java index 05d8b9cd..8675aec7 100644 --- a/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java +++ b/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java @@ -20,6 +20,8 @@ import com.google.bitcoin.params.UnitTestParams; import com.google.bitcoin.net.BlockingClientManager; import com.google.bitcoin.net.NioClientManager; import com.google.bitcoin.store.BlockStore; +import com.google.bitcoin.utils.ExponentialBackoff; +import com.google.common.base.Preconditions; import java.net.InetSocketAddress; @@ -61,10 +63,9 @@ public class TestWithPeerGroup extends TestWithNetworkConnections { } protected InboundMessageQueuer connectPeerWithoutVersionExchange(int id) throws Exception { - InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000); + Preconditions.checkArgument(id < PEER_SERVERS); + InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id); Peer peer = peerGroup.connectTo(remoteAddress).getConnectionOpenFuture().get(); - // Claim we are connected to a different IP that what we really are, so tx confidence broadcastBy sets work - peer.remoteIp = new InetSocketAddress("127.0.0.1", 2000 + id); InboundMessageQueuer writeTarget = newPeerWriteTargetQueue.take(); writeTarget.peer = peer; return writeTarget; @@ -88,4 +89,25 @@ public class TestWithPeerGroup extends TestWithNetworkConnections { } return writeTarget; } + + // handle peer discovered by PeerGroup + protected InboundMessageQueuer handleConnectToPeer(int id) throws Exception { + return handleConnectToPeer(id, remoteVersionMessage); + } + + // handle peer discovered by PeerGroup + protected InboundMessageQueuer handleConnectToPeer(int id, VersionMessage versionMessage) throws Exception { + InboundMessageQueuer writeTarget = newPeerWriteTargetQueue.take(); + checkArgument(versionMessage.hasBlockChain()); + // Complete handshake with the peer - send/receive version(ack)s, receive bloom filter + writeTarget.sendMessage(versionMessage); + writeTarget.sendMessage(new VersionAck()); + assertTrue(writeTarget.nextMessageBlocking() instanceof VersionMessage); + assertTrue(writeTarget.nextMessageBlocking() instanceof VersionAck); + if (versionMessage.isBloomFilteringSupported()) { + assertTrue(writeTarget.nextMessageBlocking() instanceof BloomFilter); + assertTrue(writeTarget.nextMessageBlocking() instanceof MemoryPoolMessage); + } + return writeTarget; + } } diff --git a/core/src/test/java/com/google/bitcoin/core/utils/ExponentialBackoffTest.java b/core/src/test/java/com/google/bitcoin/core/utils/ExponentialBackoffTest.java new file mode 100644 index 00000000..cb83644a --- /dev/null +++ b/core/src/test/java/com/google/bitcoin/core/utils/ExponentialBackoffTest.java @@ -0,0 +1,82 @@ +/** + * Copyright 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.core.utils; + +import com.google.bitcoin.core.Utils; +import com.google.bitcoin.utils.ExponentialBackoff; + +import org.junit.Before; +import org.junit.Test; + +import java.util.PriorityQueue; + +import static org.junit.Assert.*; + +/** + */ +public class ExponentialBackoffTest { + private ExponentialBackoff.Params params; + private ExponentialBackoff backoff; + + @Before + public void setUp() { + Utils.setMockClock(System.currentTimeMillis() / 1000); + params = new ExponentialBackoff.Params(); + backoff = new ExponentialBackoff(params); + } + + @Test + public void testSuccess() { + assertEquals(Utils.currentTimeMillis(), backoff.getRetryTime()); + + backoff.trackFailure(); + backoff.trackFailure(); + backoff.trackSuccess(); + + assertEquals(Utils.currentTimeMillis(), backoff.getRetryTime()); + } + + @Test + public void testFailure() { + assertEquals(Utils.currentTimeMillis(), backoff.getRetryTime()); + + backoff.trackFailure(); + backoff.trackFailure(); + backoff.trackFailure(); + + assertEquals(Utils.currentTimeMillis() + 121, backoff.getRetryTime()); + } + + @Test + public void testInQueue() { + PriorityQueue queue = new PriorityQueue(); + ExponentialBackoff backoff1 = new ExponentialBackoff(params); + backoff.trackFailure(); + backoff.trackFailure(); + backoff1.trackFailure(); + backoff1.trackFailure(); + backoff1.trackFailure(); + queue.offer(backoff); + queue.offer(backoff1); + + assertEquals(queue.poll(), backoff); // The one with soonest retry time + assertEquals(queue.peek(), backoff1); + + queue.offer(backoff); + assertEquals(queue.poll(), backoff); // Still the same one + } +} diff --git a/examples/src/main/java/com/google/bitcoin/examples/PrintPeers.java b/examples/src/main/java/com/google/bitcoin/examples/PrintPeers.java index e86ec7d1..6e7b5875 100644 --- a/examples/src/main/java/com/google/bitcoin/examples/PrintPeers.java +++ b/examples/src/main/java/com/google/bitcoin/examples/PrintPeers.java @@ -19,6 +19,7 @@ package com.google.bitcoin.examples; import com.google.bitcoin.core.AbstractPeerEventListener; import com.google.bitcoin.core.NetworkParameters; import com.google.bitcoin.core.Peer; +import com.google.bitcoin.core.PeerAddress; import com.google.bitcoin.core.VersionMessage; import com.google.bitcoin.net.discovery.DnsDiscovery; import com.google.bitcoin.net.discovery.PeerDiscoveryException; @@ -79,7 +80,8 @@ public class PrintPeers { List> futures = Lists.newArrayList(); NioClientManager clientManager = new NioClientManager(); for (final InetAddress addr : addrs) { - final Peer peer = new Peer(params, new VersionMessage(params, 0), null, new InetSocketAddress(addr, params.getPort())); + InetSocketAddress address = new InetSocketAddress(addr, params.getPort()); + final Peer peer = new Peer(params, new VersionMessage(params, 0), null, new PeerAddress(address)); final SettableFuture future = SettableFuture.create(); // Once the connection has completed version handshaking ... peer.addEventListener(new AbstractPeerEventListener() { @@ -110,7 +112,7 @@ public class PrintPeers { future.set(null); } }); - clientManager.openConnection(new InetSocketAddress(addr, params.getPort()), peer); + clientManager.openConnection(address, peer); futures.add(future); } // Wait for every tried connection to finish.