mirror of
https://github.com/Qortal/altcoinj.git
synced 2025-02-07 06:44:16 +00:00
Download transactions from whichever peer announced them first.
This commit is contained in:
parent
2023e05d7e
commit
513a75b4ba
@ -234,4 +234,29 @@ public class MemoryPool {
|
||||
log.info("{}: Announced new transaction [1] {}", byPeer, hash);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link Transaction} for the given hash if we have downloaded it, or null if that hash is unknown or
|
||||
* we only saw advertisements for it yet or it has been downloaded but garbage collected due to nowhere else
|
||||
* holding a reference to it.
|
||||
*/
|
||||
public synchronized Transaction get(Sha256Hash hash) {
|
||||
Entry entry = memoryPool.get(hash);
|
||||
if (entry == null) return null; // Unknown.
|
||||
if (entry.tx == null) return null; // Seen but only in advertisements.
|
||||
if (entry.tx.get() == null) return null; // Was downloaded but garbage collected.
|
||||
Transaction tx = entry.tx.get();
|
||||
Preconditions.checkNotNull(tx);
|
||||
return tx;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the TX identified by hash has been seen before (ie, in an inv). Note that a transaction that
|
||||
* was broadcast, downloaded and nothing kept a reference to it will eventually be cleared out by the garbage
|
||||
* collector and wasSeen() will return false - it does not keep a permanent record of every hash ever broadcast.
|
||||
*/
|
||||
public synchronized boolean maybeWasSeen(Sha256Hash hash) {
|
||||
Entry entry = memoryPool.get(hash);
|
||||
return entry != null;
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,8 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
@ -372,39 +374,61 @@ public class Peer {
|
||||
private void processInv(InventoryMessage inv) throws IOException {
|
||||
// This should be called in the network loop thread for this peer.
|
||||
List<InventoryItem> items = inv.getItems();
|
||||
if (memoryPool != null)
|
||||
updateTransactionConfidenceLevels(items);
|
||||
|
||||
// If this peer isn't responsible for downloading stuff, don't go further.
|
||||
if (!downloadData)
|
||||
return;
|
||||
// Separate out the blocks and transactions, we'll handle them differently
|
||||
List<InventoryItem> transactions = new LinkedList<InventoryItem>();
|
||||
List<InventoryItem> blocks = new LinkedList<InventoryItem>();
|
||||
|
||||
// The peer told us about some blocks or transactions they have.
|
||||
Block topBlock = blockChain.getUnconnectedBlock();
|
||||
Sha256Hash topHash = (topBlock != null ? topBlock.getHash() : null);
|
||||
if (isNewBlockTickle(topHash, items)) {
|
||||
// An inv with a single hash containing our most recent unconnected block is a special inv,
|
||||
// it's kind of like a tickle from the peer telling us that it's time to download more blocks to catch up to
|
||||
// the block chain. We could just ignore this and treat it as a regular inv but then we'd download the head
|
||||
// block over and over again after each batch of 500 blocks, which is wasteful.
|
||||
blockChainDownload(topHash);
|
||||
return;
|
||||
for (InventoryItem item : items) {
|
||||
switch (item.type) {
|
||||
case Transaction: transactions.add(item); break;
|
||||
case Block: blocks.add(item); break;
|
||||
}
|
||||
}
|
||||
// Just copy the message contents across - request whatever we're told about.
|
||||
// TODO: Don't re-request items that were already fetched.
|
||||
|
||||
GetDataMessage getdata = new GetDataMessage(params);
|
||||
for (InventoryItem item : items) {
|
||||
getdata.addItem(item);
|
||||
}
|
||||
// This will cause us to receive a bunch of block or tx messages.
|
||||
conn.writeMessage(getdata);
|
||||
}
|
||||
|
||||
private void updateTransactionConfidenceLevels(List<InventoryItem> items) {
|
||||
Preconditions.checkNotNull(memoryPool);
|
||||
for (InventoryItem item : items) {
|
||||
if (item.type != InventoryItem.Type.Transaction) continue;
|
||||
memoryPool.seen(item.hash, this.getAddress());
|
||||
Iterator<InventoryItem> it = transactions.iterator();
|
||||
while (it.hasNext()) {
|
||||
InventoryItem item = it.next();
|
||||
if (memoryPool == null && downloadData) {
|
||||
// If there's no memory pool only download transactions if we're configured to.
|
||||
getdata.addItem(item);
|
||||
} else {
|
||||
// Only download the transaction if we are the first peer that saw it be advertised. Other peers will also
|
||||
// see it be advertised in inv packets asynchronously, they co-ordinate via the memory pool. We could
|
||||
// potentially download transactions faster by always asking every peer for a tx when advertised, as remote
|
||||
// peers run at different speeds. However to conserve bandwidth on mobile devices we try to only download a
|
||||
// transaction once. This means we can miss broadcasts if the peer disconnects between sending us an inv and
|
||||
// sending us the transaction: currently we'll never try to re-fetch after a timeout.
|
||||
if (memoryPool.maybeWasSeen(item.hash)) {
|
||||
// Some other peer already announced this so don't download.
|
||||
it.remove();
|
||||
} else {
|
||||
getdata.addItem(item);
|
||||
}
|
||||
memoryPool.seen(item.hash, this.getAddress());
|
||||
}
|
||||
}
|
||||
|
||||
if (blocks.size() > 0 && downloadData) {
|
||||
Block topBlock = blockChain.getUnconnectedBlock();
|
||||
Sha256Hash topHash = (topBlock != null ? topBlock.getHash() : null);
|
||||
if (isNewBlockTickle(topHash, blocks)) {
|
||||
// An inv with a single hash containing our most recent unconnected block is a special inv,
|
||||
// it's kind of like a tickle from the peer telling us that it's time to download more blocks to catch up to
|
||||
// the block chain. We could just ignore this and treat it as a regular inv but then we'd download the head
|
||||
// block over and over again after each batch of 500 blocks, which is wasteful.
|
||||
blockChainDownload(topHash);
|
||||
return;
|
||||
}
|
||||
// Request the advertised blocks only if we're the download peer.
|
||||
for (InventoryItem item : blocks) getdata.addItem(item);
|
||||
}
|
||||
|
||||
if (!getdata.getItems().isEmpty()) {
|
||||
// This will cause us to receive a bunch of block or tx messages.
|
||||
conn.writeMessage(getdata);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,7 @@ public class PeerGroup {
|
||||
private VersionMessage versionMessage;
|
||||
// A class that tracks recent transactions that have been broadcast across the network, counts how many
|
||||
// peers announced them and updates the transaction confidence data. It is passed to each Peer.
|
||||
private MemoryPool memoryPool;
|
||||
private final MemoryPool memoryPool;
|
||||
|
||||
private NetworkParameters params;
|
||||
private BlockChain chain;
|
||||
@ -142,7 +142,8 @@ public class PeerGroup {
|
||||
}
|
||||
|
||||
private synchronized List<Message> handleGetData(GetDataMessage m) {
|
||||
// Scans the wallets for transactions in the getdata message and returns them. Invoked in parallel on peer threads.
|
||||
// Scans the wallets for transactions in the getdata message and returns them. Invoked in parallel
|
||||
// on peer threads.
|
||||
HashMap<Sha256Hash, Message> transactions = new HashMap<Sha256Hash, Message>();
|
||||
for (Wallet w : wallets) {
|
||||
synchronized (w) {
|
||||
@ -715,6 +716,17 @@ public class PeerGroup {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link MemoryPool} created by this peer group to synchronize its peers. The pool tracks advertised
|
||||
* and downloaded transactions so their confidence can be measured as a proportion of how many peers announced it.
|
||||
* With an un-tampered with internet connection, the more peers announce a transaction the more confidence you can
|
||||
* have that it's really valid.
|
||||
*/
|
||||
public MemoryPool getMemoryPool() {
|
||||
// Locking unneeded as memoryPool is final.
|
||||
return memoryPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tells the PeerGroup to download only block headers before a certain time and bodies after that. See
|
||||
* {@link Peer#setFastCatchupTime(long)} for further explanation. Call this before starting block chain download.
|
||||
|
@ -23,6 +23,7 @@ import org.junit.Test;
|
||||
import java.net.InetAddress;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class MemoryPoolTest {
|
||||
private NetworkParameters params = NetworkParameters.unitTests();
|
||||
@ -52,6 +53,7 @@ public class MemoryPoolTest {
|
||||
assertEquals(tx1, pool.seen(tx2, address2));
|
||||
assertEquals(2, tx1.getConfidence().numBroadcastPeers());
|
||||
assertEquals(2, pool.numBroadcastPeers(tx1.getHash()));
|
||||
assertEquals(tx1, pool.get(tx1.getHash()));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -61,6 +63,7 @@ public class MemoryPoolTest {
|
||||
assertEquals(0, pool.numBroadcastPeers(tx1.getHash()));
|
||||
pool.seen(tx1.getHash(), address1);
|
||||
assertEquals(1, pool.numBroadcastPeers(tx1.getHash()));
|
||||
assertTrue(pool.maybeWasSeen(tx1.getHash()));
|
||||
pool.seen(tx1.getHash(), address2);
|
||||
assertEquals(2, pool.numBroadcastPeers(tx1.getHash()));
|
||||
Transaction t = pool.seen(tx1, address1);
|
||||
|
@ -209,7 +209,7 @@ public class PeerGroupTest extends TestWithNetworkConnections {
|
||||
@Test
|
||||
public void transactionConfidence() throws Exception {
|
||||
// Checks that we correctly count how many peers broadcast a transaction, so we can establish some measure of
|
||||
// its trustworthyness assuming an untampered with internet connection.
|
||||
// its trustworthyness assuming an untampered with internet connection. This is done via the MemoryPool class.
|
||||
MockNetworkConnection n1 = createMockNetworkConnection();
|
||||
Peer p1 = new Peer(params, blockChain, n1);
|
||||
MockNetworkConnection n2 = createMockNetworkConnection();
|
||||
@ -231,13 +231,13 @@ public class PeerGroupTest extends TestWithNetworkConnections {
|
||||
}
|
||||
});
|
||||
|
||||
// Peer 2 advertises the tx but does not download it.
|
||||
assertNull(n2.exchange(inv));
|
||||
assertEquals(0, tx.getConfidence().numBroadcastPeers());
|
||||
// Peer 2 advertises the tx and requests a download of it, because it came first.
|
||||
assertTrue(n2.exchange(inv) instanceof GetDataMessage);
|
||||
assertTrue(peerGroup.getMemoryPool().maybeWasSeen(tx.getHash()));
|
||||
assertEquals(null, event[0]);
|
||||
// Peer 1 (the download peer) advertises the tx, we download it.
|
||||
n1.exchange(inv); // returns getdata
|
||||
n1.exchange(tx); // returns nothing after a queue drain.
|
||||
// Peer 1 advertises the tx, we don't do anything as it's already been requested.
|
||||
assertNull(n1.exchange(inv));
|
||||
assertNull(n2.exchange(tx));
|
||||
// Two peers saw this tx hash.
|
||||
assertEquals(2, tx.getConfidence().numBroadcastPeers());
|
||||
assertEquals(tx, event[0]);
|
||||
|
@ -150,7 +150,7 @@ public class PeerTest extends TestWithNetworkConnections {
|
||||
@Test
|
||||
public void invDownloadTx() throws Exception {
|
||||
peer.setDownloadData(true);
|
||||
// Make a transaction and tell the peer we have it.;
|
||||
// Make a transaction and tell the peer we have it.
|
||||
BigInteger value = Utils.toNanoCoins(1, 0);
|
||||
Transaction tx = TestUtils.createFakeTx(unitTestParams, value, address);
|
||||
InventoryMessage inv = new InventoryMessage(unitTestParams);
|
||||
@ -169,6 +169,40 @@ public class PeerTest extends TestWithNetworkConnections {
|
||||
assertEquals(value, wallet.getBalance(Wallet.BalanceType.ESTIMATED));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void invDownloadTxMultiPeer() throws Exception {
|
||||
// Check co-ordination of which peer to download via the memory pool.
|
||||
MemoryPool pool = new MemoryPool();
|
||||
peer.setMemoryPool(pool);
|
||||
|
||||
MockNetworkConnection conn2 = createMockNetworkConnection();
|
||||
Peer peer2 = new Peer(unitTestParams, blockChain, conn2);
|
||||
peer2.addWallet(wallet);
|
||||
peer2.setMemoryPool(pool);
|
||||
|
||||
// Make a tx and advertise it to one of the peers.
|
||||
BigInteger value = Utils.toNanoCoins(1, 0);
|
||||
Transaction tx = TestUtils.createFakeTx(unitTestParams, value, address);
|
||||
InventoryMessage inv = new InventoryMessage(unitTestParams);
|
||||
InventoryItem item = new InventoryItem(InventoryItem.Type.Transaction, tx.getHash());
|
||||
inv.addItem(item);
|
||||
|
||||
conn.inbound(inv);
|
||||
runPeer(peer, conn);
|
||||
conn.popInbound(); // Remove the disconnect marker.
|
||||
|
||||
// We got a getdata message.
|
||||
GetDataMessage message = (GetDataMessage) conn.popOutbound();
|
||||
assertEquals(1, message.getItems().size());
|
||||
assertEquals(tx.getHash(), message.getItems().get(0).hash);
|
||||
assertTrue(pool.maybeWasSeen(tx.getHash()));
|
||||
|
||||
// Advertising to peer2 results in no getdata message.
|
||||
conn2.inbound(inv);
|
||||
runPeer(peer2, conn2);
|
||||
assertNull(conn.popOutbound());
|
||||
}
|
||||
|
||||
// Check that inventory message containing blocks we want is processed correctly.
|
||||
@Test
|
||||
public void newBlock() throws Exception {
|
||||
|
Loading…
Reference in New Issue
Block a user