From 5e07076ade433ad0f64574e82abedb84ce15b90a Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Thu, 23 Jan 2025 15:49:26 -0500 Subject: [PATCH 01/11] primitives: added block/epoch commitment types --- crates/primitives/src/epoch.rs | 109 ++++++++++++++++++++++++++++++ crates/primitives/src/l1/block.rs | 34 ++++++++++ crates/primitives/src/l2.rs | 35 ++++++++++ crates/primitives/src/lib.rs | 1 + 4 files changed, 179 insertions(+) create mode 100644 crates/primitives/src/epoch.rs diff --git a/crates/primitives/src/epoch.rs b/crates/primitives/src/epoch.rs new file mode 100644 index 000000000..a57bd6980 --- /dev/null +++ b/crates/primitives/src/epoch.rs @@ -0,0 +1,109 @@ +//! Types relating to epoch bookkeeping. +//! +//! An epoch of a range of sequential blocks defined by the terminal block of +//! the epoch going back to (but not including) the terminal block of a previous +//! epoch. This uniquely identifies the epoch's final state indirectly, +//! although it's possible for conflicting epochs with different terminal blocks +//! to exist in theory, depending on the consensus algorithm. +//! +//! Epochs are *usually* always the same number of slots, but we're not +//! guaranteeing this yet, so we always include both the epoch number and slot +//! number of the terminal block. +//! +//! We also have a sentinel "null" epoch used to refer to the "finalized epoch" +//! as of the genesis block. + +use arbitrary::Arbitrary; +use borsh::{BorshDeserialize, BorshSerialize}; +use serde::{Deserialize, Serialize}; + +use crate::{ + buf::Buf32, + l2::{L2BlockCommitment, L2BlockId}, +}; + +/// Commits to a particular epoch by the last block and slot. +#[derive( + Copy, + Clone, + Debug, + Eq, + PartialEq, + Ord, + PartialOrd, + Hash, + Arbitrary, + BorshDeserialize, + BorshSerialize, + Deserialize, + Serialize, +)] +pub struct EpochCommitment { + epoch: u64, + last_slot: u64, + last_blkid: L2BlockId, +} + +impl EpochCommitment { + pub fn new(epoch: u64, last_slot: u64, last_blkid: L2BlockId) -> Self { + Self { + epoch, + last_slot, + last_blkid, + } + } + + /// Creates a "null" epoch with + pub fn null() -> Self { + Self::new(0, 0, L2BlockId::from(Buf32::zero())) + } + + pub fn epoch(&self) -> u64 { + self.epoch + } + + pub fn last_slot(&self) -> u64 { + self.last_slot + } + + pub fn last_blkid(&self) -> &L2BlockId { + &self.last_blkid + } + + /// Returns a [`L2BlockCommitment`] for the final block of the epoch. + pub fn to_block_commitment(&self) -> L2BlockCommitment { + L2BlockCommitment::new(self.last_slot, self.last_blkid) + } + + /// Returns if the terminal blkid is zero. + pub fn is_null(&self) -> bool { + Buf32::from(self.last_blkid).is_zero() + } + + /// Checks if the epoch is sane. + /// + /// Ie. if the terminal blkid is zero then the last slot and epoch number + /// are also zero. + fn sanity_check(&self) -> bool { + if self.is_null() { + self.last_slot == 0 && self.epoch == 0 + } else { + self.last_slot != 0 + } + } +} + +#[cfg(test)] +mod tests { + use super::EpochCommitment; + + #[test] + fn test_epoch_sanity() { + // TODO write test + } + + #[test] + fn test_epoch_insanity() { + // TODO write test + } +} diff --git a/crates/primitives/src/l1/block.rs b/crates/primitives/src/l1/block.rs index 3178ab8a5..b0d0089e0 100644 --- a/crates/primitives/src/l1/block.rs +++ b/crates/primitives/src/l1/block.rs @@ -45,6 +45,39 @@ impl From for BlockHash { } } +#[derive( + Copy, + Clone, + Eq, + PartialEq, + Ord, + PartialOrd, + Hash, + Arbitrary, + BorshDeserialize, + BorshSerialize, + Deserialize, + Serialize, +)] +pub struct L1BlockCommitment { + height: u64, + blkid: L1BlockId, +} + +impl L1BlockCommitment { + pub fn new(height: u64, blkid: L1BlockId) -> Self { + Self { height, blkid } + } + + pub fn height(&self) -> u64 { + self.height + } + + pub fn blkid(&self) -> &L1BlockId { + &self.blkid + } +} + /// Reference to a transaction in a block. This is the block index and the /// position of the transaction in the block. #[derive( @@ -68,6 +101,7 @@ impl L1TxRef { pub fn blk_idx(&self) -> u64 { self.0 } + pub fn position(&self) -> u32 { self.1 } diff --git a/crates/primitives/src/l2.rs b/crates/primitives/src/l2.rs index 526353ce3..0dc20b104 100644 --- a/crates/primitives/src/l2.rs +++ b/crates/primitives/src/l2.rs @@ -23,3 +23,38 @@ use crate::{buf::Buf32, impl_buf_wrapper}; pub struct L2BlockId(Buf32); impl_buf_wrapper!(L2BlockId, Buf32, 32); + +/// Commits to a specific block at some slot. +#[derive( + Copy, + Clone, + Debug, + Eq, + PartialEq, + Ord, + PartialOrd, + Hash, + Arbitrary, + BorshDeserialize, + BorshSerialize, + Deserialize, + Serialize, +)] +pub struct L2BlockCommitment { + slot: u64, + blkid: L2BlockId, +} + +impl L2BlockCommitment { + pub fn new(slot: u64, blkid: L2BlockId) -> Self { + Self { slot, blkid } + } + + pub fn slot(&self) -> u64 { + self.slot + } + + pub fn blkid(&self) -> &L2Block { + &self.blkid + } +} diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 5daa31bfb..3a43a6e50 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -14,6 +14,7 @@ pub mod l1; pub mod l2; #[macro_use] mod macros; +pub mod epoch; pub mod keys; pub mod operator; pub mod params; From b0979a4327bcf99bbfe04d2f028d87f1e94b6887 Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Thu, 23 Jan 2025 16:00:07 -0500 Subject: [PATCH 02/11] primitives: fixed typo --- crates/primitives/src/l2.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/primitives/src/l2.rs b/crates/primitives/src/l2.rs index 0dc20b104..5a0d25873 100644 --- a/crates/primitives/src/l2.rs +++ b/crates/primitives/src/l2.rs @@ -54,7 +54,7 @@ impl L2BlockCommitment { self.slot } - pub fn blkid(&self) -> &L2Block { + pub fn blkid(&self) -> &L2BlockId { &self.blkid } } From 40c2143e9907e11e05b8ac6a42394cd01e41c435 Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Thu, 23 Jan 2025 16:19:23 -0500 Subject: [PATCH 03/11] state: reworked chainstate `WriteBatch` and `StateCache` types to directly contain new state --- crates/chaintsn/src/transition.rs | 7 +- crates/state/src/state_op.rs | 355 +++++++++++++----------------- 2 files changed, 158 insertions(+), 204 deletions(-) diff --git a/crates/chaintsn/src/transition.rs b/crates/chaintsn/src/transition.rs index e8948b8b8..181fa2936 100644 --- a/crates/chaintsn/src/transition.rs +++ b/crates/chaintsn/src/transition.rs @@ -33,9 +33,10 @@ use crate::{ /// plays out all the updates a block makes to the chain, but it will abort if /// there are any semantic issues that don't make sense. /// -/// This operates on a state cache that's expected to be empty, panics -/// otherwise. Does not check the `state_root` in the header for correctness, -/// so that can be unset so it can be use during block assembly. +/// This operates on a state cache that's expected to be empty, may panic if +/// changes have been made, although this is not guaranteed. Does not check the +/// `state_root` in the header for correctness, so that can be unset so it can +/// be use during block assembly. pub fn process_block( state: &mut StateCache, header: &impl L2Header, diff --git a/crates/state/src/state_op.rs b/crates/state/src/state_op.rs index a5ec40f9a..e1914b33b 100644 --- a/crates/state/src/state_op.rs +++ b/crates/state/src/state_op.rs @@ -15,60 +15,40 @@ use crate::{ bridge_state::{DepositState, DispatchCommand, DispatchedState}, chain_state::Chainstate, header::L2Header, - id::L2BlockId, - l1::{self, L1MaturationEntry}, + l1::L1MaturationEntry, tx::ProtocolOperation::Deposit, }; #[derive(Clone, Debug, PartialEq, BorshDeserialize, BorshSerialize)] +#[repr(u8)] // needed because of representational shit pub enum StateOp { - /// Replace the chain state with something completely different. - Replace(Box), - - /// Sets the current slot. - SetSlotAndTipBlock(u64, L2BlockId), - - /// Reverts L1 accepted height back to a previous height, rolling back any - /// blocks that were there. - RevertL1Height(u64), - - /// Accepts a new L1 block into the maturation queue. - AcceptL1Block(l1::L1MaturationEntry), - - /// Matures the next L1 block, whose idx must match the one specified here - /// as a sanity check. - MatureL1Block(u64), - - /// Remove deposit Intent - ConsumeDepositIntent(u64), - - /// Creates an operator - CreateOperator(Buf32, Buf32), - - /// Assigns an assignee a deposit and withdrawal dispatch command to play out. - DispatchWithdrawal(u32, OperatorIdx, DispatchCommand, BitcoinBlockHeight), - - /// Resets the assignee and block height for a deposit. - ResetDepositAssignee(u32, OperatorIdx, BitcoinBlockHeight), + /// Does nothing, successfully. + Noop, } /// Collection of writes we're making to the state. #[derive(Clone, Debug, PartialEq, BorshDeserialize, BorshSerialize)] pub struct WriteBatch { + /// Full "toplevel" state. + new_toplevel_state: Chainstate, + + /// Ops applied to the "bulk state", which doesn't exist yet. ops: Vec, } impl WriteBatch { - pub fn new(ops: Vec) -> Self { - Self { ops } + /// Creates a new instance from the toplevel state and a list of ops. + pub fn new(new_toplevel_state: Chainstate, ops: Vec) -> Self { + Self { + new_toplevel_state, + ops, + } } + /// Creates a new instance from the new toplevel state and assumes no + /// changes to the bulk state. pub fn new_replace(new_state: Chainstate) -> Self { - Self::new(vec![StateOp::Replace(Box::new(new_state))]) - } - - pub fn new_empty() -> Self { - Self::new(Vec::new()) + Self::new(new_state, Vec::new()) } } @@ -77,129 +57,16 @@ impl WriteBatch { /// On a given in-memory chainstate, applies a write batch. /// /// This must succeed. Pancis if it does not. -pub fn apply_write_batch_to_chainstate( - mut chainstate: Chainstate, - batch: &WriteBatch, -) -> Chainstate { - for op in &batch.ops { - apply_op_to_chainstate(op, &mut chainstate); - } - - chainstate -} - -fn apply_op_to_chainstate(op: &StateOp, state: &mut Chainstate) { - match op { - StateOp::Replace(new_state) => *state = new_state.as_ref().clone(), - - StateOp::SetSlotAndTipBlock(slot, last_block) => { - state.slot = *slot; - state.last_block = *last_block; - } - - StateOp::RevertL1Height(to_height) => { - debug!(%to_height, "Obtained RevertL1Height Operation"); - let mqueue = &mut state.l1_state.maturation_queue; - let back_idx = mqueue.back_idx().expect("stateop: maturation queue empty"); - - // Do some bookkeeping to make sure it's safe to do this. - if *to_height > back_idx { - panic!("stateop: revert to above tip block"); - } - - let n_drop = back_idx - to_height; - if n_drop > mqueue.len() as u64 { - panic!("stateop: revert matured block"); - } - - // Now that it's safe to do the revert, we can just do it. - for _ in 0..n_drop { - // This expect should never trigger. - mqueue.pop_back().expect("stateop: unable to revert more"); - } - } - - StateOp::AcceptL1Block(entry) => { - let mqueue = &mut state.l1_state.maturation_queue; - mqueue.push_back(entry.clone()); - } - - StateOp::MatureL1Block(maturing_idx) => { - let operators: Vec<_> = state.operator_table().indices().collect(); - let mqueue = &mut state.l1_state.maturation_queue; - let deposits = state.exec_env_state.pending_deposits_mut(); - - // Checks. - assert!(mqueue.len() > 1); // make sure we'll still have blocks in the queue - let front_idx = mqueue.front_idx().unwrap(); - assert_eq!(front_idx, *maturing_idx); - - // Actually take the block out so we can do something with it. - let matured_block = mqueue.pop_front().unwrap(); - - // TODO add it to the MMR so we can reference it in the future - let (header_record, deposit_txs, _) = matured_block.into_parts(); - for tx in deposit_txs { - if let Deposit(deposit_info) = tx.tx().protocol_operation() { - trace!("we got some deposit_txs"); - let amt = deposit_info.amt; - let deposit_intent = DepositIntent::new(amt, &deposit_info.address); - deposits.push_back(deposit_intent); - state - .deposits_table - .add_deposits(&deposit_info.outpoint, &operators, amt) - } - } - state.l1_state.safe_block = header_record; - } - - StateOp::ConsumeDepositIntent(to_drop_idx) => { - let deposits = state.exec_env_state.pending_deposits_mut(); - - let front_idx = deposits - .front_idx() - .expect("stateop: empty deposit intent queue"); - - // deposit intent indices processed sequentially, without any gaps - let to_drop_count = to_drop_idx - .checked_sub(front_idx) // ensures to_drop_idx >= front_idx - .expect("stateop: unable to consume deposit intent") - + 1; - - deposits - .pop_front_n_vec(to_drop_count as usize) // ensures to_drop_idx < front_idx + len - .expect("stateop: unable to consume deposit intent"); - } - - StateOp::CreateOperator(spk, wpk) => { - state.operator_table.insert(*spk, *wpk); - } - - StateOp::DispatchWithdrawal(deposit_idx, op_idx, cmd, exec_height) => { - let deposit_ent = state - .deposits_table_mut() - .get_deposit_mut(*deposit_idx) - .expect("stateop: missing deposit idx"); - - let state = - DepositState::Dispatched(DispatchedState::new(cmd.clone(), *op_idx, *exec_height)); - deposit_ent.set_state(state); - } - - StateOp::ResetDepositAssignee(deposit_idx, op_idx, exec_height) => { - let deposit_ent = state - .deposits_table_mut() - .get_deposit_mut(*deposit_idx) - .expect("stateop: missing deposit idx"); - - if let DepositState::Dispatched(dstate) = deposit_ent.deposit_state_mut() { - dstate.set_assignee(*op_idx); - dstate.set_exec_deadline(*exec_height); - } else { - panic!("stateop: unexpected deposit state"); - }; - } - } +pub fn apply_write_batch_to_chainstate(chainstate: Chainstate, batch: &WriteBatch) -> Chainstate { + // This replaces the whole toplevel state. This probably makes you think + // it doesn't make sense to take the chainstate arg at all. But this will + // probably make more sense in the future when we make the state structure + // more sophisticated, splitting apart the epoch state from the per-slot + // state more, and also the bulk state. + // + // Since the only state op possible is `Noop`, we can just ignore them all + // without even iterating over them. + batch.new_toplevel_state.clone() } /// Cache that writes to state and remembers the series of operations made to it @@ -209,8 +76,13 @@ fn apply_op_to_chainstate(op: &StateOp, state: &mut Chainstate) { /// be made generic over a state provider that exposes access to that and then /// the `WriteBatch` will include writes that can be made to that. pub struct StateCache { + /// Original toplevel state that we started from, in case we need to reference it. original_state: Chainstate, - state: Chainstate, + + /// New state that we're modifying. + new_state: Chainstate, + + /// Write operations we're making to the bulk state, if there are any. write_ops: Vec, } @@ -218,76 +90,147 @@ impl StateCache { pub fn new(state: Chainstate) -> Self { Self { original_state: state.clone(), - state, + new_state: state, write_ops: Vec::new(), } } + // Basic accessors. + pub fn state(&self) -> &Chainstate { - &self.state + &self.new_state + } + + fn state_mut(&mut self) -> &mut Chainstate { + &mut self.new_state } pub fn original_state(&self) -> &Chainstate { &self.original_state } + /// Returns if there's no write ops. + /// + /// Note that this does not guarantee that no changes have been made to the + /// chainstate from wherever it was derived from before the instance was + /// constructed. This is a minimal safety measure. + pub fn is_empty(&self) -> bool { + self.write_ops.is_empty() + } + /// Finalizes the changes made to the state, exporting it and a write batch /// that can be applied to the previous state to produce it. + // TODO remove extra `Chainstate` return value pub fn finalize(self) -> (Chainstate, WriteBatch) { - (self.state, WriteBatch::new(self.write_ops)) + ( + self.new_state.clone(), + WriteBatch::new(self.new_state, self.write_ops), + ) } - /// Returns if the state cache is empty, meaning that no writes have been - /// performed. - pub fn is_empty(&self) -> bool { - self.write_ops.is_empty() - } + // Primitive manipulation functions. - /// Applies some operations to the state, including them in the write ops - /// list. - fn merge_ops(&mut self, ops: impl Iterator) { - for op in ops { - apply_op_to_chainstate(&op, &mut self.state); - self.write_ops.push(op); - } + /// Pushes a new state op onto the write ops list. + /// + /// This currently is meaningless since we don't have write ops that do anything anymore. + pub fn push_op(&mut self, op: StateOp) { + self.write_ops.push(op); } - /// Like `merge_ops`, but only for a single op, for convenience. - fn merge_op(&mut self, op: StateOp) { - self.merge_ops([op].into_iter()); - } + // Semantic manipulation functions. + // TODO rework a lot of these to make them lower-level and focus more on + // just keeping the core invariants consistent /// Sets the current slot in the state. pub fn set_cur_header(&mut self, header: &impl L2Header) { - self.merge_op(StateOp::SetSlotAndTipBlock( - header.blockidx(), - header.get_blockid(), - )); + // TODO rework this to use L2BlockCommitment + let state = self.state_mut(); + state.slot = header.blockidx(); + state.last_block = header.get_blockid(); } /// remove a deposit intent from the pending deposits queue. pub fn consume_deposit_intent(&mut self, idx: u64) { - self.merge_op(StateOp::ConsumeDepositIntent(idx)); + let deposits = self.state_mut().exec_env_state.pending_deposits_mut(); + + let front_idx = deposits + .front_idx() + .expect("stateop: empty deposit intent queue"); + + // deposit intent indices processed sequentially, without any gaps + let to_drop_count = idx + .checked_sub(front_idx) // ensures to_drop_idx >= front_idx + .expect("stateop: unable to consume deposit intent") + + 1; + + deposits + .pop_front_n_vec(to_drop_count as usize) // ensures to_drop_idx < front_idx + len + .expect("stateop: unable to consume deposit intent"); } /// Inserts a new operator with the specified pubkeys into the operator table. pub fn insert_operator(&mut self, signing_pk: Buf32, wallet_pk: Buf32) { - self.merge_op(StateOp::CreateOperator(signing_pk, wallet_pk)); + let state = self.state_mut(); + state.operator_table.insert(signing_pk, wallet_pk); } /// L1 revert - pub fn revert_l1_view_to(&mut self, height: u64) { - self.merge_op(StateOp::RevertL1Height(height)); + pub fn revert_l1_view_to(&mut self, to_height: u64) { + let mqueue = &mut self.state_mut().l1_state.maturation_queue; + let back_idx = mqueue.back_idx().expect("stateop: maturation queue empty"); + + // Do some bookkeeping to make sure it's safe to do this. + if to_height > back_idx { + panic!("stateop: revert to above tip block"); + } + + let n_drop = back_idx - to_height; + if n_drop > mqueue.len() as u64 { + panic!("stateop: revert matured block"); + } + + // Now that it's safe to do the revert, we can just do it. + for _ in 0..n_drop { + // This expect should never trigger. + mqueue.pop_back().expect("stateop: unable to revert more"); + } } /// add l1 block to maturation entry pub fn apply_l1_block_entry(&mut self, ent: L1MaturationEntry) { - self.merge_op(StateOp::AcceptL1Block(ent)); + let mqueue = &mut self.state_mut().l1_state.maturation_queue; + mqueue.push_back(ent); } /// remove matured block from maturation entry pub fn mature_l1_block(&mut self, idx: u64) { - self.merge_op(StateOp::MatureL1Block(idx)); + let operators: Vec<_> = self.state().operator_table().indices().collect(); + let deposits = self.new_state.exec_env_state.pending_deposits_mut(); + let mqueue = &mut self.new_state.l1_state.maturation_queue; + + // Checks. + assert!(mqueue.len() > 1); // make sure we'll still have blocks in the queue + let front_idx = mqueue.front_idx().unwrap(); + assert_eq!(front_idx, idx); + + // Actually take the block out so we can do something with it. + let matured_block = mqueue.pop_front().unwrap(); + + // TODO add it to the MMR so we can reference it in the future + let (header_record, deposit_txs, _) = matured_block.into_parts(); + for tx in deposit_txs { + if let Deposit(deposit_info) = tx.tx().protocol_operation() { + trace!("we got some deposit_txs"); + let amt = deposit_info.amt; + let deposit_intent = DepositIntent::new(amt, &deposit_info.address); + deposits.push_back(deposit_intent); + self.new_state + .deposits_table + .add_deposits(&deposit_info.outpoint, &operators, amt) + } + } + + self.state_mut().l1_state.safe_block = header_record; } pub fn assign_withdrawal_command( @@ -297,12 +240,15 @@ impl StateCache { cmd: DispatchCommand, exec_height: BitcoinBlockHeight, ) { - self.merge_op(StateOp::DispatchWithdrawal( - deposit_idx, - operator_idx, - cmd, - exec_height, - )); + let deposit_ent = self + .state_mut() + .deposits_table_mut() + .get_deposit_mut(deposit_idx) + .expect("stateop: missing deposit idx"); + + let state = + DepositState::Dispatched(DispatchedState::new(cmd.clone(), operator_idx, exec_height)); + deposit_ent.set_state(state); } pub fn reset_deposit_assignee( @@ -311,11 +257,18 @@ impl StateCache { operator_idx: OperatorIdx, new_exec_height: BitcoinBlockHeight, ) { - self.merge_op(StateOp::ResetDepositAssignee( - deposit_idx, - operator_idx, - new_exec_height, - )); + let deposit_ent = self + .state_mut() + .deposits_table_mut() + .get_deposit_mut(deposit_idx) + .expect("stateop: missing deposit idx"); + + if let DepositState::Dispatched(dstate) = deposit_ent.deposit_state_mut() { + dstate.set_assignee(operator_idx); + dstate.set_exec_deadline(new_exec_height); + } else { + panic!("stateop: unexpected deposit state"); + }; } // TODO add more manipulator functions From 417e5017f83a69e6278e5d83efc59c64bbb453bb Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Thu, 23 Jan 2025 17:35:52 -0500 Subject: [PATCH 04/11] storage: added `ChainstateOps` and other adjustments --- crates/db/src/traits.rs | 26 ++++++++++++++-------- crates/rocksdb-store/src/chain_state/db.rs | 13 +++++------ crates/storage/src/ops/chainstate.rs | 21 +++++++++++++++++ crates/storage/src/ops/mod.rs | 1 + 4 files changed, 44 insertions(+), 17 deletions(-) create mode 100644 crates/storage/src/ops/chainstate.rs diff --git a/crates/db/src/traits.rs b/crates/db/src/traits.rs index 4c011a2c9..ad7c3cf6d 100644 --- a/crates/db/src/traits.rs +++ b/crates/db/src/traits.rs @@ -194,20 +194,27 @@ pub enum BlockStatus { Invalid, } -/// Db trait for the (consensus layer) chain state database. For now we only -/// have a modestly sized "toplevel" chain state and no "large" state like the -/// EL does. This trait is designed to permit a change to storing larger state -/// like that in the future without *too* much extra effort. We decide new -/// states by providing the database with a generic "write batch" and offloading -/// the effort of deciding how to compute that write batch to the database impl. +/// Low-level Strata chainstate database. This provides the basic interface for +/// storing and fetching write batches and toplevel states on disk. +/// +/// Currently we do not have a "bulk" state that we would want to avoid storing +/// in memory all at once. In the future, we expect that this interface would +/// be extended to expose a "finalized" state that's fully materialized, along +/// with functions to walk the finalized state forwards and backwards. We can +/// use the unmerged write batches to construct a view of more recent states +/// than the fully materialized state in-memory. +/// +/// For now, the full state is just the "toplevel" state that can always be +/// expected to be of moderate size in memory. +// TODO maybe rewrite this around storing write batches according to blkid? pub trait ChainstateDatabase { /// Writes the genesis chainstate at index 0. - fn write_genesis_state(&self, toplevel: &Chainstate) -> DbResult<()>; + fn write_genesis_state(&self, toplevel: Chainstate) -> DbResult<()>; /// Stores a write batch in the database, possibly computing that state /// under the hood from the writes. Will not overwrite existing data, /// previous writes must be purged first in order to be replaced. - fn write_state_update(&self, idx: u64, batch: &WriteBatch) -> DbResult<()>; + fn write_state_update(&self, idx: u64, batch: WriteBatch) -> DbResult<()>; /// Tells the database to purge state before a certain block index (height). fn purge_historical_state_before(&self, before_idx: u64) -> DbResult<()>; @@ -225,7 +232,8 @@ pub trait ChainstateDatabase { /// Gets the write batch stored to compute a height. fn get_writes_at(&self, idx: u64) -> DbResult>; - /// Gets the toplevel chain state at a particular block index (height). + /// Gets the toplevel chain state at a particular block slot, if it can be + /// retrieved. fn get_toplevel_state(&self, idx: u64) -> DbResult>; } diff --git a/crates/rocksdb-store/src/chain_state/db.rs b/crates/rocksdb-store/src/chain_state/db.rs index 60e6ede43..2a1d39942 100644 --- a/crates/rocksdb-store/src/chain_state/db.rs +++ b/crates/rocksdb-store/src/chain_state/db.rs @@ -56,22 +56,19 @@ impl ChainstateDatabase for ChainstateDb { Ok(self.db.get::(&idx)?) } - fn write_genesis_state( - &self, - toplevel: &strata_state::chain_state::Chainstate, - ) -> DbResult<()> { + fn write_genesis_state(&self, toplevel: strata_state::chain_state::Chainstate) -> DbResult<()> { let genesis_key = 0; if self.get_first_idx()?.is_some() || self.get_last_idx()?.is_some() { return Err(DbError::OverwriteStateUpdate(genesis_key)); } - self.db.put::(&genesis_key, toplevel)?; + self.db.put::(&genesis_key, &toplevel)?; Ok(()) } fn write_state_update( &self, idx: u64, - batch: &strata_state::state_op::WriteBatch, + batch: strata_state::state_op::WriteBatch, ) -> DbResult<()> { if self.db.get::(&idx)?.is_some() { return Err(DbError::OverwriteStateUpdate(idx)); @@ -82,10 +79,10 @@ impl ChainstateDatabase for ChainstateDb { Some(state) => state, None => return Err(DbError::OooInsert("Chainstate", idx)), }; - let post_state = state_op::apply_write_batch_to_chainstate(pre_state, batch); + let post_state = state_op::apply_write_batch_to_chainstate(pre_state, &batch); let mut write_batch = SchemaBatch::new(); - write_batch.put::(&idx, batch)?; + write_batch.put::(&idx, &batch)?; write_batch.put::(&idx, &post_state)?; self.db.write_schemas(write_batch)?; diff --git a/crates/storage/src/ops/chainstate.rs b/crates/storage/src/ops/chainstate.rs new file mode 100644 index 000000000..43f117b33 --- /dev/null +++ b/crates/storage/src/ops/chainstate.rs @@ -0,0 +1,21 @@ +//! Low level database ops for chainstate database. + +use std::sync::Arc; + +use strata_db::traits::*; +use strata_state::{chain_state::Chainstate, state_op::WriteBatch}; + +use crate::exec::*; + +inst_ops_simple! { + ( => ChainstateOps) { + write_genesis_state(toplevel: Chainstate) => (); + write_state_update(idx: u64, batch: WriteBatch) => (); + purge_historical_state_before(before_idx: u64) => (); + rollback_writes_to(new_tip_idx: u64) => (); + get_last_state_idx() => u64; + get_earliest_state_idx() => u64; + get_writes_at(idx: u64) => Option; + get_toplevel_state(idx: u64) => Option; + } +} diff --git a/crates/storage/src/ops/mod.rs b/crates/storage/src/ops/mod.rs index 72c6283f3..8c5a358f0 100644 --- a/crates/storage/src/ops/mod.rs +++ b/crates/storage/src/ops/mod.rs @@ -2,6 +2,7 @@ pub mod bridge; pub mod bridge_duty; pub mod bridge_duty_index; pub mod bridge_relay; +pub mod chainstate; pub mod checkpoint; pub mod envelope; pub mod l1; From 21263ab87db541fe6d0057e55da10a8ec3bd8c4c Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Thu, 23 Jan 2025 18:25:36 -0500 Subject: [PATCH 05/11] storage: added most of chainstate manager --- crates/state/src/state_op.rs | 5 + crates/storage/src/cache.rs | 36 +++++++ crates/storage/src/lib.rs | 20 +++- crates/storage/src/managers/chainstate.rs | 123 ++++++++++++++++++++++ crates/storage/src/managers/mod.rs | 1 + 5 files changed, 182 insertions(+), 3 deletions(-) create mode 100644 crates/storage/src/managers/chainstate.rs diff --git a/crates/state/src/state_op.rs b/crates/state/src/state_op.rs index e1914b33b..8dde04039 100644 --- a/crates/state/src/state_op.rs +++ b/crates/state/src/state_op.rs @@ -50,6 +50,11 @@ impl WriteBatch { pub fn new_replace(new_state: Chainstate) -> Self { Self::new(new_state, Vec::new()) } + + /// Extracts the toplevel state, discarding the write ops. + pub fn into_toplevel(self) -> Chainstate { + self.new_toplevel_state + } } // TODO reversiblity stuff? diff --git a/crates/storage/src/cache.rs b/crates/storage/src/cache.rs index 514b474fc..88761eea2 100644 --- a/crates/storage/src/cache.rs +++ b/crates/storage/src/cache.rs @@ -98,6 +98,42 @@ impl CacheTable { cache.pop(k); } + /// Removes all entries for which the predicate fails. Returns the number + /// of entries removed. + /// + /// This unfortunately has to clone as many keys from the cache as pass the + /// predicate, which means it's capped at the maximum size of the cache, so + /// that's not *so* bad. + /// + /// This might remove slots that are in the process of being filled. Those + /// operations will complete, but we won't retain those values. + pub fn purge_if(&self, mut pred: impl FnMut(&K) -> bool) -> usize { + let mut cache = self.cache.lock(); + let keys_to_remove = cache + .iter() + .map(|(k, _v)| k) + .filter(|k| pred(k)) // why can't I just pass pred? + .cloned() + .collect::>(); + keys_to_remove.iter().for_each(|k| { + cache.pop(&k); + }); + keys_to_remove.len() + } + + /// Removes all entries from the cache. Returns the number of entries + /// removed. + /// + /// This might remove slots that are in the process of being filled. Those + /// operations will complete, but we won't retain those values. + #[allow(dead_code)] + pub fn clear(&self) -> usize { + let mut cache = self.cache.lock(); + let len = cache.len(); + cache.clear(); + len + } + /// Inserts an entry into the table, dropping the previous value. #[allow(dead_code)] pub fn insert(&self, k: K, v: V) { diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index b1b8f7370..653ee3953 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -5,7 +5,10 @@ pub mod ops; use std::sync::Arc; -pub use managers::{checkpoint::CheckpointDbManager, l1::L1BlockManager, l2::L2BlockManager}; +pub use managers::{ + chainstate::ChainstateManager, checkpoint::CheckpointDbManager, l1::L1BlockManager, + l2::L2BlockManager, +}; pub use ops::l1tx_broadcast::BroadcastDbOps; use strata_db::traits::Database; @@ -13,13 +16,21 @@ use strata_db::traits::Database; #[derive(Clone)] pub struct NodeStorage { l2_block_manager: Arc, + chainstate_manager: Arc, + + // TODO maybe move this into a different one? checkpoint_manager: Arc, } + impl NodeStorage { pub fn l2(&self) -> &Arc { &self.l2_block_manager } + pub fn chainstate(&self) -> &Arc { + &self.chainstate_manager + } + pub fn checkpoint(&self) -> &Arc { &self.checkpoint_manager } @@ -29,10 +40,13 @@ pub fn create_node_storage(db: Arc, pool: threadpool::ThreadPool) -> NodeS where D: Database + Sync + Send + 'static, { - let checkpoint_manager = Arc::new(CheckpointDbManager::new(pool.clone(), db.clone())); let l2_block_manager = Arc::new(L2BlockManager::new(pool.clone(), db.clone())); + let chainstate_manager = Arc::new(ChainstateManager::new(pool.clone(), db.clone())); + let checkpoint_manager = Arc::new(CheckpointDbManager::new(pool.clone(), db.clone())); + NodeStorage { - checkpoint_manager, l2_block_manager, + chainstate_manager, + checkpoint_manager, } } diff --git a/crates/storage/src/managers/chainstate.rs b/crates/storage/src/managers/chainstate.rs new file mode 100644 index 000000000..d32a171d0 --- /dev/null +++ b/crates/storage/src/managers/chainstate.rs @@ -0,0 +1,123 @@ +//! High-level chainstate interface. + +use std::sync::Arc; + +use strata_db::{traits::*, DbResult}; +use strata_state::{chain_state::Chainstate, state_op::WriteBatch}; +use threadpool::ThreadPool; + +use crate::{cache, ops}; + +pub struct ChainstateManager { + ops: ops::chainstate::ChainstateOps, + wb_cache: cache::CacheTable>, +} + +impl ChainstateManager { + pub fn new(pool: ThreadPool, db: Arc) -> Self { + let ops = ops::chainstate::Context::new(db.chain_state_db().clone()).into_ops(pool); + let wb_cache = cache::CacheTable::new(64.try_into().unwrap()); + Self { ops, wb_cache } + } + + // Basic functions that map directly onto database operations. + + /// Writes the genesis state. This only exists in blocking form because + /// that's all we need. + pub fn write_genesis_state(&self, toplevel: Chainstate) -> DbResult<()> { + self.ops.write_genesis_state_blocking(toplevel) + } + + pub async fn put_write_batch_async(&self, idx: u64, wb: WriteBatch) -> DbResult<()> { + self.ops.write_state_update_async(idx, wb).await?; + self.wb_cache.purge(&idx); + Ok(()) + } + + pub fn put_write_batch_blocking(&self, idx: u64, wb: WriteBatch) -> DbResult<()> { + self.ops.write_state_update_blocking(idx, wb)?; + self.wb_cache.purge(&idx); + Ok(()) + } + + /// Gets the writes stored for an index. + pub async fn get_writes_at_async(&self, idx: u64) -> DbResult> { + self.wb_cache + .get_or_fetch(&idx, || self.ops.get_writes_at_chan(idx)) + .await? + } + + /// Gets the writes stored for an index. + pub fn get_writes_at_blocking(&self, idx: u64) -> DbResult> { + self.wb_cache + .get_or_fetch_blocking(&idx, || self.ops.get_writes_at_blocking(idx)) + } + + pub async fn purge_state_before_async(&self, before_idx: u64) -> DbResult<()> { + self.ops + .purge_historical_state_before_async(before_idx) + .await?; + self.wb_cache.purge_if(|k| *k < before_idx); + Ok(()) + } + + pub fn purge_state_before_blocking(&self, before_idx: u64) -> DbResult<()> { + self.ops + .purge_historical_state_before_blocking(before_idx)?; + self.wb_cache.purge_if(|k| *k < before_idx); + Ok(()) + } + + /// Rolls back writes after a given new tip index, making it the newest tip. + pub async fn rollback_writes_to_async(&self, new_tip_idx: u64) -> DbResult<()> { + self.ops.rollback_writes_to_async(new_tip_idx).await?; + self.wb_cache.purge_if(|k| *k > new_tip_idx); + Ok(()) + } + + /// Rolls back writes after a given new tip index, making it the newest tip. + pub fn rollback_writes_to_blocking(&self, new_tip_idx: u64) -> DbResult<()> { + self.ops.rollback_writes_to_blocking(new_tip_idx)?; + self.wb_cache.purge_if(|k| *k > new_tip_idx); + Ok(()) + } + + pub async fn get_first_state_idx_async(&self) -> DbResult { + // TODO convert to keep this cached in memory so we don't need both variants + self.ops.get_earliest_state_idx_async().await + } + + pub fn get_first_state_idx_blocking(&self) -> DbResult { + // TODO convert to keep this cached in memory so we don't need both variants + self.ops.get_earliest_state_idx_blocking() + } + + pub async fn get_last_state_idx_async(&self) -> DbResult { + // TODO convert to keep this cached in memory so we don't need both variants + self.ops.get_last_state_idx_async().await + } + + pub fn get_last_state_idx_blocking(&self) -> DbResult { + // TODO convert to keep this cached in memory so we don't need both variants + self.ops.get_last_state_idx_blocking() + } + + // Nontrivial functions that aren't just 1:1. + + /// Convenience function just for extracting the toplevel chainstate from + /// the write batch at an index. + pub async fn get_toplevel_chainstate_async(&self, idx: u64) -> DbResult> { + Ok(self + .get_writes_at_async(idx) + .await? + .map(|wb| wb.into_toplevel())) + } + + /// Convenience function just for extracting the toplevel chainstate from + /// the write batch at an index. + pub fn get_toplevel_chainstate_blocking(&self, idx: u64) -> DbResult> { + Ok(self + .get_writes_at_blocking(idx)? + .map(|wb| wb.into_toplevel())) + } +} diff --git a/crates/storage/src/managers/mod.rs b/crates/storage/src/managers/mod.rs index 9c015a9d7..237abcb6c 100644 --- a/crates/storage/src/managers/mod.rs +++ b/crates/storage/src/managers/mod.rs @@ -1,3 +1,4 @@ +pub mod chainstate; pub mod checkpoint; pub mod l1; pub mod l2; From 9661f16c7c781d700d59cebfaf9408b015645723 Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Fri, 24 Jan 2025 11:54:12 -0500 Subject: [PATCH 06/11] primitives: removed sanity concept from `EpochCommitment` --- crates/primitives/src/epoch.rs | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/crates/primitives/src/epoch.rs b/crates/primitives/src/epoch.rs index a57bd6980..62cbf8450 100644 --- a/crates/primitives/src/epoch.rs +++ b/crates/primitives/src/epoch.rs @@ -79,31 +79,4 @@ impl EpochCommitment { pub fn is_null(&self) -> bool { Buf32::from(self.last_blkid).is_zero() } - - /// Checks if the epoch is sane. - /// - /// Ie. if the terminal blkid is zero then the last slot and epoch number - /// are also zero. - fn sanity_check(&self) -> bool { - if self.is_null() { - self.last_slot == 0 && self.epoch == 0 - } else { - self.last_slot != 0 - } - } -} - -#[cfg(test)] -mod tests { - use super::EpochCommitment; - - #[test] - fn test_epoch_sanity() { - // TODO write test - } - - #[test] - fn test_epoch_insanity() { - // TODO write test - } } From f3863849718ff008806850ef2692c022d95a2149 Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Fri, 24 Jan 2025 11:55:50 -0500 Subject: [PATCH 07/11] primitives: added epoch and l2 modules to prelude exports --- crates/primitives/src/lib.rs | 9 +++++---- crates/primitives/src/prelude.rs | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 3a43a6e50..de1703db0 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -3,19 +3,20 @@ // TODO import address types // TODO import generic account types +#[macro_use] +mod macros; + pub mod block_credential; pub mod bridge; pub mod buf; pub mod constants; +pub mod epoch; pub mod errors; pub mod evm_exec; pub mod hash; +pub mod keys; pub mod l1; pub mod l2; -#[macro_use] -mod macros; -pub mod epoch; -pub mod keys; pub mod operator; pub mod params; pub mod prelude; diff --git a/crates/primitives/src/prelude.rs b/crates/primitives/src/prelude.rs index 5b3946ca4..68471bd67 100644 --- a/crates/primitives/src/prelude.rs +++ b/crates/primitives/src/prelude.rs @@ -1,2 +1,2 @@ // Reexports from elsewhere in the crate. -pub use crate::{buf::*, l1::*, params::*}; +pub use crate::{buf::*, epoch::*, l1::*, l2::*, params::*}; From 6e4877dde965a866e701a53d9b8249a37ba8391c Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Fri, 24 Jan 2025 14:29:01 -0500 Subject: [PATCH 08/11] misc: refactoring to make use of new chainstate interface in more places --- bin/strata-client/src/main.rs | 42 +++-- bin/strata-client/src/rpc_server.rs | 127 ++++++------- .../src/csm/client_transition.rs | 139 +++++++++----- .../consensus-logic/src/csm/state_tracker.rs | 7 +- crates/consensus-logic/src/csm/worker.rs | 16 +- .../src/duty/block_assembly.rs | 14 +- crates/consensus-logic/src/duty/extractor.rs | 21 ++- crates/consensus-logic/src/duty/worker.rs | 39 ++-- .../src/fork_choice_manager.rs | 46 ++--- crates/consensus-logic/src/genesis.rs | 21 +-- crates/consensus-logic/src/sync_manager.rs | 8 +- crates/db/src/stubs/chain_state.rs | 27 +-- crates/db/src/traits.rs | 21 +-- crates/rocksdb-store/src/chain_state/db.rs | 169 +++++++----------- crates/state/src/state_op.rs | 2 +- crates/storage/src/lib.rs | 7 + crates/storage/src/managers/chainstate.rs | 47 +++-- crates/storage/src/ops/chainstate.rs | 11 +- 18 files changed, 386 insertions(+), 378 deletions(-) diff --git a/bin/strata-client/src/main.rs b/bin/strata-client/src/main.rs index 08e4d621e..43d97b00e 100644 --- a/bin/strata-client/src/main.rs +++ b/bin/strata-client/src/main.rs @@ -100,7 +100,7 @@ fn main_inner(args: Args) -> anyhow::Result<()> { // Initialize core databases let database = init_core_dbs(rbdb.clone(), ops_config); - let manager = create_node_storage(database.clone(), pool.clone()); + let storage = Arc::new(create_node_storage(database.clone(), pool.clone())); // Set up bridge messaging stuff. // TODO move all of this into relayer task init @@ -108,7 +108,7 @@ fn main_inner(args: Args) -> anyhow::Result<()> { let bridge_msg_ctx = strata_storage::ops::bridge_relay::Context::new(bridge_msg_db); let bridge_msg_ops = Arc::new(bridge_msg_ctx.into_ops(pool.clone())); - let checkpoint_handle: Arc<_> = CheckpointHandle::new(manager.checkpoint().clone()).into(); + let checkpoint_handle: Arc<_> = CheckpointHandle::new(storage.checkpoint().clone()).into(); let bitcoin_client = create_bitcoin_rpc_client(&config)?; // Check if we have to do genesis. @@ -125,7 +125,7 @@ fn main_inner(args: Args) -> anyhow::Result<()> { &config, params.clone(), database, - &manager, + storage.clone(), bridge_msg_ops, bitcoin_client, )?; @@ -164,7 +164,7 @@ fn main_inner(args: Args) -> anyhow::Result<()> { let sync_peer = RpcSyncPeer::new(rpc_client, 10); let l2_sync_context = L2SyncContext::new( sync_peer, - ctx.l2_block_manager.clone(), + ctx.storage.l2().clone(), ctx.sync_manager.clone(), ); // NOTE: this might block for some time during first run with empty db until genesis @@ -224,10 +224,10 @@ fn init_logging(rt: &Handle) { #[derive(Clone)] pub struct CoreContext { pub database: Arc, + pub storage: Arc, pub pool: threadpool::ThreadPool, pub params: Arc, pub sync_manager: Arc, - pub l2_block_manager: Arc, pub status_channel: StatusChannel, pub engine: Arc>, pub relayer_handle: Arc, @@ -236,12 +236,12 @@ pub struct CoreContext { fn do_startup_checks( database: &impl Database, + storage: &NodeStorage, engine: &impl ExecEngineCtl, bitcoin_client: &impl ReaderRpc, handle: &Handle, ) -> anyhow::Result<()> { - let chain_state_db = database.chain_state_db(); - let last_state_idx = match chain_state_db.get_last_state_idx() { + let last_state_idx = match storage.chainstate().get_last_write_idx_blocking() { Ok(idx) => idx, Err(DbError::NotBootstrapped) => { // genesis is not done @@ -250,8 +250,12 @@ fn do_startup_checks( } err => err?, }; - let Some(last_chain_state) = chain_state_db.get_toplevel_state(last_state_idx)? else { - anyhow::bail!(format!("Missing chain state idx: {}", last_state_idx)); + + let Some(last_chain_state) = storage + .chainstate() + .get_toplevel_chainstate_blocking(last_state_idx)? + else { + anyhow::bail!("Missing chain state idx: {last_state_idx}"); }; // Check that we can connect to bitcoin client and block we believe to be matured in L1 is @@ -300,7 +304,7 @@ fn start_core_tasks( config: &Config, params: Arc, database: Arc, - storage: &NodeStorage, + storage: Arc, bridge_msg_ops: Arc, bitcoin_client: Arc, ) -> anyhow::Result { @@ -318,6 +322,7 @@ fn start_core_tasks( // do startup checks do_startup_checks( database.as_ref(), + storage.as_ref(), engine.as_ref(), bitcoin_client.as_ref(), executor.handle(), @@ -327,7 +332,7 @@ fn start_core_tasks( let sync_manager: Arc<_> = sync_manager::start_sync_tasks( executor, database.clone(), - storage, + storage.clone(), engine.clone(), pool.clone(), params.clone(), @@ -356,10 +361,10 @@ fn start_core_tasks( Ok(CoreContext { database, + storage, pool, params, sync_manager, - l2_block_manager: storage.l2().clone(), status_channel, engine, relayer_handle, @@ -380,10 +385,10 @@ fn start_sequencer_tasks( ) -> anyhow::Result<()> { let CoreContext { database, + storage, pool, params, sync_manager, - l2_block_manager, status_channel, engine, bitcoin_client, @@ -433,7 +438,7 @@ fn start_sequencer_tasks( // Spawn duty tasks. let cupdate_rx = sync_manager.create_cstate_subscription(); - let t_l2_block_manager = l2_block_manager.clone(); + let t_storage = storage.clone(); let t_params = params.clone(); let t_database = database.clone(); executor.spawn_critical("duty_worker::duty_tracker_task", move |shutdown| { @@ -443,7 +448,7 @@ fn start_sequencer_tasks( duties_tx, idata.ident, t_database, - t_l2_block_manager, + t_storage, t_params, ) .map_err(Into::into) @@ -458,6 +463,7 @@ fn start_sequencer_tasks( idata.key, sync_manager, database, + storage, engine, envelope_handle, pool, @@ -496,8 +502,8 @@ async fn start_rpc( ) -> anyhow::Result<()> { let CoreContext { database, + storage, sync_manager, - l2_block_manager, status_channel, relayer_handle, .. @@ -510,7 +516,7 @@ async fn start_rpc( status_channel.clone(), database.clone(), sync_manager, - l2_block_manager.clone(), + storage.clone(), checkpoint_handle, relayer_handle, ); @@ -519,7 +525,7 @@ async fn start_rpc( let admin_rpc = rpc_server::AdminServerImpl::new(stop_tx); methods.merge(admin_rpc.into_rpc())?; - let debug_rpc = rpc_server::StrataDebugRpcImpl::new(l2_block_manager, database); + let debug_rpc = rpc_server::StrataDebugRpcImpl::new(storage.clone(), database); methods.merge(debug_rpc.into_rpc())?; let rpc_host = config.client.rpc_host; diff --git a/bin/strata-client/src/rpc_server.rs b/bin/strata-client/src/rpc_server.rs index fb2e8ff72..c6edfc985 100644 --- a/bin/strata-client/src/rpc_server.rs +++ b/bin/strata-client/src/rpc_server.rs @@ -8,7 +8,7 @@ use bitcoin::{ secp256k1::{PublicKey, XOnlyPublicKey}, Transaction as BTransaction, Txid, }; -use futures::TryFutureExt; +use futures::{FutureExt, TryFutureExt}; use jsonrpsee::core::RpcResult; use strata_bridge_relay::relayer::RelayerHandle; use strata_btcio::{broadcaster::L1BroadcastHandle, writer::EnvelopeHandle}; @@ -51,7 +51,7 @@ use strata_state::{ sync_event::SyncEvent, }; use strata_status::StatusChannel; -use strata_storage::L2BlockManager; +use strata_storage::NodeStorage; use strata_zkvm::ProofReceipt; use tokio::sync::{oneshot, Mutex}; use tracing::*; @@ -72,7 +72,7 @@ pub struct StrataRpcImpl { status_channel: StatusChannel, database: Arc, sync_manager: Arc, - l2_block_manager: Arc, + storage: Arc, checkpoint_handle: Arc, relayer_handle: Arc, } @@ -83,7 +83,7 @@ impl StrataRpcImpl { status_channel: StatusChannel, database: Arc, sync_manager: Arc, - l2_block_manager: Arc, + storage: Arc, checkpoint_handle: Arc, relayer_handle: Arc, ) -> Self { @@ -91,7 +91,7 @@ impl StrataRpcImpl { status_channel, database, sync_manager, - l2_block_manager, + storage, checkpoint_handle, relayer_handle, } @@ -102,6 +102,8 @@ impl StrataRpcImpl { self.sync_manager.status_channel().client_state() } + // TODO make these not return Arc + /// Gets a clone of the current client state and fetches the chainstate that /// of the L2 block that it considers the tip state. async fn get_cur_states(&self) -> Result<(ClientState, Option>), Error> { @@ -124,19 +126,14 @@ impl StrataRpcImpl { }; // in current implementation, chainstate idx == l2 block idx - let (_, chainstate_idx) = last_checkpoint.batch_info.l2_range; - - let db = self.database.clone(); + let (_, end_slot) = last_checkpoint.batch_info.l2_range; - wait_blocking("load_checkpoint_chainstate", move || { - let chainstate_db = db.chain_state_db(); - let chainstate = chainstate_db - .get_toplevel_state(chainstate_idx)? - .ok_or(Error::MissingChainstate(chainstate_idx))?; - - Ok(Some(Arc::new(chainstate))) - }) - .await + Ok(self + .storage + .chainstate() + .get_toplevel_chainstate_async(end_slot) + .await? + .map(Arc::new)) } } @@ -155,8 +152,9 @@ fn conv_blk_header_to_rpc(blk_header: &impl L2Header) -> RpcBlockHeader { #[async_trait] impl StrataApiServer for StrataRpcImpl { async fn get_blocks_at_idx(&self, idx: u64) -> RpcResult> { - let l2_block_manager = self.l2_block_manager.clone(); - let l2_blocks = l2_block_manager + let l2_blocks = self + .storage + .l2() .get_blocks_at_height_async(idx) .await .map_err(Error::Db)?; @@ -188,18 +186,13 @@ impl StrataApiServer for StrataRpcImpl { } async fn get_l1_block_hash(&self, height: u64) -> RpcResult> { - let db = self.database.clone(); - let blk_manifest = wait_blocking("l1_block_manifest", move || { - db.l1_db() - .get_block_manifest(height) - .map_err(|_| Error::MissingL1BlockManifest(height)) - }) - .await?; - - match blk_manifest { - Some(blk) => Ok(Some(blk.block_hash().to_string())), - None => Ok(None), - } + let blk_manifest = self + .storage + .l1() + .get_block_manifest_async(height) + .map_err(Error::Db) + .await?; + Ok(blk_manifest.map(|mf| mf.block_hash().to_string())) } async fn get_client_status(&self) -> RpcResult { @@ -213,25 +206,24 @@ impl StrataApiServer for StrataRpcImpl { }); // Copy these out of the sync state, if they're there. - let (chain_tip, finalized_blkid) = sync_state + let (chain_tip_blkid, finalized_blkid) = sync_state .map(|ss| (*ss.chain_tip_blkid(), *ss.finalized_blkid())) .unwrap_or_default(); // FIXME make this load from cache, and put the data we actually want // here in the client state // FIXME error handling - let db = self.database.clone(); - let slot: u64 = wait_blocking("load_cur_block", move || { - let l2_db = db.l2_db(); - l2_db - .get_block_data(chain_tip) - .map(|b| b.map(|b| b.header().blockidx()).unwrap_or(u64::MAX)) - .map_err(Error::from) - }) - .await?; + let slot: u64 = self + .storage + .l2() + .get_block_data_async(&chain_tip_blkid) + .map_err(Error::Db) + .await? + .map(|b| b.header().blockidx()) + .unwrap_or(u64::MAX); Ok(RpcClientStatus { - chain_tip: *chain_tip.as_ref(), + chain_tip: *chain_tip_blkid.as_ref(), chain_tip_slot: slot, finalized_blkid: *finalized_blkid.as_ref(), last_l1_block: *last_l1.as_ref(), @@ -371,15 +363,13 @@ impl StrataApiServer for StrataRpcImpl { let prev_slot = l2_blk_bundle.block().header().header().blockidx() - 1; let chain_state_db = self.database.clone(); - let chain_state = wait_blocking("l2_chain_state", move || { - let chs_db = chain_state_db.chain_state_db(); - - chs_db - .get_toplevel_state(prev_slot) - .map_err(Error::Db)? - .ok_or(Error::MissingChainstate(prev_slot)) - }) - .await?; + let chain_state = self + .storage + .chainstate() + .get_toplevel_chainstate_async(prev_slot) + .map_err(Error::Db) + .await? + .ok_or(Error::MissingChainstate(prev_slot))?; let cl_block_witness = (chain_state, l2_blk_bundle.block()); let raw_cl_block_witness = borsh::to_vec(&cl_block_witness) @@ -422,7 +412,7 @@ impl StrataApiServer for StrataRpcImpl { async fn get_raw_bundles(&self, start_height: u64, end_height: u64) -> RpcResult { let block_ids = futures::future::join_all( (start_height..=end_height) - .map(|height| self.l2_block_manager.get_blocks_at_height_async(height)), + .map(|height| self.storage.l2().get_blocks_at_height_async(height)), ) .await; @@ -435,7 +425,7 @@ impl StrataApiServer for StrataRpcImpl { let blocks = futures::future::join_all( block_ids .iter() - .map(|blkid| self.l2_block_manager.get_block_data_async(blkid)), + .map(|blkid| self.storage.l2().get_block_data_async(blkid)), ) .await; @@ -451,7 +441,8 @@ impl StrataApiServer for StrataRpcImpl { async fn get_raw_bundle_by_id(&self, block_id: L2BlockId) -> RpcResult> { let block = self - .l2_block_manager + .storage + .l2() .get_block_data_async(&block_id) .await .map_err(|e| Error::Other(e.to_string()))? @@ -823,38 +814,36 @@ impl StrataSequencerApiServer for SequencerServerImpl { } pub struct StrataDebugRpcImpl { - l2_block_manager: Arc, + storage: Arc, database: Arc, } impl StrataDebugRpcImpl { - pub fn new(l2_block_manager: Arc, database: Arc) -> Self { - Self { - l2_block_manager, - database, - } + pub fn new(storage: Arc, database: Arc) -> Self { + Self { storage, database } } } #[async_trait] impl StrataDebugApiServer for StrataDebugRpcImpl { async fn get_block_by_id(&self, block_id: L2BlockId) -> RpcResult> { - let l2_block_manager = &self.l2_block_manager; - let l2_block = l2_block_manager + let l2_block = self + .storage + .l2() .get_block_data_async(&block_id) .await .map_err(Error::Db)? .map(|b| b.block().clone()); Ok(l2_block) } + async fn get_chainstate_at_idx(&self, idx: u64) -> RpcResult> { - let db = self.database.clone(); - let chain_state = wait_blocking("chain_state_at_idx", move || { - db.chain_state_db() - .get_toplevel_state(idx) - .map_err(Error::Db) - }) - .await?; + let chain_state = self + .storage + .chainstate() + .get_toplevel_chainstate_async(idx) + .map_err(Error::Db) + .await?; match chain_state { Some(cs) => Ok(Some(RpcChainState { tip_blkid: cs.chain_tip_blockid(), diff --git a/crates/consensus-logic/src/csm/client_transition.rs b/crates/consensus-logic/src/csm/client_transition.rs index 8d352dca6..3daa98061 100644 --- a/crates/consensus-logic/src/csm/client_transition.rs +++ b/crates/consensus-logic/src/csm/client_transition.rs @@ -8,7 +8,8 @@ use strata_db::traits::{ChainstateDatabase, Database, L1Database, L2BlockDatabas use strata_primitives::prelude::*; use strata_state::{ batch::{BatchCheckpoint, BatchCheckpointWithCommitment, BatchInfo}, - block, + block::{self, L2BlockBundle}, + chain_state::Chainstate, client_state::*, header::L2Header, id::L2BlockId, @@ -16,16 +17,85 @@ use strata_state::{ operation::*, sync_event::SyncEvent, }; +use strata_storage::NodeStorage; use tracing::*; use crate::{errors::*, genesis::make_genesis_block, l1_handler::verify_proof}; +/// Interface for external context necessary specifically for event validation. +pub trait EventContext { + fn get_l1_block_manifest(&self, height: u64) -> Result; + fn get_l2_block_data(&self, blkid: &L2BlockId) -> Result; + fn get_toplevel_chainstate(&self, slot: u64) -> Result; +} + +/// Event context using the main node storage interfaace. +pub struct StorageEventContext<'c> { + storage: &'c NodeStorage, +} + +impl<'c> StorageEventContext<'c> { + pub fn new(storage: &'c NodeStorage) -> Self { + Self { storage } + } +} + +impl<'c> EventContext for StorageEventContext<'c> { + fn get_l1_block_manifest(&self, height: u64) -> Result { + self.storage + .l1() + .get_block_manifest(height)? + .ok_or(Error::MissingL1BlockHeight(height)) + } + + fn get_l2_block_data(&self, blkid: &L2BlockId) -> Result { + self.storage + .l2() + .get_block_data_blocking(blkid)? + .ok_or(Error::MissingL2Block(*blkid)) + } + + fn get_toplevel_chainstate(&self, slot: u64) -> Result { + self.storage + .chainstate() + .get_toplevel_chainstate_blocking(slot)? + .ok_or(Error::MissingIdxChainstate(slot)) + } +} + +#[cfg(test)] +pub struct DummyEventContext { + // maybe something? +} + +#[cfg(test)] +impl DummyEventContext { + pub fn new() -> Self { + Self {} + } +} + +#[cfg(test)] +impl EventContext for DummyEventContext { + fn get_l1_block_manifest(&self, height: u64) -> Result { + Err(Error::MissingL1BlockHeight(height)) + } + + fn get_l2_block_data(&self, blkid: &L2BlockId) -> Result { + Err(Error::MissingL2Block(*blkid)) + } + + fn get_toplevel_chainstate(&self, slot: u64) -> Result { + Err(Error::MissingIdxChainstate(slot)) + } +} + /// Processes the event given the current consensus state, producing some /// output. This can return database errors. -pub fn process_event( +pub fn process_event( state: &ClientState, ev: &SyncEvent, - database: &D, + context: &impl EventContext, params: &Params, ) -> Result { let mut writes = Vec::new(); @@ -44,10 +114,7 @@ pub fn process_event( // FIXME this doesn't do any SPV checks to make sure we only go to // a longer chain, it just does it unconditionally - let l1_db = database.l1_db(); - let block_mf = l1_db - .get_block_manifest(*height)? - .ok_or(Error::MissingL1BlockHeight(*height))?; + let block_mf = context.get_l1_block_manifest(*height)?; let l1v = state.l1_view(); let l1_vs = state.l1_view().tip_verification_state(); @@ -57,9 +124,7 @@ pub fn process_event( let l1_vs_height = l1_vs.last_verified_block_num as u64; let mut updated_l1vs = l1_vs.clone(); for height in (l1_vs_height + 1..l1v.tip_height()) { - let block_mf = l1_db - .get_block_manifest(height)? - .ok_or(Error::MissingL1BlockHeight(height))?; + let block_mf = context.get_l1_block_manifest(height)?; let header: Header = bitcoin::consensus::deserialize(block_mf.header()).unwrap(); updated_l1vs = @@ -74,9 +139,7 @@ pub fn process_event( if next_exp_height > params.rollup().horizon_l1_height { // TODO check that the new block we're trying to add has the same parent as the tip // block - let cur_tip_block = l1_db - .get_block_manifest(cur_seen_tip_height)? - .ok_or(Error::MissingL1BlockHeight(cur_seen_tip_height))?; + let cur_tip_block = context.get_l1_block_manifest(cur_seen_tip_height)?; } if *height == next_exp_height { @@ -92,7 +155,7 @@ pub fn process_event( let maturable_height = next_exp_height.saturating_sub(safe_depth); if maturable_height > params.rollup().horizon_l1_height && state.is_chain_active() { - let (wrs, acts) = handle_mature_l1_height(maturable_height, state, database); + let (wrs, acts) = handle_mature_l1_height(maturable_height, state, context); writes.extend(wrs); actions.extend(acts); } @@ -136,7 +199,8 @@ pub fn process_event( } SyncEvent::L1Revert(to_height) => { - let l1_db = database.l1_db(); + // TODO not sure why this was here + //let l1_db = database.l1_db(); let buried = state.l1_view().buried_l1_height(); if *to_height < buried { @@ -153,7 +217,8 @@ pub fn process_event( if let Some(ss) = state.sync() { // TODO load it up and figure out what's there, see if we have to // load the state updates from L1 or something - let l2_db = database.l2_db(); + // TODO not sure why this was here + //let l2_db = database.l2_db(); let proof_verified_checkpoints = filter_verified_checkpoints(state, checkpoints, params.rollup()); @@ -189,18 +254,13 @@ pub fn process_event( } SyncEvent::NewTipBlock(blkid) => { + // TODO remove ^this sync event type and all associated fields debug!(?blkid, "Received NewTipBlock"); - let l2_db = database.l2_db(); - let block = l2_db - .get_block_data(*blkid)? - .ok_or(Error::MissingL2Block(*blkid))?; + let block = context.get_l2_block_data(blkid)?; // TODO: get chainstate idx from blkid OR pass correct idx in sync event - let block_idx = block.header().blockidx(); - let chainstate_db = database.chain_state_db(); - let chainstate = chainstate_db - .get_toplevel_state(block_idx)? - .ok_or(Error::MissingIdxChainstate(block_idx))?; + let slot = block.header().blockidx(); + let chainstate = context.get_toplevel_chainstate(slot)?; debug!(?chainstate, "Chainstate for new tip block"); // height of last matured L1 block in chain state @@ -227,7 +287,7 @@ pub fn process_event( )); actions.push(SyncAction::UpdateTip(*blkid)); - let (wrs, acts) = handle_checkpoint_finalization(state, blkid, params, database); + let (wrs, acts) = handle_checkpoint_finalization(state, blkid, params, context); writes.extend(wrs); actions.extend(acts); } @@ -260,7 +320,7 @@ pub fn process_event( fn handle_mature_l1_height( maturable_height: u64, state: &ClientState, - database: &impl Database, + context: &impl EventContext, ) -> (Vec, Vec) { let mut writes = Vec::new(); let mut actions = Vec::new(); @@ -279,23 +339,18 @@ fn handle_mature_l1_height( // If l2 blocks is not in db then finalization will happen when // l2Block is fetched from the network and the corresponding //checkpoint is already finalized. - let l2_blockid = checkpt.batch_info.l2_blockid; + let l2_blkid = checkpt.batch_info.l2_blockid; - match database.l2_db().get_block_data(l2_blockid) { - Ok(Some(_)) => { + match context.get_l2_block_data(&l2_blkid) { + Ok(_) => { debug!(%maturable_height, "Writing CheckpointFinalized"); writes.push(ClientStateWrite::CheckpointFinalized(maturable_height)); // Emit sync action for finalizing a l2 block - info!(%maturable_height, %l2_blockid, "l2 block found in db, push FinalizeBlock SyncAction"); - actions.push(SyncAction::FinalizeBlock(l2_blockid)); - } - Ok(None) => { - warn!( - %maturable_height,%l2_blockid, "l2 block not in db yet, skipping finalize" - ); + info!(%maturable_height, %l2_blkid, "L1 block found in db, push FinalizeBlock SyncAction"); + actions.push(SyncAction::FinalizeBlock(l2_blkid)); } Err(e) => { - error!(%e, "error while fetching block data from l2_db"); + error!(%maturable_height, %l2_blkid, %e, "error while fetching block data"); } } } else { @@ -333,7 +388,7 @@ fn handle_checkpoint_finalization( state: &ClientState, blkid: &L2BlockId, params: &Params, - database: &impl Database, + context: &impl EventContext, ) -> (Vec, Vec) { let mut writes = Vec::new(); let mut actions = Vec::new(); @@ -351,7 +406,7 @@ fn handle_checkpoint_finalization( // The l1 height should be handled only if it is less than maturable height if l1_height < maturable_height { - let (wrs, acts) = handle_mature_l1_height(l1_height, state, database); + let (wrs, acts) = handle_mature_l1_height(l1_height, state, context); writes.extend(wrs); actions.extend(acts); } @@ -489,11 +544,13 @@ mod tests { database: &impl Database, params: &Params, ) { + let context = DummyEventContext::new(); + for case in test_cases { println!("Running test case: {}", case.description); let mut outputs = Vec::new(); for (i, test_event) in case.events.iter().enumerate() { - let output = process_event(state, &test_event.event, database, params).unwrap(); + let output = process_event(state, &test_event.event, &context, params).unwrap(); outputs.push(output.clone()); assert_eq!( output.writes(), diff --git a/crates/consensus-logic/src/csm/state_tracker.rs b/crates/consensus-logic/src/csm/state_tracker.rs index f2b232516..ea0530f19 100644 --- a/crates/consensus-logic/src/csm/state_tracker.rs +++ b/crates/consensus-logic/src/csm/state_tracker.rs @@ -11,6 +11,7 @@ use strata_state::{ client_state::ClientState, operation::{self, ClientUpdateOutput}, }; +use strata_storage::NodeStorage; use tracing::*; use super::client_transition; @@ -19,6 +20,7 @@ use crate::errors::Error; pub struct StateTracker { params: Arc, database: Arc, + storage: Arc, cur_state_idx: u64, @@ -29,12 +31,14 @@ impl StateTracker { pub fn new( params: Arc, database: Arc, + storage: Arc, cur_state_idx: u64, cur_state: Arc, ) -> Self { Self { params, database, + storage, cur_state_idx, cur_state, } @@ -73,7 +77,8 @@ impl StateTracker { check_bail_trigger(BAIL_SYNC_EVENT); // Compute the state transition. - let outp = client_transition::process_event(&self.cur_state, &ev, db, &self.params)?; + let context = client_transition::StorageEventContext::new(&self.storage); + let outp = client_transition::process_event(&self.cur_state, &ev, &context, &self.params)?; // Clone the state and apply the operations to it. let mut new_state = self.cur_state.as_ref().clone(); diff --git a/crates/consensus-logic/src/csm/worker.rs b/crates/consensus-logic/src/csm/worker.rs index be3a1c709..1968f7b55 100644 --- a/crates/consensus-logic/src/csm/worker.rs +++ b/crates/consensus-logic/src/csm/worker.rs @@ -12,7 +12,7 @@ use strata_eectl::engine::ExecEngineCtl; use strata_primitives::prelude::*; use strata_state::{client_state::ClientState, csm_status::CsmStatus, operation::SyncAction}; use strata_status::StatusChannel; -use strata_storage::{CheckpointDbManager, L2BlockManager}; +use strata_storage::{CheckpointDbManager, L2BlockManager, NodeStorage}; use strata_tasks::ShutdownGuard; use tokio::{ sync::{broadcast, mpsc}, @@ -44,7 +44,7 @@ pub struct WorkerState { database: Arc, /// L2 block manager. - l2_block_manager: Arc, + storage: Arc, /// Checkpoint manager. checkpoint_manager: Arc, @@ -62,7 +62,7 @@ impl WorkerState { pub fn open( params: Arc, database: Arc, - l2_block_manager: Arc, + storage: Arc, cupdate_tx: broadcast::Sender>, checkpoint_manager: Arc, ) -> anyhow::Result { @@ -71,6 +71,7 @@ impl WorkerState { let state_tracker = state_tracker::StateTracker::new( params.clone(), database.clone(), + storage.clone(), cur_state_idx, Arc::new(cur_state), ); @@ -87,7 +88,7 @@ impl WorkerState { params, config, database, - l2_block_manager, + storage, state_tracker, cupdate_tx, checkpoint_manager, @@ -281,7 +282,8 @@ fn apply_action( // TODO not sure what this should entail yet warn!(?blkid, "marking block invalid!"); state - .l2_block_manager + .storage + .l2() .set_block_status_blocking(&blkid, BlockStatus::Invalid)?; } @@ -299,8 +301,8 @@ fn apply_action( // TODO: use l1blkid during chain state genesis ? - let chstate = genesis::init_genesis_chainstate(&state.params, state.database.as_ref()) - .map_err(|err| { + let chstate = + genesis::init_genesis_chainstate(&state.params, &state.storage).map_err(|err| { error!(err = %err, "failed to compute chain genesis"); Error::GenesisFailed(err.to_string()) })?; diff --git a/crates/consensus-logic/src/duty/block_assembly.rs b/crates/consensus-logic/src/duty/block_assembly.rs index 30cc0435b..ccf138817 100644 --- a/crates/consensus-logic/src/duty/block_assembly.rs +++ b/crates/consensus-logic/src/duty/block_assembly.rs @@ -31,6 +31,7 @@ use strata_state::{ state_op::*, tx::ProtocolOperation::Deposit, }; +use strata_storage::NodeStorage; use tracing::*; use super::types::*; @@ -44,20 +45,21 @@ const MAX_L1_ENTRIES_PER_BLOCK: usize = 100; /// Signs and stores a block in the database. Does not submit it to the /// forkchoice manager. // TODO pass in the CSM state we're using to assemble this -pub(super) fn sign_and_store_block( +pub(super) fn sign_and_store_block( slot: u64, prev_block_id: L2BlockId, l1_state: &LocalL1State, ik: &IdentityKey, - database: &D, - engine: &E, + database: &impl Database, + storage: &NodeStorage, + engine: &impl ExecEngineCtl, params: &Arc, ) -> Result, Error> { debug!("preparing block"); let l1_db = database.l1_db(); let l2_db = database.l2_db(); let cs_db = database.client_state_db(); - let chs_db = database.chain_state_db(); + let chsman = storage.chainstate(); // Check the block we were supposed to build isn't already in the database, // if so then just republish that. This checks that there just if we have a @@ -96,8 +98,8 @@ pub(super) fn sign_and_store_block( // TODO make this get the prev block slot from somewhere more reliable in // case we skip slots let prev_slot = prev_block.header().blockidx(); - let prev_chstate = chs_db - .get_toplevel_state(prev_slot)? + let prev_chstate = chsman + .get_toplevel_chainstate_blocking(prev_slot)? .ok_or(Error::MissingBlockChainstate(prev_block_id))?; // Figure out the save L1 blkid. diff --git a/crates/consensus-logic/src/duty/extractor.rs b/crates/consensus-logic/src/duty/extractor.rs index 1fec0d68f..d94e56f3a 100644 --- a/crates/consensus-logic/src/duty/extractor.rs +++ b/crates/consensus-logic/src/duty/extractor.rs @@ -1,6 +1,7 @@ use strata_db::traits::ChainstateDatabase; use strata_primitives::{buf::Buf32, params::Params}; use strata_state::{batch::BatchInfo, client_state::ClientState, id::L2BlockId}; +use strata_storage::NodeStorage; use tracing::*; use super::types::{BlockSigningDuty, Duty, Identity}; @@ -11,7 +12,7 @@ pub fn extract_duties( state: &ClientState, _ident: &Identity, _params: &Params, - chs_db: &impl ChainstateDatabase, + storage: &NodeStorage, rollup_params_commitment: Buf32, ) -> Result, Error> { // If a sync state isn't present then we probably don't have anything we @@ -32,7 +33,7 @@ pub fn extract_duties( state, tip_height, tip_blkid, - chs_db, + storage, rollup_params_commitment, )?); @@ -43,9 +44,11 @@ fn extract_batch_duties( state: &ClientState, tip_height: u64, tip_id: L2BlockId, - chs_db: &impl ChainstateDatabase, + storage: &NodeStorage, rollup_params_commitment: Buf32, ) -> Result, Error> { + let chsman = storage.chainstate(); + if !state.is_chain_active() { debug!("chain not active, no duties created"); // There are no duties if the chain is not yet active @@ -88,13 +91,13 @@ fn extract_batch_duties( // Start from first non-genesis l2 block height let l2_range = (1, tip_height); - let initial_chain_state = chs_db - .get_toplevel_state(0)? + let initial_chain_state = chsman + .get_toplevel_chainstate_blocking(0)? .ok_or(Error::MissingIdxChainstate(0))?; let initial_chain_state_root = initial_chain_state.compute_state_root(); - let current_chain_state = chs_db - .get_toplevel_state(tip_height)? + let current_chain_state = chsman + .get_toplevel_chainstate_blocking(tip_height)? .ok_or(Error::MissingIdxChainstate(0))?; let current_chain_state_root = current_chain_state.compute_state_root(); let l2_transition = (initial_chain_state_root, current_chain_state_root); @@ -131,8 +134,8 @@ fn extract_batch_duties( // Also, rather than tip heights, we might need to limit the max range a prover will be // proving let l2_range = (checkpoint.l2_range.1 + 1, tip_height); - let current_chain_state = chs_db - .get_toplevel_state(tip_height)? + let current_chain_state = chsman + .get_toplevel_chainstate_blocking(tip_height)? .ok_or(Error::MissingIdxChainstate(0))?; let current_chain_state_root = current_chain_state.compute_state_root(); let l2_transition = (checkpoint.l2_transition.1, current_chain_state_root); diff --git a/crates/consensus-logic/src/duty/worker.rs b/crates/consensus-logic/src/duty/worker.rs index 0d16c65f6..4b7502b1b 100644 --- a/crates/consensus-logic/src/duty/worker.rs +++ b/crates/consensus-logic/src/duty/worker.rs @@ -18,7 +18,7 @@ use strata_primitives::{ params::Params, }; use strata_state::{batch::SignedBatchCheckpoint, client_state::ClientState, prelude::*}; -use strata_storage::L2BlockManager; +use strata_storage::{L2BlockManager, NodeStorage}; use strata_tasks::{ShutdownGuard, TaskExecutor}; use tokio::sync::broadcast; use tracing::*; @@ -41,7 +41,7 @@ pub fn duty_tracker_task( batch_queue: broadcast::Sender, ident: Identity, database: Arc, - l2_block_manager: Arc, + storage: Arc, params: Arc, ) -> Result<(), Error> { let db = database.as_ref(); @@ -51,7 +51,7 @@ pub fn duty_tracker_task( batch_queue, ident, db, - l2_block_manager.as_ref(), + storage.as_ref(), params.as_ref(), ) } @@ -62,7 +62,7 @@ fn duty_tracker_task_inner( batch_queue: broadcast::Sender, ident: Identity, database: &impl Database, - l2_block_manager: &L2BlockManager, + storage: &NodeStorage, params: &Params, ) -> Result<(), Error> { let mut duties_tracker = types::DutyTracker::new_empty(); @@ -105,9 +105,8 @@ fn duty_tracker_task_inner( &mut duties_tracker, new_state, &ident, - l2_block_manager, + storage, params, - &**database.chain_state_db(), rollup_params_commitment, ) { error!(err = %e, "failed to update duties tracker"); @@ -129,9 +128,8 @@ fn update_tracker( tracker: &mut types::DutyTracker, state: &ClientState, ident: &Identity, - l2_block_manager: &L2BlockManager, + storage: &NodeStorage, params: &Params, - chs_db: &impl ChainstateDatabase, rollup_params_commitment: Buf32, ) -> Result<(), Error> { let Some(ss) = state.sync() else { @@ -139,14 +137,15 @@ fn update_tracker( }; let new_duties = - extractor::extract_duties(state, ident, params, chs_db, rollup_params_commitment)?; + extractor::extract_duties(state, ident, params, storage, rollup_params_commitment)?; info!(new_duties = ?new_duties, "new duties"); // Figure out the block slot from the tip blockid. // TODO include the block slot in the consensus state let tip_blkid = *ss.chain_tip_blkid(); - let block = l2_block_manager + let block = storage + .l2() .get_block_data_blocking(&tip_blkid)? .ok_or(Error::MissingL2Block(tip_blkid))?; let block_idx = block.header().blockidx(); @@ -154,11 +153,8 @@ fn update_tracker( // Figure out which blocks were finalized let new_finalized = state.sync().map(|sync| *sync.finalized_blkid()); - let newly_finalized_blocks: Vec = get_finalized_blocks( - tracker.get_finalized_block(), - l2_block_manager, - new_finalized, - )?; + let newly_finalized_blocks: Vec = + get_finalized_blocks(tracker.get_finalized_block(), storage.l2(), new_finalized)?; let latest_finalized_batch = state .l1_view() @@ -225,6 +221,7 @@ pub fn duty_dispatch_task< identity_key: IdentityKey, sync_manager: Arc, database: Arc, + storage: Arc, engine: Arc, envelope_handle: Arc, pool: threadpool::ThreadPool, @@ -291,6 +288,7 @@ pub fn duty_dispatch_task< let identiy_key = identity_key.clone(); let sync_manager = sync_manager.clone(); let database = database.clone(); + let storage = storage.clone(); let engine = engine.clone(); let envelope_handle = envelope_handle.clone(); let params = params.clone(); @@ -303,6 +301,7 @@ pub fn duty_dispatch_task< identiy_key, sync_manager, database, + storage, engine, envelope_handle, params, @@ -331,6 +330,7 @@ fn duty_exec_task( identity_key: IdentityKey, sync_manager: Arc, database: Arc, + storage: Arc, engine: Arc, envelope_handle: Arc, params: Arc, @@ -343,6 +343,7 @@ fn duty_exec_task( &identity_key, &sync_manager, database.as_ref(), + storage.as_ref(), engine.as_ref(), envelope_handle.as_ref(), ¶ms, @@ -361,12 +362,13 @@ fn duty_exec_task( } #[allow(clippy::too_many_arguments)] -fn perform_duty( +fn perform_duty( duty: &Duty, identity_key: &IdentityKey, sync_manager: &SyncManager, - database: &D, - engine: &E, + database: &impl Database, + storage: &NodeStorage, + engine: &impl ExecEngineCtl, envelope_handle: &EnvelopeHandle, params: &Arc, checkpoint_handle: Arc, @@ -394,6 +396,7 @@ fn perform_duty( &l1_view, identity_key, database, + storage, engine, params, )? diff --git a/crates/consensus-logic/src/fork_choice_manager.rs b/crates/consensus-logic/src/fork_choice_manager.rs index 2c88217c8..35b208653 100644 --- a/crates/consensus-logic/src/fork_choice_manager.rs +++ b/crates/consensus-logic/src/fork_choice_manager.rs @@ -16,7 +16,7 @@ use strata_state::{ client_state::ClientState, prelude::*, state_op::StateCache, sync_event::SyncEvent, }; use strata_status::StatusChannel; -use strata_storage::L2BlockManager; +use strata_storage::{L2BlockManager, NodeStorage}; use strata_tasks::ShutdownGuard; use tokio::{ runtime::Handle, @@ -37,10 +37,11 @@ pub struct ForkChoiceManager { params: Arc, /// Underlying state database. + // TODO remove database: Arc, - /// L2 block manager. - l2_block_manager: Arc, + /// Common node storage interface. + storage: Arc, /// Current CSM state, as of the last time we were updated about it. cur_csm_state: Arc, @@ -61,7 +62,7 @@ impl ForkChoiceManager { pub fn new( params: Arc, database: Arc, - l2_block_manager: Arc, + storage: Arc, cur_csm_state: Arc, chain_tracker: unfinalized_tracker::UnfinalizedBlockTracker, cur_best_block: L2BlockId, @@ -70,7 +71,7 @@ impl ForkChoiceManager { Self { params, database, - l2_block_manager, + storage, cur_csm_state, chain_tracker, cur_best_block, @@ -83,17 +84,16 @@ impl ForkChoiceManager { } fn set_block_status(&self, id: &L2BlockId, status: BlockStatus) -> Result<(), DbError> { - self.l2_block_manager - .set_block_status_blocking(id, status)?; + self.storage.l2().set_block_status_blocking(id, status)?; Ok(()) } fn get_block_status(&self, id: &L2BlockId) -> Result, DbError> { - self.l2_block_manager.get_block_status_blocking(id) + self.storage.l2().get_block_status_blocking(id) } fn get_block_data(&self, id: &L2BlockId) -> Result, DbError> { - self.l2_block_manager.get_block_data_blocking(id) + self.storage.l2().get_block_data_blocking(id) } fn get_block_index(&self, blkid: &L2BlockId) -> anyhow::Result { @@ -114,7 +114,7 @@ impl ForkChoiceManager { /// Creates the forkchoice manager state from a database and rollup params. pub fn init_forkchoice_manager( database: &Arc, - l2_block_manager: &Arc, + storage: &Arc, params: &Arc, init_csm_state: Arc, ) -> anyhow::Result> { @@ -124,7 +124,8 @@ pub fn init_forkchoice_manager( let chain_tip_height = sync_state.chain_tip_height(); let finalized_blockid = *sync_state.finalized_blkid(); - let finalized_block = l2_block_manager + let finalized_block = storage + .l2() .get_block_data_blocking(&finalized_blockid)? .ok_or(Error::MissingL2Block(finalized_blockid))?; let finalized_height = finalized_block.header().blockidx(); @@ -137,17 +138,16 @@ pub fn init_forkchoice_manager( chain_tracker.load_unfinalized_blocks( finalized_height, chain_tip_height, - l2_block_manager.as_ref(), + storage.l2().as_ref(), )?; - let (cur_tip_blkid, cur_tip_index) = - determine_start_tip(&chain_tracker, l2_block_manager.as_ref())?; + let (cur_tip_blkid, cur_tip_index) = determine_start_tip(&chain_tracker, storage.l2())?; // Actually assemble the forkchoice manager state. let fcm = ForkChoiceManager::new( params.clone(), database.clone(), - l2_block_manager.clone(), + storage.clone(), init_csm_state, chain_tracker, cur_tip_blkid, @@ -197,7 +197,7 @@ pub fn tracker_task( shutdown: ShutdownGuard, handle: Handle, database: Arc, - l2_block_manager: Arc, + storage: Arc, engine: Arc, fcm_rx: mpsc::Receiver, csm_ctl: Arc, @@ -223,7 +223,7 @@ pub fn tracker_task( // Now that we have the database state in order, we can actually init the // FCM. - let fcm = match init_forkchoice_manager(&database, &l2_block_manager, ¶ms, init_state) { + let fcm = match init_forkchoice_manager(&database, &storage, ¶ms, init_state) { Ok(fcm) => fcm, Err(e) => { error!(err = %e, "failed to init forkchoice manager!"); @@ -374,7 +374,7 @@ fn process_fc_message( let best_block = pick_best_block( &cur_tip, fcm_state.chain_tracker.chain_tips_iter(), - &fcm_state.l2_block_manager, + fcm_state.storage.l2(), )?; // Figure out what our job is now. @@ -536,7 +536,7 @@ fn apply_tip_update( reorg: &reorg::Reorg, fc_manager: &mut ForkChoiceManager, ) -> anyhow::Result { - let chs_db = fc_manager.database.chain_state_db(); + let chsman = fc_manager.storage.as_ref().chainstate(); // See if we need to roll back recent changes. let pivot_blkid = reorg.pivot(); @@ -544,8 +544,8 @@ fn apply_tip_update( // Load the post-state of the pivot block as the block to start computing // blocks going forwards with. - let mut pre_state = chs_db - .get_toplevel_state(pivot_idx)? + let mut pre_state = chsman + .get_toplevel_chainstate_blocking(pivot_idx)? .ok_or(Error::MissingIdxChainstate(pivot_idx))?; let mut updates = Vec::new(); @@ -586,14 +586,14 @@ fn apply_tip_update( // compute new states. if pivot_idx < fc_manager.cur_index { debug!(?pivot_blkid, %pivot_idx, "rolling back chainstate"); - chs_db.rollback_writes_to(pivot_idx)?; + chsman.rollback_writes_to_blocking(pivot_idx)?; } // Now that we've verified the new chain is really valid, we can go and // apply the changes to commit to the new chain. for (idx, blkid, writes) in updates { debug!(?blkid, "applying CL state update"); - chs_db.write_state_update(idx, &writes)?; + chsman.put_write_batch_blocking(idx, writes)?; fc_manager.cur_best_block = *blkid; fc_manager.cur_index = idx; } diff --git a/crates/consensus-logic/src/genesis.rs b/crates/consensus-logic/src/genesis.rs index 13f077880..46c854529 100644 --- a/crates/consensus-logic/src/genesis.rs +++ b/crates/consensus-logic/src/genesis.rs @@ -17,6 +17,7 @@ use strata_state::{ l1::{L1HeaderRecord, L1ViewState}, prelude::*, }; +use strata_storage::{L1BlockManager, L2BlockManager, NodeStorage}; use tracing::*; use crate::errors::Error; @@ -47,7 +48,7 @@ pub fn init_client_state(params: &Params, database: &impl Database) -> anyhow::R /// that. pub fn init_genesis_chainstate( params: &Params, - database: &impl Database, + storage: &NodeStorage, ) -> anyhow::Result { debug!("preparing database genesis chainstate!"); @@ -55,7 +56,7 @@ pub fn init_genesis_chainstate( let genesis_blk_height = params.rollup.genesis_l1_height; // Query the pre-genesis blocks we need before we do anything else. - let l1_db = database.l1_db(); + let l1_db = storage.l1(); let pregenesis_mfs = load_pre_genesis_l1_manifests(l1_db.as_ref(), horizon_blk_height, genesis_blk_height)?; @@ -64,10 +65,8 @@ pub fn init_genesis_chainstate( let gchstate = make_genesis_chainstate(&gblock, pregenesis_mfs, params); // Now insert things into the database. - let chs_db = database.chain_state_db(); - let l2_db = database.l2_db(); - chs_db.write_genesis_state(&gchstate)?; - l2_db.put_block_data(gblock)?; + storage.chainstate().write_genesis_state(gchstate.clone())?; + storage.l2().put_block_data_blocking(gblock)?; // TODO make ^this be atomic so we can't accidentally not write both, or // make it so we can overwrite the genesis chainstate if there's no other @@ -84,13 +83,13 @@ pub fn construct_operator_table(opconfig: &OperatorConfig) -> OperatorTable { } fn load_pre_genesis_l1_manifests( - l1_db: &impl L1Database, + l1man: &L1BlockManager, horizon_height: u64, genesis_height: u64, ) -> anyhow::Result> { let mut manifests = Vec::new(); for height in horizon_height..=genesis_height { - let Some(mf) = l1_db.get_block_manifest(height)? else { + let Some(mf) = l1man.get_block_manifest(height)? else { return Err(Error::MissingL1BlockHeight(height).into()); }; @@ -176,11 +175,9 @@ pub fn check_needs_client_init(database: &impl Database) -> anyhow::Result Ok(false) } -pub fn check_needs_genesis(database: &impl Database) -> anyhow::Result { - let l2_db = database.l2_db(); - +pub fn check_needs_genesis(l2man: &L2BlockManager) -> anyhow::Result { // Check if there's any genesis block written. - match l2_db.get_blocks_at_height(0) { + match l2man.get_blocks_at_height_blocking(0) { Ok(blkids) => Ok(blkids.is_empty()), Err(DbError::NotBootstrapped) => Ok(true), diff --git a/crates/consensus-logic/src/sync_manager.rs b/crates/consensus-logic/src/sync_manager.rs index 0d07c0430..6a76d17fc 100644 --- a/crates/consensus-logic/src/sync_manager.rs +++ b/crates/consensus-logic/src/sync_manager.rs @@ -79,7 +79,7 @@ pub fn start_sync_tasks< >( executor: &TaskExecutor, database: Arc, - storage: &NodeStorage, + storage: Arc, engine: Arc, pool: threadpool::ThreadPool, params: Arc, @@ -97,7 +97,7 @@ pub fn start_sync_tasks< // Start the fork choice manager thread. If we haven't done genesis yet // this will just wait until the CSM says we have. let fcm_database = database.clone(); - let fcm_l2_block_manager = storage.l2().clone(); + let fcm_storage = storage.clone(); let fcm_engine = engine.clone(); let fcm_csm_controller = csm_controller.clone(); let fcm_params = params.clone(); @@ -109,7 +109,7 @@ pub fn start_sync_tasks< shutdown, handle, fcm_database, - fcm_l2_block_manager, + fcm_storage, fcm_engine, fcm_rx, fcm_csm_controller, @@ -122,7 +122,7 @@ pub fn start_sync_tasks< let client_worker_state = worker::WorkerState::open( params.clone(), database, - storage.l2().clone(), + storage.clone(), cupdate_tx, storage.checkpoint().clone(), )?; diff --git a/crates/db/src/stubs/chain_state.rs b/crates/db/src/stubs/chain_state.rs index ef5042d56..8c73516a9 100644 --- a/crates/db/src/stubs/chain_state.rs +++ b/crates/db/src/stubs/chain_state.rs @@ -1,7 +1,7 @@ use std::collections::*; use parking_lot::Mutex; -use strata_state::{chain_state::Chainstate, state_op, state_op::WriteBatch}; +use strata_state::{chain_state::Chainstate, state_op::WriteBatch}; use tracing::*; use crate::{errors::DbError, traits::*, DbResult}; @@ -46,13 +46,13 @@ impl StubChainstateDb { } impl ChainstateDatabase for StubChainstateDb { - fn write_genesis_state(&self, toplevel: &Chainstate) -> DbResult<()> { + fn write_genesis_state(&self, toplevel: Chainstate) -> DbResult<()> { let mut st = self.state.lock(); st.toplevels.insert(0, toplevel.clone()); Ok(()) } - fn write_state_update(&self, idx: u64, batch: &WriteBatch) -> DbResult<()> { + fn put_write_batch(&self, idx: u64, batch: WriteBatch) -> DbResult<()> { let mut st = self.state.lock(); let last_idx = st.find_last_write_batch(); @@ -60,21 +60,20 @@ impl ChainstateDatabase for StubChainstateDb { return Err(DbError::OooInsert("chainstate", idx)); } - let toplevel = st + let _toplevel = st .toplevels .get(&last_idx) .cloned() .expect("chainstatedb: nonsense"); // Compute new state and insert things. - let new_state = state_op::apply_write_batch_to_chainstate(toplevel, batch); - st.toplevels.insert(idx, new_state); st.write_batches.insert(idx, batch.clone()); + st.toplevels.insert(idx, batch.into_toplevel()); Ok(()) } - fn purge_historical_state_before(&self, before_idx: u64) -> DbResult<()> { + fn purge_entries_before(&self, before_idx: u64) -> DbResult<()> { let mut st = self.state.lock(); if !st.toplevels.contains_key(&before_idx) { @@ -133,12 +132,12 @@ impl ChainstateDatabase for StubChainstateDb { Ok(()) } - fn get_last_state_idx(&self) -> DbResult { + fn get_last_write_idx(&self) -> DbResult { let st = self.state.lock(); Ok(st.find_last_write_batch()) } - fn get_earliest_state_idx(&self) -> DbResult { + fn get_earliest_write_idx(&self) -> DbResult { let st = self.state.lock(); let idx = st .toplevels @@ -148,16 +147,8 @@ impl ChainstateDatabase for StubChainstateDb { Ok(idx) } - fn get_writes_at(&self, idx: u64) -> DbResult> { + fn get_write_batch(&self, idx: u64) -> DbResult> { let st = self.state.lock(); Ok(st.write_batches.get(&idx).cloned()) } - - fn get_toplevel_state( - &self, - idx: u64, - ) -> DbResult> { - let st = self.state.lock(); - Ok(st.toplevels.get(&idx).cloned()) - } } diff --git a/crates/db/src/traits.rs b/crates/db/src/traits.rs index ad7c3cf6d..4d24743ac 100644 --- a/crates/db/src/traits.rs +++ b/crates/db/src/traits.rs @@ -12,8 +12,7 @@ use strata_primitives::{ }; use strata_state::{ block::L2BlockBundle, bridge_duties::BridgeDutyStatus, chain_state::Chainstate, - client_state::ClientState, l1::L1Tx, operation::*, prelude::*, state_op::WriteBatch, - sync_event::SyncEvent, + client_state::ClientState, l1::L1Tx, operation::*, state_op::WriteBatch, sync_event::SyncEvent, }; use strata_zkvm::ProofReceipt; @@ -214,27 +213,23 @@ pub trait ChainstateDatabase { /// Stores a write batch in the database, possibly computing that state /// under the hood from the writes. Will not overwrite existing data, /// previous writes must be purged first in order to be replaced. - fn write_state_update(&self, idx: u64, batch: WriteBatch) -> DbResult<()>; + fn put_write_batch(&self, idx: u64, batch: WriteBatch) -> DbResult<()>; + + /// Gets the write batch stored to compute a height. + fn get_write_batch(&self, idx: u64) -> DbResult>; /// Tells the database to purge state before a certain block index (height). - fn purge_historical_state_before(&self, before_idx: u64) -> DbResult<()>; + fn purge_entries_before(&self, before_idx: u64) -> DbResult<()>; /// Rolls back any writes and state checkpoints after a specified block. fn rollback_writes_to(&self, new_tip_idx: u64) -> DbResult<()>; /// Gets the last written state. - fn get_last_state_idx(&self) -> DbResult; + fn get_last_write_idx(&self) -> DbResult; /// Gets the earliest written state. This corresponds to calls to /// `purge_historical_state_before`. - fn get_earliest_state_idx(&self) -> DbResult; - - /// Gets the write batch stored to compute a height. - fn get_writes_at(&self, idx: u64) -> DbResult>; - - /// Gets the toplevel chain state at a particular block slot, if it can be - /// retrieved. - fn get_toplevel_state(&self, idx: u64) -> DbResult>; + fn get_earliest_write_idx(&self) -> DbResult; } /// Db trait for Checkpoint data diff --git a/crates/rocksdb-store/src/chain_state/db.rs b/crates/rocksdb-store/src/chain_state/db.rs index 2a1d39942..5325947d4 100644 --- a/crates/rocksdb-store/src/chain_state/db.rs +++ b/crates/rocksdb-store/src/chain_state/db.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use rockbound::{OptimisticTransactionDB, SchemaBatch, SchemaDBOperationsExt}; use strata_db::{errors::DbError, traits::*, DbResult}; -use strata_state::state_op; use super::schemas::{ChainstateSchema, WriteBatchSchema}; use crate::{ @@ -30,32 +29,6 @@ impl ChainstateDb { } impl ChainstateDatabase for ChainstateDb { - fn get_earliest_state_idx(&self) -> DbResult { - match self.get_first_idx()? { - Some(idx) => Ok(idx), - None => Err(DbError::NotBootstrapped), - } - } - - fn get_last_state_idx(&self) -> DbResult { - match self.get_last_idx()? { - Some(idx) => Ok(idx), - None => Err(DbError::NotBootstrapped), - } - } - - fn get_writes_at(&self, idx: u64) -> DbResult> { - Ok(self.db.get::(&idx)?) - } - - // TODO: define what toplevel means more clearly - fn get_toplevel_state( - &self, - idx: u64, - ) -> DbResult> { - Ok(self.db.get::(&idx)?) - } - fn write_genesis_state(&self, toplevel: strata_state::chain_state::Chainstate) -> DbResult<()> { let genesis_key = 0; if self.get_first_idx()?.is_some() || self.get_last_idx()?.is_some() { @@ -65,31 +38,25 @@ impl ChainstateDatabase for ChainstateDb { Ok(()) } - fn write_state_update( - &self, - idx: u64, - batch: strata_state::state_op::WriteBatch, - ) -> DbResult<()> { + fn put_write_batch(&self, idx: u64, batch: strata_state::state_op::WriteBatch) -> DbResult<()> { if self.db.get::(&idx)?.is_some() { return Err(DbError::OverwriteStateUpdate(idx)); } - let pre_state_idx = idx - 1; - let pre_state = match self.db.get::(&pre_state_idx)? { - Some(state) => state, - None => return Err(DbError::OooInsert("Chainstate", idx)), - }; - let post_state = state_op::apply_write_batch_to_chainstate(pre_state, &batch); - let mut write_batch = SchemaBatch::new(); write_batch.put::(&idx, &batch)?; + let post_state = batch.into_toplevel(); write_batch.put::(&idx, &post_state)?; self.db.write_schemas(write_batch)?; Ok(()) } - fn purge_historical_state_before(&self, before_idx: u64) -> DbResult<()> { + fn get_write_batch(&self, idx: u64) -> DbResult> { + Ok(self.db.get::(&idx)?) + } + + fn purge_entries_before(&self, before_idx: u64) -> DbResult<()> { let first_idx = match self.get_first_idx()? { Some(idx) => idx, None => return Err(DbError::NotBootstrapped), @@ -135,13 +102,20 @@ impl ChainstateDatabase for ChainstateDb { self.db.write_schemas(del_batch)?; Ok(()) } + + fn get_earliest_write_idx(&self) -> DbResult { + self.get_first_idx()?.ok_or(DbError::NotBootstrapped) + } + + fn get_last_write_idx(&self) -> DbResult { + self.get_last_idx()?.ok_or(DbError::NotBootstrapped) + } } #[cfg(feature = "test_utils")] #[cfg(test)] mod tests { - use state_op::WriteBatch; - use strata_state::chain_state::Chainstate; + use strata_state::{chain_state::Chainstate, state_op::WriteBatch}; use strata_test_utils::ArbitraryGenerator; use super::*; @@ -157,74 +131,60 @@ mod tests { let genesis_state: Chainstate = ArbitraryGenerator::new().generate(); let db = setup_db(); - let res = db.get_earliest_state_idx(); + let res = db.get_earliest_write_idx(); assert!(res.is_err_and(|x| matches!(x, DbError::NotBootstrapped))); - let res = db.get_last_state_idx(); + let res = db.get_last_write_idx(); assert!(res.is_err_and(|x| matches!(x, DbError::NotBootstrapped))); - let res = db.write_genesis_state(&genesis_state); + let res = db.write_genesis_state(genesis_state.clone()); assert!(res.is_ok()); - let res = db.get_earliest_state_idx(); + let res = db.get_earliest_write_idx(); assert!(res.is_ok_and(|x| matches!(x, 0))); - let res = db.get_last_state_idx(); + let res = db.get_last_write_idx(); assert!(res.is_ok_and(|x| matches!(x, 0))); - let res = db.write_genesis_state(&genesis_state); + let res = db.write_genesis_state(genesis_state); assert!(res.is_err_and(|x| matches!(x, DbError::OverwriteStateUpdate(0)))); } #[test] fn test_write_state_update() { let db = setup_db(); - let batch = WriteBatch::new_empty(); + let genesis_state: Chainstate = ArbitraryGenerator::new().generate(); + let batch = WriteBatch::new_replace(genesis_state.clone()); - let res = db.write_state_update(1, &batch); + let res = db.put_write_batch(1, batch.clone()); assert!(res.is_err_and(|x| matches!(x, DbError::OooInsert("Chainstate", 1)))); - let genesis_state: Chainstate = ArbitraryGenerator::new().generate(); - db.write_genesis_state(&genesis_state).unwrap(); + db.write_genesis_state(genesis_state).unwrap(); - let res = db.write_state_update(1, &batch); + let res = db.put_write_batch(1, batch.clone()); assert!(res.is_ok()); - let res = db.write_state_update(2, &batch); + let res = db.put_write_batch(2, batch.clone()); assert!(res.is_ok()); - let res = db.write_state_update(2, &batch); + let res = db.put_write_batch(2, batch.clone()); assert!(res.is_err_and(|x| matches!(x, DbError::OverwriteStateUpdate(2)))); - let res = db.write_state_update(4, &batch); + let res = db.put_write_batch(4, batch.clone()); assert!(res.is_err_and(|x| matches!(x, DbError::OooInsert("Chainstate", 4)))); } - #[test] - fn test_get_toplevel_state() { - let db = setup_db(); - let genesis_state: Chainstate = ArbitraryGenerator::new().generate(); - let batch = WriteBatch::new_empty(); - - db.write_genesis_state(&genesis_state).unwrap(); - for i in 1..=5 { - assert!(db.get_toplevel_state(i).unwrap().is_none()); - db.write_state_update(i, &batch).unwrap(); - assert!(db.get_toplevel_state(i).unwrap().is_some()); - } - } - #[test] fn test_get_earliest_and_last_state_idx() { let db = setup_db(); let genesis_state: Chainstate = ArbitraryGenerator::new().generate(); - let batch = WriteBatch::new_empty(); + let batch = WriteBatch::new_replace(genesis_state.clone()); - db.write_genesis_state(&genesis_state).unwrap(); + db.write_genesis_state(genesis_state).unwrap(); for i in 1..=5 { - assert_eq!(db.get_earliest_state_idx().unwrap(), 0); - db.write_state_update(i, &batch).unwrap(); - assert_eq!(db.get_last_state_idx().unwrap(), i); + assert_eq!(db.get_earliest_write_idx().unwrap(), 0); + db.put_write_batch(i, batch.clone()).unwrap(); + assert_eq!(db.get_last_write_idx().unwrap(), i); } } @@ -232,36 +192,34 @@ mod tests { fn test_purge() { let db = setup_db(); let genesis_state: Chainstate = ArbitraryGenerator::new().generate(); - let batch = WriteBatch::new_empty(); + let batch = WriteBatch::new_replace(genesis_state.clone()); - db.write_genesis_state(&genesis_state).unwrap(); + db.write_genesis_state(genesis_state).unwrap(); for i in 1..=5 { - assert_eq!(db.get_earliest_state_idx().unwrap(), 0); - db.write_state_update(i, &batch).unwrap(); - assert_eq!(db.get_last_state_idx().unwrap(), i); + assert_eq!(db.get_earliest_write_idx().unwrap(), 0); + db.put_write_batch(i, batch.clone()).unwrap(); + assert_eq!(db.get_last_write_idx().unwrap(), i); } - db.purge_historical_state_before(3).unwrap(); + db.purge_entries_before(3).unwrap(); // Ensure that calling the purge again does not fail - db.purge_historical_state_before(3).unwrap(); + db.purge_entries_before(3).unwrap(); - assert_eq!(db.get_earliest_state_idx().unwrap(), 3); - assert_eq!(db.get_last_state_idx().unwrap(), 5); + assert_eq!(db.get_earliest_write_idx().unwrap(), 3); + assert_eq!(db.get_last_write_idx().unwrap(), 5); for i in 0..3 { - assert!(db.get_writes_at(i).unwrap().is_none()); - assert!(db.get_toplevel_state(i).unwrap().is_none()); + assert!(db.get_write_batch(i).unwrap().is_none()); } for i in 3..=5 { - assert!(db.get_writes_at(i).unwrap().is_some()); - assert!(db.get_toplevel_state(i).unwrap().is_some()); + assert!(db.get_write_batch(i).unwrap().is_some()); } - let res = db.purge_historical_state_before(2); + let res = db.purge_entries_before(2); assert!(res.is_err_and(|x| matches!(x, DbError::MissingL2State(2)))); - let res = db.purge_historical_state_before(1); + let res = db.purge_entries_before(1); assert!(res.is_err_and(|x| matches!(x, DbError::MissingL2State(1)))); } @@ -269,11 +227,11 @@ mod tests { fn test_rollback() { let db = setup_db(); let genesis_state: Chainstate = ArbitraryGenerator::new().generate(); - let batch = WriteBatch::new_empty(); + let batch = WriteBatch::new_replace(genesis_state.clone()); - db.write_genesis_state(&genesis_state).unwrap(); + db.write_genesis_state(genesis_state).unwrap(); for i in 1..=5 { - db.write_state_update(i, &batch).unwrap(); + db.put_write_batch(i, batch.clone()).unwrap(); } db.rollback_writes_to(3).unwrap(); @@ -281,21 +239,16 @@ mod tests { db.rollback_writes_to(3).unwrap(); for i in 4..=5 { - assert!(db.get_writes_at(i).unwrap().is_none()); - assert!(db.get_toplevel_state(i).unwrap().is_none()); - } - - for i in 0..=3 { - assert!(db.get_toplevel_state(i).unwrap().is_some()); + assert!(db.get_write_batch(i).unwrap().is_none()); } // For genesis there is no BatchWrites for i in 1..=3 { - assert!(db.get_writes_at(i).unwrap().is_some()); + assert!(db.get_write_batch(i).unwrap().is_some()); } - assert_eq!(db.get_earliest_state_idx().unwrap(), 0); - assert_eq!(db.get_last_state_idx().unwrap(), 3); + assert_eq!(db.get_earliest_write_idx().unwrap(), 0); + assert_eq!(db.get_last_write_idx().unwrap(), 3); let res = db.rollback_writes_to(5); assert!(res.is_err_and(|x| matches!(x, DbError::RevertAboveCurrent(5, 3)))); @@ -307,22 +260,22 @@ mod tests { assert!(res.is_ok()); db.rollback_writes_to(2).unwrap(); - assert_eq!(db.get_earliest_state_idx().unwrap(), 0); - assert_eq!(db.get_last_state_idx().unwrap(), 2); + assert_eq!(db.get_earliest_write_idx().unwrap(), 0); + assert_eq!(db.get_last_write_idx().unwrap(), 2); } #[test] fn test_purge_and_rollback() { let db = setup_db(); let genesis_state: Chainstate = ArbitraryGenerator::new().generate(); - let batch = WriteBatch::new_empty(); + let batch = WriteBatch::new_replace(genesis_state.clone()); - db.write_genesis_state(&genesis_state).unwrap(); + db.write_genesis_state(genesis_state).unwrap(); for i in 1..=5 { - db.write_state_update(i, &batch).unwrap(); + db.put_write_batch(i, batch.clone()).unwrap(); } - db.purge_historical_state_before(3).unwrap(); + db.purge_entries_before(3).unwrap(); let res = db.rollback_writes_to(3); assert!(res.is_ok()); diff --git a/crates/state/src/state_op.rs b/crates/state/src/state_op.rs index 8dde04039..93eb32540 100644 --- a/crates/state/src/state_op.rs +++ b/crates/state/src/state_op.rs @@ -62,7 +62,7 @@ impl WriteBatch { /// On a given in-memory chainstate, applies a write batch. /// /// This must succeed. Pancis if it does not. -pub fn apply_write_batch_to_chainstate(chainstate: Chainstate, batch: &WriteBatch) -> Chainstate { +pub fn apply_write_batch_to_chainstate(_chainstate: Chainstate, batch: &WriteBatch) -> Chainstate { // This replaces the whole toplevel state. This probably makes you think // it doesn't make sense to take the chainstate arg at all. But this will // probably make more sense in the future when we make the state structure diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 653ee3953..1169535df 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -15,6 +15,7 @@ use strata_db::traits::Database; /// A consolidation of database managers. #[derive(Clone)] pub struct NodeStorage { + l1_block_manager: Arc, l2_block_manager: Arc, chainstate_manager: Arc, @@ -23,6 +24,10 @@ pub struct NodeStorage { } impl NodeStorage { + pub fn l1(&self) -> &Arc { + &self.l1_block_manager + } + pub fn l2(&self) -> &Arc { &self.l2_block_manager } @@ -40,11 +45,13 @@ pub fn create_node_storage(db: Arc, pool: threadpool::ThreadPool) -> NodeS where D: Database + Sync + Send + 'static, { + let l1_block_manager = Arc::new(L1BlockManager::new(pool.clone(), db.clone())); let l2_block_manager = Arc::new(L2BlockManager::new(pool.clone(), db.clone())); let chainstate_manager = Arc::new(ChainstateManager::new(pool.clone(), db.clone())); let checkpoint_manager = Arc::new(CheckpointDbManager::new(pool.clone(), db.clone())); NodeStorage { + l1_block_manager, l2_block_manager, chainstate_manager, checkpoint_manager, diff --git a/crates/storage/src/managers/chainstate.rs b/crates/storage/src/managers/chainstate.rs index d32a171d0..f6168364c 100644 --- a/crates/storage/src/managers/chainstate.rs +++ b/crates/storage/src/managers/chainstate.rs @@ -28,42 +28,41 @@ impl ChainstateManager { self.ops.write_genesis_state_blocking(toplevel) } + /// Stores a new write batch at a particular index. pub async fn put_write_batch_async(&self, idx: u64, wb: WriteBatch) -> DbResult<()> { - self.ops.write_state_update_async(idx, wb).await?; + self.ops.put_write_batch_async(idx, wb).await?; self.wb_cache.purge(&idx); Ok(()) } + /// Stores a new write batch at a particular index. pub fn put_write_batch_blocking(&self, idx: u64, wb: WriteBatch) -> DbResult<()> { - self.ops.write_state_update_blocking(idx, wb)?; + self.ops.put_write_batch_blocking(idx, wb)?; self.wb_cache.purge(&idx); Ok(()) } /// Gets the writes stored for an index. - pub async fn get_writes_at_async(&self, idx: u64) -> DbResult> { + pub async fn get_write_batch_async(&self, idx: u64) -> DbResult> { self.wb_cache - .get_or_fetch(&idx, || self.ops.get_writes_at_chan(idx)) - .await? + .get_or_fetch(&idx, || self.ops.get_write_batch_chan(idx)) + .await } /// Gets the writes stored for an index. - pub fn get_writes_at_blocking(&self, idx: u64) -> DbResult> { + pub fn get_write_batch_blocking(&self, idx: u64) -> DbResult> { self.wb_cache - .get_or_fetch_blocking(&idx, || self.ops.get_writes_at_blocking(idx)) + .get_or_fetch_blocking(&idx, || self.ops.get_write_batch_blocking(idx)) } - pub async fn purge_state_before_async(&self, before_idx: u64) -> DbResult<()> { - self.ops - .purge_historical_state_before_async(before_idx) - .await?; + pub async fn purge_entries_before_async(&self, before_idx: u64) -> DbResult<()> { + self.ops.purge_entries_before_async(before_idx).await?; self.wb_cache.purge_if(|k| *k < before_idx); Ok(()) } - pub fn purge_state_before_blocking(&self, before_idx: u64) -> DbResult<()> { - self.ops - .purge_historical_state_before_blocking(before_idx)?; + pub fn purge_entries_before_blocking(&self, before_idx: u64) -> DbResult<()> { + self.ops.purge_entries_before_blocking(before_idx)?; self.wb_cache.purge_if(|k| *k < before_idx); Ok(()) } @@ -82,24 +81,24 @@ impl ChainstateManager { Ok(()) } - pub async fn get_first_state_idx_async(&self) -> DbResult { + pub async fn get_earliest_write_idx_async(&self) -> DbResult { // TODO convert to keep this cached in memory so we don't need both variants - self.ops.get_earliest_state_idx_async().await + self.ops.get_earliest_write_idx_async().await } - pub fn get_first_state_idx_blocking(&self) -> DbResult { + pub fn get_earliest_write_idx_blocking(&self) -> DbResult { // TODO convert to keep this cached in memory so we don't need both variants - self.ops.get_earliest_state_idx_blocking() + self.ops.get_earliest_write_idx_blocking() } - pub async fn get_last_state_idx_async(&self) -> DbResult { + pub async fn get_last_write_idx_async(&self) -> DbResult { // TODO convert to keep this cached in memory so we don't need both variants - self.ops.get_last_state_idx_async().await + self.ops.get_last_write_idx_async().await } - pub fn get_last_state_idx_blocking(&self) -> DbResult { + pub fn get_last_write_idx_blocking(&self) -> DbResult { // TODO convert to keep this cached in memory so we don't need both variants - self.ops.get_last_state_idx_blocking() + self.ops.get_last_write_idx_blocking() } // Nontrivial functions that aren't just 1:1. @@ -108,7 +107,7 @@ impl ChainstateManager { /// the write batch at an index. pub async fn get_toplevel_chainstate_async(&self, idx: u64) -> DbResult> { Ok(self - .get_writes_at_async(idx) + .get_write_batch_async(idx) .await? .map(|wb| wb.into_toplevel())) } @@ -117,7 +116,7 @@ impl ChainstateManager { /// the write batch at an index. pub fn get_toplevel_chainstate_blocking(&self, idx: u64) -> DbResult> { Ok(self - .get_writes_at_blocking(idx)? + .get_write_batch_blocking(idx)? .map(|wb| wb.into_toplevel())) } } diff --git a/crates/storage/src/ops/chainstate.rs b/crates/storage/src/ops/chainstate.rs index 43f117b33..99d5d8af8 100644 --- a/crates/storage/src/ops/chainstate.rs +++ b/crates/storage/src/ops/chainstate.rs @@ -10,12 +10,11 @@ use crate::exec::*; inst_ops_simple! { ( => ChainstateOps) { write_genesis_state(toplevel: Chainstate) => (); - write_state_update(idx: u64, batch: WriteBatch) => (); - purge_historical_state_before(before_idx: u64) => (); + put_write_batch(idx: u64, batch: WriteBatch) => (); + get_write_batch(idx: u64) => Option; + purge_entries_before(before_idx: u64) => (); rollback_writes_to(new_tip_idx: u64) => (); - get_last_state_idx() => u64; - get_earliest_state_idx() => u64; - get_writes_at(idx: u64) => Option; - get_toplevel_state(idx: u64) => Option; + get_last_write_idx() => u64; + get_earliest_write_idx() => u64; } } From 57cf761e5f3ef962bf260f061b01717a6eb8d743 Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Fri, 24 Jan 2025 14:33:22 -0500 Subject: [PATCH 09/11] consensus-logic: fixed CSM tests --- .../src/csm/client_transition.rs | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/crates/consensus-logic/src/csm/client_transition.rs b/crates/consensus-logic/src/csm/client_transition.rs index 3daa98061..0ad6a0771 100644 --- a/crates/consensus-logic/src/csm/client_transition.rs +++ b/crates/consensus-logic/src/csm/client_transition.rs @@ -63,33 +63,6 @@ impl<'c> EventContext for StorageEventContext<'c> { } } -#[cfg(test)] -pub struct DummyEventContext { - // maybe something? -} - -#[cfg(test)] -impl DummyEventContext { - pub fn new() -> Self { - Self {} - } -} - -#[cfg(test)] -impl EventContext for DummyEventContext { - fn get_l1_block_manifest(&self, height: u64) -> Result { - Err(Error::MissingL1BlockHeight(height)) - } - - fn get_l2_block_data(&self, blkid: &L2BlockId) -> Result { - Err(Error::MissingL2Block(*blkid)) - } - - fn get_toplevel_chainstate(&self, slot: u64) -> Result { - Err(Error::MissingIdxChainstate(slot)) - } -} - /// Processes the event given the current consensus state, producing some /// output. This can return database errors. pub fn process_event( @@ -526,6 +499,30 @@ mod tests { use super::*; use crate::genesis; + pub struct DummyEventContext { + // nothing + } + + impl DummyEventContext { + pub fn new() -> Self { + Self {} + } + } + + impl EventContext for DummyEventContext { + fn get_l1_block_manifest(&self, height: u64) -> Result { + Ok(ArbitraryGenerator::new().generate()) + } + + fn get_l2_block_data(&self, blkid: &L2BlockId) -> Result { + Err(Error::MissingL2Block(*blkid)) + } + + fn get_toplevel_chainstate(&self, slot: u64) -> Result { + Err(Error::MissingIdxChainstate(slot)) + } + } + struct TestEvent<'a> { event: SyncEvent, expected_writes: &'a [ClientStateWrite], From 22acb0c13b57d116d34a9f548932facfd13b37ce Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Fri, 24 Jan 2025 14:33:31 -0500 Subject: [PATCH 10/11] consensus-logic: fix unused imports --- crates/consensus-logic/src/csm/worker.rs | 2 +- crates/consensus-logic/src/duty/extractor.rs | 1 - crates/consensus-logic/src/fork_choice_manager.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/consensus-logic/src/csm/worker.rs b/crates/consensus-logic/src/csm/worker.rs index 1968f7b55..d09693de5 100644 --- a/crates/consensus-logic/src/csm/worker.rs +++ b/crates/consensus-logic/src/csm/worker.rs @@ -12,7 +12,7 @@ use strata_eectl::engine::ExecEngineCtl; use strata_primitives::prelude::*; use strata_state::{client_state::ClientState, csm_status::CsmStatus, operation::SyncAction}; use strata_status::StatusChannel; -use strata_storage::{CheckpointDbManager, L2BlockManager, NodeStorage}; +use strata_storage::{CheckpointDbManager, NodeStorage}; use strata_tasks::ShutdownGuard; use tokio::{ sync::{broadcast, mpsc}, diff --git a/crates/consensus-logic/src/duty/extractor.rs b/crates/consensus-logic/src/duty/extractor.rs index d94e56f3a..972b187e9 100644 --- a/crates/consensus-logic/src/duty/extractor.rs +++ b/crates/consensus-logic/src/duty/extractor.rs @@ -1,4 +1,3 @@ -use strata_db::traits::ChainstateDatabase; use strata_primitives::{buf::Buf32, params::Params}; use strata_state::{batch::BatchInfo, client_state::ClientState, id::L2BlockId}; use strata_storage::NodeStorage; diff --git a/crates/consensus-logic/src/fork_choice_manager.rs b/crates/consensus-logic/src/fork_choice_manager.rs index 35b208653..4007dc936 100644 --- a/crates/consensus-logic/src/fork_choice_manager.rs +++ b/crates/consensus-logic/src/fork_choice_manager.rs @@ -7,7 +7,7 @@ use strata_chaintsn::transition::process_block; use strata_common::bail_manager::{check_bail_trigger, BAIL_ADVANCE_CONSENSUS_STATE}; use strata_db::{ errors::DbError, - traits::{BlockStatus, ChainstateDatabase, Database}, + traits::{BlockStatus, Database}, }; use strata_eectl::{engine::ExecEngineCtl, messages::ExecPayloadData}; use strata_primitives::params::Params; From 164930d99bfad858069446fe7454784f311d123a Mon Sep 17 00:00:00 2001 From: Trey Del Bonis Date: Fri, 24 Jan 2025 15:36:07 -0500 Subject: [PATCH 11/11] misc: updated `Chainstate` to start to use new block commitment types --- bin/strata-client/src/main.rs | 2 +- bin/strata-client/src/rpc_server.rs | 4 +- crates/btcio/src/reader/query.rs | 2 +- crates/chaintsn/src/transition.rs | 2 +- crates/proof-impl/cl-stf/src/lib.rs | 4 +- crates/state/src/chain_state.rs | 62 +++++++++++++---------------- crates/state/src/l1/view.rs | 5 +++ crates/state/src/state_op.rs | 15 +++++-- crates/status/src/status_manager.rs | 6 ++- crates/test-utils/src/lib.rs | 2 +- 10 files changed, 57 insertions(+), 47 deletions(-) diff --git a/bin/strata-client/src/main.rs b/bin/strata-client/src/main.rs index 43d97b00e..c21969bfa 100644 --- a/bin/strata-client/src/main.rs +++ b/bin/strata-client/src/main.rs @@ -276,7 +276,7 @@ fn do_startup_checks( } // Check that tip L2 block exists (and engine can be connected to) - let chain_tip = last_chain_state.chain_tip_blockid(); + let chain_tip = last_chain_state.chain_tip_blkid(); match engine.check_block_exists(chain_tip) { Ok(true) => { info!("startup: last l2 block is synced") diff --git a/bin/strata-client/src/rpc_server.rs b/bin/strata-client/src/rpc_server.rs index c6edfc985..82457fda6 100644 --- a/bin/strata-client/src/rpc_server.rs +++ b/bin/strata-client/src/rpc_server.rs @@ -846,9 +846,9 @@ impl StrataDebugApiServer for StrataDebugRp .await?; match chain_state { Some(cs) => Ok(Some(RpcChainState { - tip_blkid: cs.chain_tip_blockid(), + tip_blkid: cs.chain_tip_blkid(), tip_slot: cs.chain_tip_slot(), - cur_epoch: cs.epoch(), + cur_epoch: cs.cur_epoch(), })), None => Ok(None), } diff --git a/crates/btcio/src/reader/query.rs b/crates/btcio/src/reader/query.rs index 3e0436257..82c6aacbc 100644 --- a/crates/btcio/src/reader/query.rs +++ b/crates/btcio/src/reader/query.rs @@ -493,7 +493,7 @@ mod test { let (event_tx, _event_rx) = mpsc::channel::(10); let mut chstate: Chainstate = ArbitraryGenerator::new().generate(); let clstate: ClientState = ArbitraryGenerator::new().generate(); - let curr_epoch = chstate.epoch(); + let curr_epoch = chstate.cur_epoch(); let ctx = get_reader_ctx(event_tx, chstate.clone(), clstate); let mut state = get_reader_state(&ctx); diff --git a/crates/chaintsn/src/transition.rs b/crates/chaintsn/src/transition.rs index 181fa2936..1ad115830 100644 --- a/crates/chaintsn/src/transition.rs +++ b/crates/chaintsn/src/transition.rs @@ -69,7 +69,7 @@ pub fn process_block( /// let's not get ahead of ourselves. fn compute_init_slot_rng(state: &StateCache) -> SlotRng { // Just take the last block's slot. - let blkid_buf = *state.state().chain_tip_blockid().as_ref(); + let blkid_buf = *state.state().chain_tip_blkid().as_ref(); SlotRng::from_seed(blkid_buf) } diff --git a/crates/proof-impl/cl-stf/src/lib.rs b/crates/proof-impl/cl-stf/src/lib.rs index f6a822fa6..63d56fa7e 100644 --- a/crates/proof-impl/cl-stf/src/lib.rs +++ b/crates/proof-impl/cl-stf/src/lib.rs @@ -98,13 +98,13 @@ fn process_cl_stf( let initial_snapshot = ChainStateSnapshot { hash: prev_state.compute_state_root(), slot: prev_state.chain_tip_slot(), - l2_blockid: prev_state.chain_tip_blockid(), + l2_blockid: *prev_state.chain_tip_blkid(), }; let final_snapshot = ChainStateSnapshot { hash: new_state.compute_state_root(), slot: new_state.chain_tip_slot(), - l2_blockid: new_state.chain_tip_blockid(), + l2_blockid: *new_state.chain_tip_blkid(), }; L2BatchProofOutput { diff --git a/crates/state/src/chain_state.rs b/crates/state/src/chain_state.rs index e46b3cd8d..f7356f086 100644 --- a/crates/state/src/chain_state.rs +++ b/crates/state/src/chain_state.rs @@ -1,6 +1,6 @@ use arbitrary::Arbitrary; use borsh::{BorshDeserialize, BorshSerialize}; -use strata_primitives::{buf::Buf32, hash::compute_borsh_hash}; +use strata_primitives::{buf::Buf32, hash::compute_borsh_hash, l2::L2BlockCommitment}; use crate::{ bridge_ops, @@ -18,17 +18,14 @@ use crate::{ #[derive(Clone, Debug, Eq, PartialEq, BorshSerialize, BorshDeserialize)] pub struct Chainstate { /// Most recent seen block. - pub(crate) last_block: L2BlockId, - - /// The slot of the last produced block. - pub(crate) slot: u64, + pub(crate) last_block: L2BlockCommitment, /// The checkpoint epoch period we're currently in, and so the index we /// expect the next checkpoint to be for. /// /// Immediately after genesis, this is 0, so the first checkpoint batch is /// checkpoint 0, moving us into checkpoint period 1. - pub(crate) epoch: u64, + pub(crate) cur_epoch: u64, /// Rollup's view of L1 state. pub(crate) l1_state: l1::L1ViewState, @@ -47,29 +44,12 @@ pub struct Chainstate { pub(crate) deposits_table: bridge_state::DepositsTable, } -/// Hashed Chain State. This is used to compute the state root of the [`Chainstate`] -// TODO: FIXME: Note that this is used as a temporary solution for the state root calculation -// It should be replaced once we swap out Chainstate's type definitions with SSZ type definitions -// which defines all of this more rigorously -#[derive(BorshSerialize, BorshDeserialize, Clone, Copy)] -pub struct HashedChainState { - pub last_block: Buf32, - pub slot: u64, - pub epoch: u64, - pub l1_state_hash: Buf32, - pub pending_withdraws_hash: Buf32, - pub exec_env_hash: Buf32, - pub operators_hash: Buf32, - pub deposits_hash: Buf32, -} - impl Chainstate { // TODO remove genesis blkid since apparently we don't need it anymore pub fn from_genesis(gdata: &GenesisStateData) -> Self { Self { - last_block: gdata.genesis_blkid(), - slot: 0, - epoch: 0, + last_block: L2BlockCommitment::new(0, gdata.genesis_blkid()), + cur_epoch: 0, l1_state: gdata.l1_state().clone(), pending_withdraws: StateQueue::new_empty(), exec_env_state: gdata.exec_state().clone(), @@ -80,31 +60,30 @@ impl Chainstate { /// Returns the slot last processed on the chainstate. pub fn chain_tip_slot(&self) -> u64 { - self.slot + self.last_block.slot() } /// Returns the blockid of the last processed block, which was used to /// construct this chainstate (unless we're currently in the process of /// modifying this chainstate copy). - pub fn chain_tip_blockid(&self) -> L2BlockId { - self.last_block + pub fn chain_tip_blkid(&self) -> &L2BlockId { + self.last_block.blkid() } pub fn l1_view(&self) -> &L1ViewState { &self.l1_state } - pub fn epoch(&self) -> u64 { - self.epoch + pub fn cur_epoch(&self) -> u64 { + self.cur_epoch } /// Computes a commitment to a the chainstate. This is super expensive /// because it does a bunch of hashing. pub fn compute_state_root(&self) -> Buf32 { let hashed_state = HashedChainState { - last_block: self.last_block.into(), - slot: self.slot, - epoch: self.epoch, + last_block: compute_borsh_hash(&self.last_block), + cur_epoch: self.cur_epoch, l1_state_hash: compute_borsh_hash(&self.l1_state), pending_withdraws_hash: compute_borsh_hash(&self.pending_withdraws), exec_env_hash: compute_borsh_hash(&self.exec_env_state), @@ -131,13 +110,28 @@ impl Chainstate { } } +/// Hashed Chain State. This is used to compute the state root of the [`Chainstate`] +// TODO: FIXME: Note that this is used as a temporary solution for the state root calculation +// It should be replaced once we swap out Chainstate's type definitions with SSZ type definitions +// which defines all of this more rigorously +#[derive(BorshSerialize, BorshDeserialize, Clone, Copy)] +pub struct HashedChainState { + pub last_block: Buf32, + pub cur_epoch: u64, + pub l1_state_hash: Buf32, + pub pending_withdraws_hash: Buf32, + pub exec_env_hash: Buf32, + pub operators_hash: Buf32, + pub deposits_hash: Buf32, +} + // NOTE: This is a helper setter that is supposed to be used only in tests. // This is being used in `strata_btcio::reader` to test the reader's behaviour when the epoch // changes. #[cfg(any(test, feature = "test_utils"))] impl Chainstate { pub fn set_epoch(&mut self, ep: u64) { - self.epoch = ep; + self.cur_epoch = ep; } } diff --git a/crates/state/src/l1/view.rs b/crates/state/src/l1/view.rs index 294206028..57ce91538 100644 --- a/crates/state/src/l1/view.rs +++ b/crates/state/src/l1/view.rs @@ -1,5 +1,6 @@ use arbitrary::Arbitrary; use borsh::{BorshDeserialize, BorshSerialize}; +use strata_primitives::l1::L1BlockId; use super::{L1HeaderRecord, L1MaturationEntry}; use crate::prelude::StateQueue; @@ -45,6 +46,10 @@ impl L1ViewState { &self.safe_block } + pub fn safe_blkid(&self) -> &L1BlockId { + &self.safe_block.blkid + } + pub fn safe_height(&self) -> u64 { self.maturation_queue.base_idx() } diff --git a/crates/state/src/state_op.rs b/crates/state/src/state_op.rs index 93eb32540..431022fbe 100644 --- a/crates/state/src/state_op.rs +++ b/crates/state/src/state_op.rs @@ -7,6 +7,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use strata_primitives::{ bridge::{BitcoinBlockHeight, OperatorIdx}, buf::Buf32, + l2::L2BlockCommitment, }; use tracing::*; @@ -146,12 +147,18 @@ impl StateCache { // TODO rework a lot of these to make them lower-level and focus more on // just keeping the core invariants consistent - /// Sets the current slot in the state. + /// Sets the last block commitment, derived from a header. pub fn set_cur_header(&mut self, header: &impl L2Header) { - // TODO rework this to use L2BlockCommitment + self.set_last_block(L2BlockCommitment::new( + header.blockidx(), + header.get_blockid(), + )); + } + + /// Sets the last block commitment. + pub fn set_last_block(&mut self, block: L2BlockCommitment) { let state = self.state_mut(); - state.slot = header.blockidx(); - state.last_block = header.get_blockid(); + state.last_block = block; } /// remove a deposit intent from the pending deposits queue. diff --git a/crates/status/src/status_manager.rs b/crates/status/src/status_manager.rs index 88bb7f76d..e424dcb4f 100644 --- a/crates/status/src/status_manager.rs +++ b/crates/status/src/status_manager.rs @@ -110,7 +110,11 @@ impl StatusChannel { /// Gets the latest epoch pub fn epoch(&self) -> Option { - self.receiver.chs.borrow().to_owned().map(|ch| ch.epoch()) + self.receiver + .chs + .borrow() + .to_owned() + .map(|ch| ch.cur_epoch()) } pub fn chain_state(&self) -> Option { diff --git a/crates/test-utils/src/lib.rs b/crates/test-utils/src/lib.rs index d902f1d1c..5e324792c 100644 --- a/crates/test-utils/src/lib.rs +++ b/crates/test-utils/src/lib.rs @@ -7,7 +7,7 @@ pub mod evm_ee; pub mod l2; /// The default buffer size for the `ArbitraryGenerator`. -const ARB_GEN_LEN: usize = 1024; +const ARB_GEN_LEN: usize = 16384; pub struct ArbitraryGenerator { buf: Vec, // Persistent buffer