Skip to content

Commit

Permalink
fix(fork-network): fix handling of shard IDs (near#12469)
Browse files Browse the repository at this point in the history
When the `init` command is run, the fork-network command writes state
roots to the DB by key "FORK_TOOL_SHARD_ID:{shard_id}", and later reads
them back by iterating over that prefix. But there is inconsistent
treatment of these ShardIds sometimes as ShardIds and sometimes as
ShardIndexes. Firstly, the name of the key itself indicates we should
expect ShardIds to be there. But when we write them, we collect the
state roots in a Vec in order of ShardLayout::shard_ids(), and then
iter().enumerate() over that to write the (shard_id, state_root), which
is really a shard index. Then when we read them back, we index into that
array with `shard_uid.shard_id as usize`, which doesn't work when the
ShardIds are shuffled.

When running this on a localnet where shard IDs are shuffled, this gives
us this error:

```
thread '<unnamed>' panicked at tools/fork-network/src/cli.rs:732:22:
called `Result::unwrap()` on an `Err` value: StorageInconsistentState("Failed to find root node EEMN57dXxneVQHDZ5aZreA8SHF6Xtee95CKpGfeMEmGN in memtrie")
```

So fix it by actually writing a real ShardId to the
`FORK_TOOL_SHARD_ID:...` keys, and actually parsing the ShardId from
that key when iterating (instead of an enumerate() to get the shard ID).
And just be more careful about the ShardId <-> ShardIndex mapping
throughout.
  • Loading branch information
marcelo-gonzalez authored Nov 18, 2024
1 parent 8c180e0 commit 6dab9ee
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 30 deletions.
2 changes: 2 additions & 0 deletions core/primitives/src/shard_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ impl fmt::Display for ShardLayoutError {
}
}

impl std::error::Error for ShardLayoutError {}

