3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 14:54:15 +00:00

Exception handling: provide a global variable in Threading that receives all unhandled exceptions from all framework threads.

Replaces the now removed PeerEventListener.onException() callback.
This commit is contained in:
Mike Hearn 2013-07-09 14:47:38 +02:00
parent ea19d3164a
commit f473267da2
8 changed files with 162 additions and 102 deletions

View File

@ -52,8 +52,4 @@ public class AbstractPeerEventListener implements PeerEventListener {
public List<Message> getData(Peer peer, GetDataMessage m) {
return null;
}
@Override
public void onException(Throwable throwable) {
}
}

View File

@ -378,8 +378,6 @@ public abstract class Message implements Serializable {
/**
* This method is a NOP for all classes except Block and Transaction. It is only declared in Message
* so BitcoinSerializer can avoid 2 instanceof checks + a casting.
*
* @return
*/
public Sha256Hash getHash() {
return null;

View File

@ -239,11 +239,15 @@ public class Peer {
String s;
PeerAddress addr = vAddress;
s = addr == null ? "?" : addr.toString();
if (e.getCause() instanceof ConnectException || e.getCause() instanceof IOException) {
final Throwable cause = e.getCause();
if (cause instanceof ConnectException || cause instanceof IOException) {
// Short message for network errors
log.info(s + " - " + e.getCause().getMessage());
log.info(s + " - " + cause.getMessage());
} else {
log.warn(s + " - ", e.getCause());
log.warn(s + " - ", cause);
Thread.UncaughtExceptionHandler handler = Threading.uncaughtExceptionHandler;
if (handler != null)
handler.uncaughtException(Thread.currentThread(), cause);
}
e.getChannel().close();
@ -261,92 +265,75 @@ public class Peer {
}
}
private void processMessage(MessageEvent e, Message m) throws IOException, VerificationException, ProtocolException {
try {
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (ListenerRegistration<PeerEventListener> registration : eventListeners) {
// Skip any listeners that are supposed to run in another thread as we don't want to block waiting
// for it, which might cause circular deadlock.
if (registration.executor == Threading.SAME_THREAD) {
m = registration.listener.onPreMessageReceived(this, m);
if (m == null) break;
}
private void processMessage(MessageEvent e, Message m) throws Exception {
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (ListenerRegistration<PeerEventListener> registration : eventListeners) {
// Skip any listeners that are supposed to run in another thread as we don't want to block waiting
// for it, which might cause circular deadlock.
if (registration.executor == Threading.SAME_THREAD) {
m = registration.listener.onPreMessageReceived(this, m);
if (m == null) break;
}
if (m == null) return;
}
if (m == null) return;
// If we are in the middle of receiving transactions as part of a filtered block push from the remote node,
// and we receive something that's not a transaction, then we're done.
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
endFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
}
// If we are in the middle of receiving transactions as part of a filtered block push from the remote node,
// and we receive something that's not a transaction, then we're done.
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
endFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
}
if (m instanceof NotFoundMessage) {
// This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool.
// Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next.
processNotFoundMessage((NotFoundMessage) m);
} else if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m);
} else if (m instanceof Block) {
processBlock((Block) m);
} else if (m instanceof FilteredBlock) {
startFilteredBlock((FilteredBlock) m);
} else if (m instanceof Transaction) {
processTransaction((Transaction) m);
} else if (m instanceof GetDataMessage) {
processGetData((GetDataMessage) m);
} else if (m instanceof AddressMessage) {
// We don't care about addresses of the network right now. But in future,
// we should save them in the wallet so we don't put too much load on the seed nodes and can
// properly explore the network.
} else if (m instanceof HeadersMessage) {
processHeaders((HeadersMessage) m);
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage) m);
} else if (m instanceof VersionMessage) {
vPeerVersionMessage = (VersionMessage) m;
} else if (m instanceof VersionAck) {
if (vPeerVersionMessage == null) {
throw new ProtocolException("got a version ack before version");
}
if (isAcked) {
throw new ProtocolException("got more than one version ack");
}
isAcked = true;
for (PeerLifecycleListener listener : lifecycleListeners)
listener.onPeerConnected(this);
// We check min version after onPeerConnected as channel.close() will
// call onPeerDisconnected, and we should probably call onPeerConnected first.
final int version = vMinProtocolVersion;
if (vPeerVersionMessage.clientVersion < version) {
log.warn("Connected to a peer speaking protocol version {} but need {}, closing",
vPeerVersionMessage.clientVersion, version);
e.getChannel().close();
}
} else if (m instanceof Ping) {
if (((Ping) m).hasNonce())
sendMessage(new Pong(((Ping) m).getNonce()));
} else if (m instanceof Pong) {
processPong((Pong)m);
} else {
log.warn("Received unhandled message: {}", m);
if (m instanceof NotFoundMessage) {
// This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool.
// Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next.
processNotFoundMessage((NotFoundMessage) m);
} else if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m);
} else if (m instanceof Block) {
processBlock((Block) m);
} else if (m instanceof FilteredBlock) {
startFilteredBlock((FilteredBlock) m);
} else if (m instanceof Transaction) {
processTransaction((Transaction) m);
} else if (m instanceof GetDataMessage) {
processGetData((GetDataMessage) m);
} else if (m instanceof AddressMessage) {
// We don't care about addresses of the network right now. But in future,
// we should save them in the wallet so we don't put too much load on the seed nodes and can
// properly explore the network.
} else if (m instanceof HeadersMessage) {
processHeaders((HeadersMessage) m);
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage) m);
} else if (m instanceof VersionMessage) {
vPeerVersionMessage = (VersionMessage) m;
} else if (m instanceof VersionAck) {
if (vPeerVersionMessage == null) {
throw new ProtocolException("got a version ack before version");
}
} catch (final Throwable throwable) {
log.warn("Caught exception in peer thread: {}", throwable.getMessage());
throwable.printStackTrace();
for (final ListenerRegistration<PeerEventListener> registration : eventListeners) {
try {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onException(throwable);
}
});
} catch (Exception e1) {
e1.printStackTrace();
}
if (isAcked) {
throw new ProtocolException("got more than one version ack");
}
isAcked = true;
for (PeerLifecycleListener listener : lifecycleListeners)
listener.onPeerConnected(this);
// We check min version after onPeerConnected as channel.close() will
// call onPeerDisconnected, and we should probably call onPeerConnected first.
final int version = vMinProtocolVersion;
if (vPeerVersionMessage.clientVersion < version) {
log.warn("Connected to a peer speaking protocol version {} but need {}, closing",
vPeerVersionMessage.clientVersion, version);
e.getChannel().close();
}
} else if (m instanceof Ping) {
if (((Ping) m).hasNonce())
sendMessage(new Pong(((Ping) m).getNonce()));
} else if (m instanceof Pong) {
processPong((Pong)m);
} else {
log.warn("Received unhandled message: {}", m);
}
}

