From f473267da2db627968d8f8adcbf36f2e136803b8 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Tue, 9 Jul 2013 14:47:38 +0200 Subject: [PATCH] Exception handling: provide a global variable in Threading that receives all unhandled exceptions from all framework threads. Replaces the now removed PeerEventListener.onException() callback. --- .../core/AbstractPeerEventListener.java | 4 - .../java/com/google/bitcoin/core/Message.java | 2 - .../java/com/google/bitcoin/core/Peer.java | 155 ++++++++---------- .../bitcoin/core/PeerEventListener.java | 7 - .../java/com/google/bitcoin/core/Wallet.java | 3 + .../com/google/bitcoin/utils/Threading.java | 13 ++ .../com/google/bitcoin/core/PeerTest.java | 78 ++++++++- .../google/bitcoin/examples/PrintPeers.java | 2 +- 8 files changed, 162 insertions(+), 102 deletions(-) diff --git a/core/src/main/java/com/google/bitcoin/core/AbstractPeerEventListener.java b/core/src/main/java/com/google/bitcoin/core/AbstractPeerEventListener.java index f3332ac4..86ceff72 100644 --- a/core/src/main/java/com/google/bitcoin/core/AbstractPeerEventListener.java +++ b/core/src/main/java/com/google/bitcoin/core/AbstractPeerEventListener.java @@ -52,8 +52,4 @@ public class AbstractPeerEventListener implements PeerEventListener { public List getData(Peer peer, GetDataMessage m) { return null; } - - @Override - public void onException(Throwable throwable) { - } } diff --git a/core/src/main/java/com/google/bitcoin/core/Message.java b/core/src/main/java/com/google/bitcoin/core/Message.java index cc88cfb9..025844a7 100644 --- a/core/src/main/java/com/google/bitcoin/core/Message.java +++ b/core/src/main/java/com/google/bitcoin/core/Message.java @@ -378,8 +378,6 @@ public abstract class Message implements Serializable { /** * This method is a NOP for all classes except Block and Transaction. It is only declared in Message * so BitcoinSerializer can avoid 2 instanceof checks + a casting. - * - * @return */ public Sha256Hash getHash() { return null; diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index 2f84cea1..c09606d0 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -239,11 +239,15 @@ public class Peer { String s; PeerAddress addr = vAddress; s = addr == null ? "?" : addr.toString(); - if (e.getCause() instanceof ConnectException || e.getCause() instanceof IOException) { + final Throwable cause = e.getCause(); + if (cause instanceof ConnectException || cause instanceof IOException) { // Short message for network errors - log.info(s + " - " + e.getCause().getMessage()); + log.info(s + " - " + cause.getMessage()); } else { - log.warn(s + " - ", e.getCause()); + log.warn(s + " - ", cause); + Thread.UncaughtExceptionHandler handler = Threading.uncaughtExceptionHandler; + if (handler != null) + handler.uncaughtException(Thread.currentThread(), cause); } e.getChannel().close(); @@ -261,92 +265,75 @@ public class Peer { } } - private void processMessage(MessageEvent e, Message m) throws IOException, VerificationException, ProtocolException { - try { - // Allow event listeners to filter the message stream. Listeners are allowed to drop messages by - // returning null. - for (ListenerRegistration registration : eventListeners) { - // Skip any listeners that are supposed to run in another thread as we don't want to block waiting - // for it, which might cause circular deadlock. - if (registration.executor == Threading.SAME_THREAD) { - m = registration.listener.onPreMessageReceived(this, m); - if (m == null) break; - } + private void processMessage(MessageEvent e, Message m) throws Exception { + // Allow event listeners to filter the message stream. Listeners are allowed to drop messages by + // returning null. + for (ListenerRegistration registration : eventListeners) { + // Skip any listeners that are supposed to run in another thread as we don't want to block waiting + // for it, which might cause circular deadlock. + if (registration.executor == Threading.SAME_THREAD) { + m = registration.listener.onPreMessageReceived(this, m); + if (m == null) break; } - if (m == null) return; + } + if (m == null) return; - // If we are in the middle of receiving transactions as part of a filtered block push from the remote node, - // and we receive something that's not a transaction, then we're done. - if (currentFilteredBlock != null && !(m instanceof Transaction)) { - endFilteredBlock(currentFilteredBlock); - currentFilteredBlock = null; - } + // If we are in the middle of receiving transactions as part of a filtered block push from the remote node, + // and we receive something that's not a transaction, then we're done. + if (currentFilteredBlock != null && !(m instanceof Transaction)) { + endFilteredBlock(currentFilteredBlock); + currentFilteredBlock = null; + } - if (m instanceof NotFoundMessage) { - // This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool. - // Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next. - processNotFoundMessage((NotFoundMessage) m); - } else if (m instanceof InventoryMessage) { - processInv((InventoryMessage) m); - } else if (m instanceof Block) { - processBlock((Block) m); - } else if (m instanceof FilteredBlock) { - startFilteredBlock((FilteredBlock) m); - } else if (m instanceof Transaction) { - processTransaction((Transaction) m); - } else if (m instanceof GetDataMessage) { - processGetData((GetDataMessage) m); - } else if (m instanceof AddressMessage) { - // We don't care about addresses of the network right now. But in future, - // we should save them in the wallet so we don't put too much load on the seed nodes and can - // properly explore the network. - } else if (m instanceof HeadersMessage) { - processHeaders((HeadersMessage) m); - } else if (m instanceof AlertMessage) { - processAlert((AlertMessage) m); - } else if (m instanceof VersionMessage) { - vPeerVersionMessage = (VersionMessage) m; - } else if (m instanceof VersionAck) { - if (vPeerVersionMessage == null) { - throw new ProtocolException("got a version ack before version"); - } - if (isAcked) { - throw new ProtocolException("got more than one version ack"); - } - isAcked = true; - for (PeerLifecycleListener listener : lifecycleListeners) - listener.onPeerConnected(this); - // We check min version after onPeerConnected as channel.close() will - // call onPeerDisconnected, and we should probably call onPeerConnected first. - final int version = vMinProtocolVersion; - if (vPeerVersionMessage.clientVersion < version) { - log.warn("Connected to a peer speaking protocol version {} but need {}, closing", - vPeerVersionMessage.clientVersion, version); - e.getChannel().close(); - } - } else if (m instanceof Ping) { - if (((Ping) m).hasNonce()) - sendMessage(new Pong(((Ping) m).getNonce())); - } else if (m instanceof Pong) { - processPong((Pong)m); - } else { - log.warn("Received unhandled message: {}", m); + if (m instanceof NotFoundMessage) { + // This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool. + // Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next. + processNotFoundMessage((NotFoundMessage) m); + } else if (m instanceof InventoryMessage) { + processInv((InventoryMessage) m); + } else if (m instanceof Block) { + processBlock((Block) m); + } else if (m instanceof FilteredBlock) { + startFilteredBlock((FilteredBlock) m); + } else if (m instanceof Transaction) { + processTransaction((Transaction) m); + } else if (m instanceof GetDataMessage) { + processGetData((GetDataMessage) m); + } else if (m instanceof AddressMessage) { + // We don't care about addresses of the network right now. But in future, + // we should save them in the wallet so we don't put too much load on the seed nodes and can + // properly explore the network. + } else if (m instanceof HeadersMessage) { + processHeaders((HeadersMessage) m); + } else if (m instanceof AlertMessage) { + processAlert((AlertMessage) m); + } else if (m instanceof VersionMessage) { + vPeerVersionMessage = (VersionMessage) m; + } else if (m instanceof VersionAck) { + if (vPeerVersionMessage == null) { + throw new ProtocolException("got a version ack before version"); } - } catch (final Throwable throwable) { - log.warn("Caught exception in peer thread: {}", throwable.getMessage()); - throwable.printStackTrace(); - for (final ListenerRegistration registration : eventListeners) { - try { - registration.executor.execute(new Runnable() { - @Override - public void run() { - registration.listener.onException(throwable); - } - }); - } catch (Exception e1) { - e1.printStackTrace(); - } + if (isAcked) { + throw new ProtocolException("got more than one version ack"); } + isAcked = true; + for (PeerLifecycleListener listener : lifecycleListeners) + listener.onPeerConnected(this); + // We check min version after onPeerConnected as channel.close() will + // call onPeerDisconnected, and we should probably call onPeerConnected first. + final int version = vMinProtocolVersion; + if (vPeerVersionMessage.clientVersion < version) { + log.warn("Connected to a peer speaking protocol version {} but need {}, closing", + vPeerVersionMessage.clientVersion, version); + e.getChannel().close(); + } + } else if (m instanceof Ping) { + if (((Ping) m).hasNonce()) + sendMessage(new Pong(((Ping) m).getNonce())); + } else if (m instanceof Pong) { + processPong((Pong)m); + } else { + log.warn("Received unhandled message: {}", m); } } diff --git a/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java b/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java index 8895b08d..dc854e15 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java @@ -78,11 +78,4 @@ public interface PeerEventListener { * items as possible which appear in the {@link GetDataMessage}, or null if you're not interested in responding. */ public List getData(Peer peer, GetDataMessage m); - - /** - * Called if there is an exception thrown in a Netty worker thread whilst processing an inbound message. You - * can use this to report crashes of the peer threads back to your apps website, for instance. After this callback - * runs the peer will be disconnected. Any exceptions thrown by this method will be logged and ignored. - */ - public void onException(Throwable throwable); } diff --git a/core/src/main/java/com/google/bitcoin/core/Wallet.java b/core/src/main/java/com/google/bitcoin/core/Wallet.java index f02feadb..1e2bb0aa 100644 --- a/core/src/main/java/com/google/bitcoin/core/Wallet.java +++ b/core/src/main/java/com/google/bitcoin/core/Wallet.java @@ -461,6 +461,9 @@ public class Wallet implements Serializable, BlockChainListener { setDaemon(true); setName("Wallet auto save thread"); setPriority(Thread.MIN_PRIORITY); // Avoid competing with the UI. + Thread.UncaughtExceptionHandler handler = Threading.uncaughtExceptionHandler; + if (handler != null) + setUncaughtExceptionHandler(handler); } /** Returns the global instance that services all wallets. It never shuts down. */ diff --git a/core/src/main/java/com/google/bitcoin/utils/Threading.java b/core/src/main/java/com/google/bitcoin/utils/Threading.java index bae0b6c7..753d1fde 100644 --- a/core/src/main/java/com/google/bitcoin/utils/Threading.java +++ b/core/src/main/java/com/google/bitcoin/utils/Threading.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.CycleDetectingLockFactory; import com.google.common.util.concurrent.Futures; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.lang.ref.WeakReference; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; @@ -67,6 +68,17 @@ public class Threading { Futures.getUnchecked(USER_THREAD.submit(Callables.returning(null))); } + /** + * An exception handler that will be invoked for any exceptions that occur in the user thread, and + * any unhandled exceptions that are caught whilst the framework is processing network traffic or doing other + * background tasks. The purpose of this is to allow you to report back unanticipated crashes from your users + * to a central collection center for analysis and debugging. You should configure this before any + * bitcoinj library code is run, setting it after you started network traffic and other forms of processing + * may result in the change not taking effect. + */ + @Nullable + public static volatile Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// static { @@ -80,6 +92,7 @@ public class Threading { Thread t = new Thread(runnable); t.setName("bitcoinj user thread"); t.setDaemon(true); + t.setUncaughtExceptionHandler(uncaughtExceptionHandler); userThread = new WeakReference(t); return t; } diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index bb28832a..3dd4c678 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -17,20 +17,30 @@ package com.google.bitcoin.core; import com.google.bitcoin.core.Peer.PeerHandler; +import com.google.bitcoin.params.TestNet3Params; import com.google.bitcoin.utils.Threading; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.easymock.Capture; import org.easymock.CaptureType; import org.jboss.netty.channel.*; import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.math.BigInteger; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import static com.google.bitcoin.core.TestUtils.*; @@ -782,12 +792,21 @@ public class PeerTest extends TestWithNetworkConnections { } }); final Throwable[] throwables = new Throwable[1]; - peer.addEventListener(new AbstractPeerEventListener() { + Threading.uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() { @Override - public void onException(Throwable throwable) { + public void uncaughtException(Thread thread, Throwable throwable) { throwables[0] = throwable; } - }, Threading.SAME_THREAD); + }; + // In real usage we're not really meant to adjust the uncaught exception handler after stuff started happening + // but in the unit test environment other tests have just run so the thread is probably still kicking around. + // Force it to crash so it'll be recreated with our new handler. + Threading.USER_THREAD.execute(new Runnable() { + @Override + public void run() { + throw new RuntimeException(); + } + }); control.replay(); connect(); Transaction t1 = new Transaction(unitTestParams); @@ -797,8 +816,59 @@ public class PeerTest extends TestWithNetworkConnections { t2.addInput(t1.getOutput(0)); t2.addOutput(Utils.toNanoCoins(1, 0), wallet.getChangeAddress()); inbound(peer, t2); - inbound(peer, new NotFoundMessage(unitTestParams, Lists.newArrayList(new InventoryItem(InventoryItem.Type.Transaction, t2.getInput(0).getHash())))); + final InventoryItem inventoryItem = new InventoryItem(InventoryItem.Type.Transaction, t2.getInput(0).getOutpoint().getHash()); + final NotFoundMessage nfm = new NotFoundMessage(unitTestParams, Lists.newArrayList(inventoryItem)); + inbound(peer, nfm); + Threading.waitForUserCode(); assertTrue(throwables[0] instanceof NullPointerException); + Threading.uncaughtExceptionHandler = null; + } + + @Test + public void badMessage() throws Exception { + // Bring up an actual network connection and feed it bogus data. + final SettableFuture result = SettableFuture.create(); + Threading.uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + result.setException(throwable); + } + }; + ServerSocket server = new ServerSocket(0); + final NetworkParameters params = TestNet3Params.testNet(); + Peer peer = new Peer(params, blockChain, "test", "1.0"); + ListenableFuture future = TCPNetworkConnection.connectTo(TestNet3Params.get(), + new InetSocketAddress(InetAddress.getLocalHost(), server.getLocalPort()), 5000, peer); + Socket socket = server.accept(); + // Write out a verack+version. + BitcoinSerializer serializer = new BitcoinSerializer(params); + final VersionMessage ver = new VersionMessage(params, 1000); + ver.localServices = VersionMessage.NODE_NETWORK; + serializer.serialize(ver, socket.getOutputStream()); + serializer.serialize(new VersionAck(), socket.getOutputStream()); + // Now write some bogus truncated message. + serializer.serialize("inv", new InventoryMessage(params) { + @Override + public void bitcoinSerializeToStream(OutputStream stream) throws IOException { + // Add some hashes. + addItem(new InventoryItem(InventoryItem.Type.Transaction, Sha256Hash.create(new byte[] { 1 }))); + addItem(new InventoryItem(InventoryItem.Type.Transaction, Sha256Hash.create(new byte[] { 2 }))); + addItem(new InventoryItem(InventoryItem.Type.Transaction, Sha256Hash.create(new byte[] { 3 }))); + + // Write out a copy that's truncated in the middle. + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + super.bitcoinSerializeToStream(bos); + byte[] bits = bos.toByteArray(); + bits = Arrays.copyOf(bits, bits.length / 2); + stream.write(bits); + } + }.bitcoinSerialize(), socket.getOutputStream()); + try { + result.get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof ProtocolException); + } } // TODO: Use generics here to avoid unnecessary casting. diff --git a/examples/src/main/java/com/google/bitcoin/examples/PrintPeers.java b/examples/src/main/java/com/google/bitcoin/examples/PrintPeers.java index ec14d3d4..470257d4 100644 --- a/examples/src/main/java/com/google/bitcoin/examples/PrintPeers.java +++ b/examples/src/main/java/com/google/bitcoin/examples/PrintPeers.java @@ -77,7 +77,7 @@ public class PrintPeers { List> futures = Lists.newArrayList(); for (final InetAddress addr : addrs) { final ListenableFuture future = - TCPNetworkConnection.connectTo(params, new InetSocketAddress(addr, params.getPort()), 1000 /* timeout */); + TCPNetworkConnection.connectTo(params, new InetSocketAddress(addr, params.getPort()), 1000 /* timeout */, null); futures.add(future); // Once the connection has completed version handshaking ... Futures.addCallback(future, new FutureCallback() {