impl ShardLayout {
/// Handy constructor for a single-shard layout, mostly for test purposes
pub fn single_shard() -> Self {
Expand Down
88 changes: 58 additions & 30 deletions tools/fork-network/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use near_primitives::state_record::StateRecord;
use near_primitives::trie_key::col;
use near_primitives::trie_key::trie_key_parsers::parse_account_id_from_account_key;
use near_primitives::types::{
AccountId, AccountInfo, Balance, BlockHeight, EpochId, NumBlocks, StateRoot,
AccountId, AccountInfo, Balance, BlockHeight, EpochId, NumBlocks, ShardId, StateRoot,
};
use near_primitives::version::{ProtocolVersion, PROTOCOL_VERSION};
use near_store::adapter::StoreAdapter;
Expand All @@ -40,6 +40,7 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use strum::IntoEnumIterator;

Expand Down Expand Up @@ -123,6 +124,20 @@ struct SetValidatorsCmd {
pub protocol_version: Option<ProtocolVersion>,
}

const FORKED_ROOTS_KEY_PREFIX: &str = "FORK_TOOL_SHARD_ID:";

fn parse_state_roots_key(key: &[u8]) -> anyhow::Result<ShardId> {
let key = std::str::from_utf8(key)?;
// Sanity check assertion since we should be iterating based on this prefix
assert!(key.starts_with(FORKED_ROOTS_KEY_PREFIX));
let int_part = &key[FORKED_ROOTS_KEY_PREFIX.len()..];
ShardId::from_str(int_part).with_context(|| format!("Failed parsing ShardId from {}", int_part))
}

fn make_state_roots_key(shard_id: ShardId) -> Vec<u8> {
format!("{FORKED_ROOTS_KEY_PREFIX}{shard_id}").into_bytes()
}

#[derive(clap::Parser)]
struct ResetCmd;

Expand Down Expand Up @@ -276,7 +291,7 @@ impl ForkNetworkCommand {

// Advance flat heads to the same (max) block height to ensure
// consistency of state across the shards.
let state_roots: Vec<StateRoot> = shard_layout
let state_roots: Vec<(ShardId, StateRoot)> = shard_layout
.shard_ids()
.map(|shard_id| {
let shard_uid = epoch_manager.shard_id_to_uid(shard_id, epoch_id).unwrap();
Expand All @@ -287,7 +302,7 @@ impl ForkNetworkCommand {
let chunk_extra = chain.get_chunk_extra(&desired_block_hash, &shard_uid).unwrap();
let state_root = chunk_extra.state_root();
tracing::info!(?shard_id, ?epoch_id, ?state_root);
*state_root
(shard_id, *state_root)
})
.collect();

Expand All @@ -305,12 +320,8 @@ impl ForkNetworkCommand {
store_update.set_ser(DBCol::Misc, b"FORK_TOOL_EPOCH_ID", epoch_id)?;
store_update.set_ser(DBCol::Misc, b"FORK_TOOL_BLOCK_HASH", &desired_block_hash)?;
store_update.set(DBCol::Misc, b"FORK_TOOL_BLOCK_HEIGHT", &block_height.to_le_bytes());
for (shard_id, state_root) in state_roots.iter().enumerate() {
store_update.set_ser(
DBCol::Misc,
format!("FORK_TOOL_SHARD_ID:{shard_id}").as_bytes(),
state_root,
)?;
for (shard_id, state_root) in state_roots.iter() {
store_update.set_ser(DBCol::Misc, &make_state_roots_key(*shard_id), state_root)?;
}
store_update.commit()?;
Ok(())
Expand All @@ -329,17 +340,19 @@ impl ForkNetworkCommand {
let storage = open_storage(&home_dir, near_config).unwrap();
let store = storage.get_hot_store();

let (prev_state_roots, prev_hash, epoch_id, block_height) =
self.get_state_roots_and_hash(store.clone())?;
tracing::info!(?prev_state_roots, ?epoch_id, ?prev_hash);

let epoch_manager = EpochManager::new_arc_handle(
store.clone(),
&near_config.genesis.config,
Some(home_dir),
);
let (prev_state_roots, prev_hash, epoch_id, block_height) =
self.get_state_roots_and_hash(epoch_manager.as_ref(), store.clone())?;
tracing::info!(?prev_state_roots, ?epoch_id, ?prev_hash);

let shard_layout = epoch_manager.get_shard_layout(&epoch_id)?;
let all_shard_uids = shard_layout.shard_uids().collect::<Vec<_>>();
assert_eq!(all_shard_uids.len(), prev_state_roots.len());

let runtime =
NightshadeRuntime::from_config(home_dir, store.clone(), &near_config, epoch_manager)
.context("could not create the transaction runtime")?;
Expand All @@ -350,9 +363,17 @@ impl ForkNetworkCommand {
SingleShardStorageMutator::new(&runtime.clone(), prev_state_root)
});

let prev_state_roots = prev_state_roots
.into_iter()
.enumerate()
.map(|(index, root)| {
let shard_id = shard_layout.get_shard_id(index).unwrap();
let shard_uid = ShardUId::from_shard_id_and_layout(shard_id, &shard_layout);
(shard_uid, root)
})
.collect::<Vec<_>>();
let new_state_roots = self.prepare_state(
batch_size,
&all_shard_uids,
store,
&prev_state_roots,
block_height,
Expand Down Expand Up @@ -380,15 +401,15 @@ impl ForkNetworkCommand {
let storage = open_storage(&home_dir, near_config).unwrap();
let store = storage.get_hot_store();

let (prev_state_roots, _prev_hash, epoch_id, block_height) =
self.get_state_roots_and_hash(store.clone())?;

let epoch_manager = EpochManager::new_arc_handle(
store.clone(),
&near_config.genesis.config,
Some(home_dir),
);

let (prev_state_roots, _prev_hash, epoch_id, block_height) =
self.get_state_roots_and_hash(epoch_manager.as_ref(), store.clone())?;

let runtime =
NightshadeRuntime::from_config(home_dir, store, &near_config, epoch_manager.clone())
.context("could not create the transaction runtime")?;
Expand Down Expand Up @@ -440,24 +461,32 @@ impl ForkNetworkCommand {
Ok(())
}

// The Vec<StateRoot> returned is in ShardIndex order
fn get_state_roots_and_hash(
&self,
epoch_manager: &EpochManagerHandle,
store: Store,
) -> anyhow::Result<(Vec<StateRoot>, CryptoHash, EpochId, BlockHeight)> {
let epoch_id = EpochId(store.get_ser(DBCol::Misc, b"FORK_TOOL_EPOCH_ID")?.unwrap());
let block_hash = store.get_ser(DBCol::Misc, b"FORK_TOOL_BLOCK_HASH")?.unwrap();
let block_height = store.get(DBCol::Misc, b"FORK_TOOL_BLOCK_HEIGHT")?.unwrap();
let block_height = u64::from_le_bytes(block_height.as_slice().try_into().unwrap());
let mut state_roots = vec![];
for (shard_id, item) in
store.iter_prefix(DBCol::Misc, "FORK_TOOL_SHARD_ID:".as_bytes()).enumerate()
{
let shard_layout = epoch_manager
.get_shard_layout(&epoch_id)
.with_context(|| format!("Failed getting shard layout for epoch {}", &epoch_id.0))?;
let mut state_roots = vec![None; shard_layout.shard_ids().count()];
for item in store.iter_prefix(DBCol::Misc, FORKED_ROOTS_KEY_PREFIX.as_bytes()) {
let (key, value) = item?;
let key = String::from_utf8(key.to_vec())?;
let state_root = borsh::from_slice(&value)?;
assert_eq!(key, format!("FORK_TOOL_SHARD_ID:{shard_id}"));
state_roots.push(state_root);
let shard_id = parse_state_roots_key(&key)?;
let shard_index = shard_layout
.get_shard_index(shard_id)
.with_context(|| format!("Failed finding shard index for {}", shard_id))?;
let state_root: StateRoot = borsh::from_slice(&value)?;

assert!(state_roots[shard_index].is_none());
state_roots[shard_index] = Some(state_root);
}
let state_roots = state_roots.into_iter().map(|s| s.unwrap()).collect();
tracing::info!(?state_roots, ?block_hash, ?epoch_id, block_height);
Ok((state_roots, block_hash, epoch_id, block_height))
}
Expand Down Expand Up @@ -711,21 +740,20 @@ impl ForkNetworkCommand {
fn prepare_state(
&self,
batch_size: u64,
all_shard_uids: &[ShardUId],
store: Store,
prev_state_roots: &[StateRoot],
prev_state_roots: &[(ShardUId, StateRoot)],
block_height: BlockHeight,
make_storage_mutator: MakeSingleShardStorageMutatorFn,
) -> anyhow::Result<Vec<StateRoot>> {
let state_roots = all_shard_uids
let state_roots = prev_state_roots
.into_par_iter()
.map(|shard_uid| {
.map(|(shard_uid, state_root)| {
let state_root = self
.prepare_shard_state(
batch_size,
*shard_uid,
store.clone(),
prev_state_roots[shard_uid.shard_id as usize],
*state_root,
block_height,
make_storage_mutator.clone(),
)
Expand Down

0 comments on commit 6dab9ee

Please sign in to comment.