From 7cfc631c1fcbe45681f5a90dcbc9522e5f24182b Mon Sep 17 00:00:00 2001 From: Scott Fairclough <70711990+hexoscott@users.noreply.github.com> Date: Mon, 4 Nov 2024 10:07:10 +0000 Subject: [PATCH] Regular info tree updates (#1399) * starting work on info tree updates during execution * info tree updater in sequencer loop * logging latest index for info tree updates --- cmd/utils/flags.go | 7 +- eth/backend.go | 7 +- eth/ethconfig/config_zkevm.go | 5 +- turbo/cli/default_flags.go | 1 + turbo/cli/flags_zkevm.go | 1 + turbo/stages/zk_stages.go | 10 +- zk/l1infotree/updater.go | 297 ++++++++++++++++++++++ zk/stages/stage_l1_info_tree.go | 210 +-------------- zk/stages/stage_l1_sequencer_sync.go | 41 --- zk/stages/stage_sequence_execute.go | 18 +- zk/stages/stage_sequence_execute_utils.go | 14 +- 11 files changed, 360 insertions(+), 251 deletions(-) create mode 100644 zk/l1infotree/updater.go diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 8bd7e1f7cb7..273a5a9b1d5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -738,9 +738,14 @@ var ( Usage: "The file that contains the initial (injected) batch data.", Value: "", } + InfoTreeUpdateInterval = cli.DurationFlag{ + Name: "zkevm.info-tree-update-interval", + Usage: "The interval at which the sequencer checks the L1 for new GER information", + Value: 1 * time.Minute, + } ACLPrintHistory = cli.IntFlag{ Name: "acl.print-history", - Usage: "Number of entries to print from the ACL history on node startup", + Usage: "Number of entries to print from the ACL history on node start up", Value: 10, } DebugTimers = cli.BoolFlag{ diff --git a/eth/backend.go b/eth/backend.go index a07a2f89d5c..7299e75844f 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -139,6 +139,7 @@ import ( "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/erigon/zk/witness" "github.com/ledgerwatch/erigon/zkevm/etherman" + "github.com/ledgerwatch/erigon/zk/l1infotree" ) // Config contains the configuration options of the ETH protocol. @@ -1097,6 +1098,8 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger cfg.L1HighestBlockType, ) + l1InfoTreeUpdater := l1infotree.NewUpdater(cfg.Zk, l1InfoTreeSyncer) + if isSequencer { // if we are sequencing transactions, we do the sequencing loop... witnessGenerator := witness.NewGenerator( @@ -1167,11 +1170,11 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.dataStream, backend.l1Syncer, seqVerSyncer, - l1InfoTreeSyncer, l1BlockSyncer, backend.txPool2, backend.txPool2DB, verifier, + l1InfoTreeUpdater, ) backend.syncUnwindOrder = zkStages.ZkSequencerUnwindOrder @@ -1205,9 +1208,9 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.forkValidator, backend.engine, backend.l1Syncer, - l1InfoTreeSyncer, streamClient, backend.dataStream, + l1InfoTreeUpdater, ) backend.syncUnwindOrder = zkStages.ZkUnwindOrder diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index c52e6f6021d..490aaa62613 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -87,8 +87,9 @@ type Zk struct { TxPoolRejectSmartContractDeployments bool - InitialBatchCfgFile string - ACLPrintHistory int + InitialBatchCfgFile string + ACLPrintHistory int + InfoTreeUpdateInterval time.Duration } var DefaultZkConfig = &Zk{} diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 93d00265e43..b3cd3537cd6 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -284,4 +284,5 @@ var DefaultFlags = []cli.Flag{ &utils.InitialBatchCfgFile, &utils.ACLPrintHistory, + &utils.InfoTreeUpdateInterval, } diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 4b9eeee85ba..2f459516a9e 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -186,6 +186,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { VirtualCountersSmtReduction: ctx.Float64(utils.VirtualCountersSmtReduction.Name), InitialBatchCfgFile: ctx.String(utils.InitialBatchCfgFile.Name), ACLPrintHistory: ctx.Int(utils.ACLPrintHistory.Name), + InfoTreeUpdateInterval: ctx.Duration(utils.InfoTreeUpdateInterval.Name), } utils2.EnableTimer(cfg.DebugTimers) diff --git a/turbo/stages/zk_stages.go b/turbo/stages/zk_stages.go index 05d45326c58..1a796e37250 100644 --- a/turbo/stages/zk_stages.go +++ b/turbo/stages/zk_stages.go @@ -20,6 +20,7 @@ import ( zkStages "github.com/ledgerwatch/erigon/zk/stages" "github.com/ledgerwatch/erigon/zk/syncer" "github.com/ledgerwatch/erigon/zk/txpool" + "github.com/ledgerwatch/erigon/zk/l1infotree" ) // NewDefaultZkStages creates stages for zk syncer (RPC mode) @@ -34,9 +35,9 @@ func NewDefaultZkStages(ctx context.Context, forkValidator *engine_helpers.ForkValidator, engine consensus.Engine, l1Syncer *syncer.L1Syncer, - l1InfoTreeSyncer *syncer.L1Syncer, datastreamClient zkStages.DatastreamClient, datastreamServer *datastreamer.StreamServer, + infoTreeUpdater *l1infotree.Updater, ) []*stagedsync.Stage { dirs := cfg.Dirs blockWriter := blockio.NewBlockWriter(cfg.HistoryV3) @@ -51,7 +52,7 @@ func NewDefaultZkStages(ctx context.Context, return zkStages.DefaultZkStages(ctx, zkStages.StageL1SyncerCfg(db, l1Syncer, cfg.Zk), - zkStages.StageL1InfoTreeCfg(db, cfg.Zk, l1InfoTreeSyncer), + zkStages.StageL1InfoTreeCfg(db, cfg.Zk, infoTreeUpdater), zkStages.StageBatchesCfg(db, datastreamClient, cfg.Zk, controlServer.ChainConfig, &cfg.Miner), zkStages.StageDataStreamCatchupCfg(datastreamServer, db, cfg.Genesis.Config.ChainID.Uint64(), cfg.DatastreamVersion, cfg.HasExecutors()), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), @@ -101,11 +102,11 @@ func NewSequencerZkStages(ctx context.Context, datastreamServer *datastreamer.StreamServer, sequencerStageSyncer *syncer.L1Syncer, l1Syncer *syncer.L1Syncer, - l1InfoTreeSyncer *syncer.L1Syncer, l1BlockSyncer *syncer.L1Syncer, txPool *txpool.TxPool, txPoolDb kv.RwDB, verifier *legacy_executor_verifier.LegacyExecutorVerifier, + infoTreeUpdater *l1infotree.Updater, ) []*stagedsync.Stage { dirs := cfg.Dirs blockReader := freezeblocks.NewBlockReader(snapshots, nil) @@ -117,7 +118,7 @@ func NewSequencerZkStages(ctx context.Context, return zkStages.SequencerZkStages(ctx, zkStages.StageL1SyncerCfg(db, l1Syncer, cfg.Zk), zkStages.StageL1SequencerSyncCfg(db, cfg.Zk, sequencerStageSyncer), - zkStages.StageL1InfoTreeCfg(db, cfg.Zk, l1InfoTreeSyncer), + zkStages.StageL1InfoTreeCfg(db, cfg.Zk, infoTreeUpdater), zkStages.StageSequencerL1BlockSyncCfg(db, cfg.Zk, l1BlockSyncer), zkStages.StageDataStreamCatchupCfg(datastreamServer, db, cfg.Genesis.Config.ChainID.Uint64(), cfg.DatastreamVersion, cfg.HasExecutors()), zkStages.StageSequenceBlocksCfg( @@ -144,6 +145,7 @@ func NewSequencerZkStages(ctx context.Context, txPoolDb, verifier, uint16(cfg.YieldSize), + infoTreeUpdater, ), stagedsync.StageHashStateCfg(db, dirs, cfg.HistoryV3, agg), zkStages.StageZkInterHashesCfg(db, true, true, false, dirs.Tmp, blockReader, controlServer.Hd, cfg.HistoryV3, agg, cfg.Zk), diff --git a/zk/l1infotree/updater.go b/zk/l1infotree/updater.go new file mode 100644 index 00000000000..4f51c861312 --- /dev/null +++ b/zk/l1infotree/updater.go @@ -0,0 +1,297 @@ +package l1infotree + +import ( + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/zk/hermez_db" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + zkTypes "github.com/ledgerwatch/erigon/zk/types" + "github.com/ledgerwatch/erigon/core/types" + "time" + "github.com/ledgerwatch/erigon/zk/contracts" + "github.com/ledgerwatch/log/v3" + "fmt" + "sort" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/iden3/go-iden3-crypto/keccak256" + "errors" +) + +type Syncer interface { + IsSyncStarted() bool + RunQueryBlocks(lastCheckedBlock uint64) + GetLogsChan() chan []types.Log + GetProgressMessageChan() chan string + IsDownloading() bool + GetHeader(blockNumber uint64) (*types.Header, error) + L1QueryHeaders(logs []types.Log) (map[uint64]*types.Header, error) + StopQueryBlocks() + ConsumeQueryBlocks() + WaitQueryBlocksToFinish() +} + +type Updater struct { + cfg *ethconfig.Zk + syncer Syncer + progress uint64 + latestUpdate *zkTypes.L1InfoTreeUpdate +} + +func NewUpdater(cfg *ethconfig.Zk, syncer Syncer) *Updater { + return &Updater{ + cfg: cfg, + syncer: syncer, + } +} + +func (u *Updater) GetProgress() uint64 { + return u.progress +} + +func (u *Updater) GetLatestUpdate() *zkTypes.L1InfoTreeUpdate { + return u.latestUpdate +} + +func (u *Updater) WarmUp(tx kv.RwTx) (err error) { + defer func() { + if err != nil { + u.syncer.StopQueryBlocks() + u.syncer.ConsumeQueryBlocks() + u.syncer.WaitQueryBlocksToFinish() + } + }() + + hermezDb := hermez_db.NewHermezDb(tx) + + progress, err := stages.GetStageProgress(tx, stages.L1InfoTree) + if err != nil { + return err + } + if progress == 0 { + progress = u.cfg.L1FirstBlock - 1 + } + + u.progress = progress + + latestUpdate, _, err := hermezDb.GetLatestL1InfoTreeUpdate() + if err != nil { + return err + } + + u.latestUpdate = latestUpdate + + if !u.syncer.IsSyncStarted() { + u.syncer.RunQueryBlocks(u.progress) + } + + return nil +} + +func (u *Updater) CheckForInfoTreeUpdates(logPrefix string, tx kv.RwTx) (allLogs []types.Log, err error) { + defer func() { + if err != nil { + u.syncer.StopQueryBlocks() + u.syncer.ConsumeQueryBlocks() + u.syncer.WaitQueryBlocksToFinish() + } + }() + + hermezDb := hermez_db.NewHermezDb(tx) + logChan := u.syncer.GetLogsChan() + progressChan := u.syncer.GetProgressMessageChan() + + // first get all the logs we need to process +LOOP: + for { + select { + case logs := <-logChan: + allLogs = append(allLogs, logs...) + case msg := <-progressChan: + log.Info(fmt.Sprintf("[%s] %s", logPrefix, msg)) + default: + if !u.syncer.IsDownloading() { + break LOOP + } + time.Sleep(10 * time.Millisecond) + } + } + + // sort the logs by block number - it is important that we process them in order to get the index correct + sort.Slice(allLogs, func(i, j int) bool { + l1 := allLogs[i] + l2 := allLogs[j] + // first sort by block number and if equal then by tx index + if l1.BlockNumber != l2.BlockNumber { + return l1.BlockNumber < l2.BlockNumber + } + if l1.TxIndex != l2.TxIndex { + return l1.TxIndex < l2.TxIndex + } + return l1.Index < l2.Index + }) + + // chunk the logs into batches, so we don't overload the RPC endpoints too much at once + chunks := chunkLogs(allLogs, 50) + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + processed := 0 + + tree, err := initialiseL1InfoTree(hermezDb) + if err != nil { + return nil, err + } + + // process the logs in chunks + for _, chunk := range chunks { + select { + case <-ticker.C: + log.Info(fmt.Sprintf("[%s] Processed %d/%d logs, %d%% complete", logPrefix, processed, len(allLogs), processed*100/len(allLogs))) + default: + } + + headersMap, err := u.syncer.L1QueryHeaders(chunk) + if err != nil { + return nil, err + } + + for _, l := range chunk { + switch l.Topics[0] { + case contracts.UpdateL1InfoTreeTopic: + header := headersMap[l.BlockNumber] + if header == nil { + header, err = u.syncer.GetHeader(l.BlockNumber) + if err != nil { + return nil, err + } + } + + tmpUpdate, err := createL1InfoTreeUpdate(l, header) + if err != nil { + return nil, err + } + + leafHash := HashLeafData(tmpUpdate.GER, tmpUpdate.ParentHash, tmpUpdate.Timestamp) + if tree.LeafExists(leafHash) { + log.Warn("Skipping log as L1 Info Tree leaf already exists", "hash", leafHash) + continue + } + + if u.latestUpdate != nil { + tmpUpdate.Index = u.latestUpdate.Index + 1 + } // if latestUpdate is nil then Index = 0 which is the default value so no need to set it + u.latestUpdate = tmpUpdate + + newRoot, err := tree.AddLeaf(uint32(u.latestUpdate.Index), leafHash) + if err != nil { + return nil, err + } + log.Debug("New L1 Index", + "index", u.latestUpdate.Index, + "root", newRoot.String(), + "mainnet", u.latestUpdate.MainnetExitRoot.String(), + "rollup", u.latestUpdate.RollupExitRoot.String(), + "ger", u.latestUpdate.GER.String(), + "parent", u.latestUpdate.ParentHash.String(), + ) + + if err = handleL1InfoTreeUpdate(hermezDb, u.latestUpdate); err != nil { + return nil, err + } + if err = hermezDb.WriteL1InfoTreeLeaf(u.latestUpdate.Index, leafHash); err != nil { + return nil, err + } + if err = hermezDb.WriteL1InfoTreeRoot(common.BytesToHash(newRoot[:]), u.latestUpdate.Index); err != nil { + return nil, err + } + + processed++ + default: + log.Warn("received unexpected topic from l1 info tree stage", "topic", l.Topics[0]) + } + } + } + + // save the progress - we add one here so that we don't cause overlap on the next run. We don't want to duplicate an info tree update in the db + if len(allLogs) > 0 { + u.progress = allLogs[len(allLogs)-1].BlockNumber + 1 + } + if err = stages.SaveStageProgress(tx, stages.L1InfoTree, u.progress); err != nil { + return nil, err + } + + return allLogs, nil +} + +func chunkLogs(slice []types.Log, chunkSize int) [][]types.Log { + var chunks [][]types.Log + for i := 0; i < len(slice); i += chunkSize { + end := i + chunkSize + + // If end is greater than the length of the slice, reassign it to the length of the slice + if end > len(slice) { + end = len(slice) + } + + chunks = append(chunks, slice[i:end]) + } + return chunks +} + +func initialiseL1InfoTree(hermezDb *hermez_db.HermezDb) (*L1InfoTree, error) { + leaves, err := hermezDb.GetAllL1InfoTreeLeaves() + if err != nil { + return nil, err + } + + allLeaves := make([][32]byte, len(leaves)) + for i, l := range leaves { + allLeaves[i] = l + } + + tree, err := NewL1InfoTree(32, allLeaves) + if err != nil { + return nil, err + } + + return tree, nil +} + +func createL1InfoTreeUpdate(l types.Log, header *types.Header) (*zkTypes.L1InfoTreeUpdate, error) { + if len(l.Topics) != 3 { + return nil, errors.New("received log for info tree that did not have 3 topics") + } + + if l.BlockNumber != header.Number.Uint64() { + return nil, errors.New("received log for info tree that did not match the block number") + } + + mainnetExitRoot := l.Topics[1] + rollupExitRoot := l.Topics[2] + combined := append(mainnetExitRoot.Bytes(), rollupExitRoot.Bytes()...) + ger := keccak256.Hash(combined) + update := &zkTypes.L1InfoTreeUpdate{ + GER: common.BytesToHash(ger), + MainnetExitRoot: mainnetExitRoot, + RollupExitRoot: rollupExitRoot, + BlockNumber: l.BlockNumber, + Timestamp: header.Time, + ParentHash: header.ParentHash, + } + + return update, nil +} + +func handleL1InfoTreeUpdate( + hermezDb *hermez_db.HermezDb, + update *zkTypes.L1InfoTreeUpdate, +) error { + var err error + if err = hermezDb.WriteL1InfoTreeUpdate(update); err != nil { + return err + } + if err = hermezDb.WriteL1InfoTreeUpdateToGer(update); err != nil { + return err + } + return nil +} diff --git a/zk/stages/stage_l1_info_tree.go b/zk/stages/stage_l1_info_tree.go index c277f23ed40..19c2202ad35 100644 --- a/zk/stages/stage_l1_info_tree.go +++ b/zk/stages/stage_l1_info_tree.go @@ -3,32 +3,24 @@ package stages import ( "context" "fmt" - "sort" - "time" - - "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync" - "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/zk/contracts" - "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/erigon/zk/l1infotree" "github.com/ledgerwatch/log/v3" ) type L1InfoTreeCfg struct { - db kv.RwDB - zkCfg *ethconfig.Zk - syncer IL1Syncer + db kv.RwDB + zkCfg *ethconfig.Zk + updater *l1infotree.Updater } -func StageL1InfoTreeCfg(db kv.RwDB, zkCfg *ethconfig.Zk, sync IL1Syncer) L1InfoTreeCfg { +func StageL1InfoTreeCfg(db kv.RwDB, zkCfg *ethconfig.Zk, updater *l1infotree.Updater) L1InfoTreeCfg { return L1InfoTreeCfg{ - db: db, - zkCfg: zkCfg, - syncer: sync, + db: db, + zkCfg: zkCfg, + updater: updater, } } @@ -54,161 +46,21 @@ func SpawnL1InfoTreeStage( defer tx.Rollback() } - hermezDb := hermez_db.NewHermezDb(tx) - - progress, err := stages.GetStageProgress(tx, stages.L1InfoTree) - if err != nil { + if err := cfg.updater.WarmUp(tx); err != nil { return err } - if progress == 0 { - progress = cfg.zkCfg.L1FirstBlock - 1 - } - latestUpdate, _, err := hermezDb.GetLatestL1InfoTreeUpdate() + allLogs, err := cfg.updater.CheckForInfoTreeUpdates(logPrefix, tx) if err != nil { return err } - if !cfg.syncer.IsSyncStarted() { - cfg.syncer.RunQueryBlocks(progress) - defer func() { - if funcErr != nil { - cfg.syncer.StopQueryBlocks() - cfg.syncer.ConsumeQueryBlocks() - cfg.syncer.WaitQueryBlocksToFinish() - } - }() - } - - logChan := cfg.syncer.GetLogsChan() - progressChan := cfg.syncer.GetProgressMessageChan() - - // first get all the logs we need to process - var allLogs []types.Log -LOOP: - for { - select { - case logs := <-logChan: - allLogs = append(allLogs, logs...) - case msg := <-progressChan: - log.Info(fmt.Sprintf("[%s] %s", logPrefix, msg)) - default: - if !cfg.syncer.IsDownloading() { - break LOOP - } - time.Sleep(10 * time.Millisecond) - } - } - - // sort the logs by block number - it is important that we process them in order to get the index correct - sort.Slice(allLogs, func(i, j int) bool { - l1 := allLogs[i] - l2 := allLogs[j] - // first sort by block number and if equal then by tx index - if l1.BlockNumber != l2.BlockNumber { - return l1.BlockNumber < l2.BlockNumber - } - if l1.TxIndex != l2.TxIndex { - return l1.TxIndex < l2.TxIndex - } - return l1.Index < l2.Index - }) - - // chunk the logs into batches, so we don't overload the RPC endpoints too much at once - chunks := chunkLogs(allLogs, 50) - - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - processed := 0 - - tree, err := initialiseL1InfoTree(hermezDb) - if err != nil { - funcErr = err - return funcErr + var latestIndex uint64 + latestUpdate := cfg.updater.GetLatestUpdate() + if latestUpdate != nil { + latestIndex = latestUpdate.Index } - - // process the logs in chunks - for _, chunk := range chunks { - select { - case <-ticker.C: - log.Info(fmt.Sprintf("[%s] Processed %d/%d logs, %d%% complete", logPrefix, processed, len(allLogs), processed*100/len(allLogs))) - default: - } - - headersMap, err := cfg.syncer.L1QueryHeaders(chunk) - if err != nil { - funcErr = err - return funcErr - } - - for _, l := range chunk { - switch l.Topics[0] { - case contracts.UpdateL1InfoTreeTopic: - header := headersMap[l.BlockNumber] - if header == nil { - header, funcErr = cfg.syncer.GetHeader(l.BlockNumber) - if funcErr != nil { - return funcErr - } - } - - tmpUpdate, err := CreateL1InfoTreeUpdate(l, header) - if err != nil { - funcErr = err - return funcErr - } - - leafHash := l1infotree.HashLeafData(tmpUpdate.GER, tmpUpdate.ParentHash, tmpUpdate.Timestamp) - if tree.LeafExists(leafHash) { - log.Warn("Skipping log as L1 Info Tree leaf already exists", "hash", leafHash) - continue - } - - if latestUpdate != nil { - tmpUpdate.Index = latestUpdate.Index + 1 - } // if latestUpdate is nil then Index = 0 which is the default value so no need to set it - latestUpdate = tmpUpdate - - newRoot, err := tree.AddLeaf(uint32(latestUpdate.Index), leafHash) - if err != nil { - funcErr = err - return funcErr - } - log.Debug("New L1 Index", - "index", latestUpdate.Index, - "root", newRoot.String(), - "mainnet", latestUpdate.MainnetExitRoot.String(), - "rollup", latestUpdate.RollupExitRoot.String(), - "ger", latestUpdate.GER.String(), - "parent", latestUpdate.ParentHash.String(), - ) - - if funcErr = HandleL1InfoTreeUpdate(hermezDb, latestUpdate); funcErr != nil { - return funcErr - } - if funcErr = hermezDb.WriteL1InfoTreeLeaf(latestUpdate.Index, leafHash); funcErr != nil { - return funcErr - } - if funcErr = hermezDb.WriteL1InfoTreeRoot(common.BytesToHash(newRoot[:]), latestUpdate.Index); funcErr != nil { - return funcErr - } - - processed++ - default: - log.Warn("received unexpected topic from l1 info tree stage", "topic", l.Topics[0]) - } - } - } - - // save the progress - we add one here so that we don't cause overlap on the next run. We don't want to duplicate an info tree update in the db - if len(allLogs) > 0 { - progress = allLogs[len(allLogs)-1].BlockNumber + 1 - } - if funcErr = stages.SaveStageProgress(tx, stages.L1InfoTree, progress); funcErr != nil { - return funcErr - } - - log.Info(fmt.Sprintf("[%s] Info tree updates", logPrefix), "count", len(allLogs)) + log.Info(fmt.Sprintf("[%s] Info tree updates", logPrefix), "count", len(allLogs), "latestIndex", latestIndex) if freshTx { if funcErr = tx.Commit(); funcErr != nil { @@ -219,40 +71,6 @@ LOOP: return nil } -func chunkLogs(slice []types.Log, chunkSize int) [][]types.Log { - var chunks [][]types.Log - for i := 0; i < len(slice); i += chunkSize { - end := i + chunkSize - - // If end is greater than the length of the slice, reassign it to the length of the slice - if end > len(slice) { - end = len(slice) - } - - chunks = append(chunks, slice[i:end]) - } - return chunks -} - -func initialiseL1InfoTree(hermezDb *hermez_db.HermezDb) (*l1infotree.L1InfoTree, error) { - leaves, err := hermezDb.GetAllL1InfoTreeLeaves() - if err != nil { - return nil, err - } - - allLeaves := make([][32]byte, len(leaves)) - for i, l := range leaves { - allLeaves[i] = l - } - - tree, err := l1infotree.NewL1InfoTree(32, allLeaves) - if err != nil { - return nil, err - } - - return tree, nil -} - func UnwindL1InfoTreeStage(u *stagedsync.UnwindState, tx kv.RwTx, cfg L1InfoTreeCfg, ctx context.Context) error { return nil } diff --git a/zk/stages/stage_l1_sequencer_sync.go b/zk/stages/stage_l1_sequencer_sync.go index b0b20de4295..ee2e12f83ca 100644 --- a/zk/stages/stage_l1_sequencer_sync.go +++ b/zk/stages/stage_l1_sequencer_sync.go @@ -2,12 +2,10 @@ package stages import ( "context" - "errors" "fmt" "math/big" "time" - "github.com/iden3/go-iden3-crypto/keccak256" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" ethTypes "github.com/ledgerwatch/erigon/core/types" @@ -200,45 +198,6 @@ Loop: return nil } -func CreateL1InfoTreeUpdate(l ethTypes.Log, header *ethTypes.Header) (*types.L1InfoTreeUpdate, error) { - if len(l.Topics) != 3 { - return nil, errors.New("received log for info tree that did not have 3 topics") - } - - if l.BlockNumber != header.Number.Uint64() { - return nil, errors.New("received log for info tree that did not match the block number") - } - - mainnetExitRoot := l.Topics[1] - rollupExitRoot := l.Topics[2] - combined := append(mainnetExitRoot.Bytes(), rollupExitRoot.Bytes()...) - ger := keccak256.Hash(combined) - update := &types.L1InfoTreeUpdate{ - GER: common.BytesToHash(ger), - MainnetExitRoot: mainnetExitRoot, - RollupExitRoot: rollupExitRoot, - BlockNumber: l.BlockNumber, - Timestamp: header.Time, - ParentHash: header.ParentHash, - } - - return update, nil -} - -func HandleL1InfoTreeUpdate( - hermezDb *hermez_db.HermezDb, - update *types.L1InfoTreeUpdate, -) error { - var err error - if err = hermezDb.WriteL1InfoTreeUpdate(update); err != nil { - return err - } - if err = hermezDb.WriteL1InfoTreeUpdateToGer(update); err != nil { - return err - } - return nil -} - const ( injectedBatchLogTransactionStartByte = 128 injectedBatchLastGerStartByte = 31 diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 5ebc6a89382..82b397ee8df 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -81,6 +81,10 @@ func sequencingBatchStep( } defer sdb.tx.Rollback() + if err = cfg.infoTreeUpdater.WarmUp(sdb.tx); err != nil { + return err + } + executionAt, err := s.ExecutionAt(sdb.tx) if err != nil { return err @@ -196,10 +200,11 @@ func sequencingBatchStep( } } - batchTicker, logTicker, blockTicker := prepareTickers(batchContext.cfg) + batchTicker, logTicker, blockTicker, infoTreeTicker := prepareTickers(batchContext.cfg) defer batchTicker.Stop() defer logTicker.Stop() defer blockTicker.Stop() + defer infoTreeTicker.Stop() log.Info(fmt.Sprintf("[%s] Starting batch %d...", logPrefix, batchState.batchNumber)) @@ -302,6 +307,17 @@ func sequencingBatchStep( log.Debug(fmt.Sprintf("[%s] Batch timeout reached", logPrefix)) batchTimedOut = true } + case <-infoTreeTicker.C: + newLogs, err := cfg.infoTreeUpdater.CheckForInfoTreeUpdates(logPrefix, sdb.tx) + if err != nil { + return err + } + var latestIndex uint64 + latest := cfg.infoTreeUpdater.GetLatestUpdate() + if latest != nil { + latestIndex = latest.Index + } + log.Info(fmt.Sprintf("[%s] Info tree updates", logPrefix), "count", len(newLogs), "latestIndex", latestIndex) default: if batchState.isLimboRecovery() { batchState.blockState.transactionsForInclusion, err = getLimboTransaction(ctx, cfg, batchState.limboRecoveryData.limboTxHash) diff --git a/zk/stages/stage_sequence_execute_utils.go b/zk/stages/stage_sequence_execute_utils.go index c699d553483..52045d2e03d 100644 --- a/zk/stages/stage_sequence_execute_utils.go +++ b/zk/stages/stage_sequence_execute_utils.go @@ -39,6 +39,7 @@ import ( zktypes "github.com/ledgerwatch/erigon/zk/types" "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/zk/l1infotree" ) const ( @@ -84,6 +85,8 @@ type SequenceBlockCfg struct { legacyVerifier *verifier.LegacyExecutorVerifier yieldSize uint16 + + infoTreeUpdater *l1infotree.Updater } func StageSequenceBlocksCfg( @@ -112,6 +115,7 @@ func StageSequenceBlocksCfg( txPoolDb kv.RwDB, legacyVerifier *verifier.LegacyExecutorVerifier, yieldSize uint16, + infoTreeUpdater *l1infotree.Updater, ) SequenceBlockCfg { return SequenceBlockCfg{ @@ -139,6 +143,7 @@ func StageSequenceBlocksCfg( txPoolDb: txPoolDb, legacyVerifier: legacyVerifier, yieldSize: yieldSize, + infoTreeUpdater: infoTreeUpdater, } } @@ -168,10 +173,10 @@ func (sCfg *SequenceBlockCfg) toErigonExecuteBlockCfg() stagedsync.ExecuteBlockC func validateIfDatastreamIsAheadOfExecution( s *stagedsync.StageState, - // u stagedsync.Unwinder, +// u stagedsync.Unwinder, ctx context.Context, cfg SequenceBlockCfg, - // historyCfg stagedsync.HistoryCfg, +// historyCfg stagedsync.HistoryCfg, ) error { roTx, err := cfg.db.BeginRo(ctx) if err != nil { @@ -336,12 +341,13 @@ func prepareL1AndInfoTreeRelatedStuff(sdb *stageDb, batchState *BatchState, prop return } -func prepareTickers(cfg *SequenceBlockCfg) (*time.Ticker, *time.Ticker, *time.Ticker) { +func prepareTickers(cfg *SequenceBlockCfg) (*time.Ticker, *time.Ticker, *time.Ticker, *time.Ticker) { batchTicker := time.NewTicker(cfg.zk.SequencerBatchSealTime) logTicker := time.NewTicker(10 * time.Second) blockTicker := time.NewTicker(cfg.zk.SequencerBlockSealTime) + infoTreeTicker := time.NewTicker(cfg.zk.InfoTreeUpdateInterval) - return batchTicker, logTicker, blockTicker + return batchTicker, logTicker, blockTicker, infoTreeTicker } // will be called at the start of every new block created within a batch to figure out if there is a new GER