Skip to content

Commit

Permalink
Merge branch 'main' into weiihann/2205-subscribeNewHeads
Browse files Browse the repository at this point in the history
  • Loading branch information
IronGauntlets committed Dec 23, 2024
2 parents 30e942c + 81262fd commit ec1da70
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 16 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ go 1.23.4
require (
github.com/Masterminds/semver/v3 v3.3.1
github.com/NethermindEth/cairo-vm-go v0.0.0-20241022093807-167daddfd4a4
github.com/bits-and-blooms/bitset v1.19.1
github.com/bits-and-blooms/bitset v1.20.0
github.com/bits-and-blooms/bloom/v3 v3.7.0
github.com/cockroachdb/pebble v1.1.2
github.com/coder/websocket v1.8.12
Expand All @@ -33,8 +33,8 @@ require (
go.uber.org/mock v0.5.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.31.0
google.golang.org/grpc v1.69.0
google.golang.org/protobuf v1.35.2
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.19.1 h1:mv2yVhy96D2CuskLPXnc58oJNMs5PCWjAZuyYU0p12M=
github.com/bits-and-blooms/bitset v1.19.1/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.20.0 h1:2F+rfL86jE2d/bmw7OhqUg2Sj/1rURkBn3MdfoPyRVU=
github.com/bits-and-blooms/bitset v1.20.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bloom/v3 v3.7.0 h1:VfknkqV4xI+PsaDIsoHueyxVDZrfvMn56jeWUzvzdls=
github.com/bits-and-blooms/bloom/v3 v3.7.0/go.mod h1:VKlUSvp0lFIYqxJjzdnSsZEw4iHb1kOL2tfHTgyJBHg=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
Expand Down Expand Up @@ -809,8 +809,8 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI=
google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand All @@ -820,8 +820,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
19 changes: 17 additions & 2 deletions jsonrpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ type Websocket struct {
log utils.SimpleLogger
connParams *WebsocketConnParams
listener NewRequestListener

shutdown <-chan struct{}
}

func NewWebsocket(rpc *Server, log utils.SimpleLogger) *Websocket {
func NewWebsocket(rpc *Server, shutdown <-chan struct{}, log utils.SimpleLogger) *Websocket {
ws := &Websocket{
rpc: rpc,
log: log,
connParams: DefaultWebsocketConnParams(),
listener: &SelectiveListener{},
shutdown: shutdown,
}

return ws
Expand Down Expand Up @@ -54,7 +57,19 @@ func (ws *Websocket) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// TODO include connection information, such as the remote address, in the logs.

wsc := newWebsocketConn(r.Context(), conn, ws.connParams)
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
go func() {
select {
case <-ws.shutdown:
cancel()
case <-ctx.Done():
// in case websocket connection is closed and server is not in shutdown mode
// we need to release this goroutine from waiting
}
}()

wsc := newWebsocketConn(ctx, conn, ws.connParams)

for {
_, wsc.r, err = wsc.conn.Reader(wsc.ctx)
Expand Down
2 changes: 1 addition & 1 deletion jsonrpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func testConnection(t *testing.T, ctx context.Context, method jsonrpc.Method, li
require.NoError(t, rpc.RegisterMethods(method))

// Server
srv := httptest.NewServer(jsonrpc.NewWebsocket(rpc, utils.NewNopZapLogger()))
srv := httptest.NewServer(jsonrpc.NewWebsocket(rpc, nil, utils.NewNopZapLogger()))

// Client
conn, resp, err := websocket.Dial(ctx, srv.URL, nil) //nolint:bodyclose // websocket package closes resp.Body for us.
Expand Down
15 changes: 13 additions & 2 deletions node/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (h *httpService) Run(ctx context.Context) error {
}
}

func (h *httpService) registerOnShutdown(f func()) {
h.srv.RegisterOnShutdown(f)
}

func makeHTTPService(host string, port uint16, handler http.Handler) *httpService {
portStr := strconv.FormatUint(uint64(port), 10)
return &httpService{
Expand Down Expand Up @@ -108,9 +112,11 @@ func makeRPCOverWebsocket(host string, port uint16, servers map[string]*jsonrpc.
listener = makeWSMetrics()
}

shutdown := make(chan struct{})

mux := http.NewServeMux()
for path, server := range servers {
wsHandler := jsonrpc.NewWebsocket(server, log)
wsHandler := jsonrpc.NewWebsocket(server, shutdown, log)
if listener != nil {
wsHandler = wsHandler.WithListener(listener)
}
Expand All @@ -124,7 +130,12 @@ func makeRPCOverWebsocket(host string, port uint16, servers map[string]*jsonrpc.
if corsEnabled {
handler = cors.Default().Handler(handler)
}
return makeHTTPService(host, port, handler)

httpServ := makeHTTPService(host, port, handler)
httpServ.registerOnShutdown(func() {
close(shutdown)
})
return httpServ
}

func makeMetrics(host string, port uint16) *httpService {
Expand Down
4 changes: 2 additions & 2 deletions rpc/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {

mockChain.EXPECT().HeadsHeader().Return(&core.Header{}, nil).Times(2)

ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
ws := jsonrpc.NewWebsocket(server, nil, utils.NewNopZapLogger())
httpSrv := httptest.NewServer(ws)

conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) //nolint:bodyclose
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestSubscribePendingTxs(t *testing.T) {
}

func createWsConn(t *testing.T, ctx context.Context, server *jsonrpc.Server) *websocket.Conn {
ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
ws := jsonrpc.NewWebsocket(server, nil, utils.NewNopZapLogger())
httpSrv := httptest.NewServer(ws)

conn, _, err := websocket.Dial(ctx, httpSrv.URL, nil) //nolint:bodyclose
Expand Down

0 comments on commit ec1da70

Please sign in to comment.