Handle reorgs

This commit is contained in:
Aditya Kulkarni 2019-09-19 13:17:46 -07:00
parent 65f9655b40
commit 3545958578
2 changed files with 130 additions and 28 deletions

View File

@ -1,6 +1,6 @@
use crate::lightwallet::LightWallet;
use log::{info};
use log::{info, warn, error};
use std::path::Path;
use std::fs::File;
@ -9,7 +9,7 @@ use std::io::prelude::*;
use std::io::{BufReader, BufWriter, Error, ErrorKind};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU64, AtomicI32, AtomicUsize, Ordering};
use json::{object, JsonValue};
@ -529,45 +529,92 @@ impl LightClient {
self.fetch_latest_block(move |block: BlockId| {
lbh.store(block.height, Ordering::SeqCst);
});
let last_block = latest_block_height.load(Ordering::SeqCst);
let latest_block = latest_block_height.load(Ordering::SeqCst);
info!("Latest block is {}", last_block);
info!("Latest block is {}", latest_block);
// Get the end height to scan to.
let mut end_height = std::cmp::min(last_scanned_height + 1000, last_block);
let mut end_height = std::cmp::min(last_scanned_height + 1000, latest_block);
// If there's nothing to scan, just return
if last_scanned_height == last_block {
if last_scanned_height == latest_block {
return "".to_string();
}
// Count how many bytes we've downloaded
let bytes_downloaded = Arc::new(AtomicUsize::new(0));
let mut total_reorg = 0u64;
// Fetch CompactBlocks in increments
loop {
let local_light_wallet = self.wallet.clone();
let local_bytes_downloaded = bytes_downloaded.clone();
let start_height = last_scanned_height + 1;
// Show updates only if we're syncing a lot of blocks
if print_updates && end_height - last_scanned_height > 100 {
print!("Syncing {}/{}\r", last_scanned_height, last_block);
if print_updates && end_height - start_height > 100 {
print!("Syncing {}/{}\r", start_height, latest_block);
io::stdout().flush().ok().expect("Could not flush stdout");
}
// Fetch compact blocks
info!("Fetching blocks {}-{}", last_scanned_height, end_height);
self.fetch_blocks(last_scanned_height, end_height,
info!("Fetching blocks {}-{}", start_height, end_height);
let last_invalid_height = Arc::new(AtomicI32::new(0));
let last_invalid_height_inner = last_invalid_height.clone();
self.fetch_blocks(start_height, end_height,
move |encoded_block: &[u8]| {
local_light_wallet.scan_block(encoded_block);
// Process the block only if there were no previous errors
if last_invalid_height_inner.load(Ordering::SeqCst) > 0 {
return;
}
match local_light_wallet.scan_block(encoded_block) {
Ok(_) => {},
Err(invalid_height) => {
// Block at this height seems to be invalid, so invalidate up till that point
last_invalid_height_inner.store(invalid_height, Ordering::SeqCst);
}
}
local_bytes_downloaded.fetch_add(encoded_block.len(), Ordering::SeqCst);
});
// Check if there was any invalid block, which means we might have to do a reorg
let invalid_height = last_invalid_height.load(Ordering::SeqCst);
if invalid_height > 0 {
total_reorg += self.wallet.invalidate_block(invalid_height);
warn!("Invalidated block at height {}. Total reorg is now {}", invalid_height, total_reorg);
}
// Make sure we're not re-orging too much!
if total_reorg > 99 {
error!("Reorg has now exceeded 100 blocks!");
return "Reorg has exceeded 100 blocks. Aborting.".to_string();
}
if invalid_height > 0 {
// Reset the scanning heights
last_scanned_height = (invalid_height - 1) as u64;
end_height = std::cmp::min(last_scanned_height + 1000, latest_block);
warn!("Reorg: reset scanning from {} to {}", last_scanned_height, end_height);
continue;
}
// If it got here, that means the blocks are scanning properly now.
// So, reset the total_reorg
total_reorg = 0;
// We'll also fetch all the txids that our transparent addresses are involved with
// TODO: Use for all t addresses
let address = self.wallet.address_from_sk(&self.wallet.tkeys[0]);
let wallet = self.wallet.clone();
self.fetch_transparent_txids(address, last_scanned_height, end_height,
self.fetch_transparent_txids(address, start_height, end_height,
move |tx_bytes: &[u8], height: u64 | {
let tx = Transaction::read(tx_bytes).unwrap();
@ -576,13 +623,13 @@ impl LightClient {
}
);
last_scanned_height = end_height + 1;
last_scanned_height = end_height;
end_height = last_scanned_height + 1000 - 1;
if last_scanned_height > last_block {
if last_scanned_height >= latest_block {
break;
} else if end_height > last_block {
end_height = last_block;
} else if end_height > latest_block {
end_height = latest_block;
}
}
if print_updates{
@ -591,8 +638,8 @@ impl LightClient {
let mut responses = vec![];
info!("Synced to {}, Downloaded {} kB", last_block, bytes_downloaded.load(Ordering::SeqCst) / 1024);
responses.push(format!("Synced to {}, Downloaded {} kB", last_block, bytes_downloaded.load(Ordering::SeqCst) / 1024));
info!("Synced to {}, Downloaded {} kB", latest_block, bytes_downloaded.load(Ordering::SeqCst) / 1024);
responses.push(format!("Synced to {}, Downloaded {} kB", latest_block, bytes_downloaded.load(Ordering::SeqCst) / 1024));
// Get the Raw transaction for all the wallet transactions

View File

@ -1,10 +1,10 @@
use std::time::SystemTime;
use std::io::{self, Read, Write};
use std::cmp;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use log::{info, error};
use log::{info, warn, error};
use protobuf::parse_from_bytes;
@ -615,12 +615,59 @@ impl LightWallet {
}
}
pub fn scan_block(&self, block: &[u8]) -> bool {
// Invalidate all blocks including and after "at_height".
// Returns the number of blocks invalidated
pub fn invalidate_block(&self, at_height: i32) -> u64 {
let mut num_invalidated = 0;
// First remove the blocks
{
let mut blks = self.blocks.write().unwrap();
while blks.last().unwrap().height >= at_height {
blks.pop();
num_invalidated += 1;
}
}
// Next, remove transactions
{
let mut txs = self.txs.write().unwrap();
let txids_to_remove = txs.values()
.filter_map(|wtx| if wtx.block >= at_height {Some(wtx.txid.clone())} else {None})
.collect::<HashSet<TxId>>();
for txid in &txids_to_remove {
txs.remove(&txid);
}
// We also need to update any sapling note data and utxos in existing transactions that
// were spent in any of the txids that were removed
txs.values_mut()
.for_each(|wtx| {
wtx.notes.iter_mut()
.for_each(|nd| {
if nd.spent.is_some() && txids_to_remove.contains(&nd.spent.unwrap()) {
nd.spent = None;
}
if nd.unconfirmed_spent.is_some() && txids_to_remove.contains(&nd.spent.unwrap()) {
nd.unconfirmed_spent = None;
}
})
})
}
num_invalidated
}
// Scan a block. Will return an error with the block height that failed to scan
pub fn scan_block(&self, block: &[u8]) -> Result<(), i32> {
let block: CompactBlock = match parse_from_bytes(block) {
Ok(block) => block,
Err(e) => {
eprintln!("Could not parse CompactBlock from bytes: {}", e);
return false;
return Err(-1);
}
};
@ -630,18 +677,26 @@ impl LightWallet {
// If the last scanned block is rescanned, check it still matches.
if let Some(hash) = self.blocks.read().unwrap().last().map(|block| block.hash) {
if block.hash() != hash {
eprintln!("Block hash does not match for block {}. {} vs {}", height, block.hash(), hash);
return false;
warn!("Likely reorg. Block hash does not match for block {}. {} vs {}", height, block.hash(), hash);
return Err(height);
}
}
return true;
return Ok(())
} else if height != (self.last_scanned_height() + 1) {
eprintln!(
error!(
"Block is not height-sequential (expected {}, found {})",
self.last_scanned_height() + 1,
height
);
return false;
return Err(self.last_scanned_height());
}
// Check to see that the previous block hash matches
if let Some(hash) = self.blocks.read().unwrap().last().map(|block| block.hash) {
if block.prev_hash() != hash {
warn!("Likely reorg. Prev block hash does not match for block {}. {} vs {}", height, block.prev_hash(), hash);
return Err(height-1);
}
}
// Get the most recent scanned data.
@ -770,7 +825,7 @@ impl LightWallet {
}
}
true
Ok(())
}
pub fn send_to_address(