Skip to content

Commit

Permalink
Implement starknet_subscriptionReorg
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann committed Oct 15, 2024
1 parent 35671a1 commit fcd5e8f
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 12 deletions.
14 changes: 14 additions & 0 deletions mocks/mock_synchronizer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 46 additions & 3 deletions rpc/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/feed"
"github.com/NethermindEth/juno/jsonrpc"
"github.com/NethermindEth/juno/sync"
"github.com/sourcegraph/conc"
)

const (
Expand Down Expand Up @@ -80,15 +82,18 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub
h.mu.Unlock()

headerSub := h.newHeads.Subscribe()
reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the new heads subscription
sub.wg.Go(func() {
defer func() {
h.unsubscribe(sub, id)
headerSub.Unsubscribe()
reorgSub.Unsubscribe()
}()

newHeadersChan := make(chan *core.Header, MaxBlocksBack)
var wg conc.WaitGroup

sub.wg.Go(func() {
newHeadersChan := make(chan *core.Header, MaxBlocksBack)
wg.Go(func() {
h.bufferNewHeaders(subscriptionCtx, headerSub, newHeadersChan)
})

Expand All @@ -97,7 +102,15 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub
return

Check warning on line 102 in rpc/events.go

View check run for this annotation

Codecov / codecov/patch

rpc/events.go#L101-L102

Added lines #L101 - L102 were not covered by tests
}

h.processNewHeaders(subscriptionCtx, newHeadersChan, w, id)
wg.Go(func() {
h.processNewHeaders(subscriptionCtx, newHeadersChan, w, id)
})

wg.Go(func() {
h.processReorgs(subscriptionCtx, reorgSub, w, id)
})

wg.Wait()
})

return &SubscriptionID{ID: id}, nil
Expand Down Expand Up @@ -204,6 +217,36 @@ func (h *Handler) sendHeader(w jsonrpc.Conn, header *core.Header, id uint64) err
return err
}

func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*sync.ReorgData], w jsonrpc.Conn, id uint64) {
for {
select {
case <-ctx.Done():
return
case reorg := <-reorgSub.Recv():
if err := h.sendReorg(w, reorg, id); err != nil {
h.log.Warnw("Error sending reorg", "err", err)

Check warning on line 227 in rpc/events.go

View check run for this annotation

Codecov / codecov/patch

rpc/events.go#L227

Added line #L227 was not covered by tests
return
}
}
}
}

func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgData, id uint64) error {
resp, err := json.Marshal(jsonrpc.Request{
Version: "2.0",
Method: "starknet_subscriptionReorg",
Params: map[string]any{
"subscription_id": id,
"result": reorg,
},
})
if err != nil {
return err

Check warning on line 244 in rpc/events.go

View check run for this annotation

Codecov / codecov/patch

rpc/events.go#L244

Added line #L244 was not covered by tests
}
_, err = w.Write(resp)
return err
}

func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
Expand Down
73 changes: 65 additions & 8 deletions rpc/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
var emptyCommitments = core.BlockCommitments{}

const (
testResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`
newHeadsResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`
)

