mirror of
https://github.com/Qortal/piratewallet-light-cli.git
synced 2025-01-30 18:42:15 +00:00
Multi threaded sync (#27)
* Sync on a threadpool * Retry on broken connection * Update README.md * Update README.md * Optional Embedded params * Update (#24) * Update librustzcash dep * Update to latest librustzcash * Fetch blocks on a new thread * Save wallet deterministically * Sync on a threadpool * Fetch blocks on a new thread * Save wallet deterministically * At least 2 threads Co-authored-by: Aditya Kulkarni <adityapk@tr32.home.adityapk.com>
This commit is contained in:
parent
2a88501a4c
commit
5d2b85c03a
18
Cargo.lock
generated
18
Cargo.lock
generated
@ -1002,7 +1002,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "num_cpus"
|
||||
version = "1.11.1"
|
||||
version = "1.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"hermit-abi 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -1846,6 +1846,14 @@ dependencies = [
|
||||
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "threadpool"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.1.42"
|
||||
@ -1885,7 +1893,7 @@ dependencies = [
|
||||
"mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"pin-project-lite 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -2527,6 +2535,7 @@ dependencies = [
|
||||
"libflate 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log4rs 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"pairing 0.16.0 (git+https://github.com/adityapk00/librustzcash.git?rev=ff0ffc3d1b8bc36a0ad4b6b0f06aa3ca5900d3e4)",
|
||||
"prost 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"prost-types 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -2538,7 +2547,9 @@ dependencies = [
|
||||
"secp256k1 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"sodiumoxide 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"subtle 2.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"threadpool 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tiny-bip39 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-rustls 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -2673,7 +2684,7 @@ dependencies = [
|
||||
"checksum num-bigint 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f9c3f34cdd24f334cb265d9bf8bfa8a241920d026916785747a92f0e55541a1a"
|
||||
"checksum num-integer 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)" = "b85e541ef8255f6cf42bbfe4ef361305c6c135d10919ecc26126c4e5ae94bc09"
|
||||
"checksum num-traits 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d4c81ffc11c212fa327657cb19dd85eb7419e163b5b076bede2bdb5c974c07e4"
|
||||
"checksum num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)" = "76dac5ed2a876980778b8b85f75a71b6cbf0db0b1232ee12f826bccb00d09d72"
|
||||
"checksum num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
|
||||
"checksum once_cell 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "532c29a261168a45ce28948f9537ddd7a5dd272cc513b3017b1e82a88f962c37"
|
||||
"checksum opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
|
||||
"checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
|
||||
@ -2771,6 +2782,7 @@ dependencies = [
|
||||
"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
|
||||
"checksum thread-id 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c7fbf4c9d56b320106cd64fd024dadfa0be7cb4706725fc44a7d7ce952d820c1"
|
||||
"checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b"
|
||||
"checksum threadpool 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e8dae184447c15d5a6916d973c642aec485105a13cd238192a6927ae3e077d66"
|
||||
"checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f"
|
||||
"checksum tiny-bip39 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c1c5676413eaeb1ea35300a0224416f57abc3bd251657e0fafc12c47ff98c060"
|
||||
"checksum tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)" = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58"
|
||||
|
@ -27,6 +27,9 @@ rand = "0.7.2"
|
||||
sodiumoxide = "0.2.5"
|
||||
ring = "0.16.9"
|
||||
libflate = "0.1"
|
||||
subtle = "2"
|
||||
threadpool = "1.8.0"
|
||||
num_cpus = "1.13.0"
|
||||
|
||||
tonic = { version = "0.2.1", features = ["tls", "tls-roots"] }
|
||||
bytes = "0.4"
|
||||
|
@ -1,12 +1,15 @@
|
||||
use log::{error};
|
||||
use zcash_primitives::transaction::{TxId};
|
||||
|
||||
use crate::grpc_client::{ChainSpec, BlockId, BlockRange, RawTransaction,
|
||||
use crate::grpc_client::{ChainSpec, BlockId, BlockRange, RawTransaction, CompactBlock,
|
||||
TransparentAddressBlockFilter, TxFilter, Empty, LightdInfo};
|
||||
use tonic::transport::{Channel, ClientTlsConfig};
|
||||
use tokio_rustls::{rustls::ClientConfig};
|
||||
use tonic::{Request};
|
||||
|
||||
use threadpool::ThreadPool;
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
use crate::PubCertificate;
|
||||
use crate::grpc_client::compact_tx_streamer_client::CompactTxStreamerClient;
|
||||
|
||||
@ -56,7 +59,7 @@ pub fn get_info(uri: &http::Uri) -> Result<LightdInfo, String> {
|
||||
}
|
||||
|
||||
|
||||
async fn get_block_range<F : 'static + std::marker::Send>(uri: &http::Uri, start_height: u64, end_height: u64, c: F)
|
||||
async fn get_block_range<F : 'static + std::marker::Send>(uri: &http::Uri, start_height: u64, end_height: u64, pool: ThreadPool, c: F)
|
||||
-> Result<(), Box<dyn std::error::Error>>
|
||||
where F : Fn(&[u8], u64) {
|
||||
let mut client = get_client(uri).await?;
|
||||
@ -66,19 +69,39 @@ where F : Fn(&[u8], u64) {
|
||||
|
||||
let request = Request::new(BlockRange{ start: Some(bs), end: Some(be) });
|
||||
|
||||
// Channel where the blocks are sent. A None signifies end of all blocks
|
||||
let (tx, rx) = channel::<Option<CompactBlock>>();
|
||||
|
||||
// Channel that the processor signals it is done, so the method can return
|
||||
let (ftx, frx) = channel();
|
||||
|
||||
// The processor runs on a different thread, so that the network calls don't
|
||||
// block on this
|
||||
pool.execute(move || {
|
||||
while let Some(block) = rx.recv().unwrap() {
|
||||
use prost::Message;
|
||||
let mut encoded_buf = vec![];
|
||||
|
||||
block.encode(&mut encoded_buf).unwrap();
|
||||
c(&encoded_buf, block.height);
|
||||
}
|
||||
|
||||
ftx.send(Ok(())).unwrap();
|
||||
});
|
||||
|
||||
let mut response = client.get_block_range(request).await?.into_inner();
|
||||
while let Some(block) = response.message().await? {
|
||||
use prost::Message;
|
||||
let mut encoded_buf = vec![];
|
||||
|
||||
block.encode(&mut encoded_buf).unwrap();
|
||||
c(&encoded_buf, block.height);
|
||||
tx.send(Some(block)).unwrap();
|
||||
}
|
||||
tx.send(None).unwrap();
|
||||
|
||||
// Wait for the processor to exit
|
||||
frx.iter().take(1).collect::<Result<Vec<()>, String>>()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fetch_blocks<F : 'static + std::marker::Send>(uri: &http::Uri, start_height: u64, end_height: u64, c: F) -> Result<(), String>
|
||||
pub fn fetch_blocks<F : 'static + std::marker::Send>(uri: &http::Uri, start_height: u64, end_height: u64, pool: ThreadPool, c: F) -> Result<(), String>
|
||||
where F : Fn(&[u8], u64) {
|
||||
|
||||
let mut rt = match tokio::runtime::Runtime::new() {
|
||||
@ -91,7 +114,7 @@ pub fn fetch_blocks<F : 'static + std::marker::Send>(uri: &http::Uri, start_heig
|
||||
}
|
||||
};
|
||||
|
||||
match rt.block_on(get_block_range(uri, start_height, end_height, c)) {
|
||||
match rt.block_on(get_block_range(uri, start_height, end_height, pool, c)) {
|
||||
Ok(o) => Ok(o),
|
||||
Err(e) => {
|
||||
let e = format!("Error fetching blocks {:?}", e);
|
||||
@ -162,26 +185,26 @@ async fn get_transaction(uri: &http::Uri, txid: TxId)
|
||||
Ok(response.into_inner())
|
||||
}
|
||||
|
||||
pub fn fetch_full_tx<F : 'static + std::marker::Send>(uri: &http::Uri, txid: TxId, c: F)
|
||||
where F : Fn(&[u8]) {
|
||||
pub fn fetch_full_tx(uri: &http::Uri, txid: TxId) -> Result<Vec<u8>, String> {
|
||||
let mut rt = match tokio::runtime::Runtime::new() {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
error!("Error creating runtime {}", e.to_string());
|
||||
eprintln!("{}", e);
|
||||
return;
|
||||
let errstr = format!("Error creating runtime {}", e.to_string());
|
||||
error!("{}", errstr);
|
||||
eprintln!("{}", errstr);
|
||||
return Err(errstr);
|
||||
}
|
||||
};
|
||||
|
||||
match rt.block_on(get_transaction(uri, txid)) {
|
||||
Ok(rawtx) => c(&rawtx.data),
|
||||
Ok(rawtx) => Ok(rawtx.data.to_vec()),
|
||||
Err(e) => {
|
||||
error!("Error in get_transaction runtime {}", e.to_string());
|
||||
eprintln!("{}", e);
|
||||
let errstr = format!("Error in get_transaction runtime {}", e.to_string());
|
||||
error!("{}", errstr);
|
||||
eprintln!("{}", errstr);
|
||||
Err(errstr)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// send_transaction GRPC call
|
||||
@ -222,22 +245,19 @@ async fn get_latest_block(uri: &http::Uri) -> Result<BlockId, Box<dyn std::error
|
||||
Ok(response.into_inner())
|
||||
}
|
||||
|
||||
pub fn fetch_latest_block<F : 'static + std::marker::Send>(uri: &http::Uri, mut c : F)
|
||||
where F : FnMut(BlockId) {
|
||||
pub fn fetch_latest_block(uri: &http::Uri) -> Result<BlockId, String> {
|
||||
let mut rt = match tokio::runtime::Runtime::new() {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
error!("Error creating runtime {}", e.to_string());
|
||||
eprintln!("{}", e);
|
||||
return;
|
||||
let errstr = format!("Error creating runtime {}", e.to_string());
|
||||
eprintln!("{}", errstr);
|
||||
return Err(errstr);
|
||||
}
|
||||
};
|
||||
|
||||
match rt.block_on(get_latest_block(uri)) {
|
||||
Ok(b) => c(b),
|
||||
Err(e) => {
|
||||
error!("Error getting latest block {}", e.to_string());
|
||||
eprintln!("{}", e);
|
||||
}
|
||||
};
|
||||
rt.block_on(get_latest_block(uri)).map_err(|e| {
|
||||
let errstr = format!("Error getting latest block {}", e.to_string());
|
||||
eprintln!("{}", errstr);
|
||||
errstr
|
||||
})
|
||||
}
|
||||
|
@ -2,17 +2,20 @@ use crate::lightwallet::LightWallet;
|
||||
|
||||
use rand::{rngs::OsRng, seq::SliceRandom};
|
||||
|
||||
use std::sync::{Arc, RwLock, Mutex};
|
||||
use std::sync::atomic::{AtomicU64, AtomicI32, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock, Mutex, mpsc::channel};
|
||||
use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::fs::File;
|
||||
use std::collections::HashMap;
|
||||
use std::cmp::{max, min};
|
||||
use std::io;
|
||||
use std::io::prelude::*;
|
||||
use std::io::{BufReader, Error, ErrorKind};
|
||||
|
||||
use protobuf::parse_from_bytes;
|
||||
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
use json::{object, array, JsonValue};
|
||||
use zcash_primitives::transaction::{TxId, Transaction};
|
||||
use zcash_client_backend::{
|
||||
@ -30,7 +33,6 @@ use log4rs::append::rolling_file::policy::compound::{
|
||||
roll::fixed_window::FixedWindowRoller,
|
||||
};
|
||||
|
||||
use crate::grpc_client::{BlockId};
|
||||
use crate::grpcconnector::{self, *};
|
||||
use crate::ANCHOR_OFFSET;
|
||||
|
||||
@ -1059,15 +1061,8 @@ impl LightClient {
|
||||
let mut last_scanned_height = self.wallet.read().unwrap().last_scanned_height() as u64;
|
||||
|
||||
// This will hold the latest block fetched from the RPC
|
||||
let latest_block_height = Arc::new(AtomicU64::new(0));
|
||||
let lbh = latest_block_height.clone();
|
||||
fetch_latest_block(&self.get_server_uri(),
|
||||
move |block: BlockId| {
|
||||
lbh.store(block.height, Ordering::SeqCst);
|
||||
});
|
||||
let latest_block = latest_block_height.load(Ordering::SeqCst);
|
||||
let latest_block = fetch_latest_block(&self.get_server_uri())?.height;
|
||||
|
||||
|
||||
if latest_block < last_scanned_height {
|
||||
let w = format!("Server's latest block({}) is behind ours({})", latest_block, last_scanned_height);
|
||||
warn!("{}", w);
|
||||
@ -1103,6 +1098,9 @@ impl LightClient {
|
||||
// belong to us.
|
||||
let all_new_txs = Arc::new(RwLock::new(vec![]));
|
||||
|
||||
// Create a new threadpool (upto 8, atleast 2 threads) to scan with
|
||||
let pool = ThreadPool::new(max(2, min(8, num_cpus::get())));
|
||||
|
||||
// Fetch CompactBlocks in increments
|
||||
let mut pass = 0;
|
||||
loop {
|
||||
@ -1138,7 +1136,9 @@ impl LightClient {
|
||||
|
||||
let last_invalid_height = Arc::new(AtomicI32::new(0));
|
||||
let last_invalid_height_inner = last_invalid_height.clone();
|
||||
fetch_blocks(&self.get_server_uri(), start_height, end_height,
|
||||
|
||||
let tpool = pool.clone();
|
||||
fetch_blocks(&self.get_server_uri(), start_height, end_height, pool.clone(),
|
||||
move |encoded_block: &[u8], height: u64| {
|
||||
// Process the block only if there were no previous errors
|
||||
if last_invalid_height_inner.load(Ordering::SeqCst) > 0 {
|
||||
@ -1156,7 +1156,7 @@ impl LightClient {
|
||||
Err(_) => {}
|
||||
}
|
||||
|
||||
match local_light_wallet.read().unwrap().scan_block(encoded_block) {
|
||||
match local_light_wallet.read().unwrap().scan_block_with_pool(encoded_block, &tpool) {
|
||||
Ok(block_txns) => {
|
||||
// Add to global tx list
|
||||
all_txs.write().unwrap().extend_from_slice(&block_txns.iter().map(|txid| (txid.clone(), height as i32)).collect::<Vec<_>>()[..]);
|
||||
@ -1170,6 +1170,16 @@ impl LightClient {
|
||||
local_bytes_downloaded.fetch_add(encoded_block.len(), Ordering::SeqCst);
|
||||
})?;
|
||||
|
||||
{
|
||||
// println!("Total scan duration: {:?}", self.wallet.read().unwrap().total_scan_duration.read().unwrap().get(0).unwrap().as_millis());
|
||||
|
||||
let t = self.wallet.read().unwrap();
|
||||
let mut d = t.total_scan_duration.write().unwrap();
|
||||
d.clear();
|
||||
d.push(std::time::Duration::new(0, 0));
|
||||
}
|
||||
|
||||
|
||||
// Check if there was any invalid block, which means we might have to do a reorg
|
||||
let invalid_height = last_invalid_height.load(Ordering::SeqCst);
|
||||
if invalid_height > 0 {
|
||||
@ -1204,6 +1214,11 @@ impl LightClient {
|
||||
let addresses = self.wallet.read().unwrap()
|
||||
.taddresses.read().unwrap().iter().map(|a| a.clone())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
// Create a channel so the fetch_transparent_txids can send the results back
|
||||
let (ctx, crx) = channel();
|
||||
let num_addresses = addresses.len();
|
||||
|
||||
for address in addresses {
|
||||
let wallet = self.wallet.clone();
|
||||
let block_times_inner = block_times.clone();
|
||||
@ -1216,16 +1231,28 @@ impl LightClient {
|
||||
start_height
|
||||
};
|
||||
|
||||
fetch_transparent_txids(&self.get_server_uri(), address, transparent_start_height, end_height,
|
||||
move |tx_bytes: &[u8], height: u64| {
|
||||
let tx = Transaction::read(tx_bytes).unwrap();
|
||||
let pool = pool.clone();
|
||||
let server_uri = self.get_server_uri();
|
||||
let ctx = ctx.clone();
|
||||
|
||||
// Scan this Tx for transparent inputs and outputs
|
||||
let datetime = block_times_inner.read().unwrap().get(&height).map(|v| *v).unwrap_or(0);
|
||||
wallet.read().unwrap().scan_full_tx(&tx, height as i32, datetime as u64);
|
||||
}
|
||||
)?;
|
||||
pool.execute(move || {
|
||||
// Fetch the transparent transactions for this address, and send the results
|
||||
// via the channel
|
||||
let r = fetch_transparent_txids(&server_uri, address, transparent_start_height, end_height,
|
||||
move |tx_bytes: &[u8], height: u64| {
|
||||
let tx = Transaction::read(tx_bytes).unwrap();
|
||||
|
||||
// Scan this Tx for transparent inputs and outputs
|
||||
let datetime = block_times_inner.read().unwrap().get(&height).map(|v| *v).unwrap_or(0);
|
||||
wallet.read().unwrap().scan_full_tx(&tx, height as i32, datetime as u64);
|
||||
});
|
||||
ctx.send(r).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
// Collect all results from the transparent fetches, and make sure everything was OK.
|
||||
// If it was not, we return an error, which will go back to the retry
|
||||
crx.iter().take(num_addresses).collect::<Result<Vec<()>, String>>()?;
|
||||
}
|
||||
|
||||
// Do block height accounting
|
||||
@ -1268,24 +1295,43 @@ impl LightClient {
|
||||
let mut rng = OsRng;
|
||||
txids_to_fetch.shuffle(&mut rng);
|
||||
|
||||
let num_fetches = txids_to_fetch.len();
|
||||
let (ctx, crx) = channel();
|
||||
|
||||
// And go and fetch the txids, getting the full transaction, so we can
|
||||
// read the memos
|
||||
for (txid, height) in txids_to_fetch {
|
||||
let light_wallet_clone = self.wallet.clone();
|
||||
info!("Fetching full Tx: {}", txid);
|
||||
|
||||
fetch_full_tx(&self.get_server_uri(), txid,move |tx_bytes: &[u8] | {
|
||||
let tx = Transaction::read(tx_bytes).unwrap();
|
||||
let pool = pool.clone();
|
||||
let server_uri = self.get_server_uri();
|
||||
let ctx = ctx.clone();
|
||||
|
||||
pool.execute(move || {
|
||||
info!("Fetching full Tx: {}", txid);
|
||||
|
||||
light_wallet_clone.read().unwrap().scan_full_tx(&tx, height, 0);
|
||||
match fetch_full_tx(&server_uri, txid) {
|
||||
Ok(tx_bytes) => {
|
||||
let tx = Transaction::read(&tx_bytes[..]).unwrap();
|
||||
|
||||
light_wallet_clone.read().unwrap().scan_full_tx(&tx, height, 0);
|
||||
ctx.send(Ok(())).unwrap();
|
||||
},
|
||||
Err(e) => ctx.send(Err(e)).unwrap()
|
||||
};
|
||||
});
|
||||
};
|
||||
|
||||
Ok(object!{
|
||||
"result" => "success",
|
||||
"latest_block" => latest_block,
|
||||
"downloaded_bytes" => bytes_downloaded.load(Ordering::SeqCst)
|
||||
})
|
||||
// Wait for all the fetches to finish.
|
||||
let result = crx.iter().take(num_fetches).collect::<Result<Vec<()>, String>>();
|
||||
match result {
|
||||
Ok(_) => Ok(object!{
|
||||
"result" => "success",
|
||||
"latest_block" => latest_block,
|
||||
"downloaded_bytes" => bytes_downloaded.load(Ordering::SeqCst)
|
||||
}),
|
||||
Err(e) => Err(format!("Error fetching all txns for memos: {}", e))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn do_send(&self, addrs: Vec<(&str, u64, Option<String>)>) -> Result<String, String> {
|
||||
|
@ -1,4 +1,4 @@
|
||||
use std::time::SystemTime;
|
||||
use std::time::{SystemTime, Duration};
|
||||
use std::io::{self, Read, Write};
|
||||
use std::cmp;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
@ -6,8 +6,11 @@ use std::sync::{Arc, RwLock};
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::convert::TryFrom;
|
||||
|
||||
use rand::{Rng, rngs::OsRng};
|
||||
use threadpool::ThreadPool;
|
||||
use std::sync::mpsc::{channel};
|
||||
|
||||
use rand::{Rng, rngs::OsRng};
|
||||
use subtle::{ConditionallySelectable, ConstantTimeEq, CtOption};
|
||||
use log::{info, warn, error};
|
||||
|
||||
use protobuf::parse_from_bytes;
|
||||
@ -22,12 +25,13 @@ use sha2::{Sha256, Digest};
|
||||
|
||||
use zcash_client_backend::{
|
||||
encoding::{encode_payment_address, encode_extended_spending_key},
|
||||
proto::compact_formats::CompactBlock, welding_rig::scan_block,
|
||||
proto::compact_formats::{CompactBlock, CompactOutput},
|
||||
wallet::{WalletShieldedOutput, WalletShieldedSpend}
|
||||
};
|
||||
|
||||
use zcash_primitives::{
|
||||
jubjub::fs::Fs,
|
||||
block::BlockHash,
|
||||
merkle_tree::{CommitmentTree},
|
||||
serialize::{Vector},
|
||||
consensus::BranchId,
|
||||
transaction::{
|
||||
@ -35,8 +39,10 @@ use zcash_primitives::{
|
||||
components::{Amount, OutPoint, TxOut}, components::amount::DEFAULT_FEE,
|
||||
TxId, Transaction,
|
||||
},
|
||||
sapling::Node,
|
||||
merkle_tree::{CommitmentTree, IncrementalWitness},
|
||||
legacy::{Script, TransparentAddress},
|
||||
note_encryption::{Memo, try_sapling_note_decryption, try_sapling_output_recovery},
|
||||
note_encryption::{Memo, try_sapling_note_decryption, try_sapling_output_recovery, try_sapling_compact_note_decryption},
|
||||
zip32::{ExtendedFullViewingKey, ExtendedSpendingKey, ChildIndex},
|
||||
JUBJUB,
|
||||
primitives::{PaymentAddress},
|
||||
@ -131,6 +137,8 @@ pub struct LightWallet {
|
||||
|
||||
// Non-serialized fields
|
||||
config: LightClientConfig,
|
||||
|
||||
pub total_scan_duration: Arc<RwLock<Vec<Duration>>>,
|
||||
}
|
||||
|
||||
impl LightWallet {
|
||||
@ -230,6 +238,7 @@ impl LightWallet {
|
||||
mempool_txs: Arc::new(RwLock::new(HashMap::new())),
|
||||
config: config.clone(),
|
||||
birthday: latest_block,
|
||||
total_scan_duration: Arc::new(RwLock::new(vec![Duration::new(0, 0)]))
|
||||
};
|
||||
|
||||
// If restoring from seed, make sure we are creating 5 addresses for users
|
||||
@ -350,6 +359,7 @@ impl LightWallet {
|
||||
mempool_txs: Arc::new(RwLock::new(HashMap::new())),
|
||||
config: config.clone(),
|
||||
birthday,
|
||||
total_scan_duration: Arc::new(RwLock::new(vec![Duration::new(0, 0)])),
|
||||
})
|
||||
}
|
||||
|
||||
@ -399,12 +409,19 @@ impl LightWallet {
|
||||
|
||||
Vector::write(&mut writer, &self.blocks.read().unwrap(), |w, b| b.write(w))?;
|
||||
|
||||
// The hashmap, write as a set of tuples
|
||||
Vector::write(&mut writer, &self.txs.read().unwrap().iter().collect::<Vec<(&TxId, &WalletTx)>>(),
|
||||
|w, (k, v)| {
|
||||
w.write_all(&k.0)?;
|
||||
v.write(w)
|
||||
})?;
|
||||
// The hashmap, write as a set of tuples. Store them sorted so that wallets are
|
||||
// deterministically saved
|
||||
{
|
||||
let txlist = self.txs.read().unwrap();
|
||||
let mut txns = txlist.iter().collect::<Vec<(&TxId, &WalletTx)>>();
|
||||
txns.sort_by(|a, b| a.0.partial_cmp(b.0).unwrap());
|
||||
|
||||
Vector::write(&mut writer, &txns,
|
||||
|w, (k, v)| {
|
||||
w.write_all(&k.0)?;
|
||||
v.write(w)
|
||||
})?;
|
||||
}
|
||||
utils::write_string(&mut writer, &self.config.chain_name)?;
|
||||
|
||||
// While writing the birthday, get it from the fn so we recalculate it properly
|
||||
@ -1210,7 +1227,7 @@ impl LightWallet {
|
||||
// Trim all witnesses for the invalidated blocks
|
||||
for tx in txs.values_mut() {
|
||||
for nd in tx.notes.iter_mut() {
|
||||
nd.witnesses.split_off(nd.witnesses.len().saturating_sub(num_invalidated));
|
||||
let _discard = nd.witnesses.split_off(nd.witnesses.len().saturating_sub(num_invalidated));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1218,8 +1235,235 @@ impl LightWallet {
|
||||
num_invalidated as u64
|
||||
}
|
||||
|
||||
// Scan a block. Will return an error with the block height that failed to scan
|
||||
/// Scans a [`CompactOutput`] with a set of [`ExtendedFullViewingKey`]s.
|
||||
///
|
||||
/// Returns a [`WalletShieldedOutput`] and corresponding [`IncrementalWitness`] if this
|
||||
/// output belongs to any of the given [`ExtendedFullViewingKey`]s.
|
||||
///
|
||||
/// The given [`CommitmentTree`] and existing [`IncrementalWitness`]es are incremented
|
||||
/// with this output's commitment.
|
||||
fn scan_output_internal(
|
||||
&self,
|
||||
(index, output): (usize, CompactOutput),
|
||||
ivks: &[Fs],
|
||||
tree: &mut CommitmentTree<Node>,
|
||||
existing_witnesses: &mut [&mut IncrementalWitness<Node>],
|
||||
block_witnesses: &mut [&mut IncrementalWitness<Node>],
|
||||
new_witnesses: &mut [&mut IncrementalWitness<Node>],
|
||||
pool: &ThreadPool
|
||||
) -> Option<WalletShieldedOutput> {
|
||||
let cmu = output.cmu().ok()?;
|
||||
let epk = output.epk().ok()?;
|
||||
let ct = output.ciphertext;
|
||||
|
||||
let (tx, rx) = channel();
|
||||
ivks.iter().enumerate().for_each(|(account, ivk)| {
|
||||
// Clone all values for passing to the closure
|
||||
let ivk = ivk.clone();
|
||||
let epk = epk.clone();
|
||||
let ct = ct.clone();
|
||||
let tx = tx.clone();
|
||||
|
||||
pool.execute(move || {
|
||||
let m = try_sapling_compact_note_decryption(&ivk, &epk, &cmu, &ct);
|
||||
let r = match m {
|
||||
Some((note, to)) => {
|
||||
tx.send(Some(Some((note, to, account))))
|
||||
},
|
||||
None => {
|
||||
tx.send(Some(None))
|
||||
}
|
||||
};
|
||||
|
||||
match r {
|
||||
Ok(_) => {},
|
||||
Err(e) => println!("Send error {:?}", e)
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Increment tree and witnesses
|
||||
let node = Node::new(cmu.into());
|
||||
for witness in existing_witnesses {
|
||||
witness.append(node).unwrap();
|
||||
}
|
||||
for witness in block_witnesses {
|
||||
witness.append(node).unwrap();
|
||||
}
|
||||
for witness in new_witnesses {
|
||||
witness.append(node).unwrap();
|
||||
}
|
||||
tree.append(node).unwrap();
|
||||
|
||||
// Collect all the RXs and fine if there was a valid result somewhere
|
||||
let mut wsos = vec![];
|
||||
for _i in 0..ivks.len() {
|
||||
let n = rx.recv().unwrap();
|
||||
let epk = epk.clone();
|
||||
|
||||
let wso = match n {
|
||||
None => panic!("Got a none!"),
|
||||
Some(None) => None,
|
||||
Some(Some((note, to, account))) => {
|
||||
// A note is marked as "change" if the account that received it
|
||||
// also spent notes in the same transaction. This will catch,
|
||||
// for instance:
|
||||
// - Change created by spending fractions of notes.
|
||||
// - Notes created by consolidation transactions.
|
||||
// - Notes sent from one account to itself.
|
||||
//let is_change = spent_from_accounts.contains(&account);
|
||||
|
||||
Some(WalletShieldedOutput {
|
||||
index, cmu, epk, account, note, to, is_change: false,
|
||||
witness: IncrementalWitness::from_tree(tree),
|
||||
})
|
||||
}
|
||||
};
|
||||
wsos.push(wso);
|
||||
}
|
||||
|
||||
match wsos.into_iter().find(|wso| wso.is_some()) {
|
||||
Some(Some(wso)) => Some(wso),
|
||||
_ => None
|
||||
}
|
||||
}
|
||||
|
||||
/// Scans a [`CompactBlock`] with a set of [`ExtendedFullViewingKey`]s.
|
||||
///
|
||||
/// Returns a vector of [`WalletTx`]s belonging to any of the given
|
||||
/// [`ExtendedFullViewingKey`]s, and the corresponding new [`IncrementalWitness`]es.
|
||||
///
|
||||
/// The given [`CommitmentTree`] and existing [`IncrementalWitness`]es are
|
||||
/// incremented appropriately.
|
||||
pub fn scan_block_internal(
|
||||
&self,
|
||||
block: CompactBlock,
|
||||
extfvks: &[ExtendedFullViewingKey],
|
||||
nullifiers: Vec<(Vec<u8>, usize)>,
|
||||
tree: &mut CommitmentTree<Node>,
|
||||
existing_witnesses: &mut [&mut IncrementalWitness<Node>],
|
||||
pool: &ThreadPool
|
||||
) -> Vec<zcash_client_backend::wallet::WalletTx> {
|
||||
let mut wtxs: Vec<zcash_client_backend::wallet::WalletTx> = vec![];
|
||||
let ivks = extfvks.iter().map(|extfvk| extfvk.fvk.vk.ivk()).collect::<Vec<_>>();
|
||||
|
||||
for tx in block.vtx.into_iter() {
|
||||
let num_spends = tx.spends.len();
|
||||
let num_outputs = tx.outputs.len();
|
||||
|
||||
let (ctx, crx) = channel();
|
||||
{
|
||||
let nullifiers = nullifiers.clone();
|
||||
let tx = tx.clone();
|
||||
pool.execute(move || {
|
||||
// Check for spent notes
|
||||
// The only step that is not constant-time is the filter() at the end.
|
||||
let shielded_spends: Vec<_> = tx
|
||||
.spends
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(index, spend)| {
|
||||
// Find the first tracked nullifier that matches this spend, and produce
|
||||
// a WalletShieldedSpend if there is a match, in constant time.
|
||||
nullifiers
|
||||
.iter()
|
||||
.map(|(nf, account)| CtOption::new(*account as u64, nf.ct_eq(&spend.nf[..])))
|
||||
.fold(CtOption::new(0, 0.into()), |first, next| {
|
||||
CtOption::conditional_select(&next, &first, first.is_some())
|
||||
})
|
||||
.map(|account| WalletShieldedSpend {
|
||||
index,
|
||||
nf: spend.nf,
|
||||
account: account as usize,
|
||||
})
|
||||
})
|
||||
.filter(|spend| spend.is_some().into())
|
||||
.map(|spend| spend.unwrap())
|
||||
.collect();
|
||||
|
||||
// Collect the set of accounts that were spent from in this transaction
|
||||
let spent_from_accounts: HashSet<_> =
|
||||
shielded_spends.iter().map(|spend| spend.account).collect();
|
||||
|
||||
ctx.send((shielded_spends, spent_from_accounts)).unwrap();
|
||||
|
||||
drop(ctx);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Check for incoming notes while incrementing tree and witnesses
|
||||
let mut shielded_outputs: Vec<WalletShieldedOutput> = vec![];
|
||||
{
|
||||
// Grab mutable references to new witnesses from previous transactions
|
||||
// in this block so that we can update them. Scoped so we don't hold
|
||||
// mutable references to wtxs for too long.
|
||||
let mut block_witnesses: Vec<_> = wtxs
|
||||
.iter_mut()
|
||||
.map(|tx| {
|
||||
tx.shielded_outputs
|
||||
.iter_mut()
|
||||
.map(|output| &mut output.witness)
|
||||
})
|
||||
.flatten()
|
||||
.collect();
|
||||
|
||||
for to_scan in tx.outputs.into_iter().enumerate() {
|
||||
// Grab mutable references to new witnesses from previous outputs
|
||||
// in this transaction so that we can update them. Scoped so we
|
||||
// don't hold mutable references to shielded_outputs for too long.
|
||||
let mut new_witnesses: Vec<_> = shielded_outputs
|
||||
.iter_mut()
|
||||
.map(|output| &mut output.witness)
|
||||
.collect();
|
||||
|
||||
if let Some(output) = self.scan_output_internal(
|
||||
to_scan,
|
||||
&ivks,
|
||||
tree,
|
||||
existing_witnesses,
|
||||
&mut block_witnesses,
|
||||
&mut new_witnesses,
|
||||
pool
|
||||
) {
|
||||
shielded_outputs.push(output);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (shielded_spends, spent_from_accounts) = crx.recv().unwrap();
|
||||
|
||||
// Identify change outputs
|
||||
shielded_outputs.iter_mut().for_each(|output| {
|
||||
if spent_from_accounts.contains(&output.account) {
|
||||
output.is_change = true;
|
||||
}
|
||||
});
|
||||
|
||||
// Update wallet tx
|
||||
if !(shielded_spends.is_empty() && shielded_outputs.is_empty()) {
|
||||
let mut txid = TxId([0u8; 32]);
|
||||
txid.0.copy_from_slice(&tx.hash);
|
||||
wtxs.push(zcash_client_backend::wallet::WalletTx {
|
||||
txid,
|
||||
index: tx.index as usize,
|
||||
num_spends,
|
||||
num_outputs,
|
||||
shielded_spends,
|
||||
shielded_outputs,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
wtxs
|
||||
}
|
||||
|
||||
pub fn scan_block(&self, block_bytes: &[u8]) -> Result<Vec<TxId>, i32> {
|
||||
self.scan_block_with_pool(&block_bytes, &ThreadPool::new(1))
|
||||
}
|
||||
|
||||
// Scan a block. Will return an error with the block height that failed to scan
|
||||
pub fn scan_block_with_pool(&self, block_bytes: &[u8], pool: &ThreadPool) -> Result<Vec<TxId>, i32> {
|
||||
let block: CompactBlock = match parse_from_bytes(block_bytes) {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
@ -1310,7 +1554,7 @@ impl LightWallet {
|
||||
}
|
||||
|
||||
new_txs = {
|
||||
let nf_refs: Vec<_> = nfs.iter().map(|(nf, acc, _)| (&nf[..], *acc)).collect();
|
||||
let nf_refs = nfs.iter().map(|(nf, account, _)| (nf.to_vec(), *account)).collect::<Vec<_>>();
|
||||
|
||||
// Create a single mutable slice of all the newly-added witnesses.
|
||||
let mut witness_refs: Vec<_> = txs
|
||||
@ -1319,16 +1563,16 @@ impl LightWallet {
|
||||
.flatten()
|
||||
.collect();
|
||||
|
||||
scan_block(
|
||||
self.scan_block_internal(
|
||||
block.clone(),
|
||||
&self.extfvks.read().unwrap(),
|
||||
&nf_refs[..],
|
||||
nf_refs,
|
||||
&mut block_data.tree,
|
||||
&mut witness_refs[..],
|
||||
pool
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
// If this block had any new Txs, return the list of ALL txids in this block,
|
||||
// so the wallet can fetch them all as a decoy.
|
||||
|
Loading…
Reference in New Issue
Block a user