From 45c0dd60e27a91032472e4ea49436d8bf6003a9f Mon Sep 17 00:00:00 2001 From: weiihann Date: Wed, 18 Dec 2024 19:03:15 +0800 Subject: [PATCH] create ReorgEvent --- rpc/handlers.go | 4 ++-- rpc/subscriptions.go | 23 +++++++++++++++++------ rpc/subscriptions_test.go | 6 +++--- sync/sync.go | 16 ++++++++-------- 4 files changed, 30 insertions(+), 19 deletions(-) diff --git a/rpc/handlers.go b/rpc/handlers.go index 80ec88279f..7657d021a8 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -96,7 +96,7 @@ type Handler struct { version string newHeads *feed.Feed[*core.Header] - reorgs *feed.Feed[*sync.ReorgData] + reorgs *feed.Feed[*sync.ReorgBlockRange] pendingTxs *feed.Feed[[]core.Transaction] idgen func() uint64 @@ -138,7 +138,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V }, version: version, newHeads: feed.New[*core.Header](), - reorgs: feed.New[*sync.ReorgData](), + reorgs: feed.New[*sync.ReorgBlockRange](), pendingTxs: feed.New[[]core.Transaction](), subscriptions: make(map[uint64]*subscription), diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go index 5f50500499..0b3fa84eb3 100644 --- a/rpc/subscriptions.go +++ b/rpc/subscriptions.go @@ -284,7 +284,7 @@ func (h *Handler) processPendingTxs( // filterTxs filters the transactions based on the getDetails flag. // If getDetails is true, response will contain the transaction details. // If getDetails is false, response will only contain the transaction hashes. -func (h *Handler) filterTxs(pendingTxs []core.Transaction, getDetails bool, senderAddr []felt.Felt) interface{} { +func (h *Handler) filterTxs(pendingTxs []core.Transaction, getDetails bool, senderAddr []felt.Felt) any { if getDetails { return h.filterTxDetails(pendingTxs, senderAddr) } @@ -386,8 +386,7 @@ func (h *Handler) resolveBlockRange(blockID *BlockID) (*core.Header, *core.Heade // sendHistoricalHeaders sends a range of headers from the start header until the latest header func (h *Handler) sendHistoricalHeaders( ctx context.Context, - startHeader *core.Header, - latestHeader *core.Header, + startHeader, latestHeader *core.Header, w jsonrpc.Conn, id uint64, ) error { @@ -448,7 +447,7 @@ func (h *Handler) sendHeader(w jsonrpc.Conn, header *core.Header, id uint64) err return err } -func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*sync.ReorgData], w jsonrpc.Conn, id uint64) { +func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*sync.ReorgBlockRange], w jsonrpc.Conn, id uint64) { for { select { case <-ctx.Done(): @@ -462,13 +461,25 @@ func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription } } -func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgData, id uint64) error { +type ReorgEvent struct { + StartBlockHash *felt.Felt `json:"starting_block_hash"` + StartBlockNum uint64 `json:"starting_block_number"` + EndBlockHash *felt.Felt `json:"ending_block_hash"` + EndBlockNum uint64 `json:"ending_block_number"` +} + +func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgBlockRange, id uint64) error { resp, err := json.Marshal(jsonrpc.Request{ Version: "2.0", Method: "starknet_subscriptionReorg", Params: map[string]any{ "subscription_id": id, - "result": reorg, + "result": &ReorgEvent{ + StartBlockHash: reorg.StartBlockHash, + StartBlockNum: reorg.StartBlockNum, + EndBlockHash: reorg.EndBlockHash, + EndBlockNum: reorg.EndBlockNum, + }, }, }) if err != nil { diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go index a58b68bd72..c739709661 100644 --- a/rpc/subscriptions_test.go +++ b/rpc/subscriptions_test.go @@ -332,14 +332,14 @@ func TestSubscribeEvents(t *testing.T) { type fakeSyncer struct { newHeads *feed.Feed[*core.Header] - reorgs *feed.Feed[*sync.ReorgData] + reorgs *feed.Feed[*sync.ReorgBlockRange] pendingTxs *feed.Feed[[]core.Transaction] } func newFakeSyncer() *fakeSyncer { return &fakeSyncer{ newHeads: feed.New[*core.Header](), - reorgs: feed.New[*sync.ReorgData](), + reorgs: feed.New[*sync.ReorgBlockRange](), pendingTxs: feed.New[[]core.Transaction](), } } @@ -623,7 +623,7 @@ func TestSubscriptionReorg(t *testing.T) { } // Simulate a reorg - syncer.reorgs.Send(&sync.ReorgData{ + syncer.reorgs.Send(&sync.ReorgBlockRange{ StartBlockHash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), StartBlockNum: 0, EndBlockHash: utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"), diff --git a/sync/sync.go b/sync/sync.go index ae9617429d..cd9fe2c3a0 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -39,7 +39,7 @@ type HeaderSubscription struct { } type ReorgSubscription struct { - *feed.Subscription[*ReorgData] + *feed.Subscription[*ReorgBlockRange] } type PendingTxSubscription struct { @@ -77,15 +77,15 @@ func (n *NoopSynchronizer) SubscribeNewHeads() HeaderSubscription { } func (n *NoopSynchronizer) SubscribeReorg() ReorgSubscription { - return ReorgSubscription{feed.New[*ReorgData]().Subscribe()} + return ReorgSubscription{feed.New[*ReorgBlockRange]().Subscribe()} } func (n *NoopSynchronizer) SubscribePendingTxs() PendingTxSubscription { return PendingTxSubscription{feed.New[[]core.Transaction]().Subscribe()} } -// ReorgData represents data about reorganised blocks, starting and ending block number and hash -type ReorgData struct { +// ReorgBlockRange represents data about reorganised blocks, starting and ending block number and hash +type ReorgBlockRange struct { // StartBlockHash is the hash of the first known block of the orphaned chain StartBlockHash *felt.Felt `json:"starting_block_hash"` // StartBlockNum is the number of the first known block of the orphaned chain @@ -117,7 +117,7 @@ type Synchronizer struct { startingBlockNumber *uint64 highestBlockHeader atomic.Pointer[core.Header] newHeads *feed.Feed[*core.Header] - reorgFeed *feed.Feed[*ReorgData] + reorgFeed *feed.Feed[*ReorgBlockRange] pendingTxsFeed *feed.Feed[[]core.Transaction] log utils.SimpleLogger @@ -128,7 +128,7 @@ type Synchronizer struct { catchUpMode bool plugin junoplugin.JunoPlugin - currReorg *ReorgData // If nil, no reorg is happening + currReorg *ReorgBlockRange // If nil, no reorg is happening } func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log utils.SimpleLogger, @@ -140,7 +140,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log starknetData: starkNetData, log: log, newHeads: feed.New[*core.Header](), - reorgFeed: feed.New[*ReorgData](), + reorgFeed: feed.New[*ReorgBlockRange](), pendingTxsFeed: feed.New[[]core.Transaction](), pendingPollInterval: pendingPollInterval, listener: &SelectiveListener{}, @@ -446,7 +446,7 @@ func (s *Synchronizer) revertHead(forkBlock *core.Block) { } if s.currReorg == nil { // first block of the reorg - s.currReorg = &ReorgData{ + s.currReorg = &ReorgBlockRange{ StartBlockHash: localHead, StartBlockNum: head.Number, EndBlockHash: localHead,