From b049688e32b5b4d80dadaf003f26408b51126043 Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Fri, 19 Jul 2024 23:06:40 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E4=B8=8A=E6=8A=A5=E4=B8=A2=E5=A4=B1=E6=96=B9?= =?UTF-8?q?=E6=B3=95=20(#69)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- balancer.go | 9 ++- examples/quickstart/consumer/main.go | 34 +++++++--- examples/quickstart/consumer/polaris.yaml | 16 ++++- global.go | 17 ++++- go.mod | 4 +- go.sum | 4 +- logger.go | 75 +++++++++++++++++++---- resolver.go | 26 +++++--- 8 files changed, 149 insertions(+), 36 deletions(-) diff --git a/balancer.go b/balancer.go index 2b1ae20..25538c8 100644 --- a/balancer.go +++ b/balancer.go @@ -171,7 +171,7 @@ func (p *polarisNamingBalancer) createSubConnection(key string, addr resolver.Ad } // is a new address (not existing in b.subConns). sc, err := p.cc.NewSubConn( - []resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) + []resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: false}) if err != nil { GetLogger().Error("[Polaris][Balancer] failed to create new SubConn: %v", err) return @@ -245,6 +245,8 @@ func (p *polarisNamingBalancer) ResolverError(err error) { // report an error. return } + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() p.regeneratePicker(nil) p.cc.UpdateState(balancer.State{ ConnectivityState: p.state, @@ -308,8 +310,6 @@ func (p *polarisNamingBalancer) regeneratePicker(options *dialOptions) { return } readySCs := make(map[string]balancer.SubConn) - p.rwMutex.RLock() - defer p.rwMutex.RUnlock() // Filter out all ready SCs from full subConn map. for addr, sc := range p.subConns { if st, ok := p.scStates[sc]; ok && st == connectivity.Ready { @@ -436,6 +436,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul subSc, ok := pnp.readySCs[addr] if ok { reporter := &resultReporter{ + method: info.FullMethodName, instance: targetInstance, consumerAPI: pnp.balancer.consumerAPI, startTime: time.Now(), @@ -543,6 +544,7 @@ func collectRouteLabels(routings []*traffic_manage.Route) []string { } type resultReporter struct { + method string instance model.Instance consumerAPI polaris.ConsumerAPI startTime time.Time @@ -559,6 +561,7 @@ func (r *resultReporter) report(info balancer.DoneInfo) { callResult.CalledInstance = r.instance callResult.RetStatus = retStatus callResult.SourceService = r.sourceService + callResult.SetMethod(r.method) callResult.SetDelay(time.Since(r.startTime)) callResult.SetRetCode(int32(code)) if err := r.consumerAPI.UpdateServiceCallResult(callResult); err != nil { diff --git a/examples/quickstart/consumer/main.go b/examples/quickstart/consumer/main.go index 3f430c6..78d79dc 100644 --- a/examples/quickstart/consumer/main.go +++ b/examples/quickstart/consumer/main.go @@ -22,24 +22,27 @@ import ( "fmt" "log" "net/http" + "os" + "os/signal" + "syscall" + "github.com/polarismesh/polaris-go/api" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" polaris "github.com/polarismesh/grpc-go-polaris" "github.com/polarismesh/grpc-go-polaris/examples/common/pb" - "github.com/polarismesh/polaris-go/api" ) const ( - listenPort = 16011 + listenPort = 18080 ) func main() { // grpc客户端连接获取 ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + polaris.GetLogger().SetLevel(polaris.LogDebug) conn, err := polaris.DialContext(ctx, "polaris://QuickStartEchoServerGRPC", polaris.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), @@ -49,7 +52,6 @@ func main() { if err != nil { log.Fatal(err) } - defer conn.Close() echoClient := pb.NewEchoServerClient(conn) indexHandler := func(w http.ResponseWriter, r *http.Request) { @@ -84,8 +86,26 @@ func main() { _, _ = w.Write([]byte(resp.GetValue())) } http.HandleFunc("/echo", indexHandler) - if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err { - log.Fatal(err) - } + go func() { + if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err { + log.Fatal(err) + } + }() + runMainLoop(conn, cancel) +} +func runMainLoop(conn *grpc.ClientConn, cancel context.CancelFunc) { + ch := make(chan os.Signal, 1) + signal.Notify(ch, []os.Signal{ + syscall.SIGINT, syscall.SIGTERM, + syscall.SIGSEGV, + }...) + + for s := range ch { + log.Printf("catch signal(%+v), stop servers", s) + cancel() + conn.Close() + polaris.ClosePolarisContext() + return + } } diff --git a/examples/quickstart/consumer/polaris.yaml b/examples/quickstart/consumer/polaris.yaml index 3f0b683..0ed88e3 100644 --- a/examples/quickstart/consumer/polaris.yaml +++ b/examples/quickstart/consumer/polaris.yaml @@ -1,4 +1,18 @@ global: serverConnector: addresses: - - 127.0.0.1:8091 \ No newline at end of file + - 127.0.0.1:8091 + statReporter: + #描述:是否将统计信息上报至monitor + #类型:bool + enable: true + #描述:启用的统计上报插件类型 + #类型:list + #范围:已经注册的统计上报插件的名字 + chain: + - prometheus + plugin: + prometheus: + type: push + address: 127.0.0.1:9091 + interval: 10s diff --git a/global.go b/global.go index 6dee589..66c2229 100644 --- a/global.go +++ b/global.go @@ -42,7 +42,7 @@ var ( // DefaultNamespace default namespace when namespace is not set DefaultNamespace = "default" // DefaultTTL default ttl value when ttl is not set - DefaultTTL = 20 + DefaultTTL = 5 // DefaultGracefulStopMaxWaitDuration default stop max wait duration when not set DefaultGracefulStopMaxWaitDuration = 30 * time.Second // DefaultDelayStopWaitDuration default delay time before stop @@ -57,16 +57,31 @@ const ( ) var ( + ctxRef = 0 polarisContext api.SDKContext polarisConfig config.Configuration mutexPolarisContext sync.Mutex oncePolarisConfig sync.Once ) +func ClosePolarisContext() { + mutexPolarisContext.Lock() + defer mutexPolarisContext.Unlock() + if nil == polarisContext { + return + } + ctxRef-- + if ctxRef == 0 { + polarisContext.Destroy() + polarisContext = nil + } +} + // PolarisContext get or init the global polaris context func PolarisContext() (api.SDKContext, error) { mutexPolarisContext.Lock() defer mutexPolarisContext.Unlock() + ctxRef++ if nil != polarisContext { return polarisContext, nil } diff --git a/go.mod b/go.mod index 429125a..0b5f4f6 100644 --- a/go.mod +++ b/go.mod @@ -9,13 +9,13 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/errwrap v1.1.0 // indirect github.com/natefinch/lumberjack v2.0.0+incompatible - github.com/polarismesh/polaris-go v1.6.0-beta.5 + github.com/polarismesh/polaris-go v1.6.0-alpha.8 github.com/polarismesh/specification v1.5.1 github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/common v0.54.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect + go.uber.org/zap v1.27.0 golang.org/x/net v0.26.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect google.golang.org/grpc v1.64.0 diff --git a/go.sum b/go.sum index 9b3bd80..1b306cb 100644 --- a/go.sum +++ b/go.sum @@ -1695,8 +1695,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/polarismesh/polaris-go v1.6.0-beta.5 h1:llucvfydWlFWTNeABHbbuVL2ijR7AITx8UG02tx0c/Y= -github.com/polarismesh/polaris-go v1.6.0-beta.5/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c= +github.com/polarismesh/polaris-go v1.6.0-alpha.8 h1:KzANbn7gumZLfbJEA1KavDiFBqlDKxeMVS3eTxZXFR0= +github.com/polarismesh/polaris-go v1.6.0-alpha.8/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c= github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= github.com/polarismesh/specification v1.5.1 h1:cJ2m0RBepdopGo/e3UpKdsab3NpDZnw5IsVTB1sFc5I= github.com/polarismesh/specification v1.5.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= diff --git a/logger.go b/logger.go index d53db74..7e86411 100644 --- a/logger.go +++ b/logger.go @@ -19,10 +19,13 @@ package grpcpolaris import ( "fmt" - "log" + "runtime" "sync/atomic" + "time" "github.com/natefinch/lumberjack" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) type LogLevel int @@ -35,6 +38,21 @@ const ( LogError ) +func (l LogLevel) String() string { + switch l { + case LogDebug: + return "[debug]" + case LogInfo: + return "[info]" + case LogWarn: + return "[warn]" + case LogError: + return "[error]" + default: + return "" + } +} + var _log Logger = newDefaultLogger() func SetLogger(logger Logger) { @@ -54,25 +72,35 @@ type Logger interface { } type defaultLogger struct { - writer *log.Logger + writer zapcore.Core levelRef atomic.Value } func newDefaultLogger() *defaultLogger { - lumberJackLogger := &lumberjack.Logger{ + encoderCfg := zapcore.EncoderConfig{ + MessageKey: "msg", + LevelKey: "level", + TimeKey: "time", + NameKey: "name", + CallerKey: "caller", + StacktraceKey: "stacktrace", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: zapcore.LowercaseLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + } + w := zapcore.AddSync(&lumberjack.Logger{ Filename: "./logs/grpc-go-polaris.log", // 文件位置 MaxSize: 100, // 进行切割之前,日志文件的最大大小(MB为单位) MaxAge: 7, // 保留旧文件的最大天数 MaxBackups: 100, // 保留旧文件的最大个数 Compress: true, // 是否压缩/归档旧文件 - } + }) - levelRef := atomic.Value{} - - levelRef.Store(LogInfo) + core := zapcore.NewCore(zapcore.NewConsoleEncoder(encoderCfg), w, zap.InfoLevel) return &defaultLogger{ - writer: log.New(lumberJackLogger, "", log.Lshortfile|log.Ldate|log.Ltime), - levelRef: levelRef, + writer: core, } } @@ -98,9 +126,32 @@ func (l *defaultLogger) Error(format string, args ...interface{}) { } func (l *defaultLogger) printf(expectLevel LogLevel, format string, args ...interface{}) { - curLevel := l.levelRef.Load().(LogLevel) - if curLevel > expectLevel { + zapL := func() zapcore.Level { + switch expectLevel { + case LogDebug: + return zapcore.DebugLevel + case LogInfo: + return zapcore.InfoLevel + case LogWarn: + return zapcore.WarnLevel + case LogError: + return zapcore.ErrorLevel + default: + return zapcore.InfoLevel + } + }() + + if !l.writer.Enabled(zapL) { return } - _ = l.writer.Output(3, fmt.Sprintf(format, args...)) + + msg := fmt.Sprintf(format, args...) + e := zapcore.Entry{ + Message: msg, + Level: zapL, + Time: time.Now(), + } + + e.Caller = zapcore.NewEntryCaller(runtime.Caller(2)) + _ = l.writer.Write(e, nil) } diff --git a/resolver.go b/resolver.go index c6fa8cc..7b5ed0d 100644 --- a/resolver.go +++ b/resolver.go @@ -55,7 +55,14 @@ func RegisterResolverInterceptor(i ResolverInterceptor) { resolverInterceptors = append(resolverInterceptors, i) } +func NewResolver(ctx api.SDKContext) *resolverBuilder { + return &resolverBuilder{ + sdkCtx: ctx, + } +} + type resolverBuilder struct { + sdkCtx api.SDKContext } // Scheme polaris scheme @@ -104,12 +111,14 @@ func (rb *resolverBuilder) Build( return nil, err } - sdkCtx, err := PolarisContext() - if nil != err { - return nil, err + if rb.sdkCtx == nil { + sdkCtx, err := PolarisContext() + if nil != err { + return nil, err + } + rb.sdkCtx = sdkCtx } - - options.SDKContext = sdkCtx + options.SDKContext = rb.sdkCtx ctx, cancel := context.WithCancel(context.Background()) d := &polarisNamingResolver{ @@ -120,7 +129,7 @@ func (rb *resolverBuilder) Build( options: options, host: host, port: port, - consumer: api.NewConsumerAPIByContext(sdkCtx), + consumer: api.NewConsumerAPIByContext(rb.sdkCtx), eventCh: make(chan struct{}, 1), } go d.watcher() @@ -247,13 +256,14 @@ func (pr *polarisNamingResolver) watcher() { for { select { case <-pr.ctx.Done(): - GetLogger().Info("[Polaris][Resolver] exist watch instance change event for namespace=%s service=%s: %v", + GetLogger().Info("[Polaris][Resolver] exit watch instance change event for namespace=%s service=%s: %v", pr.options.Namespace, pr.host) return case <-pr.eventCh: + pr.doRefresh() case <-ticker.C: + pr.doRefresh() } - pr.doRefresh() } }