diff --git a/common/backoff/exponential.go b/common/backoff/exponential.go new file mode 100644 index 000000000000..4e43c4e13346 --- /dev/null +++ b/common/backoff/exponential.go @@ -0,0 +1,50 @@ +package backoff + +import ( + "math" + "math/rand" + "time" +) + +type Exponential struct { + attempt int + + maxJitter time.Duration + + min time.Duration + max time.Duration +} + +func NewExponential(minimum, maximum, maxJitter time.Duration) *Exponential { + return &Exponential{ + min: minimum, + max: maximum, + maxJitter: maxJitter, + } +} + +func (e *Exponential) NextDuration() time.Duration { + var jitter time.Duration + if e.maxJitter > 0 { + jitter = time.Duration(rand.Int63n(e.maxJitter.Nanoseconds())) + } + + minFloat := float64(e.min) + duration := math.Pow(2, float64(e.attempt)) * minFloat + + // limit at configured maximum + if duration > float64(e.max) { + duration = float64(e.max) + } + + e.attempt++ + return time.Duration(duration) + jitter +} + +func (e *Exponential) Reset() { + e.attempt = 0 +} + +func (e *Exponential) Attempt() int { + return e.attempt +} diff --git a/common/backoff/exponential_test.go b/common/backoff/exponential_test.go new file mode 100644 index 000000000000..ff659337a2b0 --- /dev/null +++ b/common/backoff/exponential_test.go @@ -0,0 +1,39 @@ +package backoff + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestExponentialBackoff(t *testing.T) { + t.Run("Multiple attempts", func(t *testing.T) { + e := NewExponential(100*time.Millisecond, 10*time.Second, 0) + expectedDurations := []time.Duration{ + 100 * time.Millisecond, + 200 * time.Millisecond, + 400 * time.Millisecond, + 800 * time.Millisecond, + 1600 * time.Millisecond, + 3200 * time.Millisecond, + 6400 * time.Millisecond, + 10 * time.Second, // capped at max + } + for i, expected := range expectedDurations { + require.Equal(t, expected, e.NextDuration(), "attempt %d", i) + } + }) + + t.Run("Jitter added", func(t *testing.T) { + e := NewExponential(1*time.Second, 10*time.Second, 1*time.Second) + duration := e.NextDuration() + require.GreaterOrEqual(t, duration, 1*time.Second) + require.Less(t, duration, 2*time.Second) + }) + + t.Run("Edge case: min > max", func(t *testing.T) { + e := NewExponential(10*time.Second, 5*time.Second, 0) + require.Equal(t, 5*time.Second, e.NextDuration()) + }) +} diff --git a/core/blockchain.go b/core/blockchain.go index 54a4573e9132..e6ec46756dcb 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1803,25 +1803,49 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er return it.index, err } -// PreprocessBlock processes block on top of the chain to calculate receipts, bloom and state root -func (bc *BlockChain) PreprocessBlock(block *types.Block) (common.Hash, types.Bloom, common.Hash, uint64, error) { - // Retrieve the parent block and it's state to execute on top - parent := bc.CurrentBlock().Header() - if parent == nil { - parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) +func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions) (WriteStatus, error) { + if !bc.chainmu.TryLock() { + return NonStatTy, errInsertionInterrupted } - statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) + defer bc.chainmu.Unlock() + + statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps) if err != nil { - return common.Hash{}, types.Bloom{}, common.Hash{}, 0, err + return NonStatTy, err } - receipts, _, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) + + header.ParentHash = parentBlock.Hash() + + tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil) + receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig) if err != nil { - return common.Hash{}, types.Bloom{}, common.Hash{}, 0, err + return NonStatTy, err } - receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil)) - bloom := types.CreateBloom(receipts) - stateRoot := statedb.GetRootHash() - return receiptSha, bloom, stateRoot, usedGas, nil + + header.GasUsed = gasUsed + header.Root = statedb.GetRootHash() + // Since we're using Clique consensus, we don't have uncles + header.UncleHash = types.EmptyUncleHash + + fullBlock := types.NewBlock(header, txs, nil, receipts, trie.NewStackTrie(nil)) + + blockHash := fullBlock.Hash() + // manually replace the block hash in the receipts + for i, receipt := range receipts { + // add block location fields + receipt.BlockHash = blockHash + receipt.BlockNumber = tempBlock.Number() + receipt.TransactionIndex = uint(i) + + for _, l := range receipt.Logs { + l.BlockHash = blockHash + } + } + for _, l := range logs { + l.BlockHash = blockHash + } + + return bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false) } // insertSideChain is called when an import batch hits upon a pruned ancestor diff --git a/eth/backend.go b/eth/backend.go index 03c8144a2d30..62d2cb6b9657 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -221,6 +221,20 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl } eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) + // Initialize and start DA syncing pipeline before SyncService as SyncService is blocking until all L1 messages are loaded. + // We need SyncService to load the L1 messages for DA syncing, but since both sync from last known L1 state, we can + // simply let them run simultaneously. If messages are missing in DA syncing, it will be handled by the syncing pipeline + // by waiting and retrying. + if config.EnableDASyncing { + // TODO: set proper default for data dir and enable setting via flag + config.DA.AdditionalDataDir = stack.Config().DataDir + eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA) + if err != nil { + return nil, fmt.Errorf("cannot initialize da syncer: %w", err) + } + eth.syncingPipeline.Start() + } + // initialize and start L1 message sync service eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.chainDb, l1Client) if err != nil { @@ -237,14 +251,6 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl eth.rollupSyncService.Start() } - if config.EnableDASyncing { - eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA) - if err != nil { - return nil, fmt.Errorf("cannot initialize da syncer: %w", err) - } - eth.syncingPipeline.Start() - } - // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit checkpoint := config.Checkpoint diff --git a/go.mod b/go.mod index aea7d762976c..ab4dd89db856 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/prometheus/tsdb v0.7.1 github.com/rjeczalik/notify v0.9.1 github.com/rs/cors v1.7.0 - github.com/scroll-tech/da-codec v0.1.1-0.20240708084945-cb02d638c45f + github.com/scroll-tech/da-codec v0.1.1-0.20240725030910-15d6aa443140 github.com/scroll-tech/zktrie v0.8.4 github.com/shirou/gopsutil v3.21.11+incompatible github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 diff --git a/go.sum b/go.sum index 0b5044dffb3e..2b352d04001e 100644 --- a/go.sum +++ b/go.sum @@ -394,8 +394,8 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/scroll-tech/da-codec v0.1.1-0.20240708084945-cb02d638c45f h1:ZKPhn674+2AgBdIn2ZLGePsUZdM2823m2tJp+JlQf/Y= -github.com/scroll-tech/da-codec v0.1.1-0.20240708084945-cb02d638c45f/go.mod h1:O9jsbQGNnTEfyfZg7idevq6jGGSQshX70elX+TRH8vU= +github.com/scroll-tech/da-codec v0.1.1-0.20240725030910-15d6aa443140 h1:zoFmDxrK984L9GUu/jsnk5mqKG3S4QItzFUmtUYhMdk= +github.com/scroll-tech/da-codec v0.1.1-0.20240725030910-15d6aa443140/go.mod h1:O9jsbQGNnTEfyfZg7idevq6jGGSQshX70elX+TRH8vU= github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE= github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= diff --git a/params/config.go b/params/config.go index 3c3ef43b9bca..87042d48c445 100644 --- a/params/config.go +++ b/params/config.go @@ -24,6 +24,7 @@ import ( "golang.org/x/crypto/sha3" "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/rollup/missing_header_fields" "github.com/scroll-tech/go-ethereum/rollup/rcfg" ) @@ -337,7 +338,9 @@ var ( ScrollChainAddress: common.HexToAddress("0x2D567EcE699Eabe5afCd141eDB7A4f2D0D6ce8a0"), }, DAConfig: &DAConfig{ - BlobScanAPIEndpoint: "https://api.sepolia.blobscan.com/blobs/", + BlobScanAPIEndpoint: "https://api.sepolia.blobscan.com/blobs/", + MissingHeaderFieldsURL: "", // TODO: Add missing header fields URL and correct checksum + MissingHeaderFieldsSHA256: missing_header_fields.SHA256ChecksumFromHex("0xdcdae1c92c59c307edae24216eb06c4566f512739aec39dca1abd53c597102c7"), }, }, } @@ -379,8 +382,10 @@ var ( ScrollChainAddress: common.HexToAddress("0xa13BAF47339d63B743e7Da8741db5456DAc1E556"), }, DAConfig: &DAConfig{ - BlobScanAPIEndpoint: "https://api.blobscan.com/blobs/", - BlockNativeAPIEndpoint: "https://api.ethernow.xyz/v1/blob/", + BlobScanAPIEndpoint: "https://api.blobscan.com/blobs/", + BlockNativeAPIEndpoint: "https://api.ethernow.xyz/v1/blob/", + MissingHeaderFieldsURL: "", // TODO: Add missing header fields URL and correct checksum + MissingHeaderFieldsSHA256: missing_header_fields.SHA256ChecksumFromHex("0xdcdae1c92c59c307edae24216eb06c4566f512739aec39dca1abd53c597102c7"), }, }, } @@ -675,6 +680,10 @@ type L1Config struct { type DAConfig struct { BlobScanAPIEndpoint string `json:"blobScanApiEndpoint,omitempty"` BlockNativeAPIEndpoint string `json:"blockNativeApiEndpoint,omitempty"` + // MissingHeaderFieldsURL is the URL to fetch the historical missing header fields to a file. + MissingHeaderFieldsURL string `json:"missingHeaderFieldsURL,omitempty"` + // MissingHeaderFieldsSHA256 is the SHA256 hash of the file containing the historical missing header fields. + MissingHeaderFieldsSHA256 missing_header_fields.SHA256Checksum `json:"missingHeaderFieldsSHA256,omitempty"` } func (c *L1Config) String() string { diff --git a/rollup/da_syncer/batch_queue.go b/rollup/da_syncer/batch_queue.go index b1ae23d04e16..aa38d8a64871 100644 --- a/rollup/da_syncer/batch_queue.go +++ b/rollup/da_syncer/batch_queue.go @@ -7,11 +7,12 @@ import ( "github.com/scroll-tech/go-ethereum/core/rawdb" "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" ) type BatchQueue struct { // batches is map from batchIndex to batch blocks - batches map[uint64]DAEntry + batches map[uint64]da.Entry DAQueue *DAQueue db ethdb.Database lastFinalizedBatchIndex uint64 @@ -19,7 +20,7 @@ type BatchQueue struct { func NewBatchQueue(DAQueue *DAQueue, db ethdb.Database) *BatchQueue { return &BatchQueue{ - batches: make(map[uint64]DAEntry), + batches: make(map[uint64]da.Entry), DAQueue: DAQueue, db: db, lastFinalizedBatchIndex: 0, @@ -27,7 +28,7 @@ func NewBatchQueue(DAQueue *DAQueue, db ethdb.Database) *BatchQueue { } // NextBatch finds next finalized batch and returns data, that was committed in that batch -func (bq *BatchQueue) NextBatch(ctx context.Context) (DAEntry, error) { +func (bq *BatchQueue) NextBatch(ctx context.Context) (da.Entry, error) { if batch, ok := bq.getFinalizedBatch(); ok { return batch, nil } @@ -36,18 +37,14 @@ func (bq *BatchQueue) NextBatch(ctx context.Context) (DAEntry, error) { if err != nil { return nil, err } - switch daEntry := daEntry.(type) { - case *CommitBatchDAV0: - bq.batches[daEntry.BatchIndex] = daEntry - case *CommitBatchDAV1: - bq.batches[daEntry.BatchIndex] = daEntry - case *CommitBatchDAV2: - bq.batches[daEntry.BatchIndex] = daEntry - case *RevertBatchDA: - bq.deleteBatch(daEntry.BatchIndex) - case *FinalizeBatchDA: - if daEntry.BatchIndex > bq.lastFinalizedBatchIndex { - bq.lastFinalizedBatchIndex = daEntry.BatchIndex + switch daEntry.Type() { + case da.CommitBatchV0Type, da.CommitBatchV1Type, da.CommitBatchV2Type: + bq.batches[daEntry.BatchIndex()] = daEntry + case da.RevertBatchType: + bq.deleteBatch(daEntry.BatchIndex()) + case da.FinalizeBatchType: + if daEntry.BatchIndex() > bq.lastFinalizedBatchIndex { + bq.lastFinalizedBatchIndex = daEntry.BatchIndex() } ret, ok := bq.getFinalizedBatch() if ok { @@ -62,7 +59,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context) (DAEntry, error) { } // getFinalizedBatch returns next finalized batch if there is available -func (bq *BatchQueue) getFinalizedBatch() (DAEntry, bool) { +func (bq *BatchQueue) getFinalizedBatch() (da.Entry, bool) { if len(bq.batches) == 0 { return nil, false } @@ -93,7 +90,7 @@ func (bq *BatchQueue) deleteBatch(batchIndex uint64) { if !ok { return } - curBatchL1Height := batch.GetL1BlockNumber() + curBatchL1Height := batch.L1BlockNumber() delete(bq.batches, batchIndex) if len(bq.batches) == 0 { rawdb.WriteDASyncedL1BlockNumber(bq.db, curBatchL1Height) @@ -102,10 +99,10 @@ func (bq *BatchQueue) deleteBatch(batchIndex uint64) { // we store here min height of currently loaded batches to be able to start syncing from the same place in case of restart var minBatchL1Height uint64 = math.MaxUint64 for _, val := range bq.batches { - if val.GetL1BlockNumber() < minBatchL1Height { - minBatchL1Height = val.GetL1BlockNumber() + if val.L1BlockNumber() < minBatchL1Height { + minBatchL1Height = val.L1BlockNumber() } } - rawdb.WriteDASyncedL1BlockNumber(bq.db, minBatchL1Height-1) + rawdb.WriteDASyncedL1BlockNumber(bq.db, minBatchL1Height-1) } diff --git a/rollup/da_syncer/block_queue.go b/rollup/da_syncer/block_queue.go index 6340bc33db18..68954da69e2a 100644 --- a/rollup/da_syncer/block_queue.go +++ b/rollup/da_syncer/block_queue.go @@ -3,24 +3,23 @@ package da_syncer import ( "context" "fmt" - "math/big" - "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" ) type BlockQueue struct { batchQueue *BatchQueue - blocks []*types.Block + blocks []*da.PartialBlock } func NewBlockQueue(batchQueue *BatchQueue) *BlockQueue { return &BlockQueue{ batchQueue: batchQueue, - blocks: []*types.Block{}, + blocks: make([]*da.PartialBlock, 0), } } -func (bq *BlockQueue) NextBlock(ctx context.Context) (*types.Block, error) { +func (bq *BlockQueue) NextBlock(ctx context.Context) (*da.PartialBlock, error) { for len(bq.blocks) == 0 { err := bq.getBlocksFromBatch(ctx) if err != nil { @@ -37,120 +36,17 @@ func (bq *BlockQueue) getBlocksFromBatch(ctx context.Context) error { if err != nil { return err } - switch daEntry := daEntry.(type) { - case *CommitBatchDAV0: - bq.blocks, err = bq.processDAV0ToBlocks(daEntry) - if err != nil { - return err - } - case *CommitBatchDAV1: - bq.blocks, err = bq.processDAV1ToBlocks(daEntry) - if err != nil { - return err - } - case *CommitBatchDAV2: - bq.blocks, err = bq.processDAV2ToBlocks(daEntry) - if err != nil { - return err - } - default: - return fmt.Errorf("unexpected type of daEntry: %T", daEntry) - } - return nil -} -func (bq *BlockQueue) processDAV0ToBlocks(daEntry *CommitBatchDAV0) ([]*types.Block, error) { - var blocks []*types.Block - l1TxPointer := 0 - var curL1TxIndex uint64 = daEntry.ParentTotalL1MessagePopped - for _, chunk := range daEntry.Chunks { - for blockId, daBlock := range chunk.Blocks { - // create header - header := types.Header{ - Number: big.NewInt(0).SetUint64(daBlock.BlockNumber), - Time: daBlock.Timestamp, - BaseFee: daBlock.BaseFee, - GasLimit: daBlock.GasLimit, - } - // create txs - // var txs types.Transactions - txs := make(types.Transactions, 0, daBlock.NumTransactions) - // insert l1 msgs - for l1TxPointer < len(daEntry.L1Txs) && daEntry.L1Txs[l1TxPointer].QueueIndex < curL1TxIndex+uint64(daBlock.NumL1Messages) { - l1Tx := types.NewTx(daEntry.L1Txs[l1TxPointer]) - txs = append(txs, l1Tx) - l1TxPointer++ - } - curL1TxIndex += uint64(daBlock.NumL1Messages) - // insert l2 txs - txs = append(txs, chunk.Transactions[blockId]...) - block := types.NewBlockWithHeader(&header).WithBody(txs, make([]*types.Header, 0)) - blocks = append(blocks, block) - } + entryWithBlocks, ok := daEntry.(da.EntryWithBlocks) + // this should never happen because we only receive CommitBatch entries + if !ok { + return fmt.Errorf("unexpected type of daEntry: %T", daEntry) } - return blocks, nil -} -func (bq *BlockQueue) processDAV1ToBlocks(daEntry *CommitBatchDAV1) ([]*types.Block, error) { - var blocks []*types.Block - l1TxPointer := 0 - var curL1TxIndex uint64 = daEntry.ParentTotalL1MessagePopped - for _, chunk := range daEntry.Chunks { - for blockId, daBlock := range chunk.Blocks { - // create header - header := types.Header{ - Number: big.NewInt(0).SetUint64(daBlock.BlockNumber), - Time: daBlock.Timestamp, - BaseFee: daBlock.BaseFee, - GasLimit: daBlock.GasLimit, - } - // create txs - // var txs types.Transactions - txs := make(types.Transactions, 0, daBlock.NumTransactions) - // insert l1 msgs - for l1TxPointer < len(daEntry.L1Txs) && daEntry.L1Txs[l1TxPointer].QueueIndex < curL1TxIndex+uint64(daBlock.NumL1Messages) { - l1Tx := types.NewTx(daEntry.L1Txs[l1TxPointer]) - txs = append(txs, l1Tx) - l1TxPointer++ - } - curL1TxIndex += uint64(daBlock.NumL1Messages) - // insert l2 txs - txs = append(txs, chunk.Transactions[blockId]...) - block := types.NewBlockWithHeader(&header).WithBody(txs, make([]*types.Header, 0)) - blocks = append(blocks, block) - } + bq.blocks, err = entryWithBlocks.Blocks() + if err != nil { + return fmt.Errorf("failed to get blocks from daEntry: %w", err) } - return blocks, nil -} -func (bq *BlockQueue) processDAV2ToBlocks(daEntry *CommitBatchDAV2) ([]*types.Block, error) { - var blocks []*types.Block - l1TxPointer := 0 - var curL1TxIndex uint64 = daEntry.ParentTotalL1MessagePopped - for _, chunk := range daEntry.Chunks { - for blockId, daBlock := range chunk.Blocks { - // create header - header := types.Header{ - Number: big.NewInt(0).SetUint64(daBlock.BlockNumber), - Time: daBlock.Timestamp, - BaseFee: daBlock.BaseFee, - GasLimit: daBlock.GasLimit, - } - // create txs - // var txs types.Transactions - txs := make(types.Transactions, 0, daBlock.NumTransactions) - // insert l1 msgs - for l1TxPointer < len(daEntry.L1Txs) && daEntry.L1Txs[l1TxPointer].QueueIndex < curL1TxIndex+uint64(daBlock.NumL1Messages) { - l1Tx := types.NewTx(daEntry.L1Txs[l1TxPointer]) - txs = append(txs, l1Tx) - l1TxPointer++ - } - curL1TxIndex += uint64(daBlock.NumL1Messages) - // insert l2 txs - txs = append(txs, chunk.Transactions[blockId]...) - block := types.NewBlockWithHeader(&header).WithBody(txs, make([]*types.Header, 0)) - blocks = append(blocks, block) - } - } - return blocks, nil + return nil } diff --git a/rollup/da_syncer/calldata_blob_source.go b/rollup/da_syncer/calldata_blob_source.go deleted file mode 100644 index 94487166772c..000000000000 --- a/rollup/da_syncer/calldata_blob_source.go +++ /dev/null @@ -1,377 +0,0 @@ -package da_syncer - -import ( - "context" - "crypto/sha256" - "encoding/binary" - "fmt" - - "github.com/scroll-tech/da-codec/encoding" - "github.com/scroll-tech/da-codec/encoding/codecv0" - "github.com/scroll-tech/da-codec/encoding/codecv1" - "github.com/scroll-tech/da-codec/encoding/codecv2" - - "github.com/scroll-tech/go-ethereum/accounts/abi" - "github.com/scroll-tech/go-ethereum/common" - "github.com/scroll-tech/go-ethereum/core/rawdb" - "github.com/scroll-tech/go-ethereum/core/types" - "github.com/scroll-tech/go-ethereum/crypto/kzg4844" - "github.com/scroll-tech/go-ethereum/ethdb" - "github.com/scroll-tech/go-ethereum/log" - - "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" - "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" -) - -var ( - callDataBlobSourceFetchBlockRange uint64 = 500 - commitBatchEventName = "CommitBatch" - revertBatchEventName = "RevertBatch" - finalizeBatchEventName = "FinalizeBatch" - commitBatchMethodName = "commitBatch" - commitBatchWithBlobProofMethodName = "commitBatchWithBlobProof" - - // the length og method ID at the beginning of transaction data - methodIDLength = 4 -) - -type CalldataBlobSource struct { - ctx context.Context - l1Client *rollup_sync_service.L1Client - blobClient blob_client.BlobClient - l1height uint64 - scrollChainABI *abi.ABI - l1CommitBatchEventSignature common.Hash - l1RevertBatchEventSignature common.Hash - l1FinalizeBatchEventSignature common.Hash - db ethdb.Database -} - -func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Client *rollup_sync_service.L1Client, blobClient blob_client.BlobClient, db ethdb.Database) (DataSource, error) { - scrollChainABI, err := rollup_sync_service.ScrollChainMetaData.GetAbi() - if err != nil { - return nil, fmt.Errorf("failed to get scroll chain abi: %w", err) - } - return &CalldataBlobSource{ - ctx: ctx, - l1Client: l1Client, - blobClient: blobClient, - l1height: l1height, - scrollChainABI: scrollChainABI, - l1CommitBatchEventSignature: scrollChainABI.Events[commitBatchEventName].ID, - l1RevertBatchEventSignature: scrollChainABI.Events[revertBatchEventName].ID, - l1FinalizeBatchEventSignature: scrollChainABI.Events[finalizeBatchEventName].ID, - db: db, - }, nil -} - -func (ds *CalldataBlobSource) NextData() (DA, error) { - to := ds.l1height + callDataBlobSourceFetchBlockRange - l1Finalized, err := ds.l1Client.GetLatestFinalizedBlockNumber() - if err != nil { - return nil, fmt.Errorf("cannot get l1height, error: %v", err) - } - if to > l1Finalized { - to = l1Finalized - } - if ds.l1height > to { - return nil, errSourceExhausted - } - logs, err := ds.l1Client.FetchRollupEventsInRange(ds.l1height, to) - if err != nil { - return nil, fmt.Errorf("cannot get events, l1height: %d, error: %v", ds.l1height, err) - } - da, err := ds.processLogsToDA(logs) - if err == nil { - ds.l1height = to + 1 - } - return da, err -} - -func (ds *CalldataBlobSource) L1Height() uint64 { - return ds.l1height -} - -func (ds *CalldataBlobSource) processLogsToDA(logs []types.Log) (DA, error) { - var da DA - for _, vLog := range logs { - switch vLog.Topics[0] { - case ds.l1CommitBatchEventSignature: - event := &rollup_sync_service.L1CommitBatchEvent{} - if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, "CommitBatch", vLog); err != nil { - return nil, fmt.Errorf("failed to unpack commit rollup event log, err: %w", err) - } - batchIndex := event.BatchIndex.Uint64() - log.Trace("found new CommitBatch event", "batch index", batchIndex) - - daEntry, err := ds.getCommitBatchDa(batchIndex, &vLog) - if err != nil { - return nil, fmt.Errorf("failed to get commit batch da: %v, err: %w", batchIndex, err) - } - da = append(da, daEntry) - - case ds.l1RevertBatchEventSignature: - event := &rollup_sync_service.L1RevertBatchEvent{} - if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, "RevertBatch", vLog); err != nil { - return nil, fmt.Errorf("failed to unpack revert rollup event log, err: %w", err) - } - batchIndex := event.BatchIndex.Uint64() - log.Trace("found new RevertBatch event", "batch index", batchIndex) - da = append(da, NewRevertBatchDA(batchIndex)) - - case ds.l1FinalizeBatchEventSignature: - event := &rollup_sync_service.L1FinalizeBatchEvent{} - if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, "FinalizeBatch", vLog); err != nil { - return nil, fmt.Errorf("failed to unpack finalized rollup event log, err: %w", err) - } - batchIndex := event.BatchIndex.Uint64() - log.Trace("found new FinalizeBatch event", "batch index", batchIndex) - - da = append(da, NewFinalizeBatchDA(batchIndex)) - - default: - return nil, fmt.Errorf("unknown event, topic: %v, tx hash: %v", vLog.Topics[0].Hex(), vLog.TxHash.Hex()) - } - } - return da, nil -} - -type commitBatchArgs struct { - Version uint8 - ParentBatchHeader []byte - Chunks [][]byte - SkippedL1MessageBitmap []byte -} - -func newCommitBatchArgs(method *abi.Method, values []interface{}) (*commitBatchArgs, error) { - var args commitBatchArgs - err := method.Inputs.Copy(&args, values) - return &args, err -} - -func newCommitBatchArgsFromCommitBatchWithProof(method *abi.Method, values []interface{}) (*commitBatchArgs, error) { - var args commitBatchWithBlobProofArgs - err := method.Inputs.Copy(&args, values) - if err != nil { - return nil, err - } - return &commitBatchArgs{ - Version: args.Version, - ParentBatchHeader: args.ParentBatchHeader, - Chunks: args.Chunks, - SkippedL1MessageBitmap: args.SkippedL1MessageBitmap, - }, nil -} - -type commitBatchWithBlobProofArgs struct { - Version uint8 - ParentBatchHeader []byte - Chunks [][]byte - SkippedL1MessageBitmap []byte - BlobDataProof []byte -} - -func (ds *CalldataBlobSource) getCommitBatchDa(batchIndex uint64, vLog *types.Log) (DAEntry, error) { - if batchIndex == 0 { - return NewCommitBatchDAV0(0, batchIndex, 0, []byte{}, []*codecv0.DAChunkRawTx{}, []*types.L1MessageTx{}, 0), nil - } - - txData, err := ds.l1Client.FetchTxData(vLog) - if err != nil { - return nil, err - } - if len(txData) < methodIDLength { - return nil, fmt.Errorf("transaction data is too short, length of tx data: %v, minimum length required: %v", len(txData), methodIDLength) - } - - method, err := ds.scrollChainABI.MethodById(txData[:methodIDLength]) - if err != nil { - return nil, fmt.Errorf("failed to get method by ID, ID: %v, err: %w", txData[:methodIDLength], err) - } - values, err := method.Inputs.Unpack(txData[methodIDLength:]) - if err != nil { - return nil, fmt.Errorf("failed to unpack transaction data using ABI, tx data: %v, err: %w", txData, err) - } - - if method.Name == commitBatchMethodName { - args, err := newCommitBatchArgs(method, values) - if err != nil { - return nil, fmt.Errorf("failed to decode calldata into commitBatch args, values: %+v, err: %w", values, err) - } - switch args.Version { - case 0: - return ds.decodeDAV0(batchIndex, vLog, args) - case 1: - return ds.decodeDAV1(batchIndex, vLog, args) - case 2: - return ds.decodeDAV2(batchIndex, vLog, args) - default: - return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version) - } - } else if method.Name == commitBatchWithBlobProofMethodName { - args, err := newCommitBatchArgsFromCommitBatchWithProof(method, values) - if err != nil { - return nil, fmt.Errorf("failed to decode calldata into commitBatch args, values: %+v, err: %w", values, err) - } - return ds.decodeDAV2(batchIndex, vLog, args) - } - return nil, fmt.Errorf("unknown method name: %s", method.Name) -} - -func (ds *CalldataBlobSource) decodeDAV0(batchIndex uint64, vLog *types.Log, args *commitBatchArgs) (DAEntry, error) { - var chunks []*codecv0.DAChunkRawTx - var l1Txs []*types.L1MessageTx - chunks, err := codecv0.DecodeDAChunksRawTx(args.Chunks) - if err != nil { - return nil, fmt.Errorf("failed to unpack chunks: %v, err: %w", batchIndex, err) - } - - parentTotalL1MessagePopped := getBatchTotalL1MessagePopped(args.ParentBatchHeader) - totalL1MessagePopped := 0 - for _, chunk := range chunks { - for _, block := range chunk.Blocks { - totalL1MessagePopped += int(block.NumL1Messages) - } - } - skippedBitmap, err := encoding.DecodeBitmap(args.SkippedL1MessageBitmap, totalL1MessagePopped) - if err != nil { - return nil, fmt.Errorf("failed to decode bitmap: %v, err: %w", batchIndex, err) - } - // get all necessary l1msgs without skipped - currentIndex := parentTotalL1MessagePopped - for index := 0; index < totalL1MessagePopped; index++ { - if encoding.IsL1MessageSkipped(skippedBitmap, currentIndex-parentTotalL1MessagePopped) { - currentIndex++ - continue - } - l1Tx := rawdb.ReadL1Message(ds.db, currentIndex) - if l1Tx == nil { - return nil, fmt.Errorf("failed to read L1 message from db, l1 message index: %v", currentIndex) - } - l1Txs = append(l1Txs, l1Tx) - currentIndex++ - } - da := NewCommitBatchDAV0(args.Version, batchIndex, parentTotalL1MessagePopped, args.SkippedL1MessageBitmap, chunks, l1Txs, vLog.BlockNumber) - return da, nil -} - -func (ds *CalldataBlobSource) decodeDAV1(batchIndex uint64, vLog *types.Log, args *commitBatchArgs) (DAEntry, error) { - var chunks []*codecv1.DAChunkRawTx - var l1Txs []*types.L1MessageTx - chunks, err := codecv1.DecodeDAChunksRawTx(args.Chunks) - if err != nil { - return nil, fmt.Errorf("failed to unpack chunks: %v, err: %w", batchIndex, err) - } - - versionedHash, err := ds.l1Client.FetchTxBlobHash(vLog) - if err != nil { - return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err) - } - blob, err := ds.blobClient.GetBlobByVersionedHash(ds.ctx, versionedHash) - if err != nil { - return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err) - } - // compute blob versioned hash and compare with one from tx - c, err := kzg4844.BlobToCommitment(blob) - if err != nil { - return nil, fmt.Errorf("failed to create blob commitment") - } - blobVersionedHash := common.Hash(kzg4844.CalcBlobHashV1(sha256.New(), &c)) - if blobVersionedHash != versionedHash { - return nil, fmt.Errorf("blobVersionedHash from blob source is not equal to versionedHash from tx, correct versioned hash: %s, fetched blob hash: %s", versionedHash.String(), blobVersionedHash.String()) - } - // decode txs from blob - err = codecv1.DecodeTxsFromBlob(blob, chunks) - if err != nil { - return nil, fmt.Errorf("failed to decode txs from blob: %w", err) - } - parentTotalL1MessagePopped := getBatchTotalL1MessagePopped(args.ParentBatchHeader) - totalL1MessagePopped := 0 - for _, chunk := range chunks { - for _, block := range chunk.Blocks { - totalL1MessagePopped += int(block.NumL1Messages) - } - } - skippedBitmap, err := encoding.DecodeBitmap(args.SkippedL1MessageBitmap, totalL1MessagePopped) - if err != nil { - return nil, fmt.Errorf("failed to decode bitmap: %v, err: %w", batchIndex, err) - } - // get all necessary l1msgs without skipped - currentIndex := parentTotalL1MessagePopped - for index := 0; index < totalL1MessagePopped; index++ { - for encoding.IsL1MessageSkipped(skippedBitmap, currentIndex-parentTotalL1MessagePopped) { - currentIndex++ - } - l1Tx := rawdb.ReadL1Message(ds.db, currentIndex) - if l1Tx == nil { - return nil, fmt.Errorf("failed to read L1 message from db, l1 message index: %v", currentIndex) - } - l1Txs = append(l1Txs, l1Tx) - currentIndex++ - } - da := NewCommitBatchDAV1(args.Version, batchIndex, parentTotalL1MessagePopped, args.SkippedL1MessageBitmap, chunks, l1Txs, vLog.BlockNumber) - return da, nil -} - -func (ds *CalldataBlobSource) decodeDAV2(batchIndex uint64, vLog *types.Log, args *commitBatchArgs) (DAEntry, error) { - var chunks []*codecv2.DAChunkRawTx - var l1Txs []*types.L1MessageTx - chunks, err := codecv2.DecodeDAChunksRawTx(args.Chunks) - if err != nil { - return nil, fmt.Errorf("failed to unpack chunks: %v, err: %w", batchIndex, err) - } - - versionedHash, err := ds.l1Client.FetchTxBlobHash(vLog) - if err != nil { - return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err) - } - blob, err := ds.blobClient.GetBlobByVersionedHash(ds.ctx, versionedHash) - if err != nil { - return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err) - } - // compute blob versioned hash and compare with one from tx - c, err := kzg4844.BlobToCommitment(blob) - if err != nil { - return nil, fmt.Errorf("failed to create blob commitment") - } - blobVersionedHash := common.Hash(kzg4844.CalcBlobHashV1(sha256.New(), &c)) - if blobVersionedHash != versionedHash { - return nil, fmt.Errorf("blobVersionedHash from blob source is not equal to versionedHash from tx, correct versioned hash: %s, fetched blob hash: %s", versionedHash.String(), blobVersionedHash.String()) - } - // decode txs from blob - err = codecv2.DecodeTxsFromBlob(blob, chunks) - if err != nil { - return nil, fmt.Errorf("failed to decode txs from blob: %w", err) - } - parentTotalL1MessagePopped := getBatchTotalL1MessagePopped(args.ParentBatchHeader) - totalL1MessagePopped := 0 - for _, chunk := range chunks { - for _, block := range chunk.Blocks { - totalL1MessagePopped += int(block.NumL1Messages) - } - } - skippedBitmap, err := encoding.DecodeBitmap(args.SkippedL1MessageBitmap, totalL1MessagePopped) - if err != nil { - return nil, fmt.Errorf("failed to decode bitmap: %v, err: %w", batchIndex, err) - } - // get all necessary l1msgs without skipped - currentIndex := parentTotalL1MessagePopped - for index := 0; index < totalL1MessagePopped; index++ { - for encoding.IsL1MessageSkipped(skippedBitmap, currentIndex-parentTotalL1MessagePopped) { - currentIndex++ - } - l1Tx := rawdb.ReadL1Message(ds.db, currentIndex) - if l1Tx == nil { - return nil, fmt.Errorf("failed to read L1 message from db, l1 message index: %v", currentIndex) - } - l1Txs = append(l1Txs, l1Tx) - currentIndex++ - } - da := NewCommitBatchDAV2(args.Version, batchIndex, parentTotalL1MessagePopped, args.SkippedL1MessageBitmap, chunks, l1Txs, vLog.BlockNumber) - return da, nil -} - -func getBatchTotalL1MessagePopped(data []byte) uint64 { - // total l1 message popped stored in bytes from 17 to 24, accordingly to codec spec - return binary.BigEndian.Uint64(data[17:25]) -} diff --git a/rollup/da_syncer/da.go b/rollup/da_syncer/da.go deleted file mode 100644 index f2f8bdd9b903..000000000000 --- a/rollup/da_syncer/da.go +++ /dev/null @@ -1,186 +0,0 @@ -package da_syncer - -import ( - "github.com/scroll-tech/da-codec/encoding/codecv0" - "github.com/scroll-tech/da-codec/encoding/codecv1" - "github.com/scroll-tech/da-codec/encoding/codecv2" - - "github.com/scroll-tech/go-ethereum/core/types" -) - -type DAType int - -const ( - // CommitBatchV0 contains data of event of CommitBatchV0 - CommitBatchV0 DAType = iota - // CommitBatchV1 contains data of event of CommitBatchV1 - CommitBatchV1 - // CommitBatchV2 contains data of event of CommitBatchV2 - CommitBatchV2 - // RevertBatch contains data of event of RevertBatch - RevertBatch - // FinalizeBatch contains data of event of FinalizeBatch - FinalizeBatch - // FinalizeBatchV3 contains data of event of FinalizeBatch v3 - FinalizeBatchV3 -) - -type DAEntry interface { - DAType() DAType - GetL1BlockNumber() uint64 -} - -type DA []DAEntry - -type CommitBatchDAV0 struct { - Version uint8 - BatchIndex uint64 - ParentTotalL1MessagePopped uint64 - SkippedL1MessageBitmap []byte - Chunks []*codecv0.DAChunkRawTx - L1Txs []*types.L1MessageTx - - L1BlockNumber uint64 -} - -func NewCommitBatchDAV0(version uint8, batchIndex uint64, parentTotalL1MessagePopped uint64, skippedL1MessageBitmap []byte, chunks []*codecv0.DAChunkRawTx, l1Txs []*types.L1MessageTx, l1BlockNumber uint64) DAEntry { - return &CommitBatchDAV0{ - Version: version, - BatchIndex: batchIndex, - ParentTotalL1MessagePopped: parentTotalL1MessagePopped, - SkippedL1MessageBitmap: skippedL1MessageBitmap, - Chunks: chunks, - L1Txs: l1Txs, - L1BlockNumber: l1BlockNumber, - } -} - -func (f *CommitBatchDAV0) DAType() DAType { - return CommitBatchV0 -} - -func (f *CommitBatchDAV0) GetL1BlockNumber() uint64 { - return f.L1BlockNumber -} - -type CommitBatchDAV1 struct { - Version uint8 - BatchIndex uint64 - ParentTotalL1MessagePopped uint64 - SkippedL1MessageBitmap []byte - Chunks []*codecv1.DAChunkRawTx - L1Txs []*types.L1MessageTx - - L1BlockNumber uint64 -} - -func NewCommitBatchDAV1(version uint8, batchIndex uint64, parentTotalL1MessagePopped uint64, skippedL1MessageBitmap []byte, chunks []*codecv1.DAChunkRawTx, l1Txs []*types.L1MessageTx, l1BlockNumber uint64) DAEntry { - return &CommitBatchDAV1{ - Version: version, - BatchIndex: batchIndex, - ParentTotalL1MessagePopped: parentTotalL1MessagePopped, - SkippedL1MessageBitmap: skippedL1MessageBitmap, - Chunks: chunks, - L1Txs: l1Txs, - L1BlockNumber: l1BlockNumber, - } -} - -func (f *CommitBatchDAV1) DAType() DAType { - return CommitBatchV1 -} - -func (f *CommitBatchDAV1) GetL1BlockNumber() uint64 { - return f.L1BlockNumber -} - -type CommitBatchDAV2 struct { - Version uint8 - BatchIndex uint64 - ParentTotalL1MessagePopped uint64 - SkippedL1MessageBitmap []byte - Chunks []*codecv2.DAChunkRawTx - L1Txs []*types.L1MessageTx - - L1BlockNumber uint64 -} - -func NewCommitBatchDAV2(version uint8, batchIndex uint64, parentTotalL1MessagePopped uint64, skippedL1MessageBitmap []byte, chunks []*codecv2.DAChunkRawTx, l1Txs []*types.L1MessageTx, l1BlockNumber uint64) DAEntry { - return &CommitBatchDAV2{ - Version: version, - BatchIndex: batchIndex, - ParentTotalL1MessagePopped: parentTotalL1MessagePopped, - SkippedL1MessageBitmap: skippedL1MessageBitmap, - Chunks: chunks, - L1Txs: l1Txs, - L1BlockNumber: l1BlockNumber, - } -} - -func (f *CommitBatchDAV2) DAType() DAType { - return CommitBatchV2 -} - -func (f *CommitBatchDAV2) GetL1BlockNumber() uint64 { - return f.L1BlockNumber -} - -type RevertBatchDA struct { - BatchIndex uint64 - - L1BlockNumber uint64 -} - -func NewRevertBatchDA(batchIndex uint64) DAEntry { - return &RevertBatchDA{ - BatchIndex: batchIndex, - } -} - -func (f *RevertBatchDA) DAType() DAType { - return RevertBatch -} - -func (f *RevertBatchDA) GetL1BlockNumber() uint64 { - return f.L1BlockNumber -} - -type FinalizeBatchDA struct { - BatchIndex uint64 - - L1BlockNumber uint64 -} - -func NewFinalizeBatchDA(batchIndex uint64) DAEntry { - return &FinalizeBatchDA{ - BatchIndex: batchIndex, - } -} - -func (f *FinalizeBatchDA) DAType() DAType { - return FinalizeBatch -} - -func (f *FinalizeBatchDA) GetL1BlockNumber() uint64 { - return f.L1BlockNumber -} - -type FinalizeBatchDAV3 struct { - BatchIndex uint64 - - L1BlockNumber uint64 -} - -func NewFinalizeBatchDAV3(batchIndex uint64) DAEntry { - return &FinalizeBatchDAV3{ - BatchIndex: batchIndex, - } -} - -func (f *FinalizeBatchDAV3) DAType() DAType { - return FinalizeBatchV3 -} - -func (f *FinalizeBatchDAV3) GetL1BlockNumber() uint64 { - return f.L1BlockNumber -} diff --git a/rollup/da_syncer/da/calldata_blob_source.go b/rollup/da_syncer/da/calldata_blob_source.go new file mode 100644 index 000000000000..94f788a07c85 --- /dev/null +++ b/rollup/da_syncer/da/calldata_blob_source.go @@ -0,0 +1,224 @@ +package da + +import ( + "context" + "errors" + "fmt" + + "github.com/scroll-tech/go-ethereum/accounts/abi" + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/missing_header_fields" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" +) + +const ( + callDataBlobSourceFetchBlockRange uint64 = 500 + commitBatchEventName = "CommitBatch" + revertBatchEventName = "RevertBatch" + finalizeBatchEventName = "FinalizeBatch" + commitBatchMethodName = "commitBatch" + commitBatchWithBlobProofMethodName = "commitBatchWithBlobProof" + + // the length og method ID at the beginning of transaction data + methodIDLength = 4 +) + +var ( + ErrSourceExhausted = errors.New("data source has been exhausted") +) + +type CalldataBlobSource struct { + ctx context.Context + l1Client *rollup_sync_service.L1Client + blobClient blob_client.BlobClient + l1height uint64 + scrollChainABI *abi.ABI + l1CommitBatchEventSignature common.Hash + l1RevertBatchEventSignature common.Hash + l1FinalizeBatchEventSignature common.Hash + db ethdb.Database + missingHeaderFieldsManager *missing_header_fields.Manager +} + +func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Client *rollup_sync_service.L1Client, blobClient blob_client.BlobClient, db ethdb.Database, missingHeaderFieldsManager *missing_header_fields.Manager) (*CalldataBlobSource, error) { + scrollChainABI, err := rollup_sync_service.ScrollChainMetaData.GetAbi() + if err != nil { + return nil, fmt.Errorf("failed to get scroll chain abi: %w", err) + } + + return &CalldataBlobSource{ + ctx: ctx, + l1Client: l1Client, + blobClient: blobClient, + l1height: l1height, + scrollChainABI: scrollChainABI, + l1CommitBatchEventSignature: scrollChainABI.Events[commitBatchEventName].ID, + l1RevertBatchEventSignature: scrollChainABI.Events[revertBatchEventName].ID, + l1FinalizeBatchEventSignature: scrollChainABI.Events[finalizeBatchEventName].ID, + db: db, + missingHeaderFieldsManager: missingHeaderFieldsManager, + }, nil +} + +func (ds *CalldataBlobSource) NextData() (Entries, error) { + to := ds.l1height + callDataBlobSourceFetchBlockRange + l1Finalized, err := ds.l1Client.GetLatestFinalizedBlockNumber() + if err != nil { + return nil, fmt.Errorf("cannot get l1height, error: %v", err) + } + if to > l1Finalized { + to = l1Finalized + } + if ds.l1height > to { + return nil, ErrSourceExhausted + } + logs, err := ds.l1Client.FetchRollupEventsInRange(ds.l1height, to) + if err != nil { + return nil, fmt.Errorf("cannot get events, l1height: %d, error: %v", ds.l1height, err) + } + da, err := ds.processLogsToDA(logs) + if err == nil { + ds.l1height = to + 1 + } + return da, err +} + +func (ds *CalldataBlobSource) L1Height() uint64 { + return ds.l1height +} + +func (ds *CalldataBlobSource) processLogsToDA(logs []types.Log) (Entries, error) { + var entries Entries + var entry Entry + var err error + + for _, vLog := range logs { + switch vLog.Topics[0] { + case ds.l1CommitBatchEventSignature: + event := &rollup_sync_service.L1CommitBatchEvent{} + if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, commitBatchEventName, vLog); err != nil { + return nil, fmt.Errorf("failed to unpack commit rollup event log, err: %w", err) + } + + batchIndex := event.BatchIndex.Uint64() + log.Trace("found new CommitBatch event", "batch index", batchIndex) + + if entry, err = ds.getCommitBatchDA(batchIndex, &vLog); err != nil { + return nil, fmt.Errorf("failed to get commit batch da: %v, err: %w", batchIndex, err) + } + + case ds.l1RevertBatchEventSignature: + event := &rollup_sync_service.L1RevertBatchEvent{} + if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, revertBatchEventName, vLog); err != nil { + return nil, fmt.Errorf("failed to unpack revert rollup event log, err: %w", err) + } + + batchIndex := event.BatchIndex.Uint64() + log.Trace("found new RevertBatchType event", "batch index", batchIndex) + entry = NewRevertBatch(batchIndex) + + case ds.l1FinalizeBatchEventSignature: + event := &rollup_sync_service.L1FinalizeBatchEvent{} + if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, finalizeBatchEventName, vLog); err != nil { + return nil, fmt.Errorf("failed to unpack finalized rollup event log, err: %w", err) + } + + batchIndex := event.BatchIndex.Uint64() + log.Trace("found new FinalizeBatchType event", "batch index", event.BatchIndex.Uint64()) + entry = NewFinalizeBatch(batchIndex) + + default: + return nil, fmt.Errorf("unknown event, topic: %v, tx hash: %v", vLog.Topics[0].Hex(), vLog.TxHash.Hex()) + } + + entries = append(entries, entry) + } + return entries, nil +} + +type commitBatchArgs struct { + Version uint8 + ParentBatchHeader []byte + Chunks [][]byte + SkippedL1MessageBitmap []byte +} + +func newCommitBatchArgs(method *abi.Method, values []interface{}) (*commitBatchArgs, error) { + var args commitBatchArgs + err := method.Inputs.Copy(&args, values) + return &args, err +} + +func newCommitBatchArgsFromCommitBatchWithProof(method *abi.Method, values []interface{}) (*commitBatchArgs, error) { + var args commitBatchWithBlobProofArgs + err := method.Inputs.Copy(&args, values) + if err != nil { + return nil, err + } + return &commitBatchArgs{ + Version: args.Version, + ParentBatchHeader: args.ParentBatchHeader, + Chunks: args.Chunks, + SkippedL1MessageBitmap: args.SkippedL1MessageBitmap, + }, nil +} + +type commitBatchWithBlobProofArgs struct { + Version uint8 + ParentBatchHeader []byte + Chunks [][]byte + SkippedL1MessageBitmap []byte + BlobDataProof []byte +} + +func (ds *CalldataBlobSource) getCommitBatchDA(batchIndex uint64, vLog *types.Log) (Entry, error) { + if batchIndex == 0 { + return NewCommitBatchDAV0Empty(), nil + } + + txData, err := ds.l1Client.FetchTxData(vLog) + if err != nil { + return nil, err + } + if len(txData) < methodIDLength { + return nil, fmt.Errorf("transaction data is too short, length of tx data: %v, minimum length required: %v", len(txData), methodIDLength) + } + + method, err := ds.scrollChainABI.MethodById(txData[:methodIDLength]) + if err != nil { + return nil, fmt.Errorf("failed to get method by ID, ID: %v, err: %w", txData[:methodIDLength], err) + } + values, err := method.Inputs.Unpack(txData[methodIDLength:]) + if err != nil { + return nil, fmt.Errorf("failed to unpack transaction data using ABI, tx data: %v, err: %w", txData, err) + } + + if method.Name == commitBatchMethodName { + args, err := newCommitBatchArgs(method, values) + if err != nil { + return nil, fmt.Errorf("failed to decode calldata into commitBatch args, values: %+v, err: %w", values, err) + } + switch args.Version { + case 0: + return NewCommitBatchDAV0(ds.missingHeaderFieldsManager, ds.db, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap, vLog.BlockNumber) + case 1: + return NewCommitBatchDAV1(ds.ctx, ds.missingHeaderFieldsManager, ds.db, ds.l1Client, ds.blobClient, vLog, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + case 2: + return NewCommitBatchDAV2(ds.ctx, ds.missingHeaderFieldsManager, ds.db, ds.l1Client, ds.blobClient, vLog, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + default: + return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version) + } + } else if method.Name == commitBatchWithBlobProofMethodName { + args, err := newCommitBatchArgsFromCommitBatchWithProof(method, values) + if err != nil { + return nil, fmt.Errorf("failed to decode calldata into commitBatch args, values: %+v, err: %w", values, err) + } + return NewCommitBatchDAV2(ds.ctx, ds.missingHeaderFieldsManager, ds.db, ds.l1Client, ds.blobClient, vLog, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + } + + return nil, fmt.Errorf("unknown method name: %s", method.Name) +} diff --git a/rollup/da_syncer/da/commitV0.go b/rollup/da_syncer/da/commitV0.go new file mode 100644 index 000000000000..fe8a09f92fb6 --- /dev/null +++ b/rollup/da_syncer/da/commitV0.go @@ -0,0 +1,181 @@ +package da + +import ( + "encoding/binary" + "fmt" + "io" + "math/big" + + "github.com/scroll-tech/da-codec/encoding" + "github.com/scroll-tech/da-codec/encoding/codecv0" + + "github.com/scroll-tech/go-ethereum/core/rawdb" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/rollup/missing_header_fields" +) + +type CommitBatchDAV0 struct { + missingHeaderFieldsManager *missing_header_fields.Manager + + version uint8 + batchIndex uint64 + parentTotalL1MessagePopped uint64 + skippedL1MessageBitmap []byte + chunks []*codecv0.DAChunkRawTx + l1Txs []*types.L1MessageTx + + l1BlockNumber uint64 +} + +func NewCommitBatchDAV0( + missingHeaderFieldsManager *missing_header_fields.Manager, + db ethdb.Database, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + chunks [][]byte, + skippedL1MessageBitmap []byte, + l1BlockNumber uint64, +) (*CommitBatchDAV0, error) { + decodedChunks, err := codecv0.DecodeDAChunksRawTx(chunks) + if err != nil { + return nil, fmt.Errorf("failed to unpack chunks: %d, err: %w", batchIndex, err) + } + + return NewCommitBatchDAV0WithChunks(missingHeaderFieldsManager, db, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, l1BlockNumber) +} + +func NewCommitBatchDAV0WithChunks( + missingHeaderFieldsManager *missing_header_fields.Manager, + db ethdb.Database, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + decodedChunks []*codecv0.DAChunkRawTx, + skippedL1MessageBitmap []byte, + l1BlockNumber uint64, +) (*CommitBatchDAV0, error) { + parentTotalL1MessagePopped := getBatchTotalL1MessagePopped(parentBatchHeader) + l1Txs, err := getL1Messages(db, parentTotalL1MessagePopped, skippedL1MessageBitmap, getTotalMessagesPoppedFromChunks(decodedChunks)) + if err != nil { + return nil, fmt.Errorf("failed to get L1 messages for v0 batch %d: %w", batchIndex, err) + } + + return &CommitBatchDAV0{ + missingHeaderFieldsManager: missingHeaderFieldsManager, + version: version, + batchIndex: batchIndex, + parentTotalL1MessagePopped: parentTotalL1MessagePopped, + skippedL1MessageBitmap: skippedL1MessageBitmap, + chunks: decodedChunks, + l1Txs: l1Txs, + l1BlockNumber: l1BlockNumber, + }, nil +} + +func NewCommitBatchDAV0Empty() *CommitBatchDAV0 { + return &CommitBatchDAV0{ + batchIndex: 0, + } +} + +func (c *CommitBatchDAV0) Type() Type { + return CommitBatchV0Type +} + +func (c *CommitBatchDAV0) L1BlockNumber() uint64 { + return c.l1BlockNumber +} + +func (c *CommitBatchDAV0) BatchIndex() uint64 { + return c.batchIndex +} + +func (c *CommitBatchDAV0) Blocks() ([]*PartialBlock, error) { + var blocks []*PartialBlock + l1TxPointer := 0 + + curL1TxIndex := c.parentTotalL1MessagePopped + for _, chunk := range c.chunks { + for blockId, daBlock := range chunk.Blocks { + // create txs + // var txs types.Transactions + txs := make(types.Transactions, 0, daBlock.NumTransactions) + // insert l1 msgs + for l1TxPointer < len(c.l1Txs) && c.l1Txs[l1TxPointer].QueueIndex < curL1TxIndex+uint64(daBlock.NumL1Messages) { + l1Tx := types.NewTx(c.l1Txs[l1TxPointer]) + txs = append(txs, l1Tx) + l1TxPointer++ + } + curL1TxIndex += uint64(daBlock.NumL1Messages) + // insert l2 txs + txs = append(txs, chunk.Transactions[blockId]...) + + difficulty, extraData, err := c.missingHeaderFieldsManager.GetMissingHeaderFields(daBlock.BlockNumber) + if err != nil { + return nil, fmt.Errorf("failed to get missing header fields, block number: %d, error: %v", daBlock.BlockNumber, err) + } + + var baseFee *big.Int + if daBlock.BaseFee.Uint64() != 0 { + baseFee = daBlock.BaseFee + } + + block := NewPartialBlock( + &PartialHeader{ + Number: daBlock.BlockNumber, + Time: daBlock.Timestamp, + BaseFee: baseFee, + GasLimit: daBlock.GasLimit, + Difficulty: difficulty, + ExtraData: extraData, + }, + txs) + blocks = append(blocks, block) + } + } + return blocks, nil +} + +func getTotalMessagesPoppedFromChunks(decodedChunks []*codecv0.DAChunkRawTx) int { + totalL1MessagePopped := 0 + for _, chunk := range decodedChunks { + for _, block := range chunk.Blocks { + totalL1MessagePopped += int(block.NumL1Messages) + } + } + return totalL1MessagePopped +} + +func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skippedBitmap []byte, totalL1MessagePopped int) ([]*types.L1MessageTx, error) { + var txs []*types.L1MessageTx + + decodedSkippedBitmap, err := encoding.DecodeBitmap(skippedBitmap, totalL1MessagePopped) + if err != nil { + return nil, fmt.Errorf("failed to decode skipped message bitmap: err: %w", err) + } + + // get all necessary l1 messages without skipped + currentIndex := parentTotalL1MessagePopped + for index := 0; index < totalL1MessagePopped; index++ { + if encoding.IsL1MessageSkipped(decodedSkippedBitmap, currentIndex-parentTotalL1MessagePopped) { + currentIndex++ + continue + } + l1Tx := rawdb.ReadL1Message(db, currentIndex) + if l1Tx == nil { + // TODO: returning io.EOF is not the best way to handle this + return nil, io.EOF + } + txs = append(txs, l1Tx) + currentIndex++ + } + + return txs, nil +} + +func getBatchTotalL1MessagePopped(data []byte) uint64 { + // total l1 message popped stored in bytes from 17 to 24, accordingly to codec spec + return binary.BigEndian.Uint64(data[17:25]) +} diff --git a/rollup/da_syncer/da/commitV1.go b/rollup/da_syncer/da/commitV1.go new file mode 100644 index 000000000000..f3a596d3c105 --- /dev/null +++ b/rollup/da_syncer/da/commitV1.go @@ -0,0 +1,77 @@ +package da + +import ( + "context" + "crypto/sha256" + "fmt" + + "github.com/scroll-tech/da-codec/encoding/codecv1" + + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/missing_header_fields" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/crypto/kzg4844" + "github.com/scroll-tech/go-ethereum/ethdb" +) + +type CommitBatchDAV1 struct { + *CommitBatchDAV0 +} + +func NewCommitBatchDAV1(ctx context.Context, + missingHeaderFieldsManager *missing_header_fields.Manager, + db ethdb.Database, + l1Client *rollup_sync_service.L1Client, + blobClient blob_client.BlobClient, + vLog *types.Log, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + chunks [][]byte, + skippedL1MessageBitmap []byte, +) (*CommitBatchDAV1, error) { + decodedChunks, err := codecv1.DecodeDAChunksRawTx(chunks) + if err != nil { + return nil, fmt.Errorf("failed to unpack chunks: %v, err: %w", batchIndex, err) + } + + versionedHash, err := l1Client.FetchTxBlobHash(vLog) + if err != nil { + return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err) + } + + blob, err := blobClient.GetBlobByVersionedHash(ctx, versionedHash) + if err != nil { + return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err) + } + + // compute blob versioned hash and compare with one from tx + c, err := kzg4844.BlobToCommitment(blob) + if err != nil { + return nil, fmt.Errorf("failed to create blob commitment") + } + blobVersionedHash := common.Hash(kzg4844.CalcBlobHashV1(sha256.New(), &c)) + if blobVersionedHash != versionedHash { + return nil, fmt.Errorf("blobVersionedHash from blob source is not equal to versionedHash from tx, correct versioned hash: %s, fetched blob hash: %s", versionedHash.String(), blobVersionedHash.String()) + } + + // decode txs from blob + err = codecv1.DecodeTxsFromBlob(blob, decodedChunks) + if err != nil { + return nil, fmt.Errorf("failed to decode txs from blob: %w", err) + } + + v0, err := NewCommitBatchDAV0WithChunks(missingHeaderFieldsManager, db, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, vLog.BlockNumber) + if err != nil { + return nil, err + } + + return &CommitBatchDAV1{v0}, nil +} + +func (c *CommitBatchDAV1) Type() Type { + return CommitBatchV1Type +} diff --git a/rollup/da_syncer/da/commitV2.go b/rollup/da_syncer/da/commitV2.go new file mode 100644 index 000000000000..444b7e4fb801 --- /dev/null +++ b/rollup/da_syncer/da/commitV2.go @@ -0,0 +1,41 @@ +package da + +import ( + "context" + + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/missing_header_fields" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" + + "github.com/scroll-tech/go-ethereum/core/types" +) + +type CommitBatchDAV2 struct { + *CommitBatchDAV1 +} + +func NewCommitBatchDAV2(ctx context.Context, + missingHeaderFieldsManager *missing_header_fields.Manager, + db ethdb.Database, + l1Client *rollup_sync_service.L1Client, + blobClient blob_client.BlobClient, + vLog *types.Log, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + chunks [][]byte, + skippedL1MessageBitmap []byte, +) (*CommitBatchDAV2, error) { + + v1, err := NewCommitBatchDAV1(ctx, missingHeaderFieldsManager, db, l1Client, blobClient, vLog, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap) + if err != nil { + return nil, err + } + + return &CommitBatchDAV2{v1}, nil +} + +func (c *CommitBatchDAV2) Type() Type { + return CommitBatchV2Type +} diff --git a/rollup/da_syncer/da/da.go b/rollup/da_syncer/da/da.go new file mode 100644 index 000000000000..fbaa60962758 --- /dev/null +++ b/rollup/da_syncer/da/da.go @@ -0,0 +1,69 @@ +package da + +import ( + "math/big" + + "github.com/scroll-tech/go-ethereum/core/types" +) + +type Type int + +const ( + // CommitBatchV0Type contains data of event of CommitBatchV0Type + CommitBatchV0Type Type = iota + // CommitBatchV1Type contains data of event of CommitBatchV1Type + CommitBatchV1Type + // CommitBatchV2Type contains data of event of CommitBatchV2Type + CommitBatchV2Type + // RevertBatchType contains data of event of RevertBatchType + RevertBatchType + // FinalizeBatchType contains data of event of FinalizeBatchType + FinalizeBatchType + // FinalizeBatchV3Type contains data of event of FinalizeBatchType v3 + FinalizeBatchV3Type +) + +type Entry interface { + Type() Type + BatchIndex() uint64 + L1BlockNumber() uint64 +} + +type EntryWithBlocks interface { + Entry + Blocks() ([]*PartialBlock, error) +} + +type Entries []Entry + +type PartialHeader struct { + Number uint64 + Time uint64 + BaseFee *big.Int + GasLimit uint64 + Difficulty uint64 + ExtraData []byte +} + +func (h *PartialHeader) ToHeader() *types.Header { + return &types.Header{ + Number: big.NewInt(0).SetUint64(h.Number), + Time: h.Time, + BaseFee: h.BaseFee, + GasLimit: h.GasLimit, + Difficulty: new(big.Int).SetUint64(h.Difficulty), + Extra: h.ExtraData, + } +} + +type PartialBlock struct { + PartialHeader *PartialHeader + Transactions types.Transactions +} + +func NewPartialBlock(partialHeader *PartialHeader, txs types.Transactions) *PartialBlock { + return &PartialBlock{ + PartialHeader: partialHeader, + Transactions: txs, + } +} diff --git a/rollup/da_syncer/da/finalize.go b/rollup/da_syncer/da/finalize.go new file mode 100644 index 000000000000..9864233fe933 --- /dev/null +++ b/rollup/da_syncer/da/finalize.go @@ -0,0 +1,45 @@ +package da + +type FinalizeBatch struct { + batchIndex uint64 + + l1BlockNumber uint64 +} + +func NewFinalizeBatch(batchIndex uint64) *FinalizeBatch { + return &FinalizeBatch{ + batchIndex: batchIndex, + } +} + +func (f *FinalizeBatch) Type() Type { + return FinalizeBatchType +} + +func (f *FinalizeBatch) L1BlockNumber() uint64 { + return f.l1BlockNumber +} + +func (f *FinalizeBatch) BatchIndex() uint64 { + return f.batchIndex +} + +type FinalizeBatchDAV3 struct { + BatchIndex uint64 + + L1BlockNumber uint64 +} + +func NewFinalizeBatchDAV3(batchIndex uint64) *FinalizeBatchDAV3 { + return &FinalizeBatchDAV3{ + BatchIndex: batchIndex, + } +} + +func (f *FinalizeBatchDAV3) DAType() Type { + return FinalizeBatchV3Type +} + +func (f *FinalizeBatchDAV3) GetL1BlockNumber() uint64 { + return f.L1BlockNumber +} diff --git a/rollup/da_syncer/da/revert.go b/rollup/da_syncer/da/revert.go new file mode 100644 index 000000000000..f02c264039b7 --- /dev/null +++ b/rollup/da_syncer/da/revert.go @@ -0,0 +1,24 @@ +package da + +type RevertBatch struct { + batchIndex uint64 + + l1BlockNumber uint64 +} + +func NewRevertBatch(batchIndex uint64) *RevertBatch { + return &RevertBatch{ + batchIndex: batchIndex, + } +} + +func (r *RevertBatch) Type() Type { + return RevertBatchType +} + +func (r *RevertBatch) L1BlockNumber() uint64 { + return r.l1BlockNumber +} +func (r *RevertBatch) BatchIndex() uint64 { + return r.batchIndex +} diff --git a/rollup/da_syncer/da_queue.go b/rollup/da_syncer/da_queue.go index b1d3d79b8b22..bdbbbb5428bd 100644 --- a/rollup/da_syncer/da_queue.go +++ b/rollup/da_syncer/da_queue.go @@ -2,13 +2,15 @@ package da_syncer import ( "context" + + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" ) type DAQueue struct { l1height uint64 dataSourceFactory *DataSourceFactory dataSource DataSource - da DA + da da.Entries } func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue { @@ -16,11 +18,11 @@ func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue l1height: l1height, dataSourceFactory: dataSourceFactory, dataSource: nil, - da: []DAEntry{}, + da: make(da.Entries, 0), } } -func (dq *DAQueue) NextDA(ctx context.Context) (DAEntry, error) { +func (dq *DAQueue) NextDA(ctx context.Context) (da.Entry, error) { for len(dq.da) == 0 { err := dq.getNextData(ctx) if err != nil { @@ -42,7 +44,7 @@ func (dq *DAQueue) getNextData(ctx context.Context) error { } dq.da, err = dq.dataSource.NextData() // previous dataSource has been exhausted, create new - if err == errSourceExhausted { + if err == da.ErrSourceExhausted { dq.l1height = dq.dataSource.L1Height() dq.dataSource = nil return dq.getNextData(ctx) diff --git a/rollup/da_syncer/da_syncer.go b/rollup/da_syncer/da_syncer.go index e8dbf7859e84..b1adaa7ea994 100644 --- a/rollup/da_syncer/da_syncer.go +++ b/rollup/da_syncer/da_syncer.go @@ -2,13 +2,10 @@ package da_syncer import ( "fmt" - "math/big" - "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/core" - "github.com/scroll-tech/go-ethereum/core/types" "github.com/scroll-tech/go-ethereum/log" - "github.com/scroll-tech/go-ethereum/trie" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" ) type DASyncer struct { @@ -21,33 +18,19 @@ func NewDASyncer(blockchain *core.BlockChain) *DASyncer { } } -func (s *DASyncer) SyncOneBlock(block *types.Block) error { - prevHash := s.blockchain.CurrentBlock().Hash() - if big.NewInt(0).Add(s.blockchain.CurrentBlock().Number(), common.Big1).Cmp(block.Number()) != 0 { - return fmt.Errorf("not consecutive block, number: %d", block.Number()) +func (s *DASyncer) SyncOneBlock(block *da.PartialBlock) error { + parentBlock := s.blockchain.CurrentBlock() + if parentBlock.NumberU64()+1 != block.PartialHeader.Number { + return fmt.Errorf("not consecutive block, number: %d, chain height: %d", block.PartialHeader.Number, parentBlock.NumberU64()) } - header := block.Header() - txs := block.Transactions() - - // fill header with all necessary fields - var err error - header.ReceiptHash, header.Bloom, header.Root, header.GasUsed, err = s.blockchain.PreprocessBlock(block) - if err != nil { - return fmt.Errorf("block preprocessing failed, block number: %d, error: %v", block.Number(), err) - } - header.UncleHash = common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347") - header.Difficulty = common.Big1 - header.BaseFee = nil - header.TxHash = types.DeriveSha(txs, trie.NewStackTrie(nil)) - header.ParentHash = prevHash - - fullBlock := types.NewBlockWithHeader(header).WithBody(txs, make([]*types.Header, 0)) - if _, err := s.blockchain.InsertChainWithoutSealVerification(fullBlock); err != nil { - return fmt.Errorf("cannot insert block, number: %d, error: %v", block.Number(), err) + if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions); err != nil { + return fmt.Errorf("failed building and writing block, number: %d, error: %v", block.PartialHeader.Number, err) } + if s.blockchain.CurrentBlock().Header().Number.Uint64()%100 == 0 { - log.Info("inserted block", "blockhain height", s.blockchain.CurrentBlock().Header().Number, "block hash", s.blockchain.CurrentBlock().Header().Hash()) + log.Info("inserted block", "blockhain height", s.blockchain.CurrentBlock().Header().Number, "block hash", s.blockchain.CurrentBlock().Header().Hash(), "root", s.blockchain.CurrentBlock().Header().Root) } + return nil } diff --git a/rollup/da_syncer/data_source.go b/rollup/da_syncer/data_source.go index 017796352b8c..2f498897b1b7 100644 --- a/rollup/da_syncer/data_source.go +++ b/rollup/da_syncer/data_source.go @@ -3,44 +3,51 @@ package da_syncer import ( "context" "errors" + "path/filepath" - "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/params" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" + "github.com/scroll-tech/go-ethereum/rollup/missing_header_fields" "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" ) -var ( - errSourceExhausted = errors.New("data source has been exhausted") -) - type DataSource interface { - NextData() (DA, error) + NextData() (da.Entries, error) L1Height() uint64 } type DataSourceFactory struct { - config Config - genesisConfig *params.ChainConfig - l1Client *rollup_sync_service.L1Client - blobClient blob_client.BlobClient - db ethdb.Database + ctx context.Context + genesisConfig *params.ChainConfig + config Config + l1Client *rollup_sync_service.L1Client + blobClient blob_client.BlobClient + db ethdb.Database + missingHeaderFieldsManager *missing_header_fields.Manager } -func NewDataSourceFactory(blockchain *core.BlockChain, genesisConfig *params.ChainConfig, config Config, l1Client *rollup_sync_service.L1Client, blobClient blob_client.BlobClient, db ethdb.Database) *DataSourceFactory { +func NewDataSourceFactory(ctx context.Context, genesisConfig *params.ChainConfig, config Config, l1Client *rollup_sync_service.L1Client, blobClient blob_client.BlobClient, db ethdb.Database) *DataSourceFactory { + missingHeaderFieldsManager := missing_header_fields.NewManager(ctx, + filepath.Join(config.AdditionalDataDir, missing_header_fields.DefaultFileName), + genesisConfig.Scroll.DAConfig.MissingHeaderFieldsURL, + genesisConfig.Scroll.DAConfig.MissingHeaderFieldsSHA256, + ) + return &DataSourceFactory{ - config: config, - genesisConfig: genesisConfig, - l1Client: l1Client, - blobClient: blobClient, - db: db, + genesisConfig: genesisConfig, + config: config, + l1Client: l1Client, + blobClient: blobClient, + db: db, + missingHeaderFieldsManager: missingHeaderFieldsManager, } } func (ds *DataSourceFactory) OpenDataSource(ctx context.Context, l1height uint64) (DataSource, error) { if ds.config.FetcherMode == L1RPC { - return NewCalldataBlobSource(ctx, l1height, ds.l1Client, ds.blobClient, ds.db) + return da.NewCalldataBlobSource(ctx, l1height, ds.l1Client, ds.blobClient, ds.db, ds.missingHeaderFieldsManager) } else { return nil, errors.New("snapshot_data_source: not implemented") } diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index 5c5e56300886..91af494b89ef 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -2,12 +2,15 @@ package da_syncer import ( "context" + "errors" "fmt" + "io" "strings" + "sync" "time" + "github.com/scroll-tech/go-ethereum/common/backoff" "github.com/scroll-tech/go-ethereum/core" - "github.com/scroll-tech/go-ethereum/core/rawdb" "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/params" @@ -18,17 +21,18 @@ import ( // Config is the configuration parameters of data availability syncing. type Config struct { - FetcherMode FetcherMode // mode of fetcher - SnapshotFilePath string // path to snapshot file - BlobSource blob_client.BlobSource // blob source + FetcherMode FetcherMode // mode of fetcher + SnapshotFilePath string // path to snapshot file + BlobSource blob_client.BlobSource // blob source + AdditionalDataDir string // additional data directory } -// defaultSyncInterval is the frequency at which we query for new rollup event. -const defaultSyncInterval = 1 * time.Millisecond - type SyncingPipeline struct { ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup + expBackoff *backoff.Exponential + db ethdb.Database blockchain *core.BlockChain blockQueue *BlockQueue @@ -36,19 +40,18 @@ type SyncingPipeline struct { } func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesisConfig *params.ChainConfig, db ethdb.Database, ethClient sync_service.EthClient, l1DeploymentBlock uint64, config Config) (*SyncingPipeline, error) { - ctx, cancel := context.WithCancel(ctx) - scrollChainABI, err := rollup_sync_service.ScrollChainMetaData.GetAbi() if err != nil { - cancel() return nil, fmt.Errorf("failed to get scroll chain abi: %w", err) } + ctx, cancel := context.WithCancel(ctx) l1Client, err := rollup_sync_service.NewL1Client(ctx, ethClient, genesisConfig.Scroll.L1Config.L1ChainId, genesisConfig.Scroll.L1Config.ScrollChainAddress, scrollChainABI) if err != nil { cancel() return nil, err } + var blobClient blob_client.BlobClient switch config.BlobSource { case blob_client.BlobScan: @@ -60,20 +63,22 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi return nil, fmt.Errorf("unknown blob scan client: %d", config.BlobSource) } - dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Client, blobClient, db) + dataSourceFactory := NewDataSourceFactory(ctx, genesisConfig, config, l1Client, blobClient, db) syncedL1Height := l1DeploymentBlock - 1 from := rawdb.ReadDASyncedL1BlockNumber(db) if from != nil { syncedL1Height = *from } - DAQueue := NewDAQueue(syncedL1Height, dataSourceFactory) - batchQueue := NewBatchQueue(DAQueue, db) + + daQueue := NewDAQueue(syncedL1Height, dataSourceFactory) + batchQueue := NewBatchQueue(daQueue, db) blockQueue := NewBlockQueue(batchQueue) daSyncer := NewDASyncer(blockchain) return &SyncingPipeline{ ctx: ctx, cancel: cancel, + expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond), db: db, blockchain: blockchain, blockQueue: blockQueue, @@ -81,47 +86,92 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi }, nil } -func (sp *SyncingPipeline) Step() error { - block, err := sp.blockQueue.NextBlock(sp.ctx) +func (s *SyncingPipeline) Step() error { + block, err := s.blockQueue.NextBlock(s.ctx) if err != nil { return err } - err = sp.daSyncer.SyncOneBlock(block) + err = s.daSyncer.SyncOneBlock(block) return err } -func (sp *SyncingPipeline) Start() { +func (s *SyncingPipeline) Start() { log.Info("Starting SyncingPipeline") + s.wg.Add(1) go func() { - syncTicker := time.NewTicker(defaultSyncInterval) - defer syncTicker.Stop() - - for { - err := sp.Step() - if err != nil { - if strings.HasPrefix(err.Error(), "not consecutive block") { - log.Warn("syncing pipeline step failed, probably because of restart", "err", err) - } else { - log.Crit("syncing pipeline step failed", "err", err) - } + s.mainLoop() + s.wg.Done() + }() +} + +func (s *SyncingPipeline) mainLoop() { + stepCh := make(chan struct{}, 1) + var delayedStepCh <-chan time.Time + + // reqStep is a helper function to request a step to be executed. + // If delay is true, it will request a delayed step with exponential backoff, otherwise it will request an immediate step. + reqStep := func(delay bool) { + if delay { + if delayedStepCh == nil { + delayDur := s.expBackoff.NextDuration() + delayedStepCh = time.After(delayDur) + log.Debug("requesting delayed step", "delay", delayDur, "attempt", s.expBackoff.Attempt()) + } else { + log.Debug("ignoring step request because of ongoing delayed step", "attempt", s.expBackoff.Attempt()) } + } else { select { - case <-sp.ctx.Done(): - return - case <-syncTicker.C: - select { - case <-sp.ctx.Done(): - return - default: - } + case stepCh <- struct{}{}: + default: + } + } + } + + // start pipeline + reqStep(false) + + for { + select { + case <-s.ctx.Done(): + return + default: + } + + select { + case <-s.ctx.Done(): + return + case <-delayedStepCh: + delayedStepCh = nil + reqStep(false) + case <-stepCh: + err := s.Step() + if err == nil { + reqStep(false) + s.expBackoff.Reset() + continue + } + + if errors.Is(err, io.EOF) { + reqStep(true) continue } + if errors.Is(err, context.Canceled) { + return + } + + if strings.HasPrefix(err.Error(), "not consecutive block") { + log.Warn("syncing pipeline step failed, probably because of restart", "err", err) + } else { + log.Crit("syncing pipeline step failed", "err", err) + } } - }() + } } -func (sp *SyncingPipeline) Stop() { - log.Info("Stopping DaSyncer") - sp.cancel() +func (s *SyncingPipeline) Stop() { + log.Info("Stopping DaSyncer...") + s.cancel() + s.wg.Wait() + log.Info("Stopped DaSyncer... Done") } diff --git a/rollup/missing_header_fields/manager.go b/rollup/missing_header_fields/manager.go new file mode 100644 index 000000000000..81e578b212eb --- /dev/null +++ b/rollup/missing_header_fields/manager.go @@ -0,0 +1,186 @@ +package missing_header_fields + +import ( + "bytes" + "context" + "crypto/sha256" + "errors" + "fmt" + "io" + "net/http" + "os" + "time" + + "github.com/scroll-tech/go-ethereum/log" +) + +const timeoutDownload = 10 * time.Minute +const DefaultFileName = "missing_header_fields.bin" + +// Manager is responsible for managing the missing header fields file. +// It lazily downloads the file if it doesn't exist, verifies its checksum and provides the missing header fields. +type Manager struct { + ctx context.Context + filePath string + downloadURL string + checksum [sha256.Size]byte + + reader *Reader +} + +func NewManager(ctx context.Context, filePath string, downloadURL string, checksum [sha256.Size]byte) *Manager { + return &Manager{ + ctx: ctx, + filePath: filePath, + downloadURL: downloadURL, + checksum: checksum, + } +} + +func (m *Manager) GetMissingHeaderFields(headerNum uint64) (difficulty uint64, extraData []byte, err error) { + // lazy initialization: if the reader is not initialized this is the first time we read from the file + if m.reader == nil { + if err = m.initialize(); err != nil { + return 0, nil, fmt.Errorf("failed to initialize missing header reader: %v", err) + } + } + + return m.reader.Read(headerNum) +} + +func (m *Manager) initialize() error { + // if the file doesn't exist, download it + if _, err := os.Stat(m.filePath); errors.Is(err, os.ErrNotExist) { + if err = m.downloadFile(); err != nil { + return fmt.Errorf("failed to download file: %v", err) + } + } + + // verify the checksum + f, err := os.Open(m.filePath) + if err != nil { + return fmt.Errorf("failed to open file: %v", err) + } + + h := sha256.New() + if _, err = io.Copy(h, f); err != nil { + return fmt.Errorf("failed to copy file: %v", err) + } + if err = f.Close(); err != nil { + return fmt.Errorf("failed to close file: %v", err) + } + + if !bytes.Equal(h.Sum(nil), m.checksum[:]) { + return fmt.Errorf("checksum mismatch") + } + + // finally initialize the reader + reader, err := NewReader(m.filePath) + if err != nil { + return err + } + + m.reader = reader + return nil +} +func (m *Manager) Close() error { + if m.reader != nil { + return m.reader.Close() + } + return nil +} + +func (m *Manager) downloadFile() error { + log.Info("Downloading missing header fields. This might take a while...", "url", m.downloadURL) + + downloadCtx, downloadCtxCancel := context.WithTimeout(m.ctx, timeoutDownload) + defer downloadCtxCancel() + + req, err := http.NewRequestWithContext(downloadCtx, http.MethodGet, m.downloadURL, nil) + if err != nil { + return fmt.Errorf("failed to create download request: %v", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to download file: %v", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("server returned status code %d", resp.StatusCode) + } + + // create a temporary file + tmpFilePath := m.filePath + ".tmp" // append .tmp to the file path + tmpFile, err := os.Create(tmpFilePath) + if err != nil { + return fmt.Errorf("failed to create temporary file: %v", err) + } + var ok bool + defer func() { + if !ok { + _ = os.Remove(tmpFilePath) + } + }() + + // copy the response body to the temporary file and print progress + writeCounter := NewWriteCounter(m.ctx, uint64(resp.ContentLength)) + if _, err = io.Copy(tmpFile, io.TeeReader(resp.Body, writeCounter)); err != nil { + return fmt.Errorf("failed to copy response body: %v", err) + } + + if err = tmpFile.Close(); err != nil { + return fmt.Errorf("failed to close temporary file: %v", err) + } + + // rename the temporary file to the final file path + if err = os.Rename(tmpFilePath, m.filePath); err != nil { + return fmt.Errorf("failed to rename temporary file: %v", err) + } + + ok = true + return nil +} + +type WriteCounter struct { + ctx context.Context + total uint64 + written uint64 + lastProgressPrinted time.Time +} + +func NewWriteCounter(ctx context.Context, total uint64) *WriteCounter { + return &WriteCounter{ + ctx: ctx, + total: total, + } +} + +func (wc *WriteCounter) Write(p []byte) (int, error) { + n := len(p) + wc.written += uint64(n) + + // check if the context is done and return early + select { + case <-wc.ctx.Done(): + return n, wc.ctx.Err() + default: + } + + wc.printProgress() + + return n, nil +} + +func (wc *WriteCounter) printProgress() { + if time.Since(wc.lastProgressPrinted) < 5*time.Second { + return + } + wc.lastProgressPrinted = time.Now() + + log.Info(fmt.Sprintf("Downloading missing header fields... %d MB / %d MB", toMB(wc.written), toMB(wc.total))) +} + +func toMB(bytes uint64) uint64 { + return bytes / 1024 / 1024 +} diff --git a/rollup/missing_header_fields/manager_test.go b/rollup/missing_header_fields/manager_test.go new file mode 100644 index 000000000000..c032c2e5fd63 --- /dev/null +++ b/rollup/missing_header_fields/manager_test.go @@ -0,0 +1,59 @@ +package missing_header_fields + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/log" +) + +func TestManagerDownload(t *testing.T) { + t.Skip("skipping test due to long runtime/downloading file") + log.Root().SetHandler(log.StdoutHandler) + + // TODO: replace with actual sha256 hash and downloadURL + sha256 := [32]byte(common.FromHex("0x575858a53b8cdde8d63a2cc1a5b90f1bbf0c2243b292a66a1ab2931d571eb260")) + downloadURL := "https://ftp.halifax.rwth-aachen.de/ubuntu-releases/24.04/ubuntu-24.04-netboot-amd64.tar.gz" + filePath := filepath.Join(t.TempDir(), "test_file_path") + manager := NewManager(context.Background(), filePath, downloadURL, sha256) + + _, _, err := manager.GetMissingHeaderFields(0) + require.NoError(t, err) + + // Check if the file was downloaded and tmp file was removed + _, err = os.Stat(filePath) + require.NoError(t, err) + _, err = os.Stat(filePath + ".tmp") + require.Error(t, err) +} + +func TestManagerChecksum(t *testing.T) { + // Checksum doesn't match + { + sha256 := [32]byte(common.FromHex("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")) + downloadURL := "" // since the file exists we don't need to download it + filePath := "testdata/missing_headers_1.dedup" + manager := NewManager(context.Background(), filePath, downloadURL, sha256) + + _, _, err := manager.GetMissingHeaderFields(0) + require.ErrorContains(t, err, "checksum mismatch") + } + + // Checksum matches + { + sha256 := [32]byte(common.FromHex("0x5dee238e74c350c7116868bfe6c5218d440be3613f47f8c052bd5cef46f4ae04")) + downloadURL := "" // since the file exists we don't need to download it + filePath := "testdata/missing_headers_1.dedup" + manager := NewManager(context.Background(), filePath, downloadURL, sha256) + + difficulty, extra, err := manager.GetMissingHeaderFields(0) + require.NoError(t, err) + require.Equal(t, expectedMissingHeaders1[0].difficulty, difficulty) + require.Equal(t, expectedMissingHeaders1[0].extra, extra) + } +} diff --git a/rollup/missing_header_fields/reader.go b/rollup/missing_header_fields/reader.go new file mode 100644 index 000000000000..c534b4e140fc --- /dev/null +++ b/rollup/missing_header_fields/reader.go @@ -0,0 +1,152 @@ +package missing_header_fields + +import ( + "bufio" + "bytes" + "fmt" + "io" + "os" +) + +type missingHeader struct { + headerNum uint64 + difficulty uint64 + extraData []byte +} + +type Reader struct { + file *os.File + reader *bufio.Reader + sortedVanities map[int][32]byte + lastReadHeader *missingHeader +} + +func NewReader(filePath string) (*Reader, error) { + f, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("failed to open file: %v", err) + } + + r := &Reader{ + file: f, + reader: bufio.NewReader(f), + } + + // read the count of unique vanities + vanityCount, err := r.reader.ReadByte() + if err != nil { + return nil, err + } + + // read the unique vanities + r.sortedVanities = make(map[int][32]byte) + for i := uint8(0); i < vanityCount; i++ { + var vanity [32]byte + if _, err = r.reader.Read(vanity[:]); err != nil { + return nil, err + } + r.sortedVanities[int(i)] = vanity + } + + return r, nil +} + +func (r *Reader) Read(headerNum uint64) (difficulty uint64, extraData []byte, err error) { + if r.lastReadHeader == nil { + if _, _, err = r.ReadNext(); err != nil { + return 0, nil, err + } + } + + if headerNum > r.lastReadHeader.headerNum { + // skip the headers until the requested header number + for i := r.lastReadHeader.headerNum; i < headerNum; i++ { + if _, _, err = r.ReadNext(); err != nil { + return 0, nil, err + } + } + } + + if headerNum == r.lastReadHeader.headerNum { + return r.lastReadHeader.difficulty, r.lastReadHeader.extraData, nil + } + + // headerNum < r.lastReadHeader.headerNum is not supported + return 0, nil, fmt.Errorf("requested header %d below last read header number %d", headerNum, r.lastReadHeader.headerNum) +} + +func (r *Reader) ReadNext() (difficulty uint64, extraData []byte, err error) { + // read the bitmask + bitmask, err := r.reader.ReadByte() + if err != nil { + return 0, nil, fmt.Errorf("failed to read bitmask: %v", err) + } + + bits := newBitMask(bitmask) + + seal := make([]byte, bits.sealLen()) + if _, err = io.ReadFull(r.reader, seal); err != nil { + return 0, nil, fmt.Errorf("failed to read seal: %v", err) + } + + // construct the extraData field + vanity := r.sortedVanities[bits.vanityIndex()] + var b bytes.Buffer + b.Write(vanity[:]) + b.Write(seal) + + // we don't have the header number, so we'll just increment the last read header number + // we assume that the headers are written in order, starting from 0 + if r.lastReadHeader == nil { + r.lastReadHeader = &missingHeader{ + headerNum: 0, + difficulty: uint64(bits.difficulty()), + extraData: b.Bytes(), + } + } else { + r.lastReadHeader.headerNum++ + r.lastReadHeader.difficulty = uint64(bits.difficulty()) + r.lastReadHeader.extraData = b.Bytes() + } + + return difficulty, b.Bytes(), nil +} + +func (r *Reader) Close() error { + return r.file.Close() +} + +// bitMask is a bitmask that encodes the following information: +// +// bit 0-5: index of the vanity in the sorted vanities list +// bit 6: 0 if difficulty is 2, 1 if difficulty is 1 +// bit 7: 0 if seal length is 65, 1 if seal length is 85 +type bitMask struct { + b uint8 +} + +func newBitMask(b uint8) bitMask { + return bitMask{b} +} + +func (b bitMask) vanityIndex() int { + return int(b.b & 0b00111111) +} + +func (b bitMask) difficulty() int { + val := (b.b >> 6) & 0x01 + if val == 0 { + return 2 + } else { + return 1 + } +} + +func (b bitMask) sealLen() int { + val := (b.b >> 7) & 0x01 + if val == 0 { + return 65 + } else { + return 85 + } +} diff --git a/rollup/missing_header_fields/reader_test.go b/rollup/missing_header_fields/reader_test.go new file mode 100644 index 000000000000..bab9f59d40f9 --- /dev/null +++ b/rollup/missing_header_fields/reader_test.go @@ -0,0 +1,83 @@ +package missing_header_fields + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/scroll-tech/go-ethereum/common" +) + +type header struct { + number uint64 + difficulty uint64 + extra []byte +} + +var expectedMissingHeaders1 = []header{ + {0, 2, common.FromHex("0x0000000000000000000000000000000000000000000000000000000000000001970727fcd1de96f749d4cc9c42cba2c0693dd650acf2f23dcf7d4607ea046dbd901284ae68c3754d6c10ea39d2c30e8dcf682071d0c08c96d5ed1d6d868e347510")}, + {1, 1, common.FromHex("0x0000000000000000000000000000000000000000000000000000000000000002f347fafeee461e99075bbcb4ed3bbf3da6094bb3bdec839a78e8bca37e05af9667a54ec59b99e44491857dadbd467d8f3ffb50d5863e21267466930fb832dc6072")}, + {2, 2, common.FromHex("0x0000000000000000000000000000000000000000000000000000000000000002f358c9b2c41a94c47029ee61ecf52da080ab1026c384f2608e9b4e6d9355408702c95d30b49e0d34944efd811f06b2d4acd55d6b2792219d63dbc89450484166fd7a935859db38dc0a1927a881ab1b77d0bea1d0c6")}, + {3, 1, common.FromHex("0x0000000000000000000000000000000000000000000000000000000000000008c71d25c9c4401f0d0a33b125b4a73d04c87c74f6d0fa17396bd29c5fa10310e3adce56dd1183375e4bf6a89a652f8037ad53c0c178d17c16264979f7012ef05303a5c5f659e9060e268432fe3fe6511e1936126fbc")}, + {4, 2, common.FromHex("0x00000000000000000000000000000000000000000000000000000000000000017c0aebb5813ac87c25b0420add7067a5c4105307b461831922af9e7bbfa828efc053bb75611d660d9002776c3e7d1c1c7aa2f3265214d1831bdf1d4b45c7d32d0a")}, + {5, 2, common.FromHex("0x00000000000000000000000000000000000000000000000000000000000000018a706ae7ae04beab64a74c2d9c85bfb0597c3fdf59033ad758f19859aaa6317b6c2efe887d13f05fee70f7dacccf7dad39d3d62178b5901372ceb6d94da2f1a47b")}, + {6, 2, common.FromHex("0x00000000000000000000000000000000000000000000000000000000000000015f14949a5eaf4e56194b84435397c39450e397bbe2708065722cc74312b2d7f309330ae752e06f81a958b0cb3c15a07d17ed9f907d533827f593fbc272e5246438")}, + {7, 2, common.FromHex("0x00000000000000000000000000000000000000000000000000000000000000015c340fb761e273aeae5b8dc5d12b3dce95069ddb8d8dfdd354e32f1c7e8c590fbed5ff520968a74648132dddb1bf31e215a5203af77143fb7170fe813cc1617c4c")}, + {8, 2, common.FromHex("0x00000000000000000000000000000000000000000000000000000000000000013fc20fe02529338255b6cfb595bf4c959bcea92d6af2d78f411afec367e9c2155b52177633c2a9e94601746b48abe44adcb514624c59e67b0eaf8787915c1d74bc")}, + {9, 2, common.FromHex("0x000000000000000000000000000000000000000000000000000000000000000183346f100df02e19f5aaf907d6eff4272b976b683bbda82f50524bff099e3ac57ccaf28f7ee8ce75f7bfde8b5af88f51ecd4ca48e6eca56cf34cc43869839563cd")}, + {10, 2, common.FromHex("0x000000000000000000000000000000000000000000000000000000000000000150154d779ea6f3a74d07cd572b5c951f705b269d72a30b913369a302d4d16fb4f4b27fad875761ce2b5fff09ab3f6e3316cec19c679f2e0a161cf32ffaf5330810")}, + {11, 2, common.FromHex("0x000000000000000000000000000000000000000000000000000000000000000194590bbf6f291ad4e4aeaf2cf7fc6712824f586bcb070e0964220ee3864dc679bf1789afca4816b0e68a40b05ca7bfc533a02894df78e6f4c2c77d71e0a7e3da30")}, + {12, 2, common.FromHex("0x0000000000000000000000000000000000000000000000000000000000000001c7c1c37841da26fd9a2fa796b4972faab58de1b14a255b9b6331342bf511351bebaf5ceba901257a6341bf29d880fc2dba3d41b700f42295f50ea9b28f8ea4bd53")}, +} + +func TestReader_Read(t *testing.T) { + expectedVanities := map[int][32]byte{ + 0: {0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + 1: {0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02}, + 2: {0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08}, + } + + reader, err := NewReader("testdata/missing_headers_1.dedup") + require.NoError(t, err) + + require.Len(t, reader.sortedVanities, len(expectedVanities)) + for i, expectedVanity := range expectedVanities { + require.Equal(t, expectedVanity, reader.sortedVanities[i]) + } + + readAndAssertHeader(t, reader, expectedMissingHeaders1, 0) + readAndAssertHeader(t, reader, expectedMissingHeaders1, 0) + readAndAssertHeader(t, reader, expectedMissingHeaders1, 1) + readAndAssertHeader(t, reader, expectedMissingHeaders1, 6) + + // we don't allow reading previous headers + _, _, err = reader.Read(5) + require.Error(t, err) + + readAndAssertHeader(t, reader, expectedMissingHeaders1, 8) + readAndAssertHeader(t, reader, expectedMissingHeaders1, 8) + + // we don't allow reading previous headers + _, _, err = reader.Read(5) + require.Error(t, err) + + // we don't allow reading previous headers + _, _, err = reader.Read(6) + require.Error(t, err) + + readAndAssertHeader(t, reader, expectedMissingHeaders1, 9) + readAndAssertHeader(t, reader, expectedMissingHeaders1, 10) + readAndAssertHeader(t, reader, expectedMissingHeaders1, 11) + readAndAssertHeader(t, reader, expectedMissingHeaders1, 12) + + // no data anymore + _, _, err = reader.Read(13) + require.Error(t, err) +} + +func readAndAssertHeader(t *testing.T, reader *Reader, expectedHeaders []header, headerNum uint64) { + difficulty, extra, err := reader.Read(headerNum) + require.NoError(t, err) + require.Equalf(t, expectedHeaders[headerNum].difficulty, difficulty, "expected difficulty %d, got %d", expectedHeaders[headerNum].difficulty, difficulty) + require.Equal(t, expectedHeaders[headerNum].extra, extra) +} diff --git a/rollup/missing_header_fields/sha256checksum.go b/rollup/missing_header_fields/sha256checksum.go new file mode 100644 index 000000000000..4cef10c5177d --- /dev/null +++ b/rollup/missing_header_fields/sha256checksum.go @@ -0,0 +1,25 @@ +package missing_header_fields + +import ( + "crypto/sha256" + "reflect" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/common/hexutil" +) + +type SHA256Checksum [sha256.Size]byte + +func SHA256ChecksumFromHex(s string) SHA256Checksum { + return SHA256Checksum(common.FromHex(s)) +} + +// UnmarshalJSON parses a hash in hex syntax. +func (s *SHA256Checksum) UnmarshalJSON(input []byte) error { + return hexutil.UnmarshalFixedJSON(reflect.TypeOf(SHA256Checksum{}), input, s[:]) +} + +// MarshalText returns the hex representation of a. +func (s *SHA256Checksum) MarshalText() ([]byte, error) { + return hexutil.Bytes(s[:]).MarshalText() +}