3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 14:54:15 +00:00

Memory optimisations to avoid OOM when the user thread falls behind.

- Remove needless recalculations of the Bloom filter on any wallet change, instead of just when keys/scripts are added. This may fix one of the privacy leak issues too.
- Run fast catchup/filter recalculations on the dedicated PeerGroup thread instead of abusing the user thread. Resolves a TODO.
- Replace the user thread SingleThreadedExecutor with a custom class that blocks the submitting thread and logs a warning if the queue is saturated, to avoid building up a backlog of closures.
This commit is contained in:
Mike Hearn 2014-01-14 18:49:09 +01:00
parent e0b698a2e9
commit d7b3766c4b
3 changed files with 74 additions and 36 deletions

View File

@ -25,6 +25,7 @@ import com.google.bitcoin.script.Script;
import com.google.bitcoin.utils.ExponentialBackoff;
import com.google.bitcoin.utils.ListenerRegistration;
import com.google.bitcoin.utils.Threading;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.*;
@ -33,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.*;
@ -132,14 +132,18 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
};
private int minBroadcastConnections = 0;
private AbstractWalletEventListener walletEventListener = new AbstractWalletEventListener() {
private void onChanged() {
private Runnable recalculateRunnable = new Runnable() {
@Override public void run() {
recalculateFastCatchupAndFilter(false);
}
@Override public void onScriptsAdded(Wallet wallet, List<Script> scripts) { onChanged(); }
@Override public void onKeysAdded(Wallet wallet, List<ECKey> keys) { onChanged(); }
@Override public void onCoinsReceived(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { onChanged(); }
@Override public void onCoinsSent(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { onChanged(); }
};
private AbstractWalletEventListener walletEventListener = new AbstractWalletEventListener() {
@Override public void onScriptsAdded(Wallet wallet, List<Script> scripts) {
Uninterruptibles.putUninterruptibly(jobQueue, recalculateRunnable);
}
@Override public void onKeysAdded(Wallet wallet, List<ECKey> keys) {
Uninterruptibles.putUninterruptibly(jobQueue, recalculateRunnable);
}
};
// Exponential backoff for peers starts at 1 second and maxes at 10 minutes.
@ -147,7 +151,8 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
// Tracks failures globally in case of a network failure
private ExponentialBackoff groupBackoff = new ExponentialBackoff(new ExponentialBackoff.Params(100, 1.1f, 30 * 1000));
private LinkedBlockingQueue<Object> morePeersMailbox = new LinkedBlockingQueue<Object>();
// Things for the dedicated PeerGroup management thread to do.
private LinkedBlockingQueue<Runnable> jobQueue = new LinkedBlockingQueue<Runnable>();
private class PeerStartupListener extends AbstractPeerEventListener {
@Override
@ -264,14 +269,31 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
// We may now have too many or too few open connections. Add more or drop some to get to the right amount.
adjustment = maxConnections - channels.getConnectedClientCount();
if (adjustment > 0)
notifyServiceThread();
triggerConnections();
if (adjustment < 0)
channels.closeConnections(-adjustment);
}
private void notifyServiceThread() {
morePeersMailbox.offer(this); // Any non-null object will do.
private Runnable triggerConnectionsJob = new Runnable() {
@Override
public void run() {
// We have to test the condition at the end, because during startup we need to run this at least once
// when isRunning() can return false.
do {
try {
connectToAnyPeer();
} catch(PeerDiscoveryException e) {
groupBackoff.trackFailure();
}
} while (isRunning() && countConnectedAndPendingPeers() < getMaxConnections());
}
};
private void triggerConnections() {
// Run on a background thread due to the need to potentially retry and back off in the background.
Uninterruptibles.putUninterruptibly(jobQueue, triggerConnectionsJob);
}
/** The maximum number of connections that we will create to peers. */
@ -517,24 +539,31 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
@Override
protected void run() throws Exception {
// Runs in a background thread dedicated to the PeerGroup. Jobs are for handling peer connections with backoff,
// and also recalculating filters.
while (isRunning()) {
int numPeers;
lock.lock();
try {
numPeers = peers.size() + pendingPeers.size();
} finally {
lock.unlock();
}
jobQueue.take().run();
}
}
if (numPeers < getMaxConnections()) {
try {
connectToAnyPeer();
} catch(PeerDiscoveryException e) {
groupBackoff.trackFailure();
}
@VisibleForTesting
void waitForJobQueue() {
final CountDownLatch latch = new CountDownLatch(1);
Uninterruptibles.putUninterruptibly(jobQueue, new Runnable() {
@Override
public void run() {
latch.countDown();
}
else
morePeersMailbox.take();
});
Uninterruptibles.awaitUninterruptibly(latch);
}
private int countConnectedAndPendingPeers() {
lock.lock();
try {
return peers.size() + pendingPeers.size();
} finally {
lock.unlock();
}
}
@ -589,6 +618,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
// This is run in a background thread by the Service implementation.
vPingTimer = new Timer("Peer pinging thread", true);
channels.startAndWait();
triggerConnections();
}
@Override
@ -604,7 +634,11 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
@Override
protected void triggerShutdown() {
notifyServiceThread();
// Force the thread to wake up.
Uninterruptibles.putUninterruptibly(jobQueue, new Runnable() {
public void run() {
}
});
}
/**
@ -630,7 +664,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
checkState(!wallets.contains(wallet));
wallets.add(wallet);
wallet.setTransactionBroadcaster(this);
wallet.addEventListener(walletEventListener); // TODO: Run this in the current peer thread.
wallet.addEventListener(walletEventListener, Threading.SAME_THREAD);
addPeerFilterProvider(wallet);
} finally {
lock.unlock();
@ -1071,7 +1105,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
inactives.offer(address);
if (numPeers < getMaxConnections()) {
notifyServiceThread();
triggerConnections();
}
} finally {
lock.unlock();

View File

@ -94,9 +94,6 @@ public class Threading {
@SuppressWarnings("InfiniteLoopStatement") @Override
public void run() {
while (true) {
if (tasks.remainingCapacity() < 2)
log.warn("User thread saturated, {} tasks queued. Review your event handlers to make sure they are not too slow",
tasks.size());
Runnable task = Uninterruptibles.takeUninterruptibly(tasks);
try {
task.run();
@ -112,7 +109,12 @@ public class Threading {
@Override
public void execute(Runnable command) {
// Will block if the event thread is saturated.
Uninterruptibles.putUninterruptibly(tasks, command);
if (!tasks.offer(command)) {
log.warn("User thread saturated, check for deadlocked or slow event handlers. Sample tasks:");
for (Object task : tasks.toArray()) log.warn(task.toString());
// Try again and wait this time.
Uninterruptibles.putUninterruptibly(tasks, command);
}
}
}

View File

@ -339,19 +339,21 @@ public class PeerGroupTest extends TestWithPeerGroup {
// The wallet was already added to the peer in setup.
final int WEEK = 86400 * 7;
final long now = Utils.currentTimeMillis() / 1000;
peerGroup.startAndWait();
assertTrue(peerGroup.getFastCatchupTimeSecs() > now - WEEK - 10000);
Wallet w2 = new Wallet(params);
ECKey key1 = new ECKey();
key1.setCreationTimeSeconds(now - 86400); // One day ago.
w2.addKey(key1);
peerGroup.addWallet(w2);
Threading.waitForUserCode();
peerGroup.waitForJobQueue();
assertEquals(peerGroup.getFastCatchupTimeSecs(), now - 86400 - WEEK);
// Adding a key to the wallet should update the fast catchup time.
// Adding a key to the wallet should update the fast catchup time, but asynchronously and in the background
// due to the need to avoid complicated lock inversions.
ECKey key2 = new ECKey();
key2.setCreationTimeSeconds(now - 100000);
w2.addKey(key2);
Threading.waitForUserCode();
peerGroup.waitForJobQueue();
assertEquals(peerGroup.getFastCatchupTimeSecs(), now - WEEK - 100000);
}