Add cycle detecting locks, fix Orchid deadlock 2

https://github.com/subgraph/Orchid/pull/10

Conflicts:
	orchid/src/com/subgraph/orchid/circuits/CircuitIO.java
This commit is contained in:
Devrandom
2014-08-23 10:30:14 -07:00
committed by Mike Hearn
parent 00d1821daa
commit 0464aa71c5
5 changed files with 162 additions and 36 deletions

View File

@@ -156,6 +156,11 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,45 @@
package com.subgraph.orchid;
import com.google.common.util.concurrent.CycleDetectingLockFactory;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by android on 8/22/14.
*/
public class Threading {
static {
// Default policy goes here. If you want to change this, use one of the static methods before
// instantiating any orchid objects. The policy change will take effect only on new objects
// from that point onwards.
throwOnLockCycles();
}
private static CycleDetectingLockFactory.Policy policy;
public static CycleDetectingLockFactory factory;
public static ReentrantLock lock(String name) {
return factory.newReentrantLock(name);
}
public static void warnOnLockCycles() {
setPolicy(CycleDetectingLockFactory.Policies.WARN);
}
public static void throwOnLockCycles() {
setPolicy(CycleDetectingLockFactory.Policies.THROW);
}
public static void ignoreLockCycles() {
setPolicy(CycleDetectingLockFactory.Policies.DISABLED);
}
public static void setPolicy(CycleDetectingLockFactory.Policy policy) {
Threading.policy = policy;
factory = CycleDetectingLockFactory.newInstance(policy);
}
public static CycleDetectingLockFactory.Policy getPolicy() {
return policy;
}
}

View File

