diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 341f654b..4c2447d9 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -20,6 +20,7 @@ package com.google.bitcoin.core; import com.google.bitcoin.core.Peer.PeerHandler; import com.google.bitcoin.discovery.PeerDiscovery; import com.google.bitcoin.discovery.PeerDiscoveryException; +import com.google.bitcoin.utils.ListenerRegistration; import com.google.bitcoin.utils.Threading; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; @@ -87,7 +88,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca // Callback for events related to chain download @GuardedBy("lock") private PeerEventListener downloadListener; // Callbacks for events related to peer connection/disconnection - private final CopyOnWriteArrayList peerEventListeners; + private final CopyOnWriteArrayList> peerEventListeners; // Peer discovery sources, will be polled occasionally if there aren't enough inactives. private CopyOnWriteArraySet peerDiscoverers; // The version message to use for new connections. @@ -228,7 +229,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca pendingPeers = new ArrayList(); channels = new DefaultChannelGroup(); peerDiscoverers = new CopyOnWriteArraySet(); - peerEventListeners = new CopyOnWriteArrayList(); + peerEventListeners = new CopyOnWriteArrayList>(); } /** @@ -419,7 +420,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca /** - *

Adds a listener that will be notified on a library controlled thread when:

+ *

Adds a listener that will be notified on the given executor when:

*
    *
  1. New peers are connected to.
  2. *
  3. Peers are disconnected from.
  4. @@ -428,16 +429,22 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca *
  5. Blocks are downloaded by the download peer.
  6. * *
- *

The listener will be locked during callback execution, which in turn will cause network message processing - * to stop until the listener returns.

+ */ + public void addEventListener(PeerEventListener listener, Executor executor) { + peerEventListeners.add(new ListenerRegistration(checkNotNull(listener), executor)); + } + + /** + * Same as {@link PeerGroup#addEventListener(PeerEventListener, java.util.concurrent.Executor)} but defaults + * to running on the user thread. */ public void addEventListener(PeerEventListener listener) { - peerEventListeners.add(checkNotNull(listener)); + addEventListener(listener, Threading.userCode); } /** The given event listener will no longer be called with events. */ public boolean removeEventListener(PeerEventListener listener) { - return peerEventListeners.remove(checkNotNull(listener)); + return ListenerRegistration.removeFromList(listener, peerEventListeners); } /** @@ -816,15 +823,23 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca // TODO: Find a way to balance the desire to propagate useful transactions against DoS attacks. announcePendingWalletTransactions(wallets, Collections.singletonList(peer)); // And set up event listeners for clients. This will allow them to find out about new transactions and blocks. - for (PeerEventListener listener : peerEventListeners) { - peer.addEventListener(listener); + for (ListenerRegistration registration : peerEventListeners) { + peer.addEventListener(registration.listener); } setupPingingForNewPeer(peer); } finally { lock.unlock(); } - for (PeerEventListener listener : peerEventListeners) - listener.onPeerConnected(peer, newSize); + + final int fNewSize = newSize; + for (final ListenerRegistration registration : peerEventListeners) { + registration.executor.execute(new Runnable() { + @Override + public void run() { + registration.listener.onPeerConnected(peer, fNewSize); + } + }); + } } private void setupPingingForNewPeer(final Peer peer) { @@ -1010,9 +1025,16 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca for (Wallet wallet : wallets) { peer.removeWallet(wallet); } - for (PeerEventListener listener : peerEventListeners) { - listener.onPeerDisconnected(peer, numConnectedPeers); - peer.removeEventListener(listener); + + final int fNumConnectedPeers = numConnectedPeers; + for (final ListenerRegistration registration : peerEventListeners) { + registration.executor.execute(new Runnable() { + @Override + public void run() { + registration.listener.onPeerDisconnected(peer, fNumConnectedPeers); + } + }); + peer.removeEventListener(registration.listener); } } diff --git a/core/src/main/java/com/google/bitcoin/core/Wallet.java b/core/src/main/java/com/google/bitcoin/core/Wallet.java index 9e8364f0..1ca54cd5 100644 --- a/core/src/main/java/com/google/bitcoin/core/Wallet.java +++ b/core/src/main/java/com/google/bitcoin/core/Wallet.java @@ -1367,7 +1367,7 @@ public class Wallet implements Serializable, BlockChainListener { * like receiving money. The listener is executed by the given executor. */ public void addEventListener(WalletEventListener listener, Executor executor) { - eventListeners.add(new ListenerRegistration(listener, executor)); + eventListeners.add(new ListenerRegistration(listener, executor)); } /** @@ -1375,7 +1375,7 @@ public class Wallet implements Serializable, BlockChainListener { * was never added. */ public boolean removeEventListener(WalletEventListener listener) { - return eventListeners.remove(listener); + return ListenerRegistration.removeFromList(listener, eventListeners); } /** diff --git a/core/src/main/java/com/google/bitcoin/utils/ListenerRegistration.java b/core/src/main/java/com/google/bitcoin/utils/ListenerRegistration.java index fe99b784..f56c9ff0 100644 --- a/core/src/main/java/com/google/bitcoin/utils/ListenerRegistration.java +++ b/core/src/main/java/com/google/bitcoin/utils/ListenerRegistration.java @@ -1,11 +1,26 @@ +/** + * Copyright 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.bitcoin.utils; -import com.google.bitcoin.core.WalletEventListener; - +import java.util.List; import java.util.concurrent.Executor; /** -* A simple wrapper around a listener and an executor. +* A simple wrapper around a listener and an executor, with some utility methods. */ public class ListenerRegistration { public T listener; @@ -15,4 +30,20 @@ public class ListenerRegistration { this.listener = listener; this.executor = executor; } + + public static boolean removeFromList(T listener, List> list) { + ListenerRegistration item = null; + for (ListenerRegistration registration : list) { + if (registration.listener == listener) { + item = registration; + break; + } + } + if (item != null) { + list.remove(item); + return true; + } else { + return false; + } + } }