diff --git a/internal/net/grpc/client.go b/internal/net/grpc/client.go index 78fc51448b..9c5f215ac8 100644 --- a/internal/net/grpc/client.go +++ b/internal/net/grpc/client.go @@ -249,7 +249,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error, return ctx.Err() case <-prTick.C: if g.enablePoolRebalance { - err = g.rangeConns(func(addr string, p pool.Conn) bool { + err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool { // if addr or pool is nil or empty the registration of conns is invalid let's disconnect them if addr == "" || p == nil { disconnectTargets = append(disconnectTargets, addr) @@ -286,7 +286,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error, }) } case <-hcTick.C: - err = g.rangeConns(func(addr string, p pool.Conn) bool { + err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool { // if addr or pool is nil or empty the registration of conns is invalid let's disconnect them if addr == "" || p == nil { disconnectTargets = append(disconnectTargets, addr) @@ -415,7 +415,7 @@ func (g *gRPCClient) Range( if g.conns.Len() == 0 { return errors.ErrGRPCClientConnNotFound("*") } - err = g.rangeConns(func(addr string, p pool.Conn) bool { + err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool { ssctx, sspan := trace.StartSpan(sctx, apiName+"/Client.Range/"+addr) defer func() { if sspan != nil { @@ -478,7 +478,7 @@ func (g *gRPCClient) RangeConcurrent( if g.conns.Len() == 0 { return errors.ErrGRPCClientConnNotFound("*") } - err = g.rangeConns(func(addr string, p pool.Conn) bool { + err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool { eg.Go(safety.RecoverFunc(func() (err error) { ssctx, sspan := trace.StartSpan(egctx, apiName+"/Client.RangeConcurrent/"+addr) defer func() { @@ -565,7 +565,7 @@ func (g *gRPCClient) OrderedRange( return nil default: p, ok := g.conns.Load(addr) - if !ok || p == nil { + if !ok || p == nil || !p.IsHealthy(sctx) { g.crl.Store(addr, true) log.Warnf("gRPCClient.OrderedRange operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr)) continue @@ -634,7 +634,7 @@ func (g *gRPCClient) OrderedRangeConcurrent( addr := order eg.Go(safety.RecoverFunc(func() (err error) { p, ok := g.conns.Load(addr) - if !ok || p == nil { + if !ok || p == nil || !p.IsHealthy(sctx) { g.crl.Store(addr, true) log.Warnf("gRPCClient.OrderedRangeConcurrent operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr)) return nil @@ -701,7 +701,7 @@ func (g *gRPCClient) RoundRobin( } do := func() (data any, err error) { - cerr := g.rangeConns(func(addr string, p pool.Conn) bool { + cerr := g.rangeConns(ctx, func(addr string, p pool.Conn) bool { select { case <-ctx.Done(): err = ctx.Err() @@ -1087,8 +1087,9 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error { func (g *gRPCClient) ConnectedAddrs() (addrs []string) { addrs = make([]string, 0, g.conns.Len()) - err := g.rangeConns(func(addr string, p pool.Conn) bool { - if p != nil && p.IsHealthy(context.Background()) { + ctx := context.Background() + err := g.rangeConns(ctx, func(addr string, p pool.Conn) bool { + if p != nil && p.IsHealthy(ctx) { addrs = append(addrs, addr) } return true @@ -1113,9 +1114,20 @@ func (g *gRPCClient) Close(ctx context.Context) (err error) { return err } -func (g *gRPCClient) rangeConns(fn func(addr string, p pool.Conn) bool) error { +func (g *gRPCClient) rangeConns(ctx context.Context, fn func(addr string, p pool.Conn) bool) error { var cnt int g.conns.Range(func(addr string, p pool.Conn) bool { + if p == nil || !p.IsHealthy(ctx) { + pc, err := p.Connect(ctx) + if pc == nil || err != nil || !pc.IsHealthy(ctx) { + if pc != nil { + pc.Disconnect() + } + log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String()) + return true + } + p = pc + } cnt++ return fn(addr, p) })