Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance parallel #4369

Merged
merged 26 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
20c2204
print less info about ghost data
jackzhhuang Dec 25, 2024
e7f9007
no delete in reset
jackzhhuang Dec 25, 2024
59edb6c
get the block diligently when calling the rpc method
jackzhhuang Dec 26, 2024
200845d
add sync the specific block
jackzhhuang Dec 26, 2024
eb1d9e9
break if succeed to execute the block in waiting list to restart the …
jackzhhuang Dec 26, 2024
d87a1c1
use heap for execution waiting list
jackzhhuang Dec 27, 2024
ee7e8ba
use compact ghostdata
jackzhhuang Dec 27, 2024
80043d7
add/remove some info
jackzhhuang Dec 27, 2024
21d1d43
save the block into the local for cache
jackzhhuang Dec 27, 2024
cc55d25
close sync
jackzhhuang Dec 27, 2024
e01aecd
no process for the future block
jackzhhuang Dec 27, 2024
437f3b3
wait 500ms for fetch the dag block
jackzhhuang Dec 27, 2024
1ae7dd3
add use get block
jackzhhuang Dec 27, 2024
7afed95
add fetch_blocks
jackzhhuang Dec 27, 2024
419c345
add fet hcblock
jackzhhuang Dec 27, 2024
6a87427
use ctx spawn to re execute the dag block
jackzhhuang Dec 31, 2024
a5c14c4
connect the specific block at the last step
jackzhhuang Dec 31, 2024
1e7a6a4
use BTreeSet
jackzhhuang Dec 31, 2024
f72b48b
fix fmt
jackzhhuang Jan 2, 2025
7907652
1, use event handle
jackzhhuang Jan 3, 2025
1b750bb
allow parell sync
jackzhhuang Jan 3, 2025
1ab40fa
remove the dirty tips
jackzhhuang Jan 4, 2025
47078d4
check the dag data interity
jackzhhuang Jan 7, 2025
a75c045
1, re insert the reachability data
jackzhhuang Jan 7, 2025
5753b5b
fix typo
jackzhhuang Jan 7, 2025
9528d37
remove unused if block
jackzhhuang Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions block-relayer/src/block_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,6 @@ impl EventHandler<Self, PeerCompactBlockMessage> for BlockRelayer {
if let Some(metrics) = self.metrics.as_ref() {
metrics.block_relay_time.observe(time_sec);
}
sl_info!(
"{action} {hash} {time_sec}",
time_sec = time_sec,
hash = compact_block_msg.message.compact_block.header.id().to_hex(),
action = "block_relay_time",
);
//TODO should filter too old block?

if let Err(e) = self.handle_block_event(compact_block_msg, ctx) {
Expand Down
11 changes: 10 additions & 1 deletion chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,15 @@ impl BlockChain {
.ok_or_else(|| format_err!("Can not find block hash by number {}", number))
}

pub fn check_parents_ready(&self, header: &BlockHeader) -> bool {
header.parents_hash().into_iter().all(|parent| {
self.has_dag_block(parent).unwrap_or_else(|e| {
warn!("check_parents_ready error: {:?}", e);
false
})
})
}
Comment on lines +387 to +394
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error recovery mechanism in parent readiness check.

The check_parents_ready method silently continues on errors by returning false. Consider either:

  1. Propagating the error up for better error handling
  2. Adding metrics to track these failures
 pub fn check_parents_ready(&self, header: &BlockHeader) -> bool {
     header.parents_hash().into_iter().all(|parent| {
         self.has_dag_block(parent).unwrap_or_else(|e| {
+            if let Some(metrics) = &self.metrics {
+                metrics.parent_check_errors.inc();
+            }
             warn!("check_parents_ready error: {:?}", e);
             false
         })
     })
 }

Committable suggestion skipped: line range outside the PR's diff.


fn check_exist_block(&self, block_id: HashValue, block_number: BlockNumber) -> Result<bool> {
Ok(self
.get_hash_by_number(block_number)?
Expand Down Expand Up @@ -1360,7 +1369,7 @@ impl ChainReader for BlockChain {
return Ok(false);
}

self.dag.has_dag_block(header.id())
self.dag.has_block_connected(&header)
}

fn check_chain_type(&self) -> Result<ChainType> {
Expand Down
117 changes: 107 additions & 10 deletions flexidag/src/blockdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::process_key_already_error;
use crate::prune::pruning_point_manager::PruningPointManagerT;
use crate::reachability::ReachabilityError;
use anyhow::{bail, ensure, Ok};
use parking_lot::Mutex;
use rocksdb::WriteBatch;
use starcoin_config::temp_dir;
use starcoin_crypto::{HashValue as Hash, HashValue};
Expand Down Expand Up @@ -54,6 +55,7 @@ pub struct BlockDAG {
pub storage: FlexiDagStorage,
ghostdag_manager: DbGhostdagManager,
pruning_point_manager: PruningPointManager,
commit_lock: Arc<Mutex<FlexiDagStorage>>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential Performance Bottleneck Due to commit_lock Mutex

The addition of commit_lock: Arc<Mutex<FlexiDagStorage>> introduces a mutex that locks the entire FlexiDagStorage during commit operations. This could lead to performance issues due to contention in concurrent environments.

Consider implementing more fine-grained locking mechanisms or using lock-free data structures to reduce contention:

-pub struct BlockDAG {
-    pub storage: FlexiDagStorage,
-    ghostdag_manager: DbGhostdagManager,
-    pruning_point_manager: PruningPointManager,
-    commit_lock: Arc<Mutex<FlexiDagStorage>>,
+pub struct BlockDAG {
+    pub storage: Arc<FlexiDagStorage>,
+    ghostdag_manager: DbGhostdagManager,
+    pruning_point_manager: PruningPointManager,
+    // Remove commit_lock and use internal synchronization mechanisms
 }

Committable suggestion skipped: line range outside the PR's diff.

}

impl BlockDAG {
Expand All @@ -75,11 +77,12 @@ impl BlockDAG {
reachability_service.clone(),
);
let pruning_point_manager = PruningPointManager::new(reachability_service, ghostdag_store);

let commit_lock = Arc::new(Mutex::new(db.clone()));
Self {
ghostdag_manager,
storage: db,
pruning_point_manager,
commit_lock,
}
}

Expand All @@ -98,13 +101,101 @@ impl BlockDAG {
Ok(Self::new(k, dag_storage))
}

pub fn has_dag_block(&self, hash: Hash) -> anyhow::Result<bool> {
Ok(self.storage.header_store.has(hash)?)
pub fn has_block_connected(&self, block_header: &BlockHeader) -> anyhow::Result<bool> {
let _ghostdata = match self.storage.ghost_dag_store.get_data(block_header.id()) {
std::result::Result::Ok(data) => data,
Err(e) => {
warn!(
"failed to get ghostdata by hash: {:?}, the block should be re-executed",
e
);
return anyhow::Result::Ok(false);
}
};

let _dag_header = match self.storage.header_store.get_header(block_header.id()) {
std::result::Result::Ok(header) => header,
Err(e) => {
warn!(
"failed to get header by hash: {:?}, the block should be re-executed",
e
);
return anyhow::Result::Ok(false);
}
};

let parents = match self
.storage
.relations_store
.read()
.get_parents(block_header.id())
{
std::result::Result::Ok(parents) => parents,
Err(e) => {
warn!(
"failed to get parents by hash: {:?}, the block should be re-executed",
e
);
return anyhow::Result::Ok(false);
}
};

if !parents.iter().all(|parent| {
let children = match self.storage.relations_store.read().get_children(*parent) {
std::result::Result::Ok(children) => children,
Err(e) => {
warn!("failed to get children by hash: {:?}, the block should be re-executed", e);
return false;
}
};

if !children.contains(&block_header.id()) {
warn!("the parent: {:?} does not have the child: {:?}", parent, block_header.id());
return false;
}

match inquirer::is_dag_ancestor_of(&*self.storage.reachability_store.read(), *parent, block_header.id()) {
std::result::Result::Ok(pass) => {
if !pass {
warn!("failed to check ancestor, the block: {:?} is not the descendant of its parent: {:?}, the block should be re-executed", block_header.id(), *parent);
return false;
}
true
}
Err(e) => {
warn!("failed to check ancestor, the block: {:?} is not the descendant of its parent: {:?}, the block should be re-executed, error: {:?}", block_header.id(), *parent, e);
false
}
}
}) {
return anyhow::Result::Ok(false);
}

if block_header.pruning_point() == HashValue::zero() {
return anyhow::Result::Ok(true);
} else {
match inquirer::is_dag_ancestor_of(
&*self.storage.reachability_store.read(),
block_header.pruning_point(),
block_header.id(),
) {
std::result::Result::Ok(pass) => {
if !pass {
warn!("failed to check ancestor, the block: {:?} is not the descendant of the pruning: {:?}", block_header.id(), block_header.pruning_point());
return anyhow::Result::Ok(false);
}
}
Err(e) => {
warn!("failed to check ancestor, the block: {:?} is not the descendant of the pruning: {:?}, error: {:?}", block_header.id(), block_header.pruning_point(), e);
return anyhow::Result::Ok(false);
}
}
}

anyhow::Result::Ok(true)
}

pub fn check_ancestor_of(&self, ancestor: Hash, descendant: Hash) -> anyhow::Result<bool> {
// self.ghostdag_manager
// .check_ancestor_of(ancestor, descendant)
inquirer::is_dag_ancestor_of(
&*self.storage.reachability_store.read(),
ancestor,
Expand Down Expand Up @@ -239,11 +330,12 @@ impl BlockDAG {
);
}

info!("start to commit via batch, header id: {:?}", header.id());

// Create a DB batch writer
let mut batch = WriteBatch::default();

info!("start to commit via batch, header id: {:?}", header.id());
let lock_guard = self.commit_lock.lock();

Comment on lines +337 to +338
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Risk of Deadlock Due to Mutex Locking Order

In both commit_trusted_block and commit methods, the commit_lock mutex is acquired before other locks (e.g., reachability_store), but in other parts of the code, locks may be acquired in a different order. This inconsistency can lead to deadlocks.

Ensure that all locks are always acquired in the same order throughout the codebase. Review the locking strategy and consider combining locks or redesigning to avoid locking altogether if possible.

Also applies to: 477-479

// lock the dag data to write in batch
// the cache will be written at the same time
// when the batch is written before flush to the disk and
Expand Down Expand Up @@ -322,6 +414,7 @@ impl BlockDAG {
.write_batch(batch)
.expect("failed to write dag data in batch");

drop(lock_guard);
info!("finish writing the batch, head id: {:?}", header.id());

Ok(())
Expand Down Expand Up @@ -381,6 +474,9 @@ impl BlockDAG {
// Create a DB batch writer
let mut batch = WriteBatch::default();

info!("start to commit via batch, header id: {:?}", header.id());
let lock_guard = self.commit_lock.lock();

// lock the dag data to write in batch, read lock.
// the cache will be written at the same time
// when the batch is written before flush to the disk and
Expand Down Expand Up @@ -460,6 +556,7 @@ impl BlockDAG {
.write_batch(batch)
.expect("failed to write dag data in batch");

drop(lock_guard);
info!("finish writing the batch, head id: {:?}", header.id());

Ok(())
Expand Down Expand Up @@ -533,12 +630,12 @@ impl BlockDAG {
pruning_depth: u64,
pruning_finality: u64,
) -> anyhow::Result<MineNewDagBlockInfo> {
info!("start to calculate the mergeset and tips, previous pruning point: {:?}, previous ghostdata: {:?}", previous_pruning_point, previous_ghostdata);
info!("start to calculate the mergeset and tips, previous pruning point: {:?}, previous ghostdata: {:?} and its red block count: {:?}", previous_pruning_point, previous_ghostdata.to_compact(), previous_ghostdata.mergeset_reds.len());
let dag_state = self.get_dag_state(previous_pruning_point)?;
let next_ghostdata = self.ghostdata(&dag_state.tips)?;
info!(
"start to calculate the mergeset and tips for tips: {:?}, and last pruning point: {:?} and next ghostdata: {:?}",
dag_state.tips, previous_pruning_point, next_ghostdata,
"start to calculate the mergeset and tips for tips: {:?}, and last pruning point: {:?} and next ghostdata: {:?}, red block count: {:?}",
dag_state.tips, previous_pruning_point, next_ghostdata.to_compact(), next_ghostdata.mergeset_reds.len()
);
let next_pruning_point = self.pruning_point_manager().next_pruning_point(
previous_pruning_point,
Expand Down
24 changes: 23 additions & 1 deletion flexidag/src/reachability/inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,30 @@ fn add_dag_block(
mergeset_iterator: HashIterator,
) -> Result<()> {
// Update the future covering set for blocks in the mergeset
let mut insert_future_set_result: Vec<std::result::Result<(), ReachabilityError>> = Vec::new();
for merged_block in mergeset_iterator {
insert_to_future_covering_set(store, merged_block, new_block)?;
let result = insert_to_future_covering_set(store, merged_block, new_block);
if result.is_err() {
match result {
Err(ReachabilityError::DataInconsistency) => {
// This is a data inconsistency error, which means that the block is already in the future covering set
// of the merged block. This is a serious error, and we should propagate it.
insert_future_set_result.push(Err(ReachabilityError::DataInconsistency));
}
Err(ReachabilityError::HashesNotOrdered) => {
// This is a hashes not ordered error, which means that the merged block is not in the future covering set
// of the new block. This is a serious error, and we should propagate it.
return Err(ReachabilityError::HashesNotOrdered);
}
_ => {
// This is an unexpected error, and we should propagate it.
return result;
}
}
}
}
for result in insert_future_set_result.into_iter() {
result?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion flexidag/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async fn test_with_spawn() {
std::result::Result::Ok(_) => break,
Err(e) => {
debug!("failed to commit error: {:?}, i: {:?}", e, i);
if dag_clone.has_dag_block(block_clone.id()).unwrap() {
if dag_clone.has_block_connected(&block_clone).unwrap() {
break;
}
count -= 1;
Expand Down
12 changes: 6 additions & 6 deletions network-p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,12 +1308,12 @@ impl<T: BusinessLayerHandle + Send> Future for NetworkWorker<T> {
})) => {
if let Some(metrics) = this.metrics.as_ref() {
for (protocol, message) in &messages {
info!(
"[network-p2p] receive notification from {} {} {}",
remote,
protocol,
message.len()
);
// info!(
// "[network-p2p] receive notification from {} {} {}",
// remote,
// protocol,
// message.len()
// );
metrics
.notifications_sizes
.with_label_values(&["in", protocol])
Expand Down
20 changes: 10 additions & 10 deletions network/api/src/peer_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,18 @@ impl PeerSelector {
peers
});
if best_peers.is_empty() || best_peers[0].total_difficulty() <= min_difficulty {
info!(
"best peer difficulty {:?} is smaller than min difficulty {:?}, return None",
best_peers[0].total_difficulty(),
min_difficulty
);
// info!(
// "best peer difficulty {:?} is smaller than min difficulty {:?}, return None",
// best_peers[0].total_difficulty(),
// min_difficulty
// );
None
} else {
info!(
"best peer difficulty {:?}, info: {:?} picked",
best_peers[0].total_difficulty(),
best_peers
);
// info!(
// "best peer difficulty {:?}, info: {:?} picked",
// best_peers[0].total_difficulty(),
// best_peers
// );
Some(best_peers)
}
}
Expand Down
Loading
Loading