From 3545958578e2eab0ec7f74ca888a2a3f2ef9cd9c Mon Sep 17 00:00:00 2001 From: Aditya Kulkarni Date: Thu, 19 Sep 2019 13:17:46 -0700 Subject: [PATCH] Handle reorgs --- src/lightclient.rs | 83 +++++++++++++++++++++++++++++++++--------- src/lightwallet/mod.rs | 75 +++++++++++++++++++++++++++++++++----- 2 files changed, 130 insertions(+), 28 deletions(-) diff --git a/src/lightclient.rs b/src/lightclient.rs index 23d9d71..4814612 100644 --- a/src/lightclient.rs +++ b/src/lightclient.rs @@ -1,6 +1,6 @@ use crate::lightwallet::LightWallet; -use log::{info}; +use log::{info, warn, error}; use std::path::Path; use std::fs::File; @@ -9,7 +9,7 @@ use std::io::prelude::*; use std::io::{BufReader, BufWriter, Error, ErrorKind}; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicI32, AtomicUsize, Ordering}; use json::{object, JsonValue}; @@ -529,45 +529,92 @@ impl LightClient { self.fetch_latest_block(move |block: BlockId| { lbh.store(block.height, Ordering::SeqCst); }); - let last_block = latest_block_height.load(Ordering::SeqCst); + let latest_block = latest_block_height.load(Ordering::SeqCst); - info!("Latest block is {}", last_block); + info!("Latest block is {}", latest_block); // Get the end height to scan to. - let mut end_height = std::cmp::min(last_scanned_height + 1000, last_block); + let mut end_height = std::cmp::min(last_scanned_height + 1000, latest_block); // If there's nothing to scan, just return - if last_scanned_height == last_block { + if last_scanned_height == latest_block { return "".to_string(); } // Count how many bytes we've downloaded let bytes_downloaded = Arc::new(AtomicUsize::new(0)); + let mut total_reorg = 0u64; + // Fetch CompactBlocks in increments loop { let local_light_wallet = self.wallet.clone(); let local_bytes_downloaded = bytes_downloaded.clone(); + let start_height = last_scanned_height + 1; + // Show updates only if we're syncing a lot of blocks - if print_updates && end_height - last_scanned_height > 100 { - print!("Syncing {}/{}\r", last_scanned_height, last_block); + if print_updates && end_height - start_height > 100 { + print!("Syncing {}/{}\r", start_height, latest_block); io::stdout().flush().ok().expect("Could not flush stdout"); } // Fetch compact blocks - info!("Fetching blocks {}-{}", last_scanned_height, end_height); - self.fetch_blocks(last_scanned_height, end_height, + info!("Fetching blocks {}-{}", start_height, end_height); + + let last_invalid_height = Arc::new(AtomicI32::new(0)); + let last_invalid_height_inner = last_invalid_height.clone(); + self.fetch_blocks(start_height, end_height, move |encoded_block: &[u8]| { - local_light_wallet.scan_block(encoded_block); + // Process the block only if there were no previous errors + if last_invalid_height_inner.load(Ordering::SeqCst) > 0 { + return; + } + + match local_light_wallet.scan_block(encoded_block) { + Ok(_) => {}, + Err(invalid_height) => { + // Block at this height seems to be invalid, so invalidate up till that point + last_invalid_height_inner.store(invalid_height, Ordering::SeqCst); + } + } + local_bytes_downloaded.fetch_add(encoded_block.len(), Ordering::SeqCst); }); + // Check if there was any invalid block, which means we might have to do a reorg + let invalid_height = last_invalid_height.load(Ordering::SeqCst); + if invalid_height > 0 { + total_reorg += self.wallet.invalidate_block(invalid_height); + + warn!("Invalidated block at height {}. Total reorg is now {}", invalid_height, total_reorg); + } + + // Make sure we're not re-orging too much! + if total_reorg > 99 { + error!("Reorg has now exceeded 100 blocks!"); + return "Reorg has exceeded 100 blocks. Aborting.".to_string(); + } + + if invalid_height > 0 { + // Reset the scanning heights + last_scanned_height = (invalid_height - 1) as u64; + end_height = std::cmp::min(last_scanned_height + 1000, latest_block); + + warn!("Reorg: reset scanning from {} to {}", last_scanned_height, end_height); + + continue; + } + + // If it got here, that means the blocks are scanning properly now. + // So, reset the total_reorg + total_reorg = 0; + // We'll also fetch all the txids that our transparent addresses are involved with // TODO: Use for all t addresses let address = self.wallet.address_from_sk(&self.wallet.tkeys[0]); let wallet = self.wallet.clone(); - self.fetch_transparent_txids(address, last_scanned_height, end_height, + self.fetch_transparent_txids(address, start_height, end_height, move |tx_bytes: &[u8], height: u64 | { let tx = Transaction::read(tx_bytes).unwrap(); @@ -576,13 +623,13 @@ impl LightClient { } ); - last_scanned_height = end_height + 1; + last_scanned_height = end_height; end_height = last_scanned_height + 1000 - 1; - if last_scanned_height > last_block { + if last_scanned_height >= latest_block { break; - } else if end_height > last_block { - end_height = last_block; + } else if end_height > latest_block { + end_height = latest_block; } } if print_updates{ @@ -591,8 +638,8 @@ impl LightClient { let mut responses = vec![]; - info!("Synced to {}, Downloaded {} kB", last_block, bytes_downloaded.load(Ordering::SeqCst) / 1024); - responses.push(format!("Synced to {}, Downloaded {} kB", last_block, bytes_downloaded.load(Ordering::SeqCst) / 1024)); + info!("Synced to {}, Downloaded {} kB", latest_block, bytes_downloaded.load(Ordering::SeqCst) / 1024); + responses.push(format!("Synced to {}, Downloaded {} kB", latest_block, bytes_downloaded.load(Ordering::SeqCst) / 1024)); // Get the Raw transaction for all the wallet transactions diff --git a/src/lightwallet/mod.rs b/src/lightwallet/mod.rs index d4919ce..36e6a2e 100644 --- a/src/lightwallet/mod.rs +++ b/src/lightwallet/mod.rs @@ -1,10 +1,10 @@ use std::time::SystemTime; use std::io::{self, Read, Write}; use std::cmp; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; -use log::{info, error}; +use log::{info, warn, error}; use protobuf::parse_from_bytes; @@ -615,12 +615,59 @@ impl LightWallet { } } - pub fn scan_block(&self, block: &[u8]) -> bool { + // Invalidate all blocks including and after "at_height". + // Returns the number of blocks invalidated + pub fn invalidate_block(&self, at_height: i32) -> u64 { + let mut num_invalidated = 0; + + // First remove the blocks + { + let mut blks = self.blocks.write().unwrap(); + + while blks.last().unwrap().height >= at_height { + blks.pop(); + num_invalidated += 1; + } + } + + // Next, remove transactions + { + let mut txs = self.txs.write().unwrap(); + let txids_to_remove = txs.values() + .filter_map(|wtx| if wtx.block >= at_height {Some(wtx.txid.clone())} else {None}) + .collect::>(); + + for txid in &txids_to_remove { + txs.remove(&txid); + } + + // We also need to update any sapling note data and utxos in existing transactions that + // were spent in any of the txids that were removed + txs.values_mut() + .for_each(|wtx| { + wtx.notes.iter_mut() + .for_each(|nd| { + if nd.spent.is_some() && txids_to_remove.contains(&nd.spent.unwrap()) { + nd.spent = None; + } + + if nd.unconfirmed_spent.is_some() && txids_to_remove.contains(&nd.spent.unwrap()) { + nd.unconfirmed_spent = None; + } + }) + }) + } + + num_invalidated + } + + // Scan a block. Will return an error with the block height that failed to scan + pub fn scan_block(&self, block: &[u8]) -> Result<(), i32> { let block: CompactBlock = match parse_from_bytes(block) { Ok(block) => block, Err(e) => { eprintln!("Could not parse CompactBlock from bytes: {}", e); - return false; + return Err(-1); } }; @@ -630,18 +677,26 @@ impl LightWallet { // If the last scanned block is rescanned, check it still matches. if let Some(hash) = self.blocks.read().unwrap().last().map(|block| block.hash) { if block.hash() != hash { - eprintln!("Block hash does not match for block {}. {} vs {}", height, block.hash(), hash); - return false; + warn!("Likely reorg. Block hash does not match for block {}. {} vs {}", height, block.hash(), hash); + return Err(height); } } - return true; + return Ok(()) } else if height != (self.last_scanned_height() + 1) { - eprintln!( + error!( "Block is not height-sequential (expected {}, found {})", self.last_scanned_height() + 1, height ); - return false; + return Err(self.last_scanned_height()); + } + + // Check to see that the previous block hash matches + if let Some(hash) = self.blocks.read().unwrap().last().map(|block| block.hash) { + if block.prev_hash() != hash { + warn!("Likely reorg. Prev block hash does not match for block {}. {} vs {}", height, block.prev_hash(), hash); + return Err(height-1); + } } // Get the most recent scanned data. @@ -770,7 +825,7 @@ impl LightWallet { } } - true + Ok(()) } pub fn send_to_address(