Fixes/improvements to networking

Reworked networking execute-produce-consume threading.
Some networking task were wrongly performed during 'produce' phase,
and some producing was happening in 'consume' phase (also corrected).

Peer connection tasks are rate-limited to 1 per second to reduce CPU thrashing.

Show P2P listen port in logs on startup.

Tests for general purpose ExecuteProduceConsume class to cover both
random task scenario and mass-ping scenario.
This commit is contained in:
catbref 2019-07-12 09:40:43 +01:00
parent f8b496ff3c
commit 964e0a02ca
5 changed files with 552 additions and 206 deletions

View File

@ -235,7 +235,7 @@ public class Controller extends Thread {
LOGGER.info("Starting controller");
Controller.getInstance().start();
LOGGER.info("Starting networking");
LOGGER.info("Starting networking on port " + Settings.getInstance().getListenPort());
try {
Network network = Network.getInstance();
network.start();
@ -323,6 +323,13 @@ public class Controller extends Thread {
ntpNagTimestamp += NTP_NAG_PERIOD;
ntpNag();
}
// Prune stuck/slow/old peers
try {
Network.getInstance().prunePeers();
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage()));
}
}
} catch (InterruptedException e) {
// Fall-through to exit

View File

@ -23,8 +23,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
@ -55,6 +53,7 @@ import org.qora.repository.DataException;
import org.qora.repository.Repository;
import org.qora.repository.RepositoryManager;
import org.qora.settings.Settings;
import org.qora.utils.ExecuteProduceConsume;
// For managing peers
public class Network extends Thread {
@ -97,24 +96,22 @@ public class Network extends Thread {
public static final int PEER_ID_LENGTH = 128;
private final byte[] ourPeerId;
private volatile boolean isStopping = false;
private List<Peer> connectedPeers;
private List<PeerAddress> selfPeers;
private ExecutorService networkingExecutor;
private static Selector channelSelector;
private static ServerSocketChannel serverChannel;
private static AtomicBoolean isIterationInProgress = new AtomicBoolean(false);
private static Iterator<SelectionKey> channelIterator = null;
private static volatile boolean hasThreadPending = false;
private static AtomicInteger activeThreads = new AtomicInteger(0);
private static AtomicBoolean generalTaskLock = new AtomicBoolean(false);
private ExecuteProduceConsume networkEPC;
private Selector channelSelector;
private ServerSocketChannel serverChannel;
private Iterator<SelectionKey> channelIterator = null;
private int minOutboundPeers;
private int maxPeers;
private long nextConnectTaskTimestamp;
private ExecutorService broadcastExecutor;
/** Timestamp (ms) for next general info broadcast to all connected peers. Based on <tt>System.currentTimeMillis()</tt>. */
private long nextBroadcast;
private long nextBroadcastTimestamp;
private Lock mergePeersLock;
// Constructors
@ -153,14 +150,16 @@ public class Network extends Thread {
minOutboundPeers = Settings.getInstance().getMinOutboundPeers();
maxPeers = Settings.getInstance().getMaxPeers();
nextConnectTaskTimestamp = System.currentTimeMillis();
broadcastExecutor = Executors.newCachedThreadPool();
nextBroadcast = System.currentTimeMillis();
nextBroadcastTimestamp = System.currentTimeMillis();
mergePeersLock = new ReentrantLock();
// Start up first networking thread
networkingExecutor = Executors.newCachedThreadPool();
networkingExecutor.execute(new NetworkProcessor());
networkEPC = new NetworkProcessor();
networkEPC.start();
}
// Getters / setters
@ -269,33 +268,99 @@ public class Network extends Thread {
// Main thread
class NetworkProcessor implements Runnable {
class NetworkProcessor extends ExecuteProduceConsume {
@Override
public void run() {
Thread.currentThread().setName("Network");
protected Task produceTask(boolean canBlock) throws InterruptedException {
Task task;
activeThreads.incrementAndGet();
LOGGER.trace(() -> String.format("Network thread %s, hasThreadPending: %s, activeThreads now: %d", Thread.currentThread().getId(), (hasThreadPending ? "yes" : "no"), activeThreads.get()));
hasThreadPending = false;
task = maybeProduceChannelTask(canBlock);
if (task != null)
return task;
// Maintain long-term connections to various peers' API applications
try {
while (!isStopping) {
if (!isIterationInProgress.compareAndSet(false, true)) {
LOGGER.trace(() -> String.format("Network thread %s NOT producing (some other thread is) - exiting", Thread.currentThread().getId()));
break;
task = maybeProducePeerMessageTask();
if (task != null)
return task;
task = maybeProducePeerPingTask();
if (task != null)
return task;
task = maybeProduceConnectPeerTask();
if (task != null)
return task;
task = maybeProduceBroadcastTask();
if (task != null)
return task;
// Really nothing to do
return null;
}
LOGGER.trace(() -> String.format("Network thread %s is producing...", Thread.currentThread().getId()));
class ChannelTask implements ExecuteProduceConsume.Task {
private final SelectionKey selectionKey;
final SelectionKey nextSelectionKey;
public ChannelTask(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void perform() throws InterruptedException {
try {
LOGGER.trace(() -> String.format("Thread %d has pending channel: %s, with ops %d",
Thread.currentThread().getId(), selectionKey.channel(), selectionKey.readyOps()));
// process pending channel task
if (selectionKey.isReadable()) {
connectionRead((SocketChannel) selectionKey.channel());
} else if (selectionKey.isAcceptable()) {
acceptConnection((ServerSocketChannel) selectionKey.channel());
}
LOGGER.trace(() -> String.format("Thread %d processed channel: %s", Thread.currentThread().getId(), selectionKey.channel()));
} catch (CancelledKeyException e) {
LOGGER.trace(() -> String.format("Thread %s encountered cancelled channel: %s", Thread.currentThread().getId(), selectionKey.channel()));
}
}
private void connectionRead(SocketChannel socketChannel) {
Peer peer = getPeerFromChannel(socketChannel);
if (peer == null)
return;
try {
peer.readChannel();
} catch (IOException e) {
if (e.getMessage() != null && e.getMessage().toLowerCase().contains("onnection reset")) {
peer.disconnect("Connection reset");
return;
}
LOGGER.trace(() -> String.format("Network thread %s encountered I/O error: %s", Thread.currentThread().getId(), e.getMessage()), e);
peer.disconnect("I/O error");
return;
}
}
}
private Task maybeProduceChannelTask(boolean canBlock) throws InterruptedException {
final SelectionKey nextSelectionKey;
// anything to do?
if (channelIterator == null) {
try {
if (canBlock)
channelSelector.select(1000L);
else
channelSelector.selectNow();
} catch (IOException e) {
LOGGER.warn(String.format("Channel selection threw IOException: %s", e.getMessage()));
return null;
}
if (Thread.currentThread().isInterrupted())
break;
throw new InterruptedException();
channelIterator = channelSelector.selectedKeys().iterator();
}
@ -308,79 +373,74 @@ public class Network extends Thread {
channelIterator = null; // Nothing to do so reset iterator to cause new select
}
LOGGER.trace(() -> String.format("Network thread %s produced %s, iterator now %s",
Thread.currentThread().getId(),
(nextSelectionKey == null ? "null" : nextSelectionKey.channel()),
(channelIterator == null ? "null" : channelIterator.toString())));
LOGGER.trace(() -> String.format("Thread %d, nextSelectionKey %s, channelIterator now %s",
Thread.currentThread().getId(), nextSelectionKey, channelIterator));
// Spawn another thread in case we need help
if (!hasThreadPending) {
hasThreadPending = true;
LOGGER.trace(() -> String.format("Network thread %s spawning", Thread.currentThread().getId()));
networkingExecutor.execute(this);
}
} finally {
LOGGER.trace(() -> String.format("Network thread %s done producing", Thread.currentThread().getId()));
isIterationInProgress.set(false);
if (nextSelectionKey == null)
return null;
return new ChannelTask(nextSelectionKey);
}
// process
if (nextSelectionKey == null) {
// no pending tasks, but we're last remaining thread so maybe connect a new peer or do a broadcast
LOGGER.trace(() -> String.format("Network thread %s has no pending tasks", Thread.currentThread().getId()));
if (!generalTaskLock.compareAndSet(false, true))
continue;
try {
LOGGER.trace(() -> String.format("Network thread %s performing general tasks", Thread.currentThread().getId()));
pingPeers();
prunePeers();
createConnection();
if (System.currentTimeMillis() >= nextBroadcast) {
nextBroadcast = System.currentTimeMillis() + BROADCAST_INTERVAL;
// Controller can decide what to broadcast
Controller.getInstance().doNetworkBroadcast();
}
} finally {
LOGGER.trace(() -> String.format("Network thread %s finished general tasks", Thread.currentThread().getId()));
generalTaskLock.set(false);
}
} else {
try {
LOGGER.trace(() -> String.format("Network thread %s has pending channel: %s, with ops %d",
Thread.currentThread().getId(), nextSelectionKey.channel(), nextSelectionKey.readyOps()));
// process pending channel task
if (nextSelectionKey.isReadable()) {
connectionRead((SocketChannel) nextSelectionKey.channel());
} else if (nextSelectionKey.isAcceptable()) {
acceptConnection((ServerSocketChannel) nextSelectionKey.channel());
private Task maybeProducePeerMessageTask() {
for (Peer peer : getConnectedPeers()) {
Task peerTask = peer.getMessageTask();
if (peerTask != null)
return peerTask;
}
LOGGER.trace(() -> String.format("Network thread %s processed channel: %s", Thread.currentThread().getId(), nextSelectionKey.channel()));
} catch (CancelledKeyException e) {
LOGGER.trace(() -> String.format("Network thread %s encountered cancelled channel: %s", Thread.currentThread().getId(), nextSelectionKey.channel()));
return null;
}
private Task maybeProducePeerPingTask() {
// Ask connected peers whether they need a ping
for (Peer peer : getConnectedPeers()) {
Task peerTask = peer.getPingTask();
if (peerTask != null)
return peerTask;
}
return null;
}
class PeerConnectTask implements ExecuteProduceConsume.Task {
private final Peer peer;
public PeerConnectTask(Peer peer) {
this.peer = peer;
}
@Override
public void perform() throws InterruptedException {
connectPeer(peer);
}
}
private Task maybeProduceConnectPeerTask() throws InterruptedException {
if (getOutboundHandshakedPeers().size() >= minOutboundPeers)
return null;
final long now = System.currentTimeMillis();
if (now < nextConnectTaskTimestamp)
return null;
nextConnectTaskTimestamp = now + 1000L;
Peer targetPeer = getConnectablePeer();
if (targetPeer == null)
return null;
// Create connection task
return new PeerConnectTask(targetPeer);
}
} catch (InterruptedException e) {
// Fall-through to shutdown
} catch (DataException e) {
LOGGER.warn("Repository issue while running network", e);
// Fall-through to shutdown
} catch (IOException e) {
// Fall-through to shutdown
} finally {
activeThreads.decrementAndGet();
LOGGER.trace(() -> String.format("Network thread %s ending, activeThreads now: %d", Thread.currentThread().getId(), activeThreads.get()));
Thread.currentThread().setName("Network (dormant)");
}
private Task maybeProduceBroadcastTask() {
final long now = System.currentTimeMillis();
if (now < nextBroadcastTimestamp)
return null;
nextBroadcastTimestamp = now + BROADCAST_INTERVAL;
return () -> Controller.getInstance().doNetworkBroadcast();
}
}
@ -438,12 +498,7 @@ public class Network extends Thread {
this.onPeerReady(newPeer);
}
private void pingPeers() {
for (Peer peer : this.getConnectedPeers())
peer.pingCheck();
}
private void prunePeers() throws InterruptedException, DataException {
public void prunePeers() throws InterruptedException, DataException {
final long now = System.currentTimeMillis();
// Disconnect peers that are stuck during handshake
@ -495,12 +550,7 @@ public class Network extends Thread {
}
}
private void createConnection() throws InterruptedException, DataException {
if (this.getOutboundHandshakedPeers().size() >= minOutboundPeers)
return;
Peer newPeer;
private Peer getConnectablePeer() throws InterruptedException {
try (final Repository repository = RepositoryManager.getRepository()) {
// Find an address to connect to
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
@ -546,22 +596,29 @@ public class Network extends Thread {
// Any left?
if (peers.isEmpty())
return;
return null;
// Pick random peer
int peerIndex = new SecureRandom().nextInt(peers.size());
// Pick candidate
PeerData peerData = peers.get(peerIndex);
newPeer = new Peer(peerData);
Peer newPeer = new Peer(peerData);
// Update connection attempt info
repository.discardChanges();
peerData.setLastAttempted(System.currentTimeMillis());
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
return newPeer;
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue while finding a connectable peer: %s", e.getMessage()));
return null;
}
}
private void connectPeer(Peer newPeer) throws InterruptedException {
SocketChannel socketChannel = newPeer.connect();
if (socketChannel == null)
return;
@ -585,20 +642,6 @@ public class Network extends Thread {
this.onPeerReady(newPeer);
}
private void connectionRead(SocketChannel socketChannel) {
Peer peer = getPeerFromChannel(socketChannel);
if (peer == null)
return;
try {
peer.readMessages();
} catch (IOException e) {
LOGGER.trace(() -> String.format("Network thread %s encountered I/O error: %s", Thread.currentThread().getId(), e.getMessage()), e);
peer.disconnect("I/O error");
return;
}
}
private Peer getPeerFromChannel(SocketChannel socketChannel) {
synchronized (this.connectedPeers) {
for (Peer peer : this.connectedPeers)
@ -638,7 +681,7 @@ public class Network extends Thread {
/** Called when a new message arrives for a peer. message can be null if called after connection */
public void onMessage(Peer peer, Message message) {
if (message != null)
LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer));
LOGGER.trace(() -> String.format("Processing %s message with ID %d from peer %s", message.getType().name(), message.getId(), peer));
Handshake handshakeStatus = peer.getHandshakeStatus();
if (handshakeStatus != Handshake.COMPLETED) {
@ -1065,20 +1108,17 @@ public class Network extends Thread {
// Shutdown
public void shutdown() {
this.isStopping = true;
// Close listen socket to prevent more incoming connections
if (serverChannel.isOpen())
if (this.serverChannel.isOpen())
try {
serverChannel.close();
this.serverChannel.close();
} catch (IOException e) {
// Not important
}
// Stop processing threads
this.networkingExecutor.shutdownNow();
try {
if (!this.networkingExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS))
if (!this.networkEPC.shutdown(5000))
LOGGER.debug("Network threads failed to terminate");
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for networking threads to terminate");

View File

@ -15,6 +15,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@ -26,6 +27,7 @@ import org.qora.network.message.Message;
import org.qora.network.message.Message.MessageException;
import org.qora.network.message.Message.MessageType;
import org.qora.settings.Settings;
import org.qora.utils.ExecuteProduceConsume;
import org.qora.network.message.PingMessage;
import org.qora.network.message.VersionMessage;
@ -58,6 +60,7 @@ public class Peer {
private boolean isLocal;
private ByteBuffer byteBuffer;
private Map<Integer, BlockingQueue<Message>> replyQueues;
private LinkedBlockingQueue<Message> pendingMessages;
/** True if we created connection to peer, false if we accepted incoming connection from peer. */
private final boolean isOutbound;
@ -84,7 +87,6 @@ public class Peer {
private Long lastPing = null;
/** When last PING message was sent, or null if pings not started yet. */
private Long lastPingSent;
private final ReentrantLock pingLock = new ReentrantLock();
/** Latest block height as reported by peer. */
private Integer lastHeight;
@ -271,6 +273,7 @@ public class Peer {
this.socketChannel.configureBlocking(false);
this.byteBuffer = ByteBuffer.allocate(Network.MAXIMUM_MESSAGE_SIZE);
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
this.pendingMessages = new LinkedBlockingQueue<Message>();
}
public SocketChannel connect() {
@ -299,18 +302,14 @@ public class Peer {
}
/**
* Attempt to read Message from peer.
* Attempt to buffer bytes from socketChannel.
*
* @return message, or null if no message or there was a problem
* @throws IOException
*/
public void readMessages() throws IOException {
while(true) {
Message message;
synchronized (this) {
/* package */ void readChannel() throws IOException {
synchronized (this.byteBuffer) {
if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed())
break;
return;
int bytesRead = this.socketChannel.read(this.byteBuffer);
if (bytesRead == -1) {
@ -318,7 +317,14 @@ public class Peer {
return;
}
LOGGER.trace(() -> String.format("Receiving message from peer %s", this));
if (bytesRead == 0)
// No room in buffer, or no more bytes to read
return;
LOGGER.trace(() -> String.format("Received %d bytes from peer %s", bytesRead, this));
while (true) {
final Message message;
// Can we build a message from buffer now?
try {
@ -328,7 +334,6 @@ public class Peer {
this.disconnect(e.getMessage());
return;
}
}
if (message == null)
return;
@ -343,10 +348,26 @@ public class Peer {
continue;
}
// No thread waiting for message so pass up to network layer
Network.getInstance().onMessage(this, message);
// No thread waiting for message so we need to pass it up to network layer
// Add message to pending queue
if (!this.pendingMessages.offer(message)) {
LOGGER.info(String.format("No room to queue message from peer %s - discarding", this));
return;
}
}
}
}
/* package */ ExecuteProduceConsume.Task getMessageTask() {
final Message nextMessage = this.pendingMessages.poll();
if (nextMessage == null)
return null;
// Return a task to process message in queue
return () -> Network.getInstance().onMessage(this, nextMessage);
}
/**
* Attempt to send Message to peer.
@ -427,30 +448,27 @@ public class Peer {
}
}
public void startPings() {
/* package */ void startPings() {
// Replacing initial null value allows pingCheck() to start sending pings.
LOGGER.trace(() -> String.format("Enabling pings for peer %s", this));
this.lastPingSent = 0L; //System.currentTimeMillis();
this.lastPingSent = System.currentTimeMillis();
}
/* package */ void pingCheck() {
LOGGER.trace(() -> String.format("Ping check for peer %s", this));
if (!this.pingLock.tryLock())
return; // Some other thread is already checking ping status for this peer
try {
/* package */ ExecuteProduceConsume.Task getPingTask() {
// Pings not enabled yet?
if (this.lastPingSent == null)
return;
return null;
final long now = System.currentTimeMillis();
// Time to send another ping?
if (now < this.lastPingSent + PING_INTERVAL)
return; // Not yet
return null; // Not yet
// Not strictly true, but prevents this peer from being immediately chosen again
this.lastPingSent = now;
return () -> {
PingMessage pingMessage = new PingMessage();
Message message = this.getResponse(pingMessage);
final long after = System.currentTimeMillis();
@ -461,11 +479,7 @@ public class Peer {
}
this.setLastPing(after - now);
} catch (InterruptedException e) {
// Shutdown situation
} finally {
this.pingLock.unlock();
}
};
}
public void disconnect(String reason) {

View File

@ -0,0 +1,149 @@
package org.qora.utils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public abstract class ExecuteProduceConsume implements Runnable {
private final String className;
private final Logger logger;
private ExecutorService executor;
private int activeThreadCount = 0;
private int greatestActiveThreadCount = 0;
private int consumerCount = 0;
private boolean hasThreadPending = false;
public ExecuteProduceConsume(ExecutorService executor) {
className = this.getClass().getSimpleName();
logger = LogManager.getLogger(this.getClass());
this.executor = executor;
}
public ExecuteProduceConsume() {
this(Executors.newCachedThreadPool());
}
public void start() {
executor.execute(this);
}
public void shutdown() {
executor.shutdownNow();
}
public boolean shutdown(long timeout) throws InterruptedException {
executor.shutdownNow();
return executor.awaitTermination(timeout, TimeUnit.MILLISECONDS);
}
public int getActiveThreadCount() {
synchronized (this) {
return activeThreadCount;
}
}
public int getGreatestActiveThreadCount() {
synchronized (this) {
return greatestActiveThreadCount;
}
}
/**
* Returns a Task to be performed, possibly blocking.
*
* @param canBlock
* @return task to be performed, or null if no task pending.
* @throws InterruptedException
*/
protected abstract Task produceTask(boolean canBlock) throws InterruptedException;
@FunctionalInterface
public interface Task {
public abstract void perform() throws InterruptedException;
}
@Override
public void run() {
Thread.currentThread().setName(className + "-" + Thread.currentThread().getId());
synchronized (this) {
++activeThreadCount;
if (activeThreadCount > greatestActiveThreadCount)
greatestActiveThreadCount = activeThreadCount;
logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d",
Thread.currentThread().getId(), hasThreadPending, activeThreadCount));
hasThreadPending = false;
}
try {
boolean canBlock = false;
while (true) {
final Task task;
logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId()));
synchronized (this) {
final boolean lambdaCanIdle = canBlock;
logger.trace(() -> String.format("[%d] producing, canBlock is %b...", Thread.currentThread().getId(), lambdaCanIdle));
task = produceTask(canBlock);
}
if (task == null)
synchronized (this) {
logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d",
Thread.currentThread().getId(), activeThreadCount, consumerCount));
if (activeThreadCount > consumerCount + 1) {
--activeThreadCount;
logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d", Thread.currentThread().getId(), activeThreadCount));
break;
}
// We're the last surviving thread - producer can afford to block next round
canBlock = true;
continue;
}
// We have a task
synchronized (this) {
++consumerCount;
if (!hasThreadPending) {
logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId()));
hasThreadPending = true;
executor.execute(this); // Same object, different thread
}
}
logger.trace(() -> String.format("[%d] performing task...", Thread.currentThread().getId()));
task.perform(); // This can block for a while
logger.trace(() -> String.format("[%d] finished task", Thread.currentThread().getId()));
synchronized (this) {
--consumerCount;
// Quicker, non-blocking produce next round
canBlock = false;
}
}
} catch (InterruptedException | RejectedExecutionException e) {
// We're in shutdown situation so exit
} finally {
Thread.currentThread().setName(className + "-dormant");
}
}
}

View File

@ -0,0 +1,136 @@
package org.qora.test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.junit.Test;
import org.qora.utils.ExecuteProduceConsume;
public class ThreadTests {
private void testEPC(ExecuteProduceConsume testEPC) throws InterruptedException {
testEPC.start();
// Let it run for a minute
for (int s = 1; s <= 60; ++s) {
Thread.sleep(1000);
System.out.println(String.format("After %d second%s, active threads: %d, greatest thread count: %d", s, (s != 1 ? "s" : "") , testEPC.getActiveThreadCount(), testEPC.getGreatestActiveThreadCount()));
}
final long before = System.currentTimeMillis();
testEPC.shutdown(30 * 1000);
final long after = System.currentTimeMillis();
System.out.println(String.format("Shutdown took %d milliseconds", after - before));
System.out.println(String.format("Greatest thread count: %d", testEPC.getGreatestActiveThreadCount()));
}
@Test
public void testRandomEPC() throws InterruptedException {
final int TASK_PERCENT = 25; // Produce a task this % of the time
final int PAUSE_PERCENT = 80; // Pause for new work this % of the time
class RandomEPC extends ExecuteProduceConsume {
@Override
protected Task produceTask(boolean canIdle) throws InterruptedException {
Random random = new Random();
final int percent = random.nextInt(100);
// Sometimes produce a task
if (percent < TASK_PERCENT) {
return new Task() {
@Override
public void perform() throws InterruptedException {
Thread.sleep(random.nextInt(500) + 100);
}
};
} else {
// If we don't produce a task, then maybe simulate a pause until work arrives
if (canIdle && percent < PAUSE_PERCENT)
Thread.sleep(random.nextInt(100));
return null;
}
}
}
testEPC(new RandomEPC());
}
/**
* Test ping scenario with many peers requiring pings.
* <p>
* Specifically, if:
* <ul>
* <li>the idling EPC thread sleeps for 1 second</li>
* <li>pings are required every P seconds</li>
* <li>there are way more than P peers</li>
* </ul>
* then we need to make sure EPC threads are not
* delayed such that some peers (>P) don't get a
* chance to be pinged.
*/
@Test
public void testPingEPC() throws InterruptedException {
final long PRODUCER_SLEEP_TIME = 1000; // ms
final long PING_INTERVAL = PRODUCER_SLEEP_TIME * 8; // ms
final long PING_ROUND_TRIP_TIME = PRODUCER_SLEEP_TIME * 5; // ms
final int MAX_PEERS = 20;
final List<Long> lastPings = new ArrayList<>(Collections.nCopies(MAX_PEERS, System.currentTimeMillis()));
class PingTask implements ExecuteProduceConsume.Task {
private final int peerIndex;
public PingTask(int peerIndex) {
this.peerIndex = peerIndex;
}
@Override
public void perform() throws InterruptedException {
System.out.println("Pinging peer " + peerIndex);
// At least half the worst case ping round-trip
Random random = new Random();
int halfTime = (int) PING_ROUND_TRIP_TIME / 2;
long sleep = random.nextInt(halfTime) + halfTime;
Thread.sleep(sleep);
}
}
class PingEPC extends ExecuteProduceConsume {
@Override
protected Task produceTask(boolean canIdle) throws InterruptedException {
// If we can idle, then we do, to simulate worst case
if (canIdle)
Thread.sleep(PRODUCER_SLEEP_TIME);
// Is there a peer that needs a ping?
final long now = System.currentTimeMillis();
synchronized (lastPings) {
for (int peerIndex = 0; peerIndex < lastPings.size(); ++peerIndex) {
long lastPing = lastPings.get(peerIndex);
if (lastPing < now - PING_INTERVAL - PING_ROUND_TRIP_TIME - PRODUCER_SLEEP_TIME)
throw new RuntimeException("excessive peer ping interval for peer " + peerIndex);
if (lastPing < now - PING_INTERVAL) {
lastPings.set(peerIndex, System.currentTimeMillis());
return new PingTask(peerIndex);
}
}
}
// No work to do
return null;
}
}
testEPC(new PingEPC());
}
}