Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] add Health Check for Range over gRPC Connection Loop #2798

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
GetCallOption() []CallOption
GetBackoff() backoff.Backoff
SetDisableResolveDNSAddr(addr string, disabled bool)
ConnectedAddrs() []string
ConnectedAddrs(context.Context) []string
Close(ctx context.Context) error
}

Expand Down Expand Up @@ -249,7 +249,7 @@
return ctx.Err()
case <-prTick.C:
if g.enablePoolRebalance {
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 252 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L252

Added line #L252 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -286,7 +286,7 @@
})
}
case <-hcTick.C:
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 289 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L289

Added line #L289 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -415,7 +415,7 @@
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 418 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L418

Added line #L418 was not covered by tests
ssctx, sspan := trace.StartSpan(sctx, apiName+"/Client.Range/"+addr)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -478,7 +478,7 @@
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 481 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L481

Added line #L481 was not covered by tests
eg.Go(safety.RecoverFunc(func() (err error) {
ssctx, sspan := trace.StartSpan(egctx, apiName+"/Client.RangeConcurrent/"+addr)
defer func() {
Expand Down Expand Up @@ -565,7 +565,7 @@
return nil
default:
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 568 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L568

Added line #L568 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRange operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
continue
Expand Down Expand Up @@ -634,7 +634,7 @@
addr := order
eg.Go(safety.RecoverFunc(func() (err error) {
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 637 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L637

Added line #L637 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRangeConcurrent operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
return nil
Expand Down Expand Up @@ -701,7 +701,7 @@
}

do := func() (data any, err error) {
cerr := g.rangeConns(func(addr string, p pool.Conn) bool {
cerr := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 704 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L704

Added line #L704 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Test coverage needed: RoundRobin operation tests are defined but not implemented

Found test stubs in internal/net/grpc/client_test.go that are currently commented out and marked as "NOT IMPLEMENTED":

  • Test_gRPCClient_RoundRobin
  • Test_gRPCClient_rangeConns

This confirms that this critical functionality, which is used throughout the codebase for connection management, lacks proper test coverage.

🔗 Analysis chain

Add test coverage for RoundRobin operation.

The RoundRobin implementation properly handles context and integrates with the circuit breaker. However, this critical path lacks test coverage.

Run the following script to check existing test patterns:

Would you like me to help generate test cases for the RoundRobin operation?

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check existing test patterns for RoundRobin operations
# Look for similar test patterns that we can follow

rg -A 5 "TestRoundRobin" --type go

Length of output: 34


Script:

#!/bin/bash
# Find test files in the same directory as client.go
fd "client_test.go" internal/net/grpc/

# Search for any test functions that might be testing round-robin functionality
rg -B 2 -A 5 "func Test.*" internal/net/grpc/client_test.go || true

# Look for any round-robin related code in test files
rg -i "round.*robin|rangeconns" internal/net/grpc/ --type go

Length of output: 5654

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 704-704: internal/net/grpc/client.go#L704
Added line #L704 was not covered by tests

select {
case <-ctx.Done():
err = ctx.Err()
Expand Down Expand Up @@ -879,14 +879,14 @@
errors.Is(err, context.DeadlineExceeded) {
return nil, false, err
}
return nil, err != nil, err
return nil, p.IsHealthy(ctx), err

Check warning on line 882 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L882

Added line #L882 was not covered by tests
}
status.Log(st.Code(), err)
switch st.Code() {
case codes.Internal,
codes.Unavailable,
codes.ResourceExhausted:
return nil, err != nil, err
return nil, p.IsHealthy(ctx), err

Check warning on line 889 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L889

Added line #L889 was not covered by tests
}
return nil, false, err
}
Expand Down Expand Up @@ -1066,7 +1066,7 @@
atomic.AddUint64(&g.clientCount, ^uint64(0))
if p != nil {
log.Debugf("gRPC client connection pool addr = %s will disconnect soon...", addr)
return nil, p.Disconnect()
return nil, p.Disconnect(ctx)

Check warning on line 1069 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1069

Added line #L1069 was not covered by tests
}
return nil, nil
})
Expand All @@ -1085,10 +1085,10 @@
return nil
}

func (g *gRPCClient) ConnectedAddrs() (addrs []string) {
func (g *gRPCClient) ConnectedAddrs(ctx context.Context) (addrs []string) {

Check warning on line 1088 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1088

Added line #L1088 was not covered by tests
addrs = make([]string, 0, g.conns.Len())
err := g.rangeConns(func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(context.Background()) {
err := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(ctx) {

Check warning on line 1091 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1090-L1091

Added lines #L1090 - L1091 were not covered by tests
addrs = append(addrs, addr)
}
return true
Expand All @@ -1104,18 +1104,34 @@
g.stopMonitor()
}
g.conns.Range(func(addr string, p pool.Conn) bool {
derr := g.Disconnect(ctx, addr)
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) {
err = errors.Join(err, derr)
select {
case <-ctx.Done():
return false
default:
derr := g.Disconnect(ctx, addr)
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) {
err = errors.Join(err, derr)
}
return true

Check warning on line 1115 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1107-L1115

Added lines #L1107 - L1115 were not covered by tests
}
return true
})
return err
}

func (g *gRPCClient) rangeConns(fn func(addr string, p pool.Conn) bool) error {
func (g *gRPCClient) rangeConns(ctx context.Context, fn func(addr string, p pool.Conn) bool) error {

Check warning on line 1121 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1121

Added line #L1121 was not covered by tests
var cnt int
g.conns.Range(func(addr string, p pool.Conn) bool {
if p == nil || !p.IsHealthy(ctx) {
pc, err := p.Connect(ctx)
if pc == nil || err != nil || !pc.IsHealthy(ctx) {
if pc != nil {
pc.Disconnect(ctx)
}
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String())
return true

Check warning on line 1131 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1124-L1131

Added lines #L1124 - L1131 were not covered by tests
}
p = pc

Check warning on line 1133 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1133

Added line #L1133 was not covered by tests
}
cnt++
return fn(addr, p)
})
Expand Down
3 changes: 2 additions & 1 deletion internal/net/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3107,7 +3107,7 @@ package grpc
// stopMonitor: test.fields.stopMonitor,
// }
//
// gotAddrs := g.ConnectedAddrs()
// gotAddrs := g.ConnectedAddrs(context.Background)
// if err := checkFunc(test.want, gotAddrs); err != nil {
// tt.Errorf("error = %v", err)
// }
Expand Down Expand Up @@ -3303,6 +3303,7 @@ package grpc
//
// func Test_gRPCClient_rangeConns(t *testing.T) {
// type args struct {
// ctx context.Context
// fn func(addr string, p pool.Conn) bool
// }
// type fields struct {
Expand Down
13 changes: 6 additions & 7 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@

type Conn interface {
Connect(context.Context) (Conn, error)
Disconnect() error
Disconnect(context.Context) error
Do(ctx context.Context, f func(*ClientConn) error) error
Get(ctx context.Context) (conn *ClientConn, ok bool)
Get(context.Context) (conn *ClientConn, ok bool)
IsHealthy(context.Context) bool
IsIPConn() bool
Len() uint64
Expand Down Expand Up @@ -437,8 +437,7 @@
return p, nil
}

func (p *pool) Disconnect() (err error) {
ctx := context.Background()
func (p *pool) Disconnect(ctx context.Context) (err error) {

Check warning on line 440 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L440

Added line #L440 was not covered by tests
p.closing.Store(true)
defer p.closing.Store(false)
emap := make(map[string]error, p.len())
Expand Down Expand Up @@ -618,7 +617,7 @@
if retry <= 0 || retry > math.MaxUint64-pl || pl <= 0 {
if p.isIP {
log.Warnf("failed to find gRPC IP connection pool for %s.\tlen(pool): %d,\tretried: %d,\tseems IP %s is unhealthy will going to disconnect...", p.addr, pl, cnt, p.addr)
if err := p.Disconnect(); err != nil {
if err := p.Disconnect(ctx); err != nil {

Check warning on line 620 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L620

Added line #L620 was not covered by tests
log.Debugf("failed to disconnect gRPC IP direct connection for %s,\terr: %v", p.addr, err)
}
return 0, nil, false
Expand Down Expand Up @@ -757,8 +756,8 @@

func (pc *poolConn) Close(ctx context.Context, delay time.Duration) error {
tdelay := delay / 10
if tdelay < time.Millisecond*200 {
tdelay = time.Millisecond * 200
if tdelay < time.Millisecond*5 {
tdelay = time.Millisecond * 5

Check warning on line 760 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L759-L760

Added lines #L759 - L760 were not covered by tests
} else if tdelay > time.Minute {
tdelay = time.Second * 5
}
Expand Down
2 changes: 1 addition & 1 deletion internal/net/grpc/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2344,7 +2344,7 @@ package pool
// reconnectHash: test.fields.reconnectHash,
// }
//
// err := p.Disconnect()
// err := p.Disconnect(context.Background)
kpango marked this conversation as resolved.
Show resolved Hide resolved
// if err := checkFunc(test.want, err); err != nil {
// tt.Errorf("error = %v", err)
// }
Expand Down
2 changes: 1 addition & 1 deletion internal/test/mock/grpc/grpc_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
}

// ConnectedAddrs calls the ConnectedAddrsFunc object.
func (gc *GRPCClientMock) ConnectedAddrs() []string {
func (gc *GRPCClientMock) ConnectedAddrs(_ context.Context) []string {

Check warning on line 54 in internal/test/mock/grpc/grpc_client_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/grpc/grpc_client_mock.go#L54

Added line #L54 was not covered by tests
return gc.ConnectedAddrsFunc()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/test/mock/grpc_testify_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
return v
}

func (c *ClientInternal) ConnectedAddrs() []string {
func (c *ClientInternal) ConnectedAddrs(ctx context.Context) []string {

Check warning on line 202 in internal/test/mock/grpc_testify_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/grpc_testify_mock.go#L202

Added line #L202 was not covered by tests
args := c.Called()
v, ok := args.Get(0).([]string)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions pkg/gateway/lb/handler/grpc/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
target + " canceled: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)

Check warning on line 104 in pkg/gateway/lb/handler/grpc/aggregation.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lb/handler/grpc/aggregation.go#L104

Added line #L104 was not covered by tests
return nil
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)):
Expand All @@ -112,6 +113,7 @@
target + " deadline_exceeded: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)

Check warning on line 116 in pkg/gateway/lb/handler/grpc/aggregation.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lb/handler/grpc/aggregation.go#L116

Added line #L116 was not covered by tests
return nil
default:
st, msg, err := status.ParseError(err, codes.Unknown, "failed to parse search gRPC error response",
Expand Down Expand Up @@ -168,6 +170,7 @@
target + " canceled: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)

Check warning on line 173 in pkg/gateway/lb/handler/grpc/aggregation.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lb/handler/grpc/aggregation.go#L173

Added line #L173 was not covered by tests
return nil
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)):
Expand All @@ -179,6 +182,7 @@
target + " deadline_exceeded: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)

Check warning on line 185 in pkg/gateway/lb/handler/grpc/aggregation.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lb/handler/grpc/aggregation.go#L185

Added line #L185 was not covered by tests
return nil
default:
st, msg, err := status.ParseError(err, codes.Unknown, "failed to parse search gRPC error response",
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/mirror/service/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
}
}
}
log.Debugf("[mirror]: connected mirror gateway targets: %v", m.gateway.GRPCClient().ConnectedAddrs())
log.Debugf("[mirror]: connected mirror gateway targets: %v", m.gateway.GRPCClient().ConnectedAddrs(ctx))

Check warning on line 163 in pkg/gateway/mirror/service/mirror.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/mirror/service/mirror.go#L163

Added line #L163 was not covered by tests
kpango marked this conversation as resolved.
Show resolved Hide resolved
}
}
})
Expand Down
Loading