func TestEvents(t *testing.T) {
Expand Down Expand Up @@ -238,12 +238,24 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool {

type fakeSyncer struct {
newHeads *feed.Feed[*core.Header]
reorgs *feed.Feed[*sync.ReorgData]
}

func newFakeSyncer() *fakeSyncer {
return &fakeSyncer{
newHeads: feed.New[*core.Header](),
reorgs: feed.New[*sync.ReorgData](),
}
}

func (fs *fakeSyncer) SubscribeNewHeads() sync.HeaderSubscription {
return sync.HeaderSubscription{Subscription: fs.newHeads.Subscribe()}
}

func (fs *fakeSyncer) SubscribeReorg() sync.ReorgSubscription {
return sync.ReorgSubscription{Subscription: fs.reorgs.Subscribe()}
}

func (fs *fakeSyncer) StartingBlockNumber() (uint64, error) {
return 0, nil
}
Expand All @@ -256,7 +268,7 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
t.Parallel()

chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()}
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -289,7 +301,7 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
syncer.newHeads.Send(testHeader(t))

// Receive a block header.
want := fmt.Sprintf(testResponse, id.ID)
want := fmt.Sprintf(newHeadsResponse, id.ID)
got := make([]byte, len(want))
_, err := clientConn.Read(got)
require.NoError(t, err)
Expand Down Expand Up @@ -323,7 +335,7 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {

log := utils.NewNopZapLogger()
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()}
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", log)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -377,11 +389,11 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
syncer.newHeads.Send(testHeader(t))

// Receive a block header.
firstWant = fmt.Sprintf(testResponse, firstID)
firstWant = fmt.Sprintf(newHeadsResponse, firstID)
_, firstGot, err = conn1.Read(ctx)
require.NoError(t, err)
require.Equal(t, firstWant, string(firstGot))
secondWant = fmt.Sprintf(testResponse, secondID)
secondWant = fmt.Sprintf(newHeadsResponse, secondID)
_, secondGot, err = conn2.Read(ctx)
require.NoError(t, err)
require.Equal(t, secondWant, string(secondGot))
Expand All @@ -407,7 +419,7 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) {
assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil))

chain = blockchain.New(testDB, &utils.Mainnet)
syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()}
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -450,7 +462,7 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) {
syncer.newHeads.Send(testHeader(t))

// Check new block content
want = fmt.Sprintf(testResponse, id.ID)
want = fmt.Sprintf(newHeadsResponse, id.ID)
got = make([]byte, len(want))
_, err = clientConn.Read(got)
require.NoError(t, err)
Expand Down Expand Up @@ -478,3 +490,48 @@ func testHeader(t *testing.T) *core.Header {
}
return header
}

func TestSubscriptionReorg(t *testing.T) {
t.Parallel()

chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

go func() {
require.NoError(t, handler.Run(ctx))
}()
time.Sleep(50 * time.Millisecond)

serverConn, clientConn := net.Pipe()
t.Cleanup(func() {
require.NoError(t, serverConn.Close())
require.NoError(t, clientConn.Close())
})

subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn})

// Subscribe to new heads which will send a
id, rpcErr := handler.SubscribeNewHeads(subCtx, nil)
require.Nil(t, rpcErr)
require.NotZero(t, id)

// Simulate a reorg
syncer.reorgs.Send(&sync.ReorgData{
StartBlockHash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"),
StartBlockNum: 0,
EndBlockHash: utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"),
EndBlockNum: 2,
})

// Receive reorg event
want := `{"jsonrpc":"2.0","method":"starknet_subscriptionReorg","params":{"result":{"starting_block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","starting_block_number":0,"ending_block_hash":"0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86","ending_block_number":2},"subscription_id":%d}}`
want = fmt.Sprintf(want, id.ID)
got := make([]byte, len(want))
_, err := clientConn.Read(got)
require.NoError(t, err)
require.Equal(t, want, string(got))
}
8 changes: 7 additions & 1 deletion rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Handler struct {

version string
newHeads *feed.Feed[*core.Header]
reorgs *feed.Feed[*sync.ReorgData]

idgen func() uint64
mu stdsync.Mutex // protects subscriptions.
Expand Down Expand Up @@ -117,6 +118,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V
},
version: version,
newHeads: feed.New[*core.Header](),
reorgs: feed.New[*sync.ReorgData](),
subscriptions: make(map[uint64]*subscription),

blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize),
Expand Down Expand Up @@ -152,8 +154,12 @@ func (h *Handler) WithGateway(gatewayClient Gateway) *Handler {

func (h *Handler) Run(ctx context.Context) error {
newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription
reorgsSub := h.syncReader.SubscribeReorg().Subscription
defer newHeadsSub.Unsubscribe()
feed.Tee[*core.Header](newHeadsSub, h.newHeads)
defer reorgsSub.Unsubscribe()
feed.Tee(newHeadsSub, h.newHeads)
feed.Tee(reorgsSub, h.reorgs)

<-ctx.Done()
for _, sub := range h.subscriptions {
sub.wg.Wait()
Expand Down
49 changes: 49 additions & 0 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ type HeaderSubscription struct {
*feed.Subscription[*core.Header]
}

type ReorgSubscription struct {
*feed.Subscription[*ReorgData]
}

// Todo: Since this is also going to be implemented by p2p package we should move this interface to node package
//
//go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader
type Reader interface {
StartingBlockNumber() (uint64, error)
HighestBlockHeader() *core.Header
SubscribeNewHeads() HeaderSubscription
SubscribeReorg() ReorgSubscription
}

// This is temporary and will be removed once the p2p synchronizer implements this interface.
Expand All @@ -58,6 +63,22 @@ func (n *NoopSynchronizer) SubscribeNewHeads() HeaderSubscription {
return HeaderSubscription{feed.New[*core.Header]().Subscribe()}
}

func (n *NoopSynchronizer) SubscribeReorg() ReorgSubscription {
return ReorgSubscription{feed.New[*ReorgData]().Subscribe()}

Check warning on line 67 in sync/sync.go

View check run for this annotation

Codecov / codecov/patch

sync/sync.go#L66-L67

Added lines #L66 - L67 were not covered by tests
}

// ReorgData represents data about reorganised blocks, starting and ending block number and hash
type ReorgData struct {
// StartBlockHash is the hash of the first known block of the orphaned chain
StartBlockHash *felt.Felt `json:"starting_block_hash"`
// StartBlockNum is the number of the first known block of the orphaned chain
StartBlockNum uint64 `json:"starting_block_number"`
// The last known block of the orphaned chain
EndBlockHash *felt.Felt `json:"ending_block_hash"`
// Number of the last known block of the orphaned chain
EndBlockNum uint64 `json:"ending_block_number"`
}

// Synchronizer manages a list of StarknetData to fetch the latest blockchain updates
type Synchronizer struct {
blockchain *blockchain.Blockchain
Expand All @@ -66,12 +87,15 @@ type Synchronizer struct {
startingBlockNumber *uint64
highestBlockHeader atomic.Pointer[core.Header]
newHeads *feed.Feed[*core.Header]
reorgFeed *feed.Feed[*ReorgData]

log utils.SimpleLogger
listener EventListener

pendingPollInterval time.Duration
catchUpMode bool

currReorg *ReorgData // If nil, no reorg is happening
}

func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
Expand All @@ -82,6 +106,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
starknetData: starkNetData,
log: log,
newHeads: feed.New[*core.Header](),
reorgFeed: feed.New[*ReorgData](),
pendingPollInterval: pendingPollInterval,
listener: &SelectiveListener{},
readOnlyBlockchain: readOnlyBlockchain,
Expand Down Expand Up @@ -228,6 +253,11 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header)
}

if s.currReorg != nil {
s.reorgFeed.Send(s.currReorg)
s.currReorg = nil // reset the reorg data
}

s.newHeads.Send(block.Header)
s.log.Infow("Stored Block", "number", block.Number, "hash",
block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString())
Expand Down Expand Up @@ -324,6 +354,19 @@ func (s *Synchronizer) revertHead(forkBlock *core.Block) {
} else {
s.log.Infow("Reverted HEAD", "reverted", localHead)
}

if s.currReorg == nil { // first block of the reorg
s.currReorg = &ReorgData{
StartBlockHash: localHead,
StartBlockNum: head.Number,
EndBlockHash: localHead,
EndBlockNum: head.Number,
}
} else { // not the first block of the reorg, adjust the starting block
s.currReorg.StartBlockHash = localHead
s.currReorg.StartBlockNum = head.Number
}

s.listener.OnReorg(head.Number)
}

Expand Down Expand Up @@ -439,3 +482,9 @@ func (s *Synchronizer) SubscribeNewHeads() HeaderSubscription {
Subscription: s.newHeads.Subscribe(),
}
}

func (s *Synchronizer) SubscribeReorg() ReorgSubscription {
return ReorgSubscription{
Subscription: s.reorgFeed.Subscribe(),
}
}
Loading

0 comments on commit fcd5e8f

Please sign in to comment.