View File

@ -78,11 +78,4 @@ public interface PeerEventListener {
* items as possible which appear in the {@link GetDataMessage}, or null if you're not interested in responding.
*/
public List<Message> getData(Peer peer, GetDataMessage m);
/**
* Called if there is an exception thrown in a Netty worker thread whilst processing an inbound message. You
* can use this to report crashes of the peer threads back to your apps website, for instance. After this callback
* runs the peer will be disconnected. Any exceptions thrown by this method will be logged and ignored.
*/
public void onException(Throwable throwable);
}

View File

@ -461,6 +461,9 @@ public class Wallet implements Serializable, BlockChainListener {
setDaemon(true);
setName("Wallet auto save thread");
setPriority(Thread.MIN_PRIORITY); // Avoid competing with the UI.
Thread.UncaughtExceptionHandler handler = Threading.uncaughtExceptionHandler;
if (handler != null)
setUncaughtExceptionHandler(handler);
}
/** Returns the global instance that services all wallets. It never shuts down. */

View File

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.CycleDetectingLockFactory;
import com.google.common.util.concurrent.Futures;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.lang.ref.WeakReference;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
@ -67,6 +68,17 @@ public class Threading {
Futures.getUnchecked(USER_THREAD.submit(Callables.returning(null)));
}
/**
* An exception handler that will be invoked for any exceptions that occur in the user thread, and
* any unhandled exceptions that are caught whilst the framework is processing network traffic or doing other
* background tasks. The purpose of this is to allow you to report back unanticipated crashes from your users
* to a central collection center for analysis and debugging. You should configure this <b>before</b> any
* bitcoinj library code is run, setting it after you started network traffic and other forms of processing
* may result in the change not taking effect.
*/
@Nullable
public static volatile Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
static {
@ -80,6 +92,7 @@ public class Threading {
Thread t = new Thread(runnable);
t.setName("bitcoinj user thread");
t.setDaemon(true);
t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
userThread = new WeakReference<Thread>(t);
return t;
}

View File

@ -17,20 +17,30 @@
package com.google.bitcoin.core;
import com.google.bitcoin.core.Peer.PeerHandler;
import com.google.bitcoin.params.TestNet3Params;
import com.google.bitcoin.utils.Threading;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.jboss.netty.channel.*;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static com.google.bitcoin.core.TestUtils.*;
@ -782,12 +792,21 @@ public class PeerTest extends TestWithNetworkConnections {
}
});
final Throwable[] throwables = new Throwable[1];
peer.addEventListener(new AbstractPeerEventListener() {
Threading.uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void onException(Throwable throwable) {
public void uncaughtException(Thread thread, Throwable throwable) {
throwables[0] = throwable;
}
}, Threading.SAME_THREAD);
};
// In real usage we're not really meant to adjust the uncaught exception handler after stuff started happening
// but in the unit test environment other tests have just run so the thread is probably still kicking around.
// Force it to crash so it'll be recreated with our new handler.
Threading.USER_THREAD.execute(new Runnable() {
@Override
public void run() {
throw new RuntimeException();
}
});
control.replay();
connect();
Transaction t1 = new Transaction(unitTestParams);
@ -797,8 +816,59 @@ public class PeerTest extends TestWithNetworkConnections {
t2.addInput(t1.getOutput(0));
t2.addOutput(Utils.toNanoCoins(1, 0), wallet.getChangeAddress());
inbound(peer, t2);
inbound(peer, new NotFoundMessage(unitTestParams, Lists.newArrayList(new InventoryItem(InventoryItem.Type.Transaction, t2.getInput(0).getHash()))));
final InventoryItem inventoryItem = new InventoryItem(InventoryItem.Type.Transaction, t2.getInput(0).getOutpoint().getHash());
final NotFoundMessage nfm = new NotFoundMessage(unitTestParams, Lists.newArrayList(inventoryItem));
inbound(peer, nfm);
Threading.waitForUserCode();
assertTrue(throwables[0] instanceof NullPointerException);
Threading.uncaughtExceptionHandler = null;
}
@Test
public void badMessage() throws Exception {
// Bring up an actual network connection and feed it bogus data.
final SettableFuture<Void> result = SettableFuture.create();
Threading.uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread thread, Throwable throwable) {
result.setException(throwable);
}
};
ServerSocket server = new ServerSocket(0);
final NetworkParameters params = TestNet3Params.testNet();
Peer peer = new Peer(params, blockChain, "test", "1.0");
ListenableFuture<TCPNetworkConnection> future = TCPNetworkConnection.connectTo(TestNet3Params.get(),
new InetSocketAddress(InetAddress.getLocalHost(), server.getLocalPort()), 5000, peer);
Socket socket = server.accept();
// Write out a verack+version.
BitcoinSerializer serializer = new BitcoinSerializer(params);
final VersionMessage ver = new VersionMessage(params, 1000);
ver.localServices = VersionMessage.NODE_NETWORK;
serializer.serialize(ver, socket.getOutputStream());
serializer.serialize(new VersionAck(), socket.getOutputStream());
// Now write some bogus truncated message.
serializer.serialize("inv", new InventoryMessage(params) {
@Override
public void bitcoinSerializeToStream(OutputStream stream) throws IOException {
// Add some hashes.
addItem(new InventoryItem(InventoryItem.Type.Transaction, Sha256Hash.create(new byte[] { 1 })));
addItem(new InventoryItem(InventoryItem.Type.Transaction, Sha256Hash.create(new byte[] { 2 })));
addItem(new InventoryItem(InventoryItem.Type.Transaction, Sha256Hash.create(new byte[] { 3 })));
// Write out a copy that's truncated in the middle.
ByteArrayOutputStream bos = new ByteArrayOutputStream();
super.bitcoinSerializeToStream(bos);
byte[] bits = bos.toByteArray();
bits = Arrays.copyOf(bits, bits.length / 2);
stream.write(bits);
}
}.bitcoinSerialize(), socket.getOutputStream());
try {
result.get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof ProtocolException);
}
}
// TODO: Use generics here to avoid unnecessary casting.

View File

@ -77,7 +77,7 @@ public class PrintPeers {
List<ListenableFuture<TCPNetworkConnection>> futures = Lists.newArrayList();
for (final InetAddress addr : addrs) {
final ListenableFuture<TCPNetworkConnection> future =
TCPNetworkConnection.connectTo(params, new InetSocketAddress(addr, params.getPort()), 1000 /* timeout */);
TCPNetworkConnection.connectTo(params, new InetSocketAddress(addr, params.getPort()), 1000 /* timeout */, null);
futures.add(future);
// Once the connection has completed version handshaking ...
Futures.addCallback(future, new FutureCallback<TCPNetworkConnection>() {