diff --git a/docs/docs/faq.md b/docs/docs/faq.md index 78828677ce..1ae6c6a56f 100644 --- a/docs/docs/faq.md +++ b/docs/docs/faq.md @@ -74,7 +74,7 @@ docker logs -f juno
How can I get real-time updates of new blocks? -The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain. +The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.
diff --git a/docs/docs/websocket.md b/docs/docs/websocket.md index ba55e24db8..4614caedc0 100644 --- a/docs/docs/websocket.md +++ b/docs/docs/websocket.md @@ -96,7 +96,7 @@ Get the most recent accepted block hash and number with the `starknet_blockHashA ## Subscribe to newly created blocks -The WebSocket server provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain: +The WebSocket server provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain: @@ -104,8 +104,7 @@ The WebSocket server provides a `juno_subscribeNewHeads` method that emits an ev ```json { "jsonrpc": "2.0", - "method": "juno_subscribeNewHeads", - "params": [], + "method": "starknet_subscribeNewHeads", "id": 1 } ``` @@ -129,7 +128,7 @@ When a new block is added, you will receive a message like this: ```json { "jsonrpc": "2.0", - "method": "juno_subscribeNewHeads", + "method": "starknet_subscriptionNewHeads", "params": { "result": { "block_hash": "0x840660a07a17ae6a55d39fb6d366698ecda11e02280ca3e9ca4b4f1bad741c", @@ -149,7 +148,7 @@ When a new block is added, you will receive a message like this: "l1_da_mode": "BLOB", "starknet_version": "0.13.1.1" }, - "subscription": 16570962336122680234 + "subscription_id": 16570962336122680234 } } ``` diff --git a/jsonrpc/server.go b/jsonrpc/server.go index c63f15c849..863b1594f5 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -422,8 +422,17 @@ func isBatch(reader *bufio.Reader) bool { return false } -func isNil(i any) bool { - return i == nil || reflect.ValueOf(i).IsNil() +func isNilOrEmpty(i any) (bool, error) { + if utils.IsNil(i) { + return true, nil + } + + switch reflect.TypeOf(i).Kind() { + case reflect.Slice, reflect.Array, reflect.Map: + return reflect.ValueOf(i).Len() == 0, nil + default: + return false, fmt.Errorf("impossible param type: check request.isSane") + } } func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, http.Header, error) { @@ -471,7 +480,7 @@ func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, ht header = (tuple[1].Interface()).(http.Header) } - if errAny := tuple[errorIndex].Interface(); !isNil(errAny) { + if errAny := tuple[errorIndex].Interface(); !utils.IsNil(errAny) { res.Error = errAny.(*Error) if res.Error.Code == InternalError { s.listener.OnRequestFailed(req.Method, res.Error) @@ -486,6 +495,7 @@ func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, ht return res, header, nil } +//nolint:gocyclo func (s *Server) buildArguments(ctx context.Context, params any, method Method) ([]reflect.Value, error) { handlerType := reflect.TypeOf(method.Handler) @@ -498,7 +508,12 @@ func (s *Server) buildArguments(ctx context.Context, params any, method Method) addContext = 1 } - if isNil(params) { + isNilOrEmpty, err := isNilOrEmpty(params) + if err != nil { + return nil, err + } + + if isNilOrEmpty { allParamsAreOptional := utils.All(method.Params, func(p Parameter) bool { return p.Optional }) diff --git a/jsonrpc/server_test.go b/jsonrpc/server_test.go index 603db85915..71218a429d 100644 --- a/jsonrpc/server_test.go +++ b/jsonrpc/server_test.go @@ -174,6 +174,13 @@ func TestHandle(t *testing.T) { return 0, jsonrpc.Err(jsonrpc.InternalError, nil) }, }, + { + Name: "singleOptionalParam", + Params: []jsonrpc.Parameter{{Name: "param", Optional: true}}, + Handler: func(param *int) (int, *jsonrpc.Error) { + return 0, nil + }, + }, } listener := CountingEventListener{} @@ -475,6 +482,14 @@ func TestHandle(t *testing.T) { res: `{"jsonrpc":"2.0","error":{"code":-32603,"message":"Internal error"},"id":1}`, checkFailedEvent: true, }, + "empty optional param": { + req: `{"jsonrpc": "2.0", "method": "singleOptionalParam", "params": {}, "id": 1}`, + res: `{"jsonrpc":"2.0","result":0,"id":1}`, + }, + "null optional param": { + req: `{"jsonrpc": "2.0", "method": "singleOptionalParam", "id": 1}`, + res: `{"jsonrpc":"2.0","result":0,"id":1}`, + }, } for desc, test := range tests { diff --git a/mocks/mock_synchronizer.go b/mocks/mock_synchronizer.go index 910e5007e6..d04a733db0 100644 --- a/mocks/mock_synchronizer.go +++ b/mocks/mock_synchronizer.go @@ -127,3 +127,31 @@ func (mr *MockSyncReaderMockRecorder) SubscribeNewHeads() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeNewHeads", reflect.TypeOf((*MockSyncReader)(nil).SubscribeNewHeads)) } + +// SubscribePendingTxs mocks base method. +func (m *MockSyncReader) SubscribePendingTxs() sync.PendingTxSubscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribePendingTxs") + ret0, _ := ret[0].(sync.PendingTxSubscription) + return ret0 +} + +// SubscribePendingTxs indicates an expected call of SubscribePendingTxs. +func (mr *MockSyncReaderMockRecorder) SubscribePendingTxs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePendingTxs", reflect.TypeOf((*MockSyncReader)(nil).SubscribePendingTxs)) +} + +// SubscribeReorg mocks base method. +func (m *MockSyncReader) SubscribeReorg() sync.ReorgSubscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeReorg") + ret0, _ := ret[0].(sync.ReorgSubscription) + return ret0 +} + +// SubscribeReorg indicates an expected call of SubscribeReorg. +func (mr *MockSyncReaderMockRecorder) SubscribeReorg() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeReorg", reflect.TypeOf((*MockSyncReader)(nil).SubscribeReorg)) +} diff --git a/rpc/events.go b/rpc/events.go index a7298486f8..0f403c546f 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -1,9 +1,6 @@ package rpc import ( - "context" - "encoding/json" - "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/jsonrpc" @@ -51,71 +48,6 @@ type SubscriptionID struct { /**************************************************** Events Handlers *****************************************************/ - -func (h *Handler) SubscribeNewHeads(ctx context.Context) (uint64, *jsonrpc.Error) { - w, ok := jsonrpc.ConnFromContext(ctx) - if !ok { - return 0, jsonrpc.Err(jsonrpc.MethodNotFound, nil) - } - - id := h.idgen() - subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) - sub := &subscription{ - cancel: subscriptionCtxCancel, - conn: w, - } - h.mu.Lock() - h.subscriptions[id] = sub - h.mu.Unlock() - headerSub := h.newHeads.Subscribe() - sub.wg.Go(func() { - defer func() { - headerSub.Unsubscribe() - h.unsubscribe(sub, id) - }() - for { - select { - case <-subscriptionCtx.Done(): - return - case header := <-headerSub.Recv(): - resp, err := json.Marshal(SubscriptionResponse{ - Version: "2.0", - Method: "juno_subscribeNewHeads", - Params: map[string]any{ - "result": adaptBlockHeader(header), - "subscription": id, - }, - }) - if err != nil { - h.log.Warnw("Error marshalling a subscription reply", "err", err) - return - } - if _, err = w.Write(resp); err != nil { - h.log.Warnw("Error writing a subscription reply", "err", err) - return - } - } - } - }) - return id, nil -} - -func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) { - w, ok := jsonrpc.ConnFromContext(ctx) - if !ok { - return false, jsonrpc.Err(jsonrpc.MethodNotFound, nil) - } - h.mu.Lock() - sub, ok := h.subscriptions[id] - h.mu.Unlock() // Don't defer since h.unsubscribe acquires the lock. - if !ok || !sub.conn.Equal(w) { - return false, ErrSubscriptionNotFound - } - sub.cancel() - sub.wg.Wait() // Let the subscription finish before responding. - return true, nil -} - // Events gets the events matching a filter // // It follows the specification defined here: diff --git a/rpc/events_test.go b/rpc/events_test.go index 6655b6166e..e54765de2d 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -2,24 +2,16 @@ package rpc_test import ( "context" - "fmt" - "io" - "net" - "net/http/httptest" "testing" - "time" "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/clients/feeder" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db/pebble" - "github.com/NethermindEth/juno/jsonrpc" "github.com/NethermindEth/juno/rpc" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" - "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" - "github.com/coder/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -214,188 +206,3 @@ func TestEvents(t *testing.T) { assert.Equal(t, utils.HexToFelt(t, "0x785c2ada3f53fbc66078d47715c27718f92e6e48b96372b36e5197de69b82b5"), events.Events[0].TransactionHash) }) } - -type fakeConn struct { - w io.Writer -} - -func (fc *fakeConn) Write(p []byte) (int, error) { - return fc.w.Write(p) -} - -func (fc *fakeConn) Equal(other jsonrpc.Conn) bool { - fc2, ok := other.(*fakeConn) - if !ok { - return false - } - return fc.w == fc2.w -} - -func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { - t.Parallel() - log := utils.NewNopZapLogger() - n := utils.Ptr(utils.Mainnet) - client := feeder.NewTestClient(t, n) - gw := adaptfeeder.New(client) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - testDB := pebble.NewMemTest(t) - chain := blockchain.New(pebble.NewMemTest(t), n, nil) - syncer := sync.New(chain, gw, log, 0, false, testDB) - handler := rpc.New(chain, syncer, nil, "", log) - - go func() { - require.NoError(t, handler.Run(ctx)) - }() - // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. - // Sleep for a moment just in case. - time.Sleep(50 * time.Millisecond) - - serverConn, clientConn := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) - }) - - // Subscribe without setting the connection on the context. - id, rpcErr := handler.SubscribeNewHeads(ctx) - require.Zero(t, id) - require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code) - - // Sync blocks and then revert head. - // This is a super hacky way to deterministically receive a single block on the subscription. - // It would be nicer if we could tell the synchronizer to exit after a certain block height, but, alas, we can't do that. - syncCtx, syncCancel := context.WithTimeout(context.Background(), time.Second) - require.NoError(t, syncer.Run(syncCtx)) - syncCancel() - // This is technically an unsafe thing to do. We're modifying the synchronizer's blockchain while it is owned by the synchronizer. - // But it works. - require.NoError(t, chain.RevertHead()) - - // Subscribe. - subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - id, rpcErr = handler.SubscribeNewHeads(subCtx) - require.Nil(t, rpcErr) - - // Sync the block we reverted above. - syncCtx, syncCancel = context.WithTimeout(context.Background(), 250*time.Millisecond) - require.NoError(t, syncer.Run(syncCtx)) - syncCancel() - - // Receive a block header. - want := `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription":%d}}` - want = fmt.Sprintf(want, id) - got := make([]byte, len(want)) - _, err := clientConn.Read(got) - require.NoError(t, err) - require.Equal(t, want, string(got)) - - // Unsubscribe without setting the connection on the context. - ok, rpcErr := handler.Unsubscribe(ctx, id) - require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code) - require.False(t, ok) - - // Unsubscribe on correct connection with the incorrect id. - ok, rpcErr = handler.Unsubscribe(subCtx, id+1) - require.Equal(t, rpc.ErrSubscriptionNotFound, rpcErr) - require.False(t, ok) - - // Unsubscribe on incorrect connection with the correct id. - subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{}) - ok, rpcErr = handler.Unsubscribe(subCtx, id) - require.Equal(t, rpc.ErrSubscriptionNotFound, rpcErr) - require.False(t, ok) - - // Unsubscribe on correct connection with the correct id. - subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - ok, rpcErr = handler.Unsubscribe(subCtx, id) - require.Nil(t, rpcErr) - require.True(t, ok) -} - -func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { - t.Parallel() - log := utils.NewNopZapLogger() - n := utils.Ptr(utils.Mainnet) - feederClient := feeder.NewTestClient(t, n) - gw := adaptfeeder.New(feederClient) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - testDB := pebble.NewMemTest(t) - chain := blockchain.New(testDB, n, nil) - syncer := sync.New(chain, gw, log, 0, false, testDB) - handler := rpc.New(chain, syncer, nil, "", log) - go func() { - require.NoError(t, handler.Run(ctx)) - }() - // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. - // Sleep for a moment just in case. - time.Sleep(50 * time.Millisecond) - - // Sync blocks and then revert head. - // This is a super hacky way to deterministically receive a single block on the subscription. - // It would be nicer if we could tell the synchronizer to exit after a certain block height, but, alas, we can't do that. - syncCtx, syncCancel := context.WithTimeout(context.Background(), time.Second) - require.NoError(t, syncer.Run(syncCtx)) - syncCancel() - // This is technically an unsafe thing to do. We're modifying the synchronizer's blockchain while it is owned by the synchronizer. - // But it works. - require.NoError(t, chain.RevertHead()) - - server := jsonrpc.NewServer(1, log) - require.NoError(t, server.RegisterMethods(jsonrpc.Method{ - Name: "juno_subscribeNewHeads", - Handler: handler.SubscribeNewHeads, - }, jsonrpc.Method{ - Name: "juno_unsubscribe", - Params: []jsonrpc.Parameter{{Name: "id"}}, - Handler: handler.Unsubscribe, - })) - ws := jsonrpc.NewWebsocket(server, nil, log) - httpSrv := httptest.NewServer(ws) - conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) - require.NoError(t, err) - conn2, _, err := websocket.Dial(ctx, httpSrv.URL, nil) - require.NoError(t, err) - - subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"juno_subscribeNewHeads"}`) - - firstID := uint64(1) - secondID := uint64(2) - handler.WithIDGen(func() uint64 { return firstID }) - require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg)) - - want := `{"jsonrpc":"2.0","result":%d,"id":1}` - firstWant := fmt.Sprintf(want, firstID) - _, firstGot, err := conn1.Read(ctx) - require.NoError(t, err) - require.Equal(t, firstWant, string(firstGot)) - - handler.WithIDGen(func() uint64 { return secondID }) - require.NoError(t, conn2.Write(ctx, websocket.MessageText, subscribeMsg)) - secondWant := fmt.Sprintf(want, secondID) - _, secondGot, err := conn2.Read(ctx) - require.NoError(t, err) - require.Equal(t, secondWant, string(secondGot)) - - // Now we're subscribed. Sync the block we reverted above. - syncCtx, syncCancel = context.WithTimeout(context.Background(), 250*time.Millisecond) - require.NoError(t, syncer.Run(syncCtx)) - syncCancel() - - // Receive a block header. - want = `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription":%d}}` - firstWant = fmt.Sprintf(want, firstID) - _, firstGot, err = conn1.Read(ctx) - require.NoError(t, err) - require.Equal(t, firstWant, string(firstGot)) - secondWant = fmt.Sprintf(want, secondID) - _, secondGot, err = conn2.Read(ctx) - require.NoError(t, err) - require.Equal(t, secondWant, string(secondGot)) - - // Unsubscribe - unsubMsg := `{"jsonrpc":"2.0","id":1,"method":"juno_unsubscribe","params":[%d]}` - require.NoError(t, conn1.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubMsg, firstID)))) - require.NoError(t, conn2.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubMsg, secondID)))) -} diff --git a/rpc/handlers.go b/rpc/handlers.go index 1cf96b0c21..7657d021a8 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -66,6 +66,7 @@ var ( ErrUnsupportedTxVersion = &jsonrpc.Error{Code: 61, Message: "the transaction version is not supported"} ErrUnsupportedContractClassVersion = &jsonrpc.Error{Code: 62, Message: "the contract class version is not supported"} ErrUnexpectedError = &jsonrpc.Error{Code: 63, Message: "An unexpected error occurred"} + ErrTooManyAddressesInFilter = &jsonrpc.Error{Code: 67, Message: "Too many addresses in filter sender_address filter"} ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: fmt.Sprintf("Cannot go back more than %v blocks", maxBlocksBack)} ErrCallOnPending = &jsonrpc.Error{Code: 69, Message: "This method does not support being called on the pending block"} @@ -93,8 +94,10 @@ type Handler struct { vm vm.VM log utils.Logger - version string - newHeads *feed.Feed[*core.Header] + version string + newHeads *feed.Feed[*core.Header] + reorgs *feed.Feed[*sync.ReorgBlockRange] + pendingTxs *feed.Feed[[]core.Transaction] idgen func() uint64 mu stdsync.Mutex // protects subscriptions. @@ -135,6 +138,8 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V }, version: version, newHeads: feed.New[*core.Header](), + reorgs: feed.New[*sync.ReorgBlockRange](), + pendingTxs: feed.New[[]core.Transaction](), subscriptions: make(map[uint64]*subscription), blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize), @@ -176,8 +181,15 @@ func (h *Handler) WithGateway(gatewayClient Gateway) *Handler { func (h *Handler) Run(ctx context.Context) error { newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription + reorgsSub := h.syncReader.SubscribeReorg().Subscription + pendingTxsSub := h.syncReader.SubscribePendingTxs().Subscription defer newHeadsSub.Unsubscribe() - feed.Tee[*core.Header](newHeadsSub, h.newHeads) + defer reorgsSub.Unsubscribe() + defer pendingTxsSub.Unsubscribe() + feed.Tee(newHeadsSub, h.newHeads) + feed.Tee(reorgsSub, h.reorgs) + feed.Tee(pendingTxsSub, h.pendingTxs) + <-ctx.Done() for _, sub := range h.subscriptions { sub.wg.Wait() @@ -344,9 +356,15 @@ func (h *Handler) Methods() ([]jsonrpc.Method, string) { //nolint: funlen Handler: h.SubscribeEvents, }, { - Name: "juno_subscribeNewHeads", + Name: "starknet_subscribeNewHeads", + Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, Handler: h.SubscribeNewHeads, }, + { + Name: "starknet_subscribePendingTransactions", + Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, + Handler: h.SubscribePendingTxs, + }, { Name: "juno_unsubscribe", Params: []jsonrpc.Parameter{{Name: "id"}}, @@ -512,7 +530,8 @@ func (h *Handler) MethodsV0_7() ([]jsonrpc.Method, string) { //nolint: funlen Handler: h.SpecVersionV0_7, }, { - Name: "juno_subscribeNewHeads", + Name: "starknet_subscribeNewHeads", + Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, Handler: h.SubscribeNewHeads, }, { diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go index 5edf1bcfbb..371219f165 100644 --- a/rpc/subscriptions.go +++ b/rpc/subscriptions.go @@ -3,12 +3,14 @@ package rpc import ( "context" "encoding/json" - "sync" "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/feed" "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/sync" + "github.com/sourcegraph/conc" ) const subscribeEventsChunkSize = 1024 @@ -19,6 +21,7 @@ type SubscriptionResponse struct { Params any `json:"params"` } +// SubscribeEvents creates a WebSocket stream which will fire events for new Starknet events with applied filters func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys [][]felt.Felt, blockID *BlockID, ) (*SubscriptionID, *jsonrpc.Error) { @@ -35,28 +38,9 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys return nil, ErrTooManyKeysInFilter } - var requestedHeader *core.Header - headHeader, err := h.bcReader.HeadsHeader() - if err != nil { - return nil, ErrInternal.CloneWithData(err.Error()) - } - - if blockID == nil { - requestedHeader = headHeader - } else { - if blockID.Pending { - return nil, ErrCallOnPending - } - - var rpcErr *jsonrpc.Error - requestedHeader, rpcErr = h.blockHeaderByID(blockID) - if rpcErr != nil { - return nil, rpcErr - } - - if headHeader.Number >= maxBlocksBack && requestedHeader.Number <= headHeader.Number-maxBlocksBack { - return nil, ErrTooManyBlocksBack - } + requestedHeader, headHeader, rpcErr := h.resolveBlockRange(blockID) + if rpcErr != nil { + return nil, rpcErr } id := h.idgen() @@ -70,21 +54,18 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys h.mu.Unlock() headerSub := h.newHeads.Subscribe() + reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the events subscription sub.wg.Go(func() { defer func() { h.unsubscribe(sub, id) headerSub.Unsubscribe() + reorgSub.Unsubscribe() }() // The specification doesn't enforce ordering of events therefore events from new blocks can be sent before // old blocks. - // Todo: see if sub's wg can be used? - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - defer wg.Done() - + var wg conc.WaitGroup + wg.Go(func() { for { select { case <-subscriptionCtx.Done(): @@ -93,9 +74,15 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys h.processEvents(subscriptionCtx, w, id, header.Number, header.Number, fromAddr, keys) } } - }() + }) - h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys) + wg.Go(func() { + h.processReorgs(subscriptionCtx, reorgSub, w, id) + }) + + wg.Go(func() { + h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys) + }) wg.Wait() }) @@ -181,3 +168,336 @@ func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.Filter } return nil } + +// SubscribeNewHeads creates a WebSocket stream which will fire events when a new block header is added. +func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*SubscriptionID, *jsonrpc.Error) { + w, ok := jsonrpc.ConnFromContext(ctx) + if !ok { + return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) + } + + startHeader, latestHeader, rpcErr := h.resolveBlockRange(blockID) + if rpcErr != nil { + return nil, rpcErr + } + + id := h.idgen() + subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: w, + } + h.mu.Lock() + h.subscriptions[id] = sub + h.mu.Unlock() + + headerSub := h.newHeads.Subscribe() + reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the new heads subscription + sub.wg.Go(func() { + defer func() { + h.unsubscribe(sub, id) + headerSub.Unsubscribe() + reorgSub.Unsubscribe() + }() + + var wg conc.WaitGroup + + wg.Go(func() { + if err := h.sendHistoricalHeaders(subscriptionCtx, startHeader, latestHeader, w, id); err != nil { + h.log.Errorw("Error sending old headers", "err", err) + return + } + }) + + wg.Go(func() { + h.processReorgs(subscriptionCtx, reorgSub, w, id) + }) + + wg.Go(func() { + h.processNewHeaders(subscriptionCtx, headerSub, w, id) + }) + + wg.Wait() + }) + + return &SubscriptionID{ID: id}, nil +} + +// SubscribePendingTxs creates a WebSocket stream which will fire events when a new pending transaction is added. +// The getDetails flag controls if the response will contain the transaction details or just the transaction hashes. +// The senderAddr flag is used to filter the transactions by sender address. +func (h *Handler) SubscribePendingTxs(ctx context.Context, getDetails *bool, senderAddr []felt.Felt) (*SubscriptionID, *jsonrpc.Error) { + w, ok := jsonrpc.ConnFromContext(ctx) + if !ok { + return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) + } + + if len(senderAddr) > maxEventFilterKeys { + return nil, ErrTooManyAddressesInFilter + } + + id := h.idgen() + subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: w, + } + h.mu.Lock() + h.subscriptions[id] = sub + h.mu.Unlock() + + pendingTxsSub := h.pendingTxs.Subscribe() + sub.wg.Go(func() { + defer func() { + h.unsubscribe(sub, id) + pendingTxsSub.Unsubscribe() + }() + + h.processPendingTxs(subscriptionCtx, getDetails != nil && *getDetails, senderAddr, pendingTxsSub, w, id) + }) + + return &SubscriptionID{ID: id}, nil +} + +func (h *Handler) processPendingTxs(ctx context.Context, getDetails bool, senderAddr []felt.Felt, + pendingTxsSub *feed.Subscription[[]core.Transaction], + w jsonrpc.Conn, + id uint64, +) { + for { + select { + case <-ctx.Done(): + return + case pendingTxs := <-pendingTxsSub.Recv(): + filteredTxs := h.filterTxs(pendingTxs, getDetails, senderAddr) + if err := h.sendPendingTxs(w, filteredTxs, id); err != nil { + h.log.Warnw("Error sending pending transactions", "err", err) + return + } + } + } +} + +// filterTxs filters the transactions based on the getDetails flag. +// If getDetails is true, response will contain the transaction details. +// If getDetails is false, response will only contain the transaction hashes. +func (h *Handler) filterTxs(pendingTxs []core.Transaction, getDetails bool, senderAddr []felt.Felt) any { + if getDetails { + return h.filterTxDetails(pendingTxs, senderAddr) + } + return h.filterTxHashes(pendingTxs, senderAddr) +} + +func (h *Handler) filterTxDetails(pendingTxs []core.Transaction, senderAddr []felt.Felt) []*Transaction { + filteredTxs := make([]*Transaction, 0, len(pendingTxs)) + for _, txn := range pendingTxs { + if h.filterTxBySender(txn, senderAddr) { + filteredTxs = append(filteredTxs, AdaptTransaction(txn)) + } + } + return filteredTxs +} + +func (h *Handler) filterTxHashes(pendingTxs []core.Transaction, senderAddr []felt.Felt) []felt.Felt { + filteredTxHashes := make([]felt.Felt, 0, len(pendingTxs)) + for _, txn := range pendingTxs { + if h.filterTxBySender(txn, senderAddr) { + filteredTxHashes = append(filteredTxHashes, *txn.Hash()) + } + } + return filteredTxHashes +} + +// filterTxBySender checks if the transaction is included in the sender address list. +// If the sender address list is empty, it will return true by default. +// If the sender address list is not empty, it will check if the transaction is an Invoke or Declare transaction +// and if the sender address is in the list. For other transaction types, it will by default return false. +func (h *Handler) filterTxBySender(txn core.Transaction, senderAddr []felt.Felt) bool { + if len(senderAddr) == 0 { + return true + } + + switch t := txn.(type) { + case *core.InvokeTransaction: + for _, addr := range senderAddr { + if t.SenderAddress.Equal(&addr) { + return true + } + } + case *core.DeclareTransaction: + for _, addr := range senderAddr { + if t.SenderAddress.Equal(&addr) { + return true + } + } + } + + return false +} + +func (h *Handler) sendPendingTxs(w jsonrpc.Conn, result any, id uint64) error { + resp, err := json.Marshal(SubscriptionResponse{ + Version: "2.0", + Method: "starknet_subscriptionPendingTransactions", + Params: map[string]any{ + "subscription_id": id, + "result": result, + }, + }) + if err != nil { + return err + } + + _, err = w.Write(resp) + return err +} + +// resolveBlockRange returns the start and latest headers based on the blockID. +// It will also do some sanity checks and return errors if the blockID is invalid. +func (h *Handler) resolveBlockRange(blockID *BlockID) (*core.Header, *core.Header, *jsonrpc.Error) { + latestHeader, err := h.bcReader.HeadsHeader() + if err != nil { + return nil, nil, ErrInternal.CloneWithData(err.Error()) + } + + if blockID == nil || blockID.Latest { + return latestHeader, latestHeader, nil + } + + if blockID.Pending { + return nil, nil, ErrCallOnPending + } + + startHeader, rpcErr := h.blockHeaderByID(blockID) + if rpcErr != nil { + return nil, nil, rpcErr + } + + if latestHeader.Number >= maxBlocksBack && startHeader.Number <= latestHeader.Number-maxBlocksBack { + return nil, nil, ErrTooManyBlocksBack + } + + return startHeader, latestHeader, nil +} + +// sendHistoricalHeaders sends a range of headers from the start header until the latest header +func (h *Handler) sendHistoricalHeaders( + ctx context.Context, + startHeader, latestHeader *core.Header, + w jsonrpc.Conn, + id uint64, +) error { + var ( + err error + curHeader = startHeader + ) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if err := h.sendHeader(w, curHeader, id); err != nil { + return err + } + + if curHeader.Number == latestHeader.Number { + return nil + } + + curHeader, err = h.bcReader.BlockHeaderByNumber(curHeader.Number + 1) + if err != nil { + return err + } + } + } +} + +func (h *Handler) processNewHeaders(ctx context.Context, headerSub *feed.Subscription[*core.Header], w jsonrpc.Conn, id uint64) { + for { + select { + case <-ctx.Done(): + return + case header := <-headerSub.Recv(): + if err := h.sendHeader(w, header, id); err != nil { + h.log.Warnw("Error sending header", "err", err) + return + } + } + } +} + +// sendHeader creates a request and sends it to the client +func (h *Handler) sendHeader(w jsonrpc.Conn, header *core.Header, id uint64) error { + resp, err := json.Marshal(SubscriptionResponse{ + Version: "2.0", + Method: "starknet_subscriptionNewHeads", + Params: map[string]any{ + "subscription_id": id, + "result": adaptBlockHeader(header), + }, + }) + if err != nil { + return err + } + _, err = w.Write(resp) + return err +} + +func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*sync.ReorgBlockRange], w jsonrpc.Conn, id uint64) { + for { + select { + case <-ctx.Done(): + return + case reorg := <-reorgSub.Recv(): + if err := h.sendReorg(w, reorg, id); err != nil { + h.log.Warnw("Error sending reorg", "err", err) + return + } + } + } +} + +type ReorgEvent struct { + StartBlockHash *felt.Felt `json:"starting_block_hash"` + StartBlockNum uint64 `json:"starting_block_number"` + EndBlockHash *felt.Felt `json:"ending_block_hash"` + EndBlockNum uint64 `json:"ending_block_number"` +} + +func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgBlockRange, id uint64) error { + resp, err := json.Marshal(jsonrpc.Request{ + Version: "2.0", + Method: "starknet_subscriptionReorg", + Params: map[string]any{ + "subscription_id": id, + "result": &ReorgEvent{ + StartBlockHash: reorg.StartBlockHash, + StartBlockNum: reorg.StartBlockNum, + EndBlockHash: reorg.EndBlockHash, + EndBlockNum: reorg.EndBlockNum, + }, + }, + }) + if err != nil { + return err + } + _, err = w.Write(resp) + return err +} + +func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) { + w, ok := jsonrpc.ConnFromContext(ctx) + if !ok { + return false, jsonrpc.Err(jsonrpc.MethodNotFound, nil) + } + h.mu.Lock() + sub, ok := h.subscriptions[id] + h.mu.Unlock() // Don't defer since h.unsubscribe acquires the lock. + if !ok || !sub.conn.Equal(w) { + return false, ErrSubscriptionNotFound + } + sub.cancel() + sub.wg.Wait() // Let the subscription finish before responding. + return true, nil +} diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go index a3ab61fa7c..9ca3f2d4fb 100644 --- a/rpc/subscriptions_test.go +++ b/rpc/subscriptions_test.go @@ -3,8 +3,10 @@ package rpc import ( "context" "encoding/json" + "fmt" "io" "net" + "net/http/httptest" "testing" "time" @@ -12,16 +14,21 @@ import ( "github.com/NethermindEth/juno/clients/feeder" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/db/pebble" "github.com/NethermindEth/juno/feed" "github.com/NethermindEth/juno/jsonrpc" "github.com/NethermindEth/juno/mocks" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" + "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" + "github.com/coder/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" ) +var emptyCommitments = core.BlockCommitments{} + // Due to the difference in how some test files in rpc use "package rpc" vs "package rpc_test" it was easiest to copy // the fakeConn here. // Todo: move all the subscription related test here @@ -55,10 +62,9 @@ func TestSubscribeEvents(t *testing.T) { keys := make([][]felt.Felt, 1024+1) fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) - serverConn, clientConn := net.Pipe() + serverConn, _ := net.Pipe() t.Cleanup(func() { require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) }) subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) @@ -80,10 +86,9 @@ func TestSubscribeEvents(t *testing.T) { fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) blockID := &BlockID{Pending: true} - serverConn, clientConn := net.Pipe() + serverConn, _ := net.Pipe() t.Cleanup(func() { require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) }) subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) @@ -107,10 +112,9 @@ func TestSubscribeEvents(t *testing.T) { fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) blockID := &BlockID{Number: 0} - serverConn, clientConn := net.Pipe() + serverConn, _ := net.Pipe() t.Cleanup(func() { require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) }) subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) @@ -206,7 +210,7 @@ func TestSubscribeEvents(t *testing.T) { var marshalledResponses [][]byte for _, e := range emittedEvents { - resp, err := marshalSubscriptionResponse(e, id.ID) + resp, err := marshalSubEventsResp(e, id.ID) require.NoError(t, err) marshalledResponses = append(marshalledResponses, resp) } @@ -254,7 +258,7 @@ func TestSubscribeEvents(t *testing.T) { var marshalledResponses [][]byte for _, e := range emittedEvents { - resp, err := marshalSubscriptionResponse(e, id.ID) + resp, err := marshalSubEventsResp(e, id.ID) require.NoError(t, err) marshalledResponses = append(marshalledResponses, resp) } @@ -298,7 +302,7 @@ func TestSubscribeEvents(t *testing.T) { id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, nil) require.Nil(t, rpcErr) - resp, err := marshalSubscriptionResponse(emittedEvents[0], id.ID) + resp, err := marshalSubEventsResp(emittedEvents[0], id.ID) require.NoError(t, err) got := make([]byte, len(resp)) @@ -313,7 +317,7 @@ func TestSubscribeEvents(t *testing.T) { headerFeed.Send(&core.Header{Number: b1.Number + 1}) - resp, err = marshalSubscriptionResponse(emittedEvents[1], id.ID) + resp, err = marshalSubEventsResp(emittedEvents[1], id.ID) require.NoError(t, err) got = make([]byte, len(resp)) @@ -326,7 +330,533 @@ func TestSubscribeEvents(t *testing.T) { }) } -func marshalSubscriptionResponse(e *EmittedEvent, id uint64) ([]byte, error) { +type fakeSyncer struct { + newHeads *feed.Feed[*core.Header] + reorgs *feed.Feed[*sync.ReorgBlockRange] + pendingTxs *feed.Feed[[]core.Transaction] +} + +func newFakeSyncer() *fakeSyncer { + return &fakeSyncer{ + newHeads: feed.New[*core.Header](), + reorgs: feed.New[*sync.ReorgBlockRange](), + pendingTxs: feed.New[[]core.Transaction](), + } +} + +func (fs *fakeSyncer) SubscribeNewHeads() sync.HeaderSubscription { + return sync.HeaderSubscription{Subscription: fs.newHeads.Subscribe()} +} + +func (fs *fakeSyncer) SubscribeReorg() sync.ReorgSubscription { + return sync.ReorgSubscription{Subscription: fs.reorgs.Subscribe()} +} + +func (fs *fakeSyncer) SubscribePendingTxs() sync.PendingTxSubscription { + return sync.PendingTxSubscription{Subscription: fs.pendingTxs.Subscribe()} +} + +func (fs *fakeSyncer) StartingBlockNumber() (uint64, error) { + return 0, nil +} + +func (fs *fakeSyncer) HighestBlockHeader() *core.Header { + return nil +} + +func (fs *fakeSyncer) Pending() (*sync.Pending, error) { return nil, nil } +func (fs *fakeSyncer) PendingBlock() *core.Block { return nil } +func (fs *fakeSyncer) PendingState() (core.StateReader, func() error, error) { return nil, nil, nil } + +func TestSubscribeNewHeads(t *testing.T) { + log := utils.NewNopZapLogger() + + t.Run("Return error if called on pending block", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{}, nil) + + serverConn, _ := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + id, rpcErr := handler.SubscribeNewHeads(subCtx, &BlockID{Pending: true}) + assert.Zero(t, id) + assert.Equal(t, ErrCallOnPending, rpcErr) + }) + + t.Run("Return error if block is too far back", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + + blockID := &BlockID{Number: 0} + + serverConn, _ := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + t.Run("head is 1024", func(t *testing.T) { + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1024}, nil) + mockChain.EXPECT().BlockHeaderByNumber(blockID.Number).Return(&core.Header{Number: 0}, nil) + + id, rpcErr := handler.SubscribeNewHeads(subCtx, blockID) + assert.Zero(t, id) + assert.Equal(t, ErrTooManyBlocksBack, rpcErr) + }) + + t.Run("head is more than 1024", func(t *testing.T) { + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 2024}, nil) + mockChain.EXPECT().BlockHeaderByNumber(blockID.Number).Return(&core.Header{Number: 0}, nil) + + id, rpcErr := handler.SubscribeNewHeads(subCtx, blockID) + assert.Zero(t, id) + assert.Equal(t, ErrTooManyBlocksBack, rpcErr) + }) + }) + + t.Run("new block is received", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + mockChain := mocks.NewMockReader(mockCtrl) + syncer := newFakeSyncer() + handler, server := setupRPC(t, ctx, mockChain, syncer) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{}, nil) + + conn := createWsConn(t, ctx, server) + + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) + + got := sendWsMessage(t, ctx, conn, subMsg("starknet_subscribeNewHeads")) + require.Equal(t, subResp(id), got) + + // Ignore the first mock header + _, _, err := conn.Read(ctx) + require.NoError(t, err) + + // Simulate a new block + syncer.newHeads.Send(testHeader(t)) + + // Receive a block header. + _, headerGot, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, newHeadsResponse(id), string(headerGot)) + }) +} + +func TestSubscribeNewHeadsHistorical(t *testing.T) { + client := feeder.NewTestClient(t, &utils.Mainnet) + gw := adaptfeeder.New(client) + + block0, err := gw.BlockByNumber(context.Background(), 0) + require.NoError(t, err) + + stateUpdate0, err := gw.StateUpdate(context.Background(), 0) + require.NoError(t, err) + + testDB := pebble.NewMemTest(t) + chain := blockchain.New(testDB, &utils.Mainnet, nil) + assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil)) + + chain = blockchain.New(testDB, &utils.Mainnet, nil) + syncer := newFakeSyncer() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + handler, server := setupRPC(t, ctx, chain, syncer) + + conn := createWsConn(t, ctx, server) + + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) + + subMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads", "params":{"block":{"block_number":0}}}` + got := sendWsMessage(t, ctx, conn, subMsg) + require.Equal(t, subResp(id), got) + + // Check block 0 content + want := `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` + want = fmt.Sprintf(want, id) + _, block0Got, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(block0Got)) + + // Simulate a new block + syncer.newHeads.Send(testHeader(t)) + + // Check new block content + _, newBlockGot, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, newHeadsResponse(id), string(newBlockGot)) +} + +func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + mockChain := mocks.NewMockReader(mockCtrl) + syncer := newFakeSyncer() + handler, server := setupRPC(t, ctx, mockChain, syncer) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{}, nil).Times(2) + + ws := jsonrpc.NewWebsocket(server, nil, utils.NewNopZapLogger()) + httpSrv := httptest.NewServer(ws) + + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) //nolint:bodyclose + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, conn1.Close(websocket.StatusNormalClosure, "")) + }) + + conn2, _, err := websocket.Dial(ctx, httpSrv.URL, nil) //nolint:bodyclose + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, conn2.Close(websocket.StatusNormalClosure, "")) + }) + + firstID := uint64(1) + secondID := uint64(2) + + handler.WithIDGen(func() uint64 { return firstID }) + firstGot := sendWsMessage(t, ctx, conn1, subMsg("starknet_subscribeNewHeads")) + require.NoError(t, err) + require.Equal(t, subResp(firstID), firstGot) + + handler.WithIDGen(func() uint64 { return secondID }) + secondGot := sendWsMessage(t, ctx, conn2, subMsg("starknet_subscribeNewHeads")) + require.NoError(t, err) + require.Equal(t, subResp(secondID), secondGot) + + // Ignore the first mock header + _, _, err = conn1.Read(ctx) + require.NoError(t, err) + _, _, err = conn2.Read(ctx) + require.NoError(t, err) + + // Simulate a new block + syncer.newHeads.Send(testHeader(t)) + + // Receive a block header. + _, firstHeaderGot, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, newHeadsResponse(firstID), string(firstHeaderGot)) + + _, secondHeaderGot, err := conn2.Read(ctx) + require.NoError(t, err) + require.Equal(t, newHeadsResponse(secondID), string(secondHeaderGot)) + + // Unsubscribe + unsubMsg := `{"jsonrpc":"2.0","id":1,"method":"juno_unsubscribe","params":[%d]}` + require.NoError(t, conn1.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubMsg, firstID)))) + require.NoError(t, conn2.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubMsg, secondID)))) +} + +func TestSubscriptionReorg(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + syncer := newFakeSyncer() + handler, server := setupRPC(t, ctx, mockChain, syncer) + + testCases := []struct { + name string + subscribeMethod string + ignoreFirst bool + }{ + { + name: "reorg event in starknet_subscribeNewHeads", + subscribeMethod: "starknet_subscribeNewHeads", + ignoreFirst: true, + }, + { + name: "reorg event in starknet_subscribeEvents", + subscribeMethod: "starknet_subscribeEvents", + ignoreFirst: false, + }, + // TODO: test reorg event in TransactionStatus + } + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{}, nil).Times(len(testCases)) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + conn := createWsConn(t, ctx, server) + + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) + + got := sendWsMessage(t, ctx, conn, subMsg(tc.subscribeMethod)) + require.Equal(t, subResp(id), got) + + if tc.ignoreFirst { + _, _, err := conn.Read(ctx) + require.NoError(t, err) + } + + // Simulate a reorg + syncer.reorgs.Send(&sync.ReorgBlockRange{ + StartBlockHash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), + StartBlockNum: 0, + EndBlockHash: utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"), + EndBlockNum: 2, + }) + + // Receive reorg event + expectedRes := `{"jsonrpc":"2.0","method":"starknet_subscriptionReorg","params":{"result":{"starting_block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","starting_block_number":0,"ending_block_hash":"0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86","ending_block_number":2},"subscription_id":%d}}` + want := fmt.Sprintf(expectedRes, id) + _, reorgGot, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(reorgGot)) + }) + } +} + +func TestSubscribePendingTxs(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + syncer := newFakeSyncer() + handler, server := setupRPC(t, ctx, mockChain, syncer) + + t.Run("Basic subscription", func(t *testing.T) { + conn := createWsConn(t, ctx, server) + + subMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions"}` + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) + got := sendWsMessage(t, ctx, conn, subMsg) + require.Equal(t, subResp(id), got) + + hash1 := new(felt.Felt).SetUint64(1) + addr1 := new(felt.Felt).SetUint64(11) + + hash2 := new(felt.Felt).SetUint64(2) + addr2 := new(felt.Felt).SetUint64(22) + + hash3 := new(felt.Felt).SetUint64(3) + hash4 := new(felt.Felt).SetUint64(4) + hash5 := new(felt.Felt).SetUint64(5) + + syncer.pendingTxs.Send([]core.Transaction{ + &core.InvokeTransaction{TransactionHash: hash1, SenderAddress: addr1}, + &core.DeclareTransaction{TransactionHash: hash2, SenderAddress: addr2}, + &core.DeployTransaction{TransactionHash: hash3}, + &core.DeployAccountTransaction{DeployTransaction: core.DeployTransaction{TransactionHash: hash4}}, + &core.L1HandlerTransaction{TransactionHash: hash5}, + }) + + want := `{"jsonrpc":"2.0","method":"starknet_subscriptionPendingTransactions","params":{"result":["0x1","0x2","0x3","0x4","0x5"],"subscription_id":%d}}` + want = fmt.Sprintf(want, id) + _, pendingTxsGot, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(pendingTxsGot)) + }) + + t.Run("Filtered subscription", func(t *testing.T) { + conn := createWsConn(t, ctx, server) + + subMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"sender_address":["0xb", "0x16"]}}` + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) + got := sendWsMessage(t, ctx, conn, subMsg) + require.Equal(t, subResp(id), got) + + hash1 := new(felt.Felt).SetUint64(1) + addr1 := new(felt.Felt).SetUint64(11) + + hash2 := new(felt.Felt).SetUint64(2) + addr2 := new(felt.Felt).SetUint64(22) + + hash3 := new(felt.Felt).SetUint64(3) + hash4 := new(felt.Felt).SetUint64(4) + hash5 := new(felt.Felt).SetUint64(5) + + hash6 := new(felt.Felt).SetUint64(6) + addr6 := new(felt.Felt).SetUint64(66) + + hash7 := new(felt.Felt).SetUint64(7) + addr7 := new(felt.Felt).SetUint64(77) + + syncer.pendingTxs.Send([]core.Transaction{ + &core.InvokeTransaction{TransactionHash: hash1, SenderAddress: addr1}, + &core.DeclareTransaction{TransactionHash: hash2, SenderAddress: addr2}, + &core.DeployTransaction{TransactionHash: hash3}, + &core.DeployAccountTransaction{DeployTransaction: core.DeployTransaction{TransactionHash: hash4}}, + &core.L1HandlerTransaction{TransactionHash: hash5}, + &core.InvokeTransaction{TransactionHash: hash6, SenderAddress: addr6}, + &core.DeclareTransaction{TransactionHash: hash7, SenderAddress: addr7}, + }) + + want := `{"jsonrpc":"2.0","method":"starknet_subscriptionPendingTransactions","params":{"result":["0x1","0x2"],"subscription_id":%d}}` + want = fmt.Sprintf(want, id) + _, pendingTxsGot, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(pendingTxsGot)) + }) + + t.Run("Full details subscription", func(t *testing.T) { + conn := createWsConn(t, ctx, server) + + subMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"transaction_details": true}}` + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) + got := sendWsMessage(t, ctx, conn, subMsg) + require.Equal(t, subResp(id), got) + + syncer.pendingTxs.Send([]core.Transaction{ + &core.InvokeTransaction{ + TransactionHash: new(felt.Felt).SetUint64(1), + CallData: []*felt.Felt{new(felt.Felt).SetUint64(2)}, + TransactionSignature: []*felt.Felt{new(felt.Felt).SetUint64(3)}, + MaxFee: new(felt.Felt).SetUint64(4), + ContractAddress: new(felt.Felt).SetUint64(5), + Version: new(core.TransactionVersion).SetUint64(3), + EntryPointSelector: new(felt.Felt).SetUint64(6), + Nonce: new(felt.Felt).SetUint64(7), + SenderAddress: new(felt.Felt).SetUint64(8), + ResourceBounds: map[core.Resource]core.ResourceBounds{}, + Tip: 9, + PaymasterData: []*felt.Felt{new(felt.Felt).SetUint64(10)}, + AccountDeploymentData: []*felt.Felt{new(felt.Felt).SetUint64(11)}, + }, + }) + + want := `{"jsonrpc":"2.0","method":"starknet_subscriptionPendingTransactions","params":{"result":[{"transaction_hash":"0x1","type":"INVOKE","version":"0x3","nonce":"0x7","max_fee":"0x4","contract_address":"0x5","sender_address":"0x8","signature":["0x3"],"calldata":["0x2"],"entry_point_selector":"0x6","resource_bounds":{},"tip":"0x9","paymaster_data":["0xa"],"account_deployment_data":["0xb"],"nonce_data_availability_mode":"L1","fee_data_availability_mode":"L1"}],"subscription_id":%d}}` + want = fmt.Sprintf(want, id) + _, pendingTxsGot, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(pendingTxsGot)) + }) + + t.Run("Return error if too many addresses in filter", func(t *testing.T) { + addresses := make([]felt.Felt, 1024+1) + + serverConn, _ := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + id, rpcErr := handler.SubscribePendingTxs(subCtx, nil, addresses) + assert.Zero(t, id) + assert.Equal(t, ErrTooManyAddressesInFilter, rpcErr) + }) +} + +func createWsConn(t *testing.T, ctx context.Context, server *jsonrpc.Server) *websocket.Conn { + ws := jsonrpc.NewWebsocket(server, nil, utils.NewNopZapLogger()) + httpSrv := httptest.NewServer(ws) + + conn, _, err := websocket.Dial(ctx, httpSrv.URL, nil) //nolint:bodyclose + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, conn.Close(websocket.StatusNormalClosure, "")) + }) + + return conn +} + +func subResp(id uint64) string { + return fmt.Sprintf(`{"jsonrpc":"2.0","result":{"subscription_id":%d},"id":1}`, id) +} + +func subMsg(method string) string { + return fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":%q}`, method) +} + +func testHeader(t *testing.T) *core.Header { + t.Helper() + + header := &core.Header{ + Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), + ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"), + Number: 2, + GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"), + Timestamp: 1637084470, + SequencerAddress: utils.HexToFelt(t, "0x0"), + L1DataGasPrice: &core.GasPrice{ + PriceInFri: utils.HexToFelt(t, "0x0"), + PriceInWei: utils.HexToFelt(t, "0x0"), + }, + GasPrice: utils.HexToFelt(t, "0x0"), + GasPriceSTRK: utils.HexToFelt(t, "0x0"), + L1DAMode: core.Calldata, + ProtocolVersion: "", + } + return header +} + +func newHeadsResponse(id uint64) string { + return fmt.Sprintf(`{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`, id) +} + +// setupRPC creates a RPC handler that runs in a goroutine and a JSONRPC server that can be used to test subscriptions +func setupRPC(t *testing.T, ctx context.Context, chain blockchain.Reader, syncer sync.Reader) (*Handler, *jsonrpc.Server) { + t.Helper() + + log := utils.NewNopZapLogger() + handler := New(chain, syncer, nil, "", log) + + go func() { + require.NoError(t, handler.Run(ctx)) + }() + time.Sleep(50 * time.Millisecond) + + server := jsonrpc.NewServer(1, log) + methods, _ := handler.Methods() + require.NoError(t, server.RegisterMethods(methods...)) + + return handler, server +} + +// sendWsMessage sends a message to a websocket connection and returns the response +func sendWsMessage(t *testing.T, ctx context.Context, conn *websocket.Conn, message string) string { + t.Helper() + + err := conn.Write(ctx, websocket.MessageText, []byte(message)) + require.NoError(t, err) + + _, response, err := conn.Read(ctx) + require.NoError(t, err) + return string(response) +} + +func marshalSubEventsResp(e *EmittedEvent, id uint64) ([]byte, error) { return json.Marshal(SubscriptionResponse{ Version: "2.0", Method: "starknet_subscriptionEvents", diff --git a/sync/sync.go b/sync/sync.go index c268ffa7e3..499bd396ba 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -38,6 +38,26 @@ type HeaderSubscription struct { *feed.Subscription[*core.Header] } +type ReorgSubscription struct { + *feed.Subscription[*ReorgBlockRange] +} + +type PendingTxSubscription struct { + *feed.Subscription[[]core.Transaction] +} + +// ReorgBlockRange represents data about reorganised blocks, starting and ending block number and hash +type ReorgBlockRange struct { + // StartBlockHash is the hash of the first known block of the orphaned chain + StartBlockHash *felt.Felt + // StartBlockNum is the number of the first known block of the orphaned chain + StartBlockNum uint64 + // The last known block of the orphaned chain + EndBlockHash *felt.Felt + // Number of the last known block of the orphaned chain + EndBlockNum uint64 +} + // Todo: Since this is also going to be implemented by p2p package we should move this interface to node package // //go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader @@ -45,6 +65,8 @@ type Reader interface { StartingBlockNumber() (uint64, error) HighestBlockHeader() *core.Header SubscribeNewHeads() HeaderSubscription + SubscribeReorg() ReorgSubscription + SubscribePendingTxs() PendingTxSubscription Pending() (*Pending, error) PendingBlock() *core.Block @@ -66,6 +88,14 @@ func (n *NoopSynchronizer) SubscribeNewHeads() HeaderSubscription { return HeaderSubscription{feed.New[*core.Header]().Subscribe()} } +func (n *NoopSynchronizer) SubscribeReorg() ReorgSubscription { + return ReorgSubscription{feed.New[*ReorgBlockRange]().Subscribe()} +} + +func (n *NoopSynchronizer) SubscribePendingTxs() PendingTxSubscription { + return PendingTxSubscription{feed.New[[]core.Transaction]().Subscribe()} +} + func (n *NoopSynchronizer) PendingBlock() *core.Block { return nil } @@ -87,6 +117,8 @@ type Synchronizer struct { startingBlockNumber *uint64 highestBlockHeader atomic.Pointer[core.Header] newHeads *feed.Feed[*core.Header] + reorgFeed *feed.Feed[*ReorgBlockRange] + pendingTxsFeed *feed.Feed[[]core.Transaction] log utils.SimpleLogger listener EventListener @@ -95,6 +127,8 @@ type Synchronizer struct { pendingPollInterval time.Duration catchUpMode bool plugin junoplugin.JunoPlugin + + currReorg *ReorgBlockRange // If nil, no reorg is happening } func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log utils.SimpleLogger, @@ -106,6 +140,8 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log starknetData: starkNetData, log: log, newHeads: feed.New[*core.Header](), + reorgFeed: feed.New[*ReorgBlockRange](), + pendingTxsFeed: feed.New[[]core.Transaction](), pendingPollInterval: pendingPollInterval, listener: &SelectiveListener{}, readOnlyBlockchain: readOnlyBlockchain, @@ -304,6 +340,11 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header) } + if s.currReorg != nil { + s.reorgFeed.Send(s.currReorg) + s.currReorg = nil // reset the reorg data + } + s.newHeads.Send(block.Header) s.log.Infow("Stored Block", "number", block.Number, "hash", block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString()) @@ -403,6 +444,19 @@ func (s *Synchronizer) revertHead(forkBlock *core.Block) { } else { s.log.Infow("Reverted HEAD", "reverted", localHead) } + + if s.currReorg == nil { // first block of the reorg + s.currReorg = &ReorgBlockRange{ + StartBlockHash: localHead, + StartBlockNum: head.Number, + EndBlockHash: localHead, + EndBlockNum: head.Number, + } + } else { // not the first block of the reorg, adjust the starting block + s.currReorg.StartBlockHash = localHead + s.currReorg.StartBlockNum = head.Number + } + s.listener.OnReorg(head.Number) } @@ -519,6 +573,18 @@ func (s *Synchronizer) SubscribeNewHeads() HeaderSubscription { } } +func (s *Synchronizer) SubscribeReorg() ReorgSubscription { + return ReorgSubscription{ + Subscription: s.reorgFeed.Subscribe(), + } +} + +func (s *Synchronizer) SubscribePendingTxs() PendingTxSubscription { + return PendingTxSubscription{ + Subscription: s.pendingTxsFeed.Subscribe(), + } +} + // StorePending stores a pending block given that it is for the next height func (s *Synchronizer) StorePending(p *Pending) error { err := blockchain.CheckBlockVersion(p.Block.ProtocolVersion) @@ -548,6 +614,9 @@ func (s *Synchronizer) StorePending(p *Pending) error { } s.pending.Store(p) + // send the pending transactions to the feed + s.pendingTxsFeed.Send(p.Block.Transactions) + return nil } diff --git a/sync/sync_test.go b/sync/sync_test.go index ab97fc322b..2b6d514e88 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -160,8 +160,12 @@ func TestReorg(t *testing.T) { head, err := bc.HeadsHeader() require.NoError(t, err) require.Equal(t, utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"), head.Hash) + integEnd := head + integStart, err := bc.BlockHeaderByNumber(0) + require.NoError(t, err) synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), 0, false, testDB) + sub := synchronizer.SubscribeReorg() ctx, cancel = context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) cancel() @@ -170,54 +174,15 @@ func TestReorg(t *testing.T) { head, err = bc.HeadsHeader() require.NoError(t, err) require.Equal(t, utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), head.Hash) - }) -} - -func TestSubscribeNewHeads(t *testing.T) { - t.Parallel() - testDB := pebble.NewMemTest(t) - log := utils.NewNopZapLogger() - integration := utils.Integration - chain := blockchain.New(testDB, &integration, nil) - integrationClient := feeder.NewTestClient(t, &integration) - gw := adaptfeeder.New(integrationClient) - syncer := sync.New(chain, gw, log, 0, false, testDB) - - sub := syncer.SubscribeNewHeads() - - // Receive on new block. - ctx, cancel := context.WithTimeout(context.Background(), timeout) - require.NoError(t, syncer.Run(ctx)) - cancel() - got, ok := <-sub.Recv() - require.True(t, ok) - want, err := gw.BlockByNumber(context.Background(), 0) - require.NoError(t, err) - require.Equal(t, want.Header, got) - sub.Unsubscribe() -} - -func TestPendingSync(t *testing.T) { - t.Parallel() - - client := feeder.NewTestClient(t, &utils.Mainnet) - gw := adaptfeeder.New(client) - - var synchronizer *sync.Synchronizer - testDB := pebble.NewMemTest(t) - log := utils.NewNopZapLogger() - bc := blockchain.New(testDB, &utils.Mainnet, synchronizer.PendingBlock) - synchronizer = sync.New(bc, gw, log, time.Millisecond*100, false, testDB) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - require.NoError(t, synchronizer.Run(ctx)) - cancel() - - head, err := bc.HeadsHeader() - require.NoError(t, err) - pending, err := synchronizer.Pending() - require.NoError(t, err) - assert.Equal(t, head.Hash, pending.Block.ParentHash) + // Validate reorg event + got, ok := <-sub.Recv() + require.True(t, ok) + assert.Equal(t, integEnd.Hash, got.EndBlockHash) + assert.Equal(t, integEnd.Number, got.EndBlockNum) + assert.Equal(t, integStart.Hash, got.StartBlockHash) + assert.Equal(t, integStart.Number, got.StartBlockNum) + }) } func TestPending(t *testing.T) { @@ -296,3 +261,52 @@ func TestPending(t *testing.T) { require.NoError(t, pErr) }) } + +func TestSubscribeNewHeads(t *testing.T) { + t.Parallel() + testDB := pebble.NewMemTest(t) + log := utils.NewNopZapLogger() + integration := utils.Integration + chain := blockchain.New(testDB, &integration, nil) + integrationClient := feeder.NewTestClient(t, &integration) + gw := adaptfeeder.New(integrationClient) + syncer := sync.New(chain, gw, log, 0, false, testDB) + + sub := syncer.SubscribeNewHeads() + + // Receive on new block. + ctx, cancel := context.WithTimeout(context.Background(), timeout) + require.NoError(t, syncer.Run(ctx)) + cancel() + got, ok := <-sub.Recv() + require.True(t, ok) + want, err := gw.BlockByNumber(context.Background(), 0) + require.NoError(t, err) + require.Equal(t, want.Header, got) + sub.Unsubscribe() +} + +func TestSubscribePendingTxs(t *testing.T) { + t.Parallel() + + client := feeder.NewTestClient(t, &utils.Mainnet) + gw := adaptfeeder.New(client) + + testDB := pebble.NewMemTest(t) + log := utils.NewNopZapLogger() + bc := blockchain.New(testDB, &utils.Mainnet, nil) + synchronizer := sync.New(bc, gw, log, time.Millisecond*100, false, testDB) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + + sub := synchronizer.SubscribePendingTxs() + + require.NoError(t, synchronizer.Run(ctx)) + cancel() + + pending, err := synchronizer.Pending() + require.NoError(t, err) + pendingTxs, ok := <-sub.Recv() + require.True(t, ok) + require.Equal(t, pending.Block.Transactions, pendingTxs) + sub.Unsubscribe() +}