@@ -9,6 +9,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -18,6 +19,7 @@ import com.subgraph.orchid.Connection;
import com.subgraph.orchid.ConnectionIOException;
import com.subgraph.orchid.RelayCell;
import com.subgraph.orchid.Stream;
import com.subgraph.orchid.Threading;
import com.subgraph.orchid.TorException;
import com.subgraph.orchid.circuits.cells.CellImpl;
import com.subgraph.orchid.circuits.cells.RelayCellImpl;
@@ -36,11 +38,10 @@ public class CircuitIO implements DashboardRenderable {
private final BlockingQueue<RelayCell> relayCellResponseQueue;
private final BlockingQueue<Cell> controlCellResponseQueue;
private final Map<Integer, StreamImpl> streamMap;
private final Object relaySendLock = new Object();
private final ReentrantLock streamLock = Threading.lock("stream");
private final ReentrantLock relaySendLock = Threading.lock("relaySend");
/** LOCKING: streamMap */
private boolean isMarkedForClose;
/** LOCKING: streamMap */
private boolean isClosed;
CircuitIO(CircuitImpl circuit, Connection connection, int circuitId) {
@@ -171,7 +172,8 @@ public class CircuitIO implements DashboardRenderable {
}
}
synchronized(streamMap) {
streamLock.lock();
try {
final StreamImpl stream = streamMap.get(cell.getStreamId());
// It's not unusual for the stream to not be found. For example, if a RELAY_CONNECTED arrives after
// the client has stopped waiting for it, the stream will never be tracked and eventually the edge node
@@ -179,6 +181,8 @@ public class CircuitIO implements DashboardRenderable {
if(stream != null) {
stream.addInputCell(cell);
}
} finally {
streamLock.unlock();
}
}
@@ -187,7 +191,8 @@ public class CircuitIO implements DashboardRenderable {
}
void sendRelayCellTo(RelayCell cell, CircuitNode targetNode) {
synchronized(relaySendLock) {
relaySendLock.lock();
try {
logRelayCell("Sending: ", cell);
cell.setLength();
targetNode.updateForwardDigest(cell);
@@ -200,6 +205,8 @@ public class CircuitIO implements DashboardRenderable {
targetNode.waitForSendWindowAndDecrement();
sendCell(cell);
} finally {
relaySendLock.unlock();
}
}
@@ -236,20 +243,26 @@ public class CircuitIO implements DashboardRenderable {
void markForClose() {
boolean shouldClose;
synchronized (streamMap) {
streamLock.lock();
try {
if(isMarkedForClose) {
return;
}
isMarkedForClose = true;
shouldClose = streamMap.isEmpty();
} finally {
streamLock.unlock();
}
if(shouldClose)
closeCircuit();
}
boolean isMarkedForClose() {
synchronized (streamMap) {
streamLock.lock();
try {
return isMarkedForClose;
} finally {
streamLock.unlock();
}
}
@@ -276,7 +289,8 @@ public class CircuitIO implements DashboardRenderable {
}
void destroyCircuit() {
synchronized(streamMap) {
streamLock.lock();
try {
if(isClosed) {
return;
}
@@ -287,31 +301,42 @@ public class CircuitIO implements DashboardRenderable {
s.close();
}
isClosed = true;
} finally {
streamLock.unlock();
}
}
StreamImpl createNewStream(boolean autoclose) {
synchronized(streamMap) {
streamLock.lock();
try {
final int streamId = circuit.getStatus().nextStreamId();
final StreamImpl stream = new StreamImpl(circuit, circuit.getFinalCircuitNode(), streamId, autoclose);
streamMap.put(streamId, stream);
return stream;
} finally {
streamLock.unlock();
}
}
void removeStream(StreamImpl stream) {
boolean shouldClose;
synchronized(streamMap) {
streamLock.lock();
try {
streamMap.remove(stream.getStreamId());
shouldClose = streamMap.isEmpty() && isMarkedForClose;
} finally {
streamLock.unlock();
}
if(shouldClose)
closeCircuit();
}
List<Stream> getActiveStreams() {
synchronized (streamMap) {
streamLock.lock();
try {
return new ArrayList<Stream>(streamMap.values());
} finally {
streamLock.unlock();
}
}

View File

@@ -13,6 +13,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import com.subgraph.orchid.Circuit;
import com.subgraph.orchid.CircuitBuildHandler;
@@ -29,6 +30,7 @@ import com.subgraph.orchid.OpenFailedException;
import com.subgraph.orchid.Router;
import com.subgraph.orchid.Stream;
import com.subgraph.orchid.StreamConnectFailedException;
import com.subgraph.orchid.Threading;
import com.subgraph.orchid.Tor;
import com.subgraph.orchid.TorConfig;
import com.subgraph.orchid.circuits.guards.EntryGuards;
@@ -62,6 +64,7 @@ public class CircuitManagerImpl implements CircuitManager, DashboardRenderable {
private final TorInitializationTracker initializationTracker;
private final CircuitPathChooser pathChooser;
private final HiddenServiceManager hiddenServiceManager;
private final ReentrantLock lock = Threading.lock("circuitManager");
public CircuitManagerImpl(TorConfig config, DirectoryDownloaderImpl directoryDownloader, Directory directory, ConnectionCache connectionCache, TorInitializationTracker initializationTracker) {
this.config = config;
@@ -87,13 +90,19 @@ public class CircuitManagerImpl implements CircuitManager, DashboardRenderable {
scheduledExecutor.scheduleAtFixedRate(circuitCreationTask, 0, 1000, TimeUnit.MILLISECONDS);
}
public synchronized void stopBuildingCircuits(boolean killCircuits) {
scheduledExecutor.shutdownNow();
if(killCircuits) {
List<CircuitImpl> circuits = new ArrayList<CircuitImpl>(activeCircuits);
for(CircuitImpl c: circuits) {
c.destroyCircuit();
public void stopBuildingCircuits(boolean killCircuits) {
lock.lock();
try {
scheduledExecutor.shutdownNow();
if (killCircuits) {
List<CircuitImpl> circuits = new ArrayList<CircuitImpl>(activeCircuits);
for (CircuitImpl c : circuits) {
c.destroyCircuit();
}
}
} finally {
lock.unlock();
}
}
@@ -114,8 +123,10 @@ public class CircuitManagerImpl implements CircuitManager, DashboardRenderable {
}
}
synchronized int getActiveCircuitCount() {
return activeCircuits.size();
int getActiveCircuitCount() {
synchronized (activeCircuits) {
return activeCircuits.size();
}
}
Set<Circuit> getPendingCircuits() {
@@ -126,17 +137,29 @@ public class CircuitManagerImpl implements CircuitManager, DashboardRenderable {
});
}
synchronized int getPendingCircuitCount() {
return getPendingCircuits().size();
int getPendingCircuitCount() {
lock.lock();
try {
return getPendingCircuits().size();
} finally {
lock.unlock();
}
}
Set<Circuit> getCircuitsByFilter(CircuitFilter filter) {
final Set<Circuit> result = new HashSet<Circuit>();
final Set<CircuitImpl> circuits = new HashSet<CircuitImpl>();
synchronized (activeCircuits) {
for(CircuitImpl c: activeCircuits) {
if(filter == null || filter.filter(c)) {
result.add(c);
}
// the filter might lock additional objects, causing a deadlock, so don't
// call it inside the monitor
circuits.addAll(activeCircuits);
}
for(CircuitImpl c: circuits) {
if(filter == null || filter.filter(c)) {
result.add(c);
}
}
return result;

View File

@@ -14,6 +14,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -27,6 +28,7 @@ import com.subgraph.orchid.ConnectionHandshakeException;
import com.subgraph.orchid.ConnectionIOException;
import com.subgraph.orchid.ConnectionTimeoutException;
import com.subgraph.orchid.Router;
import com.subgraph.orchid.Threading;
import com.subgraph.orchid.Tor;
import com.subgraph.orchid.TorConfig;
import com.subgraph.orchid.TorException;
@@ -61,9 +63,11 @@ public class ConnectionImpl implements Connection, DashboardRenderable {
private boolean isConnected;
private volatile boolean isClosed;
private final Thread readCellsThread;
private final Object connectLock = new Object();
private final ReentrantLock connectLock = Threading.lock("connect");
private final ReentrantLock circuitsLock = Threading.lock("circuits");
private final ReentrantLock outputLock = Threading.lock("output");
private final AtomicLong lastActivity = new AtomicLong();
public ConnectionImpl(TorConfig config, SSLSocket socket, Router router, TorInitializationTracker tracker, boolean isDirectoryConnection) {
this.config = config;
@@ -92,13 +96,16 @@ public class ConnectionImpl implements Connection, DashboardRenderable {
}
public int bindCircuit(Circuit circuit) {
synchronized(circuitMap) {
circuitsLock.lock();
try {
while(circuitMap.containsKey(currentId))
incrementNextId();
final int id = currentId;
incrementNextId();
circuitMap.put(id, circuit);
return id;
} finally {
circuitsLock.unlock();
}
}
@@ -109,7 +116,8 @@ public class ConnectionImpl implements Connection, DashboardRenderable {
}
void connect() throws ConnectionFailedException, ConnectionTimeoutException, ConnectionHandshakeException {
synchronized (connectLock) {
connectLock.lock();
try {
if(isConnected) {
return;
}
@@ -128,6 +136,8 @@ public class ConnectionImpl implements Connection, DashboardRenderable {
throw new ConnectionFailedException(e.getMessage());
}
isConnected = true;
} finally {
connectLock.unlock();
}
}
@@ -171,7 +181,8 @@ public class ConnectionImpl implements Connection, DashboardRenderable {
throw new ConnectionIOException("Cannot send cell because connection is not connected");
}
updateLastActivity();
synchronized(output) {
outputLock.lock();
try {
try {
output.write(cell.getCellBytes());
} catch (IOException e) {
@@ -179,6 +190,8 @@ public class ConnectionImpl implements Connection, DashboardRenderable {
closeSocket();
throw new ConnectionIOException(e.getClass().getName() + " : "+ e.getMessage());
}
} finally {
outputLock.unlock();
}
}
@@ -276,32 +289,41 @@ public class ConnectionImpl implements Connection, DashboardRenderable {
}
private void processRelayCell(Cell cell) {
synchronized(circuitMap) {
circuitsLock.lock();
try {
final Circuit circuit = circuitMap.get(cell.getCircuitId());
if(circuit == null) {
logger.warning("Could not deliver relay cell for circuit id = "+ cell.getCircuitId() +" on connection "+ this +". Circuit not found");
return;
}
circuit.deliverRelayCell(cell);
} finally {
circuitsLock.unlock();
}
}
private void processControlCell(Cell cell) {
synchronized(circuitMap) {
circuitsLock.lock();
try {
final Circuit circuit = circuitMap.get(cell.getCircuitId());
if(circuit != null) {
circuit.deliverControlCell(cell);
}
} finally {
circuitsLock.unlock();
}
}
void idleCloseCheck() {
synchronized (circuitMap) {
circuitsLock.lock();
try {
final boolean needClose = (!isClosed && circuitMap.isEmpty() && getIdleMilliseconds() > CONNECTION_IDLE_TIMEOUT);
if(needClose) {
logger.fine("Closing connection to "+ this +" on idle timeout");
closeSocket();
}
}
} finally {
circuitsLock.unlock();
}
}
@@ -317,8 +339,11 @@ public class ConnectionImpl implements Connection, DashboardRenderable {
}
public void removeCircuit(Circuit circuit) {
synchronized(circuitMap) {
circuitsLock.lock();
try {
circuitMap.remove(circuit.getCircuitId());
} finally {
circuitsLock.unlock();
}
}
@@ -328,8 +353,11 @@ public class ConnectionImpl implements Connection, DashboardRenderable {
public void dashboardRender(DashboardRenderer renderer, PrintWriter writer, int flags) throws IOException {
final int circuitCount;
synchronized (circuitMap) {
circuitsLock.lock();
try {
circuitCount = circuitMap.size();
} finally {
circuitsLock.unlock();
}
if(circuitCount == 0 && (flags & DASHBOARD_CONNECTIONS_VERBOSE) == 0) {
return;