diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index 79de1a8b..3e8018ee 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -56,9 +56,8 @@ public class Peer { private final NetworkParameters params; private final AbstractBlockChain blockChain; private final AtomicReference address = new AtomicReference(); - // TODO: Make the types here explicit and remove synchronization on adders/removers. - private List eventListeners; - private List lifecycleListeners; + private final CopyOnWriteArrayList eventListeners; + private final CopyOnWriteArrayList lifecycleListeners; // Whether to try and download blocks and transactions from this peer. Set to false by PeerGroup if not the // primary peer. This is to avoid redundant work and concurrency problems with downloading the same chain // in parallel. @@ -69,13 +68,12 @@ public class Peer { // How many block messages the peer has announced to us. Peers only announce blocks that attach to their best chain // so we can use this to calculate the height of the peers chain, by adding it to the initial height in the version // message. This method can go wrong if the peer re-orgs onto a shorter (but harder) chain, however, this is rare. - private AtomicInteger blocksAnnounced = new AtomicInteger(); + private final AtomicInteger blocksAnnounced = new AtomicInteger(); // A class that tracks recent transactions that have been broadcast across the network, counts how many // peers announced them and updates the transaction confidence data. It is passed to each Peer. - // TODO: Make this final and unsynchronized. - private MemoryPool memoryPool; + private final MemoryPool memoryPool; // Each wallet added to the peer will be notified of downloaded transaction data. - private CopyOnWriteArrayList wallets; + private final CopyOnWriteArrayList wallets; // A time before which we only download block headers, after that point we download block bodies. private long fastCatchupTimeSecs; // Whether we are currently downloading headers only or block bodies. Starts at true. If the fast catchup time is @@ -92,7 +90,7 @@ public class Peer { // // It is important to avoid a nasty edge case where we can end up with parallel chain downloads proceeding // simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us. - private HashSet pendingBlockDownloads = new HashSet(); + private final HashSet pendingBlockDownloads = new HashSet(); // The lowest version number we're willing to accept. Lower than this will result in an immediate disconnect. private int minProtocolVersion = Pong.MIN_PROTOCOL_VERSION; // When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here @@ -114,7 +112,7 @@ public class Peer { private static final int PING_MOVING_AVERAGE_WINDOW = 20; private Channel channel; - private AtomicReference peerVersionMessage = new AtomicReference(); + private final AtomicReference peerVersionMessage = new AtomicReference(); private boolean isAcked; private PeerHandler handler; @@ -122,6 +120,15 @@ public class Peer { * Construct a peer that reads/writes from the given block chain. */ public Peer(NetworkParameters params, AbstractBlockChain chain, VersionMessage ver) { + this(params, chain, ver, null); + } + + /** + * Construct a peer that reads/writes from the given block chain and memory pool. Transactions stored + * in a memory pool will have their confidence levels updated when a peer announces it, to reflect the greater + * likelyhood that the transaction is valid. + */ + public Peer(NetworkParameters params, AbstractBlockChain chain, VersionMessage ver, MemoryPool mempool) { this.params = Preconditions.checkNotNull(params); this.versionMessage = Preconditions.checkNotNull(ver); this.blockChain = chain; // Allowed to be null. @@ -134,6 +141,7 @@ public class Peer { this.handler = new PeerHandler(); this.pendingPings = new CopyOnWriteArrayList(); this.wallets = new CopyOnWriteArrayList(); + this.memoryPool = mempool; } /** @@ -146,34 +154,22 @@ public class Peer { this.versionMessage.appendToSubVer(thisSoftwareName, thisSoftwareVersion, null); } - public synchronized void addEventListener(PeerEventListener listener) { + public void addEventListener(PeerEventListener listener) { eventListeners.add(listener); } - public synchronized boolean removeEventListener(PeerEventListener listener) { + public boolean removeEventListener(PeerEventListener listener) { return eventListeners.remove(listener); } - synchronized void addLifecycleListener(PeerLifecycleListener listener) { + void addLifecycleListener(PeerLifecycleListener listener) { lifecycleListeners.add(listener); } - synchronized boolean removeLifecycleListener(PeerLifecycleListener listener) { + boolean removeLifecycleListener(PeerLifecycleListener listener) { return lifecycleListeners.remove(listener); } - /** - * Tells the peer to insert received transactions/transaction announcements into the given {@link MemoryPool}. - * This is normally done for you by the {@link PeerGroup} so you don't have to think about it. Transactions stored - * in a memory pool will have their confidence levels updated when a peer announces it, to reflect the greater - * likelyhood that the transaction is valid. - * - * @param pool A new pool or null to unlink. - */ - public synchronized void setMemoryPool(MemoryPool pool) { - memoryPool = pool; - } - @Override public synchronized String toString() { PeerAddress addr = address.get(); @@ -1260,7 +1256,7 @@ public class Peer { /** *

Sets a Bloom filter on this connection. This will cause the given {@link BloomFilter} object to be sent to the - * remote peer and if either a memory pool has been set using {@link Peer#setMemoryPool(MemoryPool)} or the + * remote peer and if either a memory pool has been set using the constructor or the * downloadData property is true, a {@link MemoryPoolMessage} is sent as well to trigger downloading of any * pending transactions that may be relevant.

* diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 3c70d709..37611642 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -256,7 +256,7 @@ public class PeerGroup extends AbstractIdleService { ChannelPipeline p = Channels.pipeline(); - Peer peer = new Peer(params, chain, ver); + Peer peer = new Peer(params, chain, ver, memoryPool); peer.addLifecycleListener(startupListener); pendingPeers.add(peer); TCPNetworkConnection codec = new TCPNetworkConnection(params, peer.getVersionMessage()); @@ -726,7 +726,6 @@ public class PeerGroup extends AbstractIdleService { if (bloomFilter != null) peer.setBloomFilter(bloomFilter); } catch (IOException e) { } // That was quick...already disconnected // Link the peer to the memory pool so broadcast transactions have their confidence levels updated. - peer.setMemoryPool(memoryPool); peer.setDownloadData(false); // TODO: The peer should calculate the fast catchup time from the added wallets here. for (Wallet wallet : wallets) diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index 18debab1..5205050b 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -47,11 +47,10 @@ public class PeerTest extends TestWithNetworkConnections { public void setUp() throws Exception { super.setUp(); - VersionMessage ver = new VersionMessage(unitTestParams, 100); - peer = new Peer(unitTestParams, blockChain, ver); - peer.addWallet(wallet); memoryPool = new MemoryPool(); - peer.setMemoryPool(memoryPool); + VersionMessage ver = new VersionMessage(unitTestParams, 100); + peer = new Peer(unitTestParams, blockChain, ver, memoryPool); + peer.addWallet(wallet); handler = peer.getHandler(); event = new Capture(CaptureType.ALL); pipeline.sendDownstream(capture(event)); @@ -272,14 +271,10 @@ public class PeerTest extends TestWithNetworkConnections { control.replay(); // Check co-ordination of which peer to download via the memory pool. - MemoryPool pool = new MemoryPool(); - peer.setMemoryPool(pool); - MockNetworkConnection conn2 = createMockNetworkConnection(); VersionMessage ver = new VersionMessage(unitTestParams, 100); - Peer peer2 = new Peer(unitTestParams, blockChain, ver); + Peer peer2 = new Peer(unitTestParams, blockChain, ver, memoryPool); peer2.addWallet(wallet); - peer2.setMemoryPool(pool); connect(); connect(peer2.getHandler(), channel2, ctx2); @@ -297,7 +292,7 @@ public class PeerTest extends TestWithNetworkConnections { GetDataMessage message = (GetDataMessage)outbound(); assertEquals(1, message.getItems().size()); assertEquals(tx.getHash(), message.getItems().get(0).hash); - assertTrue(pool.maybeWasSeen(tx.getHash())); + assertTrue(memoryPool.maybeWasSeen(tx.getHash())); // Advertising to peer2 results in no getdata message. conn2.inbound(inv); diff --git a/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java b/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java index 7fce6624..4d20e313 100644 --- a/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java +++ b/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java @@ -51,7 +51,7 @@ public class TestWithPeerGroup extends TestWithNetworkConnections { VersionMessage ver = new VersionMessage(unitTestParams, 1); ChannelPipeline p = Channels.pipeline(); - Peer peer = new Peer(unitTestParams, blockChain, ver); + Peer peer = new Peer(unitTestParams, blockChain, ver, peerGroup.getMemoryPool()); peer.addLifecycleListener(peerGroup.startupListener); p.addLast("peer", peer.getHandler()); return p;