Skip to content

Commit

Permalink
fixed build error
Browse files Browse the repository at this point in the history
  • Loading branch information
welbon committed Feb 20, 2024
1 parent 0a2fd42 commit 985d02d
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 92 deletions.
46 changes: 30 additions & 16 deletions cmd/db-exporter/src/command_decode_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use crate::command_progress::{
ParallelCommand, ParallelCommandFilter, ParallelCommandObserver, ParallelCommandProgress,
ParallelCommandReadBlockFromDB,
};
use crate::init_db_obj;
use anyhow::Result;
use chrono::{TimeZone, Utc};
use clap::Parser;
Expand All @@ -13,6 +13,8 @@ use move_binary_format::errors::{Location, PartialVMError};
use serde::Serialize;
use starcoin_abi_decoder;
use starcoin_abi_decoder::DecodedTransactionPayload;
use starcoin_config::BuiltinNetworkID::Barnard;
use starcoin_config::ChainNetwork;
use starcoin_crypto::{hash::CryptoHash, HashValue};
use starcoin_statedb::ChainStateDB;
use starcoin_storage::Storage;
Expand All @@ -31,13 +33,19 @@ const DECODE_PAYLOAD_COMMAND_NAME: &str = "decode_payload_command";
)]
pub struct DecodePayloadCommandOptions {
#[clap(long, short = 'i', parse(from_os_str))]
/// input file, like accounts.csv
/// Db path
pub input_path: PathBuf,

#[clap(long, short = 'o', parse(from_os_str))]
/// output file, like accounts.csv
pub output_path: PathBuf,

#[clap(long)]
pub start_height: Option<u64>,

#[clap(long)]
pub end_height: Option<u64>,

#[clap(long, short = 's')]
/// Signer filter
pub signer: Option<String>,
Expand Down Expand Up @@ -111,7 +119,7 @@ impl ParallelCommandObserver for CommandDecodePayload {
}
}

impl ParallelCommand<CommandDecodePayload, Block, DecodePayloadCommandError> for Block {
impl ParallelCommand<CommandDecodePayload, DecodePayloadCommandError> for Block {
fn execute(&self, command: &CommandDecodePayload) -> (usize, Vec<DecodePayloadCommandError>) {
// let errors = vec![];
// let mut success_module_size = 0;
Expand Down Expand Up @@ -184,35 +192,41 @@ impl ParallelCommand<CommandDecodePayload, Block, DecodePayloadCommandError> for
pub fn decode_payload(
input_path: PathBuf,
out_path: PathBuf,
db_path: PathBuf,
start_height: Option<u64>,
end_height: Option<u64>,
filter: Option<ParallelCommandFilter>,
) -> Result<()> {
let file = WriterBuilder::new().from_path(out_path.clone())?;
let writer_mutex = Mutex::new(file);

let (dbreader, storage) = ParallelCommandReadBlockFromDB::new(
input_path,
ChainNetwork::from(Barnard),
start_height.unwrap_or(0),
end_height.unwrap_or(0),
)?;
let command = Arc::new(CommandDecodePayload {
writer_mutex,
storage: init_db_obj(db_path)?,
storage,
});

ParallelCommandProgress::new(
String::from(DECODE_PAYLOAD_COMMAND_NAME),
input_path,
num_cpus::get(),
Arc::new(dbreader),
filter,
Some(command.clone() as Arc<dyn ParallelCommandObserver>),
)
.progress::<CommandDecodePayload, Block, DecodePayloadCommandError>(&command)
.progress::<CommandDecodePayload, DecodePayloadCommandError>(&command)
}

#[test]
pub fn test_decode_payload() {
let mut workspace = PathBuf::from("/Users/bobong/Downloads/STC-DB-mainnet");
let mut input = workspace.clone();
input.push("grep-'LocalPool'.json");

let mut output = workspace.clone();
output.push("output.csv");

decode_payload(input, output, PathBuf::from(""), None)?;
pub fn test_decode_payload() -> Result<()> {
decode_payload(
PathBuf::from("~/.starcoin/barnard"),
PathBuf::from("/Users/bobong/Downloads/STC-DB-mainnet/output.csv"),
None,
None,
None,
)
}
222 changes: 178 additions & 44 deletions cmd/db-exporter/src/command_progress.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use anyhow::{bail, Result};
use anyhow::{bail, format_err, Result};
use indicatif::{ProgressBar, ProgressStyle};
use rayon::prelude::*;
use starcoin_chain::{BlockChain, ChainReader};
use starcoin_config::ChainNetwork;
use starcoin_genesis::Genesis;
use starcoin_storage::cache_storage::CacheStorage;
use starcoin_storage::db_storage::DBStorage;
use starcoin_storage::storage::StorageInstance;
use starcoin_storage::{Storage, StorageVersion};
use starcoin_types::block::{Block, BlockNumber};
use starcoin_vm_types::language_storage::TypeTag;
use std::sync::Arc;
use std::{
fs::File,
io::{BufRead, BufReader},
path::PathBuf,
time::SystemTime,
};
use std::{fs::File, io::{BufRead, BufReader}, path::PathBuf, time::SystemTime};
use std::io::{Seek, SeekFrom};

struct CommandResult {
succeed: usize,
Expand All @@ -23,9 +27,11 @@ impl CommandResult {

pub struct ParallelCommandFilter {
signer: Option<String>,
func_name: Option<String>, // function name for filter, none for all
ty_args: Option<Vec<String>>, // template parameter for filter, none for all
args: Option<Vec<String>>, // arguments type for filter, none for all
func_name: Option<String>,
// function name for filter, none for all
ty_args: Option<Vec<String>>,
// template parameter for filter, none for all
args: Option<Vec<String>>, // arguments type for filter, none for all
}

impl ParallelCommandFilter {
Expand Down Expand Up @@ -66,83 +72,201 @@ impl ParallelCommandFilter {
}
}

pub trait ParallelCommandObserver {
fn before_progress(&self) -> Result<()>;
fn after_progress(&self) -> Result<()>;
pub struct ParallelCommandReadBodyFromExportLine {
file: File,
line_count: u64,
}

impl ParallelCommandReadBodyFromExportLine {

fn count_lines(reader: &mut BufReader<File>) -> Result<u64> {
let line_count = reader.lines().count();
reader.seek(SeekFrom::Start(0))?;
Ok(line_count as u64)
}

pub fn new(input_path: PathBuf) -> Result<Self> {
let file = File::open(input_path.display().to_string())?;
let line_count = ParallelCommandReadBodyFromExportLine::count_lines(&mut BufReader::new(file.try_clone()?))?;
Ok(Self {
file,
line_count,
})
}
}

impl ParallelCommandBlockReader for ParallelCommandReadBodyFromExportLine {
fn get_progress_interval(&self) -> u64 {
self.line_count
}

fn read(&self) -> Result<Vec<Block>> {
let reader = BufReader::new(self.file.try_clone()?);
let lines = reader.lines().collect::<Result<Vec<_>, _>>()?;
Ok(lines
.par_iter()
.map(|line| Ok(serde_json::from_str::<Block>(line.as_str()))?)
.collect::<Result<Vec<Block>, _>>()?)
}
}

pub struct ParallelCommandReadBlockFromDB {
start_num: u64,
end_num: u64,
chain: Arc<BlockChain>,
}

const BLOCK_GAP: u64 = 1000;

impl ParallelCommandReadBlockFromDB {
pub fn new(
input_path: PathBuf,
net: ChainNetwork,
start: u64,
end: u64,
) -> Result<(Self, Arc<Storage>)> {
let storage = Self::init_db_obj(input_path.clone()).expect("Failed to initialize db");
let (chain_info, _) =
Genesis::init_and_check_storage(&net, storage.clone(), input_path.as_ref())
.expect("Failed init_and_check_storage");
let chain = BlockChain::new(net.time_service(), chain_info.head().id(), storage.clone(), None)
.expect("Failed to initialize block chain");

let cur_num = chain.status().head().number();

let (start_num, end_num) = if start != 0 && end == 0 {
(0, cur_num)
} else {
let end = if cur_num > end + BLOCK_GAP {
end
} else if cur_num > BLOCK_GAP {
cur_num - BLOCK_GAP
} else {
end
};
(start, end)
};

if start > cur_num || start > end {
return Err(format_err!(
"cur_num {} start {} end {} illegal",
cur_num,
start,
end
));
};

Ok((
Self {
start_num,
end_num,
chain: Arc::new(chain),
},
storage,
))
}
fn init_db_obj(db_path: PathBuf) -> Result<Arc<Storage>> {
let db_storage = DBStorage::open_with_cfs(
db_path.join("starcoindb/db/starcoindb"),
StorageVersion::current_version()
.get_column_family_names()
.to_vec(),
true,
Default::default(),
None,
)?;
Ok(Arc::new(Storage::new(
StorageInstance::new_cache_and_db_instance(CacheStorage::new(None), db_storage),
)?))
}
}

impl ParallelCommandBlockReader for ParallelCommandReadBlockFromDB {
fn get_progress_interval(&self) -> u64 {
self.end_num - self.start_num
}

fn read(&self) -> Result<Vec<Block>> {
let ret = (self.start_num..=self.end_num)
.collect::<Vec<BlockNumber>>()
.into_iter()
.map(|num| {
// progress_bar.set_message(format!("load block {}", num));
// progress_bar.inc(1);
self.chain.get_block_by_number(num).ok()?
})
.filter(|block| block.is_some())
.map(|block| block.unwrap())
.collect();
Ok(ret)
}
}

pub struct ParallelCommandProgress {
name: String,
file_path: PathBuf,
parallel_level: usize,
block_reader: Arc<dyn ParallelCommandBlockReader>,
filter: Option<ParallelCommandFilter>,
obs: Option<Arc<dyn ParallelCommandObserver>>,
}

impl ParallelCommandProgress {
pub fn new(
name: String,
file_path: PathBuf,
parallel_level: usize,
reader: Arc<dyn ParallelCommandBlockReader>,
filter: Option<ParallelCommandFilter>,
obs: Option<Arc<dyn ParallelCommandObserver>>,
) -> ParallelCommandProgress {
Self {
file_path,
name,
block_reader: reader.clone(),
parallel_level,
filter,
obs,
}
}

pub fn progress<CommandT: Sync + Send, BodyT, ErrorT>(self, command: &CommandT) -> Result<()>
pub fn progress<CommandT: Sync + Send, ErrorT>(self, command: &CommandT) -> Result<()>
where
BodyT: ParallelCommand<CommandT, BodyT, ErrorT>
+ Send
+ Sync
+ Clone
+ serde::Serialize
+ for<'a> serde::Deserialize<'a>
+ 'static,
Block: ParallelCommand<CommandT, ErrorT>,
{
println!("Start progress task, batch_size: {:?}", self.parallel_level);

let mut start_time = SystemTime::now();
let file_name = self.file_path.display().to_string();
let reader = BufReader::new(File::open(file_name)?);
//let file_name = self.file_path.display().to_string();
//let reader = BufReader::new(File::open(file_name)?);
println!(
"Reading file process expire mini seconds time: {:?}",
SystemTime::now().duration_since(start_time)?.as_micros()
);

start_time = SystemTime::now();
let lines = reader.lines().collect::<Result<Vec<_>, _>>()?;
// let lines = reader.lines().collect::<Result<Vec<_>, _>>()?;

// let all_items = lines
// .par_iter()
// .map(|line| Ok(serde_json::from_str::<BodyT>(line.as_str()))?)
// .filter(|item| match item {
// Ok(i) => i.matched(&self.filter),
// Err(_e) => false,
// })
// .collect::<Result<Vec<BodyT>, _>>()?;
if let Some(observer) = &self.obs {
observer.before_progress()?;
}

let all_items = lines
.par_iter()
.map(|line| Ok(serde_json::from_str::<BodyT>(line.as_str()))?)
.filter(|item| match item {
Ok(i) => {
if i.matched(&self.filter) {
true
} else {
false
}
}
Err(_e) => false,
})
.collect::<Result<Vec<BodyT>, _>>()?;

let progress_bar = ProgressBar::new(all_items.len() as u64).with_style(
let progress_bar = ProgressBar::new(self.block_reader.get_progress_interval()).with_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:100.cyan/blue} {percent}% {msg}"),
);

let all_items = self.block_reader.read()?;
// .iter()
// .filter(|b| (*b).matched(&self.filter))
// .map(|b| *b)
// .collect();

println!(
"Reading lines from file expire time: {:?}",
SystemTime::now().duration_since(start_time)?.as_secs()
Expand Down Expand Up @@ -194,8 +318,18 @@ impl ParallelCommandProgress {
}
}

pub trait ParallelCommand<CommandT, BodyT, ErrorT> {
pub trait ParallelCommand<CommandT, ErrorT> {
fn execute(&self, cmd: &CommandT) -> (usize, Vec<ErrorT>);

fn matched(&self, filter: &Option<ParallelCommandFilter>) -> bool;
}

pub trait ParallelCommandObserver {
fn before_progress(&self) -> Result<()>;
fn after_progress(&self) -> Result<()>;
}

pub trait ParallelCommandBlockReader {
fn get_progress_interval(&self) -> u64;
fn read(&self) -> Result<Vec<Block>>;
}
Loading

0 comments on commit 985d02d

Please sign in to comment.