Skip to content

Commit

Permalink
create ReorgEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann committed Dec 19, 2024
1 parent 32ff715 commit 45c0dd6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 19 deletions.
4 changes: 2 additions & 2 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Handler struct {

version string
newHeads *feed.Feed[*core.Header]
reorgs *feed.Feed[*sync.ReorgData]
reorgs *feed.Feed[*sync.ReorgBlockRange]
pendingTxs *feed.Feed[[]core.Transaction]

idgen func() uint64
Expand Down Expand Up @@ -138,7 +138,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V
},
version: version,
newHeads: feed.New[*core.Header](),
reorgs: feed.New[*sync.ReorgData](),
reorgs: feed.New[*sync.ReorgBlockRange](),
pendingTxs: feed.New[[]core.Transaction](),
subscriptions: make(map[uint64]*subscription),

Expand Down
23 changes: 17 additions & 6 deletions rpc/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (h *Handler) processPendingTxs(
// filterTxs filters the transactions based on the getDetails flag.
// If getDetails is true, response will contain the transaction details.
// If getDetails is false, response will only contain the transaction hashes.
func (h *Handler) filterTxs(pendingTxs []core.Transaction, getDetails bool, senderAddr []felt.Felt) interface{} {
func (h *Handler) filterTxs(pendingTxs []core.Transaction, getDetails bool, senderAddr []felt.Felt) any {
if getDetails {
return h.filterTxDetails(pendingTxs, senderAddr)
}
Expand Down Expand Up @@ -386,8 +386,7 @@ func (h *Handler) resolveBlockRange(blockID *BlockID) (*core.Header, *core.Heade
// sendHistoricalHeaders sends a range of headers from the start header until the latest header
func (h *Handler) sendHistoricalHeaders(
ctx context.Context,
startHeader *core.Header,
latestHeader *core.Header,
startHeader, latestHeader *core.Header,
w jsonrpc.Conn,
id uint64,
) error {
Expand Down Expand Up @@ -448,7 +447,7 @@ func (h *Handler) sendHeader(w jsonrpc.Conn, header *core.Header, id uint64) err
return err
}

func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*sync.ReorgData], w jsonrpc.Conn, id uint64) {
func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*sync.ReorgBlockRange], w jsonrpc.Conn, id uint64) {
for {
select {
case <-ctx.Done():
Expand All @@ -462,13 +461,25 @@ func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription
}
}

func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgData, id uint64) error {
type ReorgEvent struct {
StartBlockHash *felt.Felt `json:"starting_block_hash"`
StartBlockNum uint64 `json:"starting_block_number"`
EndBlockHash *felt.Felt `json:"ending_block_hash"`
EndBlockNum uint64 `json:"ending_block_number"`
}

func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgBlockRange, id uint64) error {
resp, err := json.Marshal(jsonrpc.Request{
Version: "2.0",
Method: "starknet_subscriptionReorg",
Params: map[string]any{
"subscription_id": id,
"result": reorg,
"result": &ReorgEvent{
StartBlockHash: reorg.StartBlockHash,
StartBlockNum: reorg.StartBlockNum,
EndBlockHash: reorg.EndBlockHash,
EndBlockNum: reorg.EndBlockNum,
},
},
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions rpc/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,14 @@ func TestSubscribeEvents(t *testing.T) {

type fakeSyncer struct {
newHeads *feed.Feed[*core.Header]
reorgs *feed.Feed[*sync.ReorgData]
reorgs *feed.Feed[*sync.ReorgBlockRange]
pendingTxs *feed.Feed[[]core.Transaction]
}

func newFakeSyncer() *fakeSyncer {
return &fakeSyncer{
newHeads: feed.New[*core.Header](),
reorgs: feed.New[*sync.ReorgData](),
reorgs: feed.New[*sync.ReorgBlockRange](),
pendingTxs: feed.New[[]core.Transaction](),
}
}
Expand Down Expand Up @@ -623,7 +623,7 @@ func TestSubscriptionReorg(t *testing.T) {
}

// Simulate a reorg
syncer.reorgs.Send(&sync.ReorgData{
syncer.reorgs.Send(&sync.ReorgBlockRange{
StartBlockHash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"),
StartBlockNum: 0,
EndBlockHash: utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"),
Expand Down
16 changes: 8 additions & 8 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type HeaderSubscription struct {
}

type ReorgSubscription struct {
*feed.Subscription[*ReorgData]
*feed.Subscription[*ReorgBlockRange]
}

type PendingTxSubscription struct {
Expand Down Expand Up @@ -77,15 +77,15 @@ func (n *NoopSynchronizer) SubscribeNewHeads() HeaderSubscription {
}

func (n *NoopSynchronizer) SubscribeReorg() ReorgSubscription {
return ReorgSubscription{feed.New[*ReorgData]().Subscribe()}
return ReorgSubscription{feed.New[*ReorgBlockRange]().Subscribe()}
}

func (n *NoopSynchronizer) SubscribePendingTxs() PendingTxSubscription {
return PendingTxSubscription{feed.New[[]core.Transaction]().Subscribe()}
}

// ReorgData represents data about reorganised blocks, starting and ending block number and hash
type ReorgData struct {
// ReorgBlockRange represents data about reorganised blocks, starting and ending block number and hash
type ReorgBlockRange struct {
// StartBlockHash is the hash of the first known block of the orphaned chain
StartBlockHash *felt.Felt `json:"starting_block_hash"`
// StartBlockNum is the number of the first known block of the orphaned chain
Expand Down Expand Up @@ -117,7 +117,7 @@ type Synchronizer struct {
startingBlockNumber *uint64
highestBlockHeader atomic.Pointer[core.Header]
newHeads *feed.Feed[*core.Header]
reorgFeed *feed.Feed[*ReorgData]
reorgFeed *feed.Feed[*ReorgBlockRange]
pendingTxsFeed *feed.Feed[[]core.Transaction]

log utils.SimpleLogger
Expand All @@ -128,7 +128,7 @@ type Synchronizer struct {
catchUpMode bool
plugin junoplugin.JunoPlugin

currReorg *ReorgData // If nil, no reorg is happening
currReorg *ReorgBlockRange // If nil, no reorg is happening
}

func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log utils.SimpleLogger,
Expand All @@ -140,7 +140,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log
starknetData: starkNetData,
log: log,
newHeads: feed.New[*core.Header](),
reorgFeed: feed.New[*ReorgData](),
reorgFeed: feed.New[*ReorgBlockRange](),
pendingTxsFeed: feed.New[[]core.Transaction](),
pendingPollInterval: pendingPollInterval,
listener: &SelectiveListener{},
Expand Down Expand Up @@ -446,7 +446,7 @@ func (s *Synchronizer) revertHead(forkBlock *core.Block) {
}

if s.currReorg == nil { // first block of the reorg
s.currReorg = &ReorgData{
s.currReorg = &ReorgBlockRange{
StartBlockHash: localHead,
StartBlockNum: head.Number,
EndBlockHash: localHead,
Expand Down

0 comments on commit 45c0dd6

Please sign in to comment.