3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-01-31 23:32:16 +00:00

Make Peer.memoryPool final and introduce a new c'tor for it.

This commit is contained in:
Mike Hearn 2013-03-06 15:52:54 +01:00
parent 83308c6ee1
commit 94670f3df0
4 changed files with 29 additions and 39 deletions

View File

@ -56,9 +56,8 @@ public class Peer {
private final NetworkParameters params; private final NetworkParameters params;
private final AbstractBlockChain blockChain; private final AbstractBlockChain blockChain;
private final AtomicReference<PeerAddress> address = new AtomicReference<PeerAddress>(); private final AtomicReference<PeerAddress> address = new AtomicReference<PeerAddress>();
// TODO: Make the types here explicit and remove synchronization on adders/removers. private final CopyOnWriteArrayList<PeerEventListener> eventListeners;
private List<PeerEventListener> eventListeners; private final CopyOnWriteArrayList<PeerLifecycleListener> lifecycleListeners;
private List<PeerLifecycleListener> lifecycleListeners;
// Whether to try and download blocks and transactions from this peer. Set to false by PeerGroup if not the // Whether to try and download blocks and transactions from this peer. Set to false by PeerGroup if not the
// primary peer. This is to avoid redundant work and concurrency problems with downloading the same chain // primary peer. This is to avoid redundant work and concurrency problems with downloading the same chain
// in parallel. // in parallel.
@ -69,13 +68,12 @@ public class Peer {
// How many block messages the peer has announced to us. Peers only announce blocks that attach to their best chain // How many block messages the peer has announced to us. Peers only announce blocks that attach to their best chain
// so we can use this to calculate the height of the peers chain, by adding it to the initial height in the version // so we can use this to calculate the height of the peers chain, by adding it to the initial height in the version
// message. This method can go wrong if the peer re-orgs onto a shorter (but harder) chain, however, this is rare. // message. This method can go wrong if the peer re-orgs onto a shorter (but harder) chain, however, this is rare.
private AtomicInteger blocksAnnounced = new AtomicInteger(); private final AtomicInteger blocksAnnounced = new AtomicInteger();
// A class that tracks recent transactions that have been broadcast across the network, counts how many // A class that tracks recent transactions that have been broadcast across the network, counts how many
// peers announced them and updates the transaction confidence data. It is passed to each Peer. // peers announced them and updates the transaction confidence data. It is passed to each Peer.
// TODO: Make this final and unsynchronized. private final MemoryPool memoryPool;
private MemoryPool memoryPool;
// Each wallet added to the peer will be notified of downloaded transaction data. // Each wallet added to the peer will be notified of downloaded transaction data.
private CopyOnWriteArrayList<Wallet> wallets; private final CopyOnWriteArrayList<Wallet> wallets;
// A time before which we only download block headers, after that point we download block bodies. // A time before which we only download block headers, after that point we download block bodies.
private long fastCatchupTimeSecs; private long fastCatchupTimeSecs;
// Whether we are currently downloading headers only or block bodies. Starts at true. If the fast catchup time is // Whether we are currently downloading headers only or block bodies. Starts at true. If the fast catchup time is
@ -92,7 +90,7 @@ public class Peer {
// //
// It is important to avoid a nasty edge case where we can end up with parallel chain downloads proceeding // It is important to avoid a nasty edge case where we can end up with parallel chain downloads proceeding
// simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us. // simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us.
private HashSet<Sha256Hash> pendingBlockDownloads = new HashSet<Sha256Hash>(); private final HashSet<Sha256Hash> pendingBlockDownloads = new HashSet<Sha256Hash>();
// The lowest version number we're willing to accept. Lower than this will result in an immediate disconnect. // The lowest version number we're willing to accept. Lower than this will result in an immediate disconnect.
private int minProtocolVersion = Pong.MIN_PROTOCOL_VERSION; private int minProtocolVersion = Pong.MIN_PROTOCOL_VERSION;
// When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here // When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here
@ -114,7 +112,7 @@ public class Peer {
private static final int PING_MOVING_AVERAGE_WINDOW = 20; private static final int PING_MOVING_AVERAGE_WINDOW = 20;
private Channel channel; private Channel channel;
private AtomicReference<VersionMessage> peerVersionMessage = new AtomicReference<VersionMessage>(); private final AtomicReference<VersionMessage> peerVersionMessage = new AtomicReference<VersionMessage>();
private boolean isAcked; private boolean isAcked;
private PeerHandler handler; private PeerHandler handler;
@ -122,6 +120,15 @@ public class Peer {
* Construct a peer that reads/writes from the given block chain. * Construct a peer that reads/writes from the given block chain.
*/ */
public Peer(NetworkParameters params, AbstractBlockChain chain, VersionMessage ver) { public Peer(NetworkParameters params, AbstractBlockChain chain, VersionMessage ver) {
this(params, chain, ver, null);
}
/**
* Construct a peer that reads/writes from the given block chain and memory pool. Transactions stored
* in a memory pool will have their confidence levels updated when a peer announces it, to reflect the greater
* likelyhood that the transaction is valid.
*/
public Peer(NetworkParameters params, AbstractBlockChain chain, VersionMessage ver, MemoryPool mempool) {
this.params = Preconditions.checkNotNull(params); this.params = Preconditions.checkNotNull(params);
this.versionMessage = Preconditions.checkNotNull(ver); this.versionMessage = Preconditions.checkNotNull(ver);
this.blockChain = chain; // Allowed to be null. this.blockChain = chain; // Allowed to be null.
@ -134,6 +141,7 @@ public class Peer {
this.handler = new PeerHandler(); this.handler = new PeerHandler();
this.pendingPings = new CopyOnWriteArrayList<PendingPing>(); this.pendingPings = new CopyOnWriteArrayList<PendingPing>();
this.wallets = new CopyOnWriteArrayList<Wallet>(); this.wallets = new CopyOnWriteArrayList<Wallet>();
this.memoryPool = mempool;
} }
/** /**
@ -146,34 +154,22 @@ public class Peer {
this.versionMessage.appendToSubVer(thisSoftwareName, thisSoftwareVersion, null); this.versionMessage.appendToSubVer(thisSoftwareName, thisSoftwareVersion, null);
} }
public synchronized void addEventListener(PeerEventListener listener) { public void addEventListener(PeerEventListener listener) {
eventListeners.add(listener); eventListeners.add(listener);
} }
public synchronized boolean removeEventListener(PeerEventListener listener) { public boolean removeEventListener(PeerEventListener listener) {
return eventListeners.remove(listener); return eventListeners.remove(listener);
} }
synchronized void addLifecycleListener(PeerLifecycleListener listener) { void addLifecycleListener(PeerLifecycleListener listener) {
lifecycleListeners.add(listener); lifecycleListeners.add(listener);
} }
synchronized boolean removeLifecycleListener(PeerLifecycleListener listener) { boolean removeLifecycleListener(PeerLifecycleListener listener) {
return lifecycleListeners.remove(listener); return lifecycleListeners.remove(listener);
} }
/**
* Tells the peer to insert received transactions/transaction announcements into the given {@link MemoryPool}.
* This is normally done for you by the {@link PeerGroup} so you don't have to think about it. Transactions stored
* in a memory pool will have their confidence levels updated when a peer announces it, to reflect the greater
* likelyhood that the transaction is valid.
*
* @param pool A new pool or null to unlink.
*/
public synchronized void setMemoryPool(MemoryPool pool) {
memoryPool = pool;
}
@Override @Override
public synchronized String toString() { public synchronized String toString() {
PeerAddress addr = address.get(); PeerAddress addr = address.get();
@ -1260,7 +1256,7 @@ public class Peer {
/** /**
* <p>Sets a Bloom filter on this connection. This will cause the given {@link BloomFilter} object to be sent to the * <p>Sets a Bloom filter on this connection. This will cause the given {@link BloomFilter} object to be sent to the
* remote peer and if either a memory pool has been set using {@link Peer#setMemoryPool(MemoryPool)} or the * remote peer and if either a memory pool has been set using the constructor or the
* downloadData property is true, a {@link MemoryPoolMessage} is sent as well to trigger downloading of any * downloadData property is true, a {@link MemoryPoolMessage} is sent as well to trigger downloading of any
* pending transactions that may be relevant.</p> * pending transactions that may be relevant.</p>
* *

View File

@ -256,7 +256,7 @@ public class PeerGroup extends AbstractIdleService {
ChannelPipeline p = Channels.pipeline(); ChannelPipeline p = Channels.pipeline();
Peer peer = new Peer(params, chain, ver); Peer peer = new Peer(params, chain, ver, memoryPool);
peer.addLifecycleListener(startupListener); peer.addLifecycleListener(startupListener);
pendingPeers.add(peer); pendingPeers.add(peer);
TCPNetworkConnection codec = new TCPNetworkConnection(params, peer.getVersionMessage()); TCPNetworkConnection codec = new TCPNetworkConnection(params, peer.getVersionMessage());
@ -726,7 +726,6 @@ public class PeerGroup extends AbstractIdleService {
if (bloomFilter != null) peer.setBloomFilter(bloomFilter); if (bloomFilter != null) peer.setBloomFilter(bloomFilter);
} catch (IOException e) { } // That was quick...already disconnected } catch (IOException e) { } // That was quick...already disconnected
// Link the peer to the memory pool so broadcast transactions have their confidence levels updated. // Link the peer to the memory pool so broadcast transactions have their confidence levels updated.
peer.setMemoryPool(memoryPool);
peer.setDownloadData(false); peer.setDownloadData(false);
// TODO: The peer should calculate the fast catchup time from the added wallets here. // TODO: The peer should calculate the fast catchup time from the added wallets here.
for (Wallet wallet : wallets) for (Wallet wallet : wallets)

View File

@ -47,11 +47,10 @@ public class PeerTest extends TestWithNetworkConnections {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
VersionMessage ver = new VersionMessage(unitTestParams, 100);
peer = new Peer(unitTestParams, blockChain, ver);
peer.addWallet(wallet);
memoryPool = new MemoryPool(); memoryPool = new MemoryPool();
peer.setMemoryPool(memoryPool); VersionMessage ver = new VersionMessage(unitTestParams, 100);
peer = new Peer(unitTestParams, blockChain, ver, memoryPool);
peer.addWallet(wallet);
handler = peer.getHandler(); handler = peer.getHandler();
event = new Capture<DownstreamMessageEvent>(CaptureType.ALL); event = new Capture<DownstreamMessageEvent>(CaptureType.ALL);
pipeline.sendDownstream(capture(event)); pipeline.sendDownstream(capture(event));
@ -272,14 +271,10 @@ public class PeerTest extends TestWithNetworkConnections {
control.replay(); control.replay();
// Check co-ordination of which peer to download via the memory pool. // Check co-ordination of which peer to download via the memory pool.
MemoryPool pool = new MemoryPool();
peer.setMemoryPool(pool);
MockNetworkConnection conn2 = createMockNetworkConnection(); MockNetworkConnection conn2 = createMockNetworkConnection();
VersionMessage ver = new VersionMessage(unitTestParams, 100); VersionMessage ver = new VersionMessage(unitTestParams, 100);
Peer peer2 = new Peer(unitTestParams, blockChain, ver); Peer peer2 = new Peer(unitTestParams, blockChain, ver, memoryPool);
peer2.addWallet(wallet); peer2.addWallet(wallet);
peer2.setMemoryPool(pool);
connect(); connect();
connect(peer2.getHandler(), channel2, ctx2); connect(peer2.getHandler(), channel2, ctx2);
@ -297,7 +292,7 @@ public class PeerTest extends TestWithNetworkConnections {
GetDataMessage message = (GetDataMessage)outbound(); GetDataMessage message = (GetDataMessage)outbound();
assertEquals(1, message.getItems().size()); assertEquals(1, message.getItems().size());
assertEquals(tx.getHash(), message.getItems().get(0).hash); assertEquals(tx.getHash(), message.getItems().get(0).hash);
assertTrue(pool.maybeWasSeen(tx.getHash())); assertTrue(memoryPool.maybeWasSeen(tx.getHash()));
// Advertising to peer2 results in no getdata message. // Advertising to peer2 results in no getdata message.
conn2.inbound(inv); conn2.inbound(inv);

View File

@ -51,7 +51,7 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
VersionMessage ver = new VersionMessage(unitTestParams, 1); VersionMessage ver = new VersionMessage(unitTestParams, 1);
ChannelPipeline p = Channels.pipeline(); ChannelPipeline p = Channels.pipeline();
Peer peer = new Peer(unitTestParams, blockChain, ver); Peer peer = new Peer(unitTestParams, blockChain, ver, peerGroup.getMemoryPool());
peer.addLifecycleListener(peerGroup.startupListener); peer.addLifecycleListener(peerGroup.startupListener);
p.addLast("peer", peer.getHandler()); p.addLast("peer", peer.getHandler());
return p; return p;