diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index 5b9a50fc72..6bd1259e9b 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -88,6 +88,7 @@ import ( // Configuration "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/configstores/apollo" + store_inmemory "mosn.io/layotto/components/configstores/in-memory" // Pub/Sub dapr_comp_pubsub "github.com/dapr/components-contrib/pubsub" @@ -297,6 +298,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp configstores.NewStoreFactory("apollo", apollo.NewStore), configstores.NewStoreFactory("etcd", etcdv3.NewStore), configstores.NewStoreFactory("nacos", nacos.NewStore), + configstores.NewStoreFactory("in-memory", store_inmemory.NewStore), ), // RPC diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go index e63fd987f6..3effb91f6e 100644 --- a/cmd/layotto_multiple_api/main.go +++ b/cmd/layotto_multiple_api/main.go @@ -94,6 +94,7 @@ import ( // Configuration "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/configstores/apollo" + store_inmemory "mosn.io/layotto/components/configstores/in-memory" "mosn.io/layotto/components/configstores/nacos" // Pub/Sub @@ -303,6 +304,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp configstores.NewStoreFactory("apollo", apollo.NewStore), configstores.NewStoreFactory("etcd", etcdv3.NewStore), configstores.NewStoreFactory("nacos", nacos.NewStore), + configstores.NewStoreFactory("in-memory", store_inmemory.NewStore), ), // RPC diff --git a/cmd/layotto_without_xds/main.go b/cmd/layotto_without_xds/main.go index 2c209d8d35..de791a541f 100644 --- a/cmd/layotto_without_xds/main.go +++ b/cmd/layotto_without_xds/main.go @@ -85,6 +85,7 @@ import ( // Configuration "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/configstores/apollo" + store_inmemory "mosn.io/layotto/components/configstores/in-memory" // Pub/Sub dapr_comp_pubsub "github.com/dapr/components-contrib/pubsub" @@ -276,6 +277,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp configstores.NewStoreFactory("apollo", apollo.NewStore), configstores.NewStoreFactory("etcd", etcdv3.NewStore), configstores.NewStoreFactory("nacos", nacos.NewStore), + configstores.NewStoreFactory("in-memory", store_inmemory.NewStore), ), // RPC diff --git a/components/configstores/in-memory/configstore.go b/components/configstores/in-memory/configstore.go new file mode 100644 index 0000000000..7e3fbed3c9 --- /dev/null +++ b/components/configstores/in-memory/configstore.go @@ -0,0 +1,181 @@ +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package in_memory + +import ( + "context" + "fmt" + "sync" + + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/pkg/actuators" + "mosn.io/layotto/components/trace" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +const ( + componentName = "configstore-memory" + defaultGroup = "default" + defaultLabel = "default" +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + +type InMemoryConfigStore struct { + data *sync.Map + listener *sync.Map + storeName string + appId string +} + +func NewStore() configstores.Store { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) + return &InMemoryConfigStore{ + data: &sync.Map{}, + listener: &sync.Map{}, + } +} + +func (m *InMemoryConfigStore) Init(config *configstores.StoreConfig) error { + m.appId = config.AppId + m.storeName = config.StoreName + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() + return nil +} + +// Get gets configuration from configuration store. +func (m *InMemoryConfigStore) Get(ctx context.Context, req *configstores.GetRequest) ([]*configstores.ConfigurationItem, error) { + + res := make([]*configstores.ConfigurationItem, 0, len(req.Keys)) + + for _, key := range req.Keys { + value, ok := m.data.Load(key) + if ok { + config := &configstores.ConfigurationItem{ + Content: value.(string), + Key: key, + Group: req.Group, + } + res = append(res, config) + } + } + trace.SetExtraComponentInfo(ctx, fmt.Sprintf("method: %+v, store: %+v", "Get", "memory")) + return res, nil +} + +// Set saves configuration into configuration store. +func (m *InMemoryConfigStore) Set(ctx context.Context, req *configstores.SetRequest) error { + if len(req.Items) == 0 { + return fmt.Errorf("params illegal:item is empty") + } + for _, item := range req.Items { + m.data.Store(item.Key, item.Content) + m.notifyChanged(item) + } + return nil +} + +// Delete deletes configuration from configuration store. +func (m *InMemoryConfigStore) Delete(ctx context.Context, req *configstores.DeleteRequest) error { + for _, key := range req.Keys { + m.data.Delete(key) + } + return nil +} + +// Subscribe gets configuration from configuration store and subscribe the updates. +func (m *InMemoryConfigStore) Subscribe(request *configstores.SubscribeReq, ch chan *configstores.SubscribeResp) error { + if request.Group == "" && len(request.Keys) > 0 { + request.Group = defaultGroup + } + + ctx := context.Background() + req := &configstores.GetRequest{ + AppId: request.AppId, + Group: request.Group, + Label: request.Label, + Keys: request.Keys, + Metadata: request.Metadata, + } + + for _, key := range req.Keys { + m.listener.Store(key, m.subscribeOnChange(ch)) + } + + items, err := m.Get(ctx, req) + if err != nil { + return err + } + + for _, item := range items { + m.notifyChanged(item) + } + + return nil +} + +func (m *InMemoryConfigStore) notifyChanged(item *configstores.ConfigurationItem) { + f, ok := m.listener.Load(item.Key) + if ok { + f.(OnChangeFunc)(item.Group, item.Key, item.Content) + } +} + +type OnChangeFunc func(group, dataId, data string) + +func (m *InMemoryConfigStore) subscribeOnChange(ch chan *configstores.SubscribeResp) OnChangeFunc { + return func(group, dataId, data string) { + resp := &configstores.SubscribeResp{ + StoreName: m.storeName, + AppId: m.appId, + Items: []*configstores.ConfigurationItem{ + { + Key: dataId, + Content: data, + Group: group, + }, + }, + } + + ch <- resp + } +} + +func (m *InMemoryConfigStore) StopSubscribe() { + // stop listening all subscribed configs + m.listener.Range(func(key, value any) bool { + m.listener.Delete(key) + return true + }) +} + +func (m *InMemoryConfigStore) GetDefaultGroup() string { + return defaultGroup +} + +func (m *InMemoryConfigStore) GetDefaultLabel() string { + return defaultLabel +} diff --git a/components/configstores/in-memory/configstore_test.go b/components/configstores/in-memory/configstore_test.go new file mode 100644 index 0000000000..0577fde248 --- /dev/null +++ b/components/configstores/in-memory/configstore_test.go @@ -0,0 +1,281 @@ +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package in_memory + +import ( + "context" + "fmt" + "sync" + "testing" + + "mosn.io/layotto/components/configstores" + + "github.com/stretchr/testify/assert" +) + +func TestImMemoryConfigStore_Set(t *testing.T) { + t.Run("set success", func(t *testing.T) { + params := &configstores.SetRequest{ + AppId: "test-set-app", + Items: []*configstores.ConfigurationItem{ + { + Group: "test-set-group", + Content: "content", + Key: "test-set-key", + }, + }, + } + + store := NewStore() + err := store.Set(context.Background(), params) + assert.Nil(t, err) + }) + + t.Run("set with empty items", func(t *testing.T) { + store := NewStore() + params := &configstores.SetRequest{ + AppId: "test-set-app", + Items: []*configstores.ConfigurationItem{}, + } + err := store.Set(context.Background(), params) + assert.EqualError(t, fmt.Errorf("params illegal:item is empty"), err.Error()) + }) + + t.Run("test notify listener success", func(t *testing.T) { + store := &InMemoryConfigStore{ + data: &sync.Map{}, + listener: &sync.Map{}, + } + ch := make(chan *configstores.SubscribeResp, 2) + + config := &configstores.StoreConfig{ + AppId: "test-app", + StoreName: "test-store", + } + + err := store.Init(config) + assert.Nil(t, err) + + subReqParams := &configstores.SubscribeReq{ + AppId: "test-app", + Group: "group", + Keys: []string{"data_id"}, + } + + setReqParams := &configstores.SetRequest{ + AppId: "test-app", + StoreName: "test-store", + Items: []*configstores.ConfigurationItem{ + { + Group: "group", + Content: "content", + Key: "data_id", + }, + }, + } + + err = store.Subscribe(subReqParams, ch) + assert.Nil(t, err) + + expected := &configstores.SubscribeResp{ + StoreName: store.storeName, + AppId: store.appId, + Items: []*configstores.ConfigurationItem{ + { + Key: "data_id", + Content: "content", + Group: "group", + }, + }, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + for v := range ch { + i++ + assert.EqualValues(t, expected, v) + } + assert.EqualValues(t, i, 3) + }() + err = store.Set(context.Background(), setReqParams) + assert.Nil(t, err) + + err = store.Set(context.Background(), setReqParams) + assert.Nil(t, err) + + err = store.Set(context.Background(), setReqParams) + assert.Nil(t, err) + close(ch) + wg.Wait() + }) + +} + +func TestInMemoryConfigStore_Get(t *testing.T) { + + t.Run("get success", func(t *testing.T) { + store := NewStore() + + params := &configstores.SetRequest{ + AppId: "test-set-app", + Items: []*configstores.ConfigurationItem{ + { + Group: "test-group", + Content: "content", + Key: "test-key", + }, + }, + } + err := store.Set(context.Background(), params) + assert.Nil(t, err) + + getRequestParams := &configstores.GetRequest{ + Group: "test-group", + Keys: []string{"test-key"}, + } + + v, err := store.Get(context.Background(), getRequestParams) + assert.Nil(t, err) + expect := []*configstores.ConfigurationItem{ + { + Key: getRequestParams.Keys[0], + Group: getRequestParams.Group, + Content: "content", + }, + } + assert.EqualValues(t, expect, v) + }) + +} + +func TestInMemoryConfigStore_Subscribe(t *testing.T) { + t.Run("test subscribe success", func(t *testing.T) { + store := &InMemoryConfigStore{ + data: &sync.Map{}, + listener: &sync.Map{}, + } + ch := make(chan *configstores.SubscribeResp, 2) + + config := &configstores.StoreConfig{ + AppId: "test-app", + StoreName: "test-store", + } + + err := store.Init(config) + assert.Nil(t, err) + + subReqParams := &configstores.SubscribeReq{ + AppId: "test-app", + Group: "group", + Keys: []string{"data_id"}, + } + + err = store.Subscribe(subReqParams, ch) + assert.Nil(t, err) + f, ok := store.listener.Load("data_id") + assert.True(t, ok) + assert.NotNil(t, f) + + expected := &configstores.SubscribeResp{ + StoreName: store.storeName, + AppId: store.appId, + Items: []*configstores.ConfigurationItem{ + { + Key: "data_id", + Content: "content", + Group: "group", + }, + }, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + for v := range ch { + i++ + assert.EqualValues(t, expected, v) + } + assert.EqualValues(t, i, 3) + }() + f.(OnChangeFunc)("group", "data_id", "content") + f.(OnChangeFunc)("group", "data_id", "content") + f.(OnChangeFunc)("group", "data_id", "content") + close(ch) + wg.Wait() + }) +} + +func TestInMemoryConfigStore_StopSubscribe(t *testing.T) { + t.Run("test stop subscribe success", func(t *testing.T) { + store := &InMemoryConfigStore{ + data: &sync.Map{}, + listener: &sync.Map{}, + } + ch := make(chan *configstores.SubscribeResp, 2) + + config := &configstores.StoreConfig{ + AppId: "test-app", + StoreName: "test-store", + } + + err := store.Init(config) + assert.Nil(t, err) + + subReqParams := &configstores.SubscribeReq{ + AppId: "test-app", + Group: "group", + Keys: []string{"data_id"}, + } + + err = store.Subscribe(subReqParams, ch) + assert.Nil(t, err) + + store.StopSubscribe() + f, ok := store.listener.Load("data_id") + assert.False(t, ok) + assert.Nil(t, f) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + for range ch { + i++ + } + assert.EqualValues(t, 0, i) + }() + + setReqParams := &configstores.SetRequest{ + AppId: "test-app", + StoreName: "test-store", + Items: []*configstores.ConfigurationItem{ + { + Group: "group", + Content: "content", + Key: "data_id", + }, + }, + } + err = store.Set(context.Background(), setReqParams) + assert.Nil(t, err) + close(ch) + wg.Wait() + }) +} diff --git a/components/rpc/callback/callback.go b/components/rpc/callback/callback.go index 4f4a7f492a..e0fe39d1fe 100644 --- a/components/rpc/callback/callback.go +++ b/components/rpc/callback/callback.go @@ -19,7 +19,7 @@ package callback import ( "encoding/json" - "mosn.io/layotto/kit/logger" + "mosn.io/pkg/log" "mosn.io/layotto/components/rpc" ) @@ -63,32 +63,23 @@ var ( // NewCallback is created Callback func NewCallback() rpc.Callback { - cb := &callback{ - logger: logger.NewLayottoLogger("rpc-callback"), - } - logger.NewLayottoLogger("rpc-callback") - return cb -} - -func (c *callback) OnLogLevelChanged(level logger.LogLevel) { - c.logger.SetLogLevel(level) + return &callback{} } type callback struct { beforeInvoke []func(*rpc.RPCRequest) (*rpc.RPCRequest, error) afterInvoke []func(*rpc.RPCResponse) (*rpc.RPCResponse, error) - logger logger.Logger } // AddBeforeInvoke is add beforeInvoke into callback.beforeInvoke func (c *callback) AddBeforeInvoke(conf rpc.CallbackFunc) { f, ok := beforeInvokeRegistry[conf.Name] if !ok { - c.logger.Errorf("[runtime][rpc]can't find before filter %s", conf.Name) + log.DefaultLogger.Errorf("[runtime][rpc]can't find before filter %s", conf.Name) return } if err := f.Init(conf.Config); err != nil { - c.logger.Errorf("[runtime][rpc]init before filter err %s", err.Error()) + log.DefaultLogger.Errorf("[runtime][rpc]init before filter err %s", err.Error()) return } c.beforeInvoke = append(c.beforeInvoke, f.Create()) @@ -98,11 +89,11 @@ func (c *callback) AddBeforeInvoke(conf rpc.CallbackFunc) { func (c *callback) AddAfterInvoke(conf rpc.CallbackFunc) { f, ok := afterInvokeRegistry[conf.Name] if !ok { - c.logger.Errorf("[runtime][rpc]can't find after filter %s", conf.Name) + log.DefaultLogger.Errorf("[runtime][rpc]can't find after filter %s", conf.Name) return } if err := f.Init(conf.Config); err != nil { - c.logger.Errorf("[runtime][rpc]init after filter err %s", err.Error()) + log.DefaultLogger.Errorf("[runtime][rpc]init after filter err %s", err.Error()) return } c.afterInvoke = append(c.afterInvoke, f.Create()) diff --git a/components/rpc/invoker/mosn/channel/connpool.go b/components/rpc/invoker/mosn/channel/connpool.go index 7ff26cffae..fe99390719 100644 --- a/components/rpc/invoker/mosn/channel/connpool.go +++ b/components/rpc/invoker/mosn/channel/connpool.go @@ -26,10 +26,9 @@ import ( "sync/atomic" "mosn.io/pkg/buffer" + "mosn.io/pkg/log" "mosn.io/pkg/utils" - "mosn.io/layotto/kit/logger" - common "mosn.io/layotto/components/pkg/common" ) @@ -76,8 +75,7 @@ func newConnPool( // handle data onDataFunc func(*wrapConn) error, // clean connected - cleanupFunc func(*wrapConn, error), - logger logger.Logger) *connPool { + cleanupFunc func(*wrapConn, error)) *connPool { p := &connPool{ maxActive: maxActive, @@ -87,7 +85,6 @@ func newConnPool( cleanupFunc: cleanupFunc, sema: make(chan struct{}, maxActive), free: list.New(), - logger: logger, } return p } @@ -100,10 +97,9 @@ type connPool struct { onDataFunc func(*wrapConn) error cleanupFunc func(*wrapConn, error) - sema chan struct{} - mu sync.Mutex - free *list.List - logger logger.Logger + sema chan struct{} + mu sync.Mutex + free *list.List } // Get is get wrapConn by context.Context @@ -181,9 +177,9 @@ func (p *connPool) readloop(c *wrapConn) { if readErr != nil { err = readErr if readErr == io.EOF { - p.logger.Debugf("[runtime][rpc]connpool readloop err: %s", readErr.Error()) + log.DefaultLogger.Debugf("[runtime][rpc]connpool readloop err: %s", readErr.Error()) } else { - p.logger.Errorf("[runtime][rpc]connpool readloop err: %s", readErr.Error()) + log.DefaultLogger.Errorf("[runtime][rpc]connpool readloop err: %s", readErr.Error()) } } @@ -192,7 +188,7 @@ func (p *connPool) readloop(c *wrapConn) { // it will delegate to hstate if it's constructed by httpchannel if onDataErr := p.onDataFunc(c); onDataErr != nil { err = onDataErr - p.logger.Errorf("[runtime][rpc]connpool onData err: %s", onDataErr.Error()) + log.DefaultLogger.Errorf("[runtime][rpc]connpool onData err: %s", onDataErr.Error()) } } diff --git a/components/rpc/invoker/mosn/channel/connpool_test.go b/components/rpc/invoker/mosn/channel/connpool_test.go index c1410dd9f2..1d5714b222 100644 --- a/components/rpc/invoker/mosn/channel/connpool_test.go +++ b/components/rpc/invoker/mosn/channel/connpool_test.go @@ -23,8 +23,6 @@ import ( "testing" "time" - "mosn.io/layotto/kit/logger" - "github.com/stretchr/testify/assert" ) @@ -42,7 +40,6 @@ func TestGetPut(t *testing.T) { return nil }, nil, - logger.NewLayottoLogger("test"), ) ctx, cancel := context.WithTimeout(context.TODO(), time.Second) @@ -102,7 +99,6 @@ func TestDeadconnRenew(t *testing.T) { }, func(conn *wrapConn) error { return nil }, nil, - logger.NewLayottoLogger("test"), ) c1, err := p.Get(context.TODO()) @@ -135,7 +131,6 @@ func TestPoolConcurrent(t *testing.T) { }, func(conn *wrapConn) error { return <-ch }, nil, - logger.NewLayottoLogger("test"), ) actions := []string{ diff --git a/components/rpc/invoker/mosn/channel/httpchannel.go b/components/rpc/invoker/mosn/channel/httpchannel.go index 6ae5431476..28e5b00887 100644 --- a/components/rpc/invoker/mosn/channel/httpchannel.go +++ b/components/rpc/invoker/mosn/channel/httpchannel.go @@ -23,8 +23,6 @@ import ( "net/http" "time" - "mosn.io/layotto/kit/logger" - "mosn.io/pkg/buffer" "github.com/valyala/fasthttp" @@ -64,16 +62,12 @@ func (h *hstate) close() { // httpChannel is Channel implement type httpChannel struct { - pool *connPool - logger logger.Logger + pool *connPool } // newHttpChannel is used to create rpc.Channel according to ChannelConfig func newHttpChannel(config ChannelConfig) (rpc.Channel, error) { - hc := &httpChannel{ - logger: logger.NewLayottoLogger("httpChannel/" + config.Protocol), - } - logger.RegisterComponentLoggerListener("httpChannel/"+config.Protocol, hc) + hc := &httpChannel{} hc.pool = newConnPool( config.Size, // dialFunc @@ -105,15 +99,10 @@ func newHttpChannel(config ChannelConfig) (rpc.Channel, error) { }, hc.onData, hc.cleanup, - hc.logger, ) return hc, nil } -func (h *httpChannel) OnLogLevelChanged(level logger.LogLevel) { - h.logger.SetLogLevel(level) -} - // Do is used to handle RPCRequest and return RPCResponse func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { // 1. context.WithTimeout diff --git a/components/rpc/invoker/mosn/channel/xchannel.go b/components/rpc/invoker/mosn/channel/xchannel.go index 86fedddca7..4d3d782e60 100644 --- a/components/rpc/invoker/mosn/channel/xchannel.go +++ b/components/rpc/invoker/mosn/channel/xchannel.go @@ -27,8 +27,7 @@ import ( "time" "mosn.io/pkg/buffer" - - "mosn.io/layotto/kit/logger" + "mosn.io/pkg/log" "mosn.io/api" @@ -53,11 +52,7 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) { if err := proto.Init(config.Ext); err != nil { return nil, err } - m := &xChannel{ - proto: proto, - logger: logger.NewLayottoLogger("xChannel/" + config.Protocol), - } - logger.RegisterComponentLoggerListener("xChannel/"+config.Protocol, m) + m := &xChannel{proto: proto} m.pool = newConnPool( config.Size, // dialFunc @@ -87,15 +82,10 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) { }, m.onData, m.cleanup, - m.logger, ) return m, nil } -func (m *xChannel) OnLogLevelChanged(level logger.LogLevel) { - m.logger.SetLogLevel(level) -} - // xstate is record state type xstate struct { reqid uint32 @@ -110,9 +100,8 @@ type call struct { // xChannel is Channel implement type xChannel struct { - proto transport_protocol.TransportProtocol - pool *connPool - logger logger.Logger + proto transport_protocol.TransportProtocol + pool *connPool } // InvokeWithTargetAddress send request to specific provider address @@ -160,9 +149,9 @@ func (m *xChannel) InvokeWithTargetAddress(req *rpc.RPCRequest) (*rpc.RPCRespons if readErr != nil { err = readErr if readErr == io.EOF { - m.logger.Debugf("[runtime][rpc]direct conn read-loop err: %s", readErr.Error()) + log.DefaultLogger.Debugf("[runtime][rpc]direct conn read-loop err: %s", readErr.Error()) } else { - m.logger.Errorf("[runtime][rpc]direct conn read-loop err: %s", readErr.Error()) + log.DefaultLogger.Errorf("[runtime][rpc]direct conn read-loop err: %s", readErr.Error()) } } @@ -170,7 +159,7 @@ func (m *xChannel) InvokeWithTargetAddress(req *rpc.RPCRequest) (*rpc.RPCRespons iframe, decodeErr := m.proto.Decode(context.TODO(), wc.buf) if decodeErr != nil { err = decodeErr - m.logger.Errorf("[runtime][rpc]direct conn decode frame err: %s", err) + log.DefaultLogger.Errorf("[runtime][rpc]direct conn decode frame err: %s", err) break } frame, ok := iframe.(api.XRespFrame) @@ -179,7 +168,7 @@ func (m *xChannel) InvokeWithTargetAddress(req *rpc.RPCRequest) (*rpc.RPCRespons } if !ok { err = errors.New("[runtime][rpc]xchannel type not XRespFrame") - m.logger.Errorf("[runtime][rpc]direct conn decode frame err: %s", err) + log.DefaultLogger.Errorf("[runtime][rpc]direct conn decode frame err: %s", err) break } callChan <- call{resp: frame} diff --git a/components/rpc/invoker/mosn/mosninvoker.go b/components/rpc/invoker/mosn/mosninvoker.go index 1d4dae6ba3..211b708c3e 100644 --- a/components/rpc/invoker/mosn/mosninvoker.go +++ b/components/rpc/invoker/mosn/mosninvoker.go @@ -25,8 +25,7 @@ import ( // bridge to mosn _ "mosn.io/mosn/pkg/filter/network/proxy" - - "mosn.io/layotto/kit/logger" + "mosn.io/pkg/log" "mosn.io/layotto/components/rpc" "mosn.io/layotto/components/rpc/callback" @@ -41,7 +40,6 @@ const ( type mosnInvoker struct { channel rpc.Channel cb rpc.Callback - logger logger.Logger } // mosnConfig is mosn config @@ -53,18 +51,10 @@ type mosnConfig struct { // NewMosnInvoker is init mosnInvoker func NewMosnInvoker() rpc.Invoker { - invoker := &mosnInvoker{ - cb: callback.NewCallback(), - logger: logger.NewLayottoLogger("mosnInvoker"), - } - logger.RegisterComponentLoggerListener("mosnInvoker", invoker) + invoker := &mosnInvoker{cb: callback.NewCallback()} return invoker } -func (m *mosnInvoker) OnLogLevelChanged(level logger.LogLevel) { - m.logger.SetLogLevel(level) -} - // Init is init mosn RpcConfig func (m *mosnInvoker) Init(conf rpc.RpcConfig) error { var config mosnConfig @@ -98,7 +88,7 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp defer func() { if r := recover(); r != nil { err = fmt.Errorf("[runtime][rpc]mosn invoker panic: %v", r) - m.logger.Errorf("%v", err) + log.DefaultLogger.Errorf("%v", err) } }() @@ -113,24 +103,24 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp } } req.Ctx = ctx - m.logger.Debugf("[runtime][rpc]request %+v", req) + log.DefaultLogger.Debugf("[runtime][rpc]request %+v", req) // 2. beforeInvoke callback req, err = m.cb.BeforeInvoke(req) if err != nil { - m.logger.Errorf("[runtime][rpc]before filter error %s", err.Error()) + log.DefaultLogger.Errorf("[runtime][rpc]before filter error %s", err.Error()) return nil, err } // 3. do invocation resp, err = m.channel.Do(req) if err != nil { - m.logger.Errorf("[runtime][rpc]error %s", err.Error()) + log.DefaultLogger.Errorf("[runtime][rpc]error %s", err.Error()) return nil, err } resp.Ctx = req.Ctx // 4. afterInvoke callback resp, err = m.cb.AfterInvoke(resp) if err != nil { - m.logger.Errorf("[runtime][rpc]after filter error %s", err.Error()) + log.DefaultLogger.Errorf("[runtime][rpc]after filter error %s", err.Error()) return nil, err } return resp, nil diff --git a/kit/logger/logger.go b/kit/logger/logger.go index 928685ba45..a97832bb21 100644 --- a/kit/logger/logger.go +++ b/kit/logger/logger.go @@ -193,7 +193,7 @@ func NewLayottoLogger(name string) Logger { } if err != nil { ll.loggers[logKeyDebug] = log.DefaultLogger - log.DefaultLogger.Fatalf("Failed to create mosn logger: %v", err) + log.DefaultLogger.Errorf("Failed to create mosn logger: %v", err) } else { dLogger.SetLogLevel(toMosnLoggerLevel(defaultLoggerLevel)) ll.loggers[logKeyDebug] = dLogger @@ -207,7 +207,7 @@ func NewLayottoLogger(name string) Logger { } if err != nil { ll.loggers[logKeyAccess] = log.DefaultLogger - log.DefaultLogger.Fatalf("Failed to create mosn logger: %v", err) + log.DefaultLogger.Errorf("Failed to create mosn logger: %v", err) } else { aLogger.SetLogLevel(toMosnLoggerLevel(defaultLoggerLevel)) ll.loggers[logKeyAccess] = aLogger @@ -221,7 +221,7 @@ func NewLayottoLogger(name string) Logger { } if err != nil { ll.loggers[logKeyError] = log.DefaultLogger - log.DefaultLogger.Fatalf("Failed to create mosn logger: %v", err) + log.DefaultLogger.Errorf("Failed to create mosn logger: %v", err) } else { eLogger.SetLogLevel(toMosnLoggerLevel(defaultLoggerLevel)) ll.loggers[logKeyError] = eLogger diff --git a/pkg/actuator/actuator.go b/pkg/actuator/actuator.go index 8b2082bedc..309b0c4559 100644 --- a/pkg/actuator/actuator.go +++ b/pkg/actuator/actuator.go @@ -17,27 +17,20 @@ package actuator import ( - "mosn.io/layotto/kit/logger" + "mosn.io/pkg/log" "mosn.io/layotto/pkg/filter/stream/common/http" ) type Actuator struct { endpointRegistry map[string]http.Endpoint - Logger logger.Logger } // New init an Actuator. func New() *Actuator { - a := &Actuator{ + return &Actuator{ endpointRegistry: make(map[string]http.Endpoint), - Logger: logger.NewLayottoLogger("actuator"), } - return a -} - -func (act *Actuator) OnLogLevelChanged(level logger.LogLevel) { - act.Logger.SetLogLevel(level) } // GetEndpoint get an Endpoint from Actuator with name. @@ -49,7 +42,7 @@ func (act *Actuator) GetEndpoint(name string) (endpoint http.Endpoint, ok bool) // AddEndpoint add an Endpoint to Actuator。 func (act *Actuator) AddEndpoint(name string, ep http.Endpoint) { if _, ok := act.endpointRegistry[name]; ok { - act.Logger.Warnf("Duplicate Endpoint name: %v !", name) + log.DefaultLogger.Warnf("Duplicate Endpoint name: %v !", name) } act.endpointRegistry[name] = ep } diff --git a/pkg/actuator/info/endpoint.go b/pkg/actuator/info/endpoint.go index fb455ea7e2..d39dc969c0 100644 --- a/pkg/actuator/info/endpoint.go +++ b/pkg/actuator/info/endpoint.go @@ -19,7 +19,7 @@ package info import ( "context" - "mosn.io/layotto/kit/logger" + "mosn.io/pkg/log" "mosn.io/layotto/pkg/actuator" "mosn.io/layotto/pkg/filter/stream/common/http" @@ -27,20 +27,16 @@ import ( // init info Endpoint. func init() { - a := actuator.GetDefault() - a.AddEndpoint("info", NewEndpoint(a.Logger)) + actuator.GetDefault().AddEndpoint("info", NewEndpoint()) } var infoContributors = make(map[string]Contributor) type Endpoint struct { - logger logger.Logger } -func NewEndpoint(logger logger.Logger) *Endpoint { - return &Endpoint{ - logger: logger, - } +func NewEndpoint() *Endpoint { + return &Endpoint{} } func (e *Endpoint) Handle(ctx context.Context, params http.ParamsScanner) (map[string]interface{}, error) { @@ -50,7 +46,7 @@ func (e *Endpoint) Handle(ctx context.Context, params http.ParamsScanner) (map[s for k, c := range infoContributors { cinfo, err := c.GetInfo() if err != nil { - e.logger.Errorf("[actuator][info] Error when GetInfo.Contributor:%v,error:%v", k, err) + log.DefaultLogger.Errorf("[actuator][info] Error when GetInfo.Contributor:%v,error:%v", k, err) result[k] = err.Error() resultErr = err } else { diff --git a/pkg/actuator/info/endpoint_test.go b/pkg/actuator/info/endpoint_test.go index 0fbbb66eeb..3f66b43997 100644 --- a/pkg/actuator/info/endpoint_test.go +++ b/pkg/actuator/info/endpoint_test.go @@ -20,8 +20,6 @@ import ( "context" "testing" - "mosn.io/layotto/kit/logger" - "github.com/stretchr/testify/assert" ) @@ -36,7 +34,7 @@ func (m MockContributor) GetInfo() (info interface{}, err error) { } func TestEndpoint_Handle(t *testing.T) { - ep := NewEndpoint(logger.NewLayottoLogger("test")) + ep := NewEndpoint() handle, err := ep.Handle(context.Background(), nil) assert.True(t, err == nil) assert.True(t, len(handle) == 0) diff --git a/pkg/grpc/dapr/dapr_api.go b/pkg/grpc/dapr/dapr_api.go index 96b7927651..df7ddbe122 100644 --- a/pkg/grpc/dapr/dapr_api.go +++ b/pkg/grpc/dapr/dapr_api.go @@ -31,8 +31,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" - - "mosn.io/layotto/kit/logger" + "mosn.io/pkg/log" "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/file" @@ -70,8 +69,7 @@ type daprGrpcAPI struct { AppCallbackConn *grpc.ClientConn topicPerComponent map[string]TopicSubscriptions // json - json jsoniter.API - logger logger.Logger + json jsoniter.API } func (d *daprGrpcAPI) Init(conn *grpc.ClientConn) error { @@ -154,7 +152,7 @@ func (d *daprGrpcAPI) InvokeBinding(ctx context.Context, in *dapr_v1pb.InvokeBin resp, err := d.sendToOutputBindingFn(in.Name, req) if err != nil { err = status.Errorf(codes.Internal, messages.ErrInvokeOutputBinding, in.Name, err.Error()) - d.logger.Errorf("call out binding fail, err:%+v", err) + log.DefaultLogger.Errorf("call out binding fail, err:%+v", err) return r, err } @@ -170,10 +168,6 @@ func (d *daprGrpcAPI) isSecretAllowed(storeName string, key string) bool { return true } -func (d *daprGrpcAPI) OnLogLevelChanged(level logger.LogLevel) { - d.logger.SetLogLevel(level) -} - // NewDaprAPI_Alpha construct a grpc_api.GrpcAPI which implements DaprServer. // Currently it only support Dapr's InvokeService and InvokeBinding API. // Note: this feature is still in Alpha state and we don't recommend that you use it in your production environment. @@ -206,7 +200,7 @@ func NewDaprServer( secretStores map[string]secretstores.SecretStore, ) DaprGrpcAPI { // construct - dAPI := &daprGrpcAPI{ + return &daprGrpcAPI{ appId: appId, hellos: hellos, configStores: configStores, @@ -220,8 +214,5 @@ func NewDaprServer( sendToOutputBindingFn: sendToOutputBindingFn, json: jsoniter.ConfigFastest, secretStores: secretStores, - logger: logger.NewLayottoLogger("dapr"), } - logger.RegisterComponentLoggerListener("dapr", dAPI) - return dAPI } diff --git a/pkg/grpc/dapr/dapr_api_pubsub.go b/pkg/grpc/dapr/dapr_api_pubsub.go index c27c6c2faf..a8c40cf113 100644 --- a/pkg/grpc/dapr/dapr_api_pubsub.go +++ b/pkg/grpc/dapr/dapr_api_pubsub.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" + "mosn.io/pkg/log" l8_comp_pubsub "mosn.io/layotto/components/pubsub" dapr_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" @@ -50,7 +51,7 @@ func (d *daprGrpcAPI) PublishEvent(ctx context.Context, in *dapr_v1pb.PublishEve // 1. validate result, err := d.doPublishEvent(ctx, in.PubsubName, in.Topic, in.Data, in.DataContentType, in.Metadata) if err != nil { - d.logger.Errorf("[runtime] [grpc.PublishEvent] %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.PublishEvent] %v", err) } return result, err } @@ -151,7 +152,7 @@ func (d *daprGrpcAPI) getInterestedTopics() (map[string]TopicSubscriptions, erro // 2. handle app subscriptions client := dapr_v1pb.NewAppCallbackClient(d.AppCallbackConn) - subscriptions = d.listTopicSubscriptions(client) + subscriptions = listTopicSubscriptions(client, log.DefaultLogger) // TODO handle declarative subscriptions // 3. prepare result @@ -172,7 +173,7 @@ func (d *daprGrpcAPI) getInterestedTopics() (map[string]TopicSubscriptions, erro for topic := range v.topic2Details { topics = append(topics, topic) } - d.logger.Infof("[runtime][getInterestedTopics]app is subscribed to the following topics: %v through pubsub=%s", topics, pubsubName) + log.DefaultLogger.Infof("[runtime][getInterestedTopics]app is subscribed to the following topics: %v through pubsub=%s", topics, pubsubName) } } @@ -191,7 +192,7 @@ func (d *daprGrpcAPI) beginPubSub(pubsubName string, ps pubsub.PubSub, topicRout // 2. loop subscribing every for topic, route := range v.topic2Details { // TODO limit topic scope - d.logger.Debugf("[runtime][beginPubSub]subscribing to topic=%s on pubsub=%s", topic, pubsubName) + log.DefaultLogger.Debugf("[runtime][beginPubSub]subscribing to topic=%s on pubsub=%s", topic, pubsubName) // ask component to subscribe if err := ps.Subscribe(pubsub.SubscribeRequest{ Topic: topic, @@ -203,7 +204,7 @@ func (d *daprGrpcAPI) beginPubSub(pubsubName string, ps pubsub.PubSub, topicRout msg.Metadata[Metadata_key_pubsubName] = pubsubName return d.publishMessageGRPC(ctx, msg) }); err != nil { - d.logger.Warnf("[runtime][beginPubSub]failed to subscribe to topic %s: %s", topic, err) + log.DefaultLogger.Warnf("[runtime][beginPubSub]failed to subscribe to topic %s: %s", topic, err) return err } } @@ -215,13 +216,13 @@ func (d *daprGrpcAPI) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMes var cloudEvent map[string]interface{} err := d.json.Unmarshal(msg.Data, &cloudEvent) if err != nil { - d.logger.Debugf("[runtime]error deserializing cloud events proto: %s", err) + log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err) return err } // 2. drop msg if the current cloud event has expired if pubsub.HasExpired(cloudEvent) { - d.logger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string)) + log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string)) return nil } @@ -240,7 +241,7 @@ func (d *daprGrpcAPI) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMes if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil { decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string)) if decodeErr != nil { - d.logger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr) + log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr) return err } @@ -260,20 +261,20 @@ func (d *daprGrpcAPI) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMes res, err := clientV1.OnTopicEvent(ctx, envelope) // 5. check result - return d.retryStrategy(err, res, cloudEvent) + return retryStrategy(err, res, cloudEvent) } -func (d *daprGrpcAPI) retryStrategy(err error, res *dapr_v1pb.TopicEventResponse, cloudEvent map[string]interface{}) error { +func retryStrategy(err error, res *dapr_v1pb.TopicEventResponse, cloudEvent map[string]interface{}) error { if err != nil { errStatus, hasErrStatus := status.FromError(err) if hasErrStatus && (errStatus.Code() == codes.Unimplemented) { // DROP - d.logger.Warnf("[runtime]non-retriable error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) + log.DefaultLogger.Warnf("[runtime]non-retriable error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) return nil } err = fmt.Errorf("error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) - d.logger.Debugf("%s", err) + log.DefaultLogger.Debugf("%s", err) // on error from application, return error for redelivery of event return err } @@ -286,17 +287,17 @@ func (d *daprGrpcAPI) retryStrategy(err error, res *dapr_v1pb.TopicEventResponse case dapr_v1pb.TopicEventResponse_RETRY: return fmt.Errorf("RETRY status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) case dapr_v1pb.TopicEventResponse_DROP: - d.logger.Warnf("[runtime]DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) + log.DefaultLogger.Warnf("[runtime]DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) return nil } // Consider unknown status field as error and retry return fmt.Errorf("unknown status returned from app while processing pub/sub event %v: %v", cloudEvent[pubsub.IDField].(string), res.GetStatus()) } -func (d *daprGrpcAPI) listTopicSubscriptions(client dapr_v1pb.AppCallbackClient) []*dapr_v1pb.TopicSubscription { +func listTopicSubscriptions(client dapr_v1pb.AppCallbackClient, log log.ErrorLogger) []*dapr_v1pb.TopicSubscription { resp, err := client.ListTopicSubscriptions(context.Background(), &emptypb.Empty{}) if err != nil { - d.logger.Errorf("[runtime][listTopicSubscriptions]error after callback: %s", err) + log.Errorf("[runtime][listTopicSubscriptions]error after callback: %s", err) return make([]*dapr_v1pb.TopicSubscription, 0) } diff --git a/pkg/grpc/dapr/dapr_api_pubsub_test.go b/pkg/grpc/dapr/dapr_api_pubsub_test.go index 18fc244e7c..5d17d106de 100644 --- a/pkg/grpc/dapr/dapr_api_pubsub_test.go +++ b/pkg/grpc/dapr/dapr_api_pubsub_test.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc" rawGRPC "google.golang.org/grpc" "google.golang.org/grpc/test/bufconn" + "mosn.io/pkg/log" dapr_common_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/common/v1" dapr_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" @@ -212,10 +213,6 @@ func (m *mockClient) OnTopicEvent(ctx context.Context, in *dapr_v1pb.TopicEventR } func Test_listTopicSubscriptions(t *testing.T) { - a := NewDaprServer("", nil, nil, nil, nil, - nil, nil, nil, nil, nil, nil, nil) - - var apiForTest = a.(*daprGrpcAPI) - topics := apiForTest.listTopicSubscriptions(&mockClient{}) + topics := listTopicSubscriptions(&mockClient{}, log.DefaultLogger) assert.True(t, topics != nil && len(topics) == 0) } diff --git a/pkg/grpc/dapr/dapr_api_secret.go b/pkg/grpc/dapr/dapr_api_secret.go index 159c0d51af..d4e6f05bf5 100644 --- a/pkg/grpc/dapr/dapr_api_secret.go +++ b/pkg/grpc/dapr/dapr_api_secret.go @@ -22,6 +22,7 @@ import ( "github.com/dapr/components-contrib/secretstores" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "mosn.io/pkg/log" "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" "mosn.io/layotto/pkg/messages" @@ -31,14 +32,14 @@ func (d *daprGrpcAPI) GetSecret(ctx context.Context, request *runtime.GetSecretR // 1. check parameters if d.secretStores == nil || len(d.secretStores) == 0 { err := status.Error(codes.FailedPrecondition, messages.ErrSecretStoreNotConfigured) - d.logger.Errorf("GetSecret fail,not configured err:%+v", err) + log.DefaultLogger.Errorf("GetSecret fail,not configured err:%+v", err) return &runtime.GetSecretResponse{}, err } secretStoreName := request.StoreName if d.secretStores[secretStoreName] == nil { err := status.Errorf(codes.InvalidArgument, messages.ErrSecretStoreNotFound, secretStoreName) - d.logger.Errorf("GetSecret fail,not find err:%+v", err) + log.DefaultLogger.Errorf("GetSecret fail,not find err:%+v", err) return &runtime.GetSecretResponse{}, err } @@ -57,7 +58,7 @@ func (d *daprGrpcAPI) GetSecret(ctx context.Context, request *runtime.GetSecretR // 4. parse result if err != nil { err = status.Errorf(codes.Internal, messages.ErrSecretGet, req.Name, secretStoreName, err.Error()) - d.logger.Errorf("GetSecret fail,get secret err:%+v", err) + log.DefaultLogger.Errorf("GetSecret fail,get secret err:%+v", err) return &runtime.GetSecretResponse{}, err } @@ -72,14 +73,14 @@ func (d *daprGrpcAPI) GetBulkSecret(ctx context.Context, in *runtime.GetBulkSecr // 1. check parameters if d.secretStores == nil || len(d.secretStores) == 0 { err := status.Error(codes.FailedPrecondition, messages.ErrSecretStoreNotConfigured) - d.logger.Errorf("GetBulkSecret fail,not configured err:%+v", err) + log.DefaultLogger.Errorf("GetBulkSecret fail,not configured err:%+v", err) return &runtime.GetBulkSecretResponse{}, err } secretStoreName := in.StoreName if d.secretStores[secretStoreName] == nil { err := status.Errorf(codes.InvalidArgument, messages.ErrSecretStoreNotFound, secretStoreName) - d.logger.Errorf("GetBulkSecret fail,not find err:%+v", err) + log.DefaultLogger.Errorf("GetBulkSecret fail,not find err:%+v", err) return &runtime.GetBulkSecretResponse{}, err } // 2. delegate to components @@ -90,7 +91,7 @@ func (d *daprGrpcAPI) GetBulkSecret(ctx context.Context, in *runtime.GetBulkSecr // 3. parse result if err != nil { err = status.Errorf(codes.Internal, messages.ErrBulkSecretGet, secretStoreName, err.Error()) - d.logger.Errorf("GetBulkSecret fail,bulk secret err:%+v", err) + log.DefaultLogger.Errorf("GetBulkSecret fail,bulk secret err:%+v", err) return &runtime.GetBulkSecretResponse{}, err } @@ -101,7 +102,7 @@ func (d *daprGrpcAPI) GetBulkSecret(ctx context.Context, in *runtime.GetBulkSecr if d.isSecretAllowed(secretStoreName, key) { filteredSecrets[key] = v } else { - d.logger.Debugf(messages.ErrPermissionDenied, key, in.StoreName) + log.DefaultLogger.Debugf(messages.ErrPermissionDenied, key, in.StoreName) } } response := &runtime.GetBulkSecretResponse{} diff --git a/pkg/grpc/dapr/dapr_api_state.go b/pkg/grpc/dapr/dapr_api_state.go index 7c924f0a0c..f8ec1f1374 100644 --- a/pkg/grpc/dapr/dapr_api_state.go +++ b/pkg/grpc/dapr/dapr_api_state.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" + "mosn.io/pkg/log" "mosn.io/layotto/pkg/common" dapr_common_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/common/v1" @@ -38,7 +39,7 @@ func (d *daprGrpcAPI) SaveState(ctx context.Context, in *dapr_v1pb.SaveStateRequ // 1. get store store, err := d.getStateStore(in.StoreName) if err != nil { - d.logger.Errorf("[runtime] [grpc.SaveState] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.SaveState] error: %v", err) return &emptypb.Empty{}, err } // 2. convert requests @@ -55,7 +56,7 @@ func (d *daprGrpcAPI) SaveState(ctx context.Context, in *dapr_v1pb.SaveStateRequ // 4. check result if err != nil { err = d.wrapDaprComponentError(err, messages.ErrStateSave, in.StoreName, err.Error()) - d.logger.Errorf("[runtime] [grpc.SaveState] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.SaveState] error: %v", err) return &emptypb.Empty{}, err } return &emptypb.Empty{}, nil @@ -66,7 +67,7 @@ func (d *daprGrpcAPI) GetState(ctx context.Context, request *dapr_v1pb.GetStateR // 1. get store store, err := d.getStateStore(request.StoreName) if err != nil { - d.logger.Errorf("[runtime] [grpc.GetState] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.GetState] error: %v", err) return nil, err } // 2. generate the actual key @@ -86,7 +87,7 @@ func (d *daprGrpcAPI) GetState(ctx context.Context, request *dapr_v1pb.GetStateR // 4. check result if err != nil { err = status.Errorf(codes.Internal, messages.ErrStateGet, request.Key, request.StoreName, err.Error()) - d.logger.Errorf("[runtime] [grpc.GetState] %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.GetState] %v", err) return &dapr_v1pb.GetStateResponse{}, err } return GetResponse2GetStateResponse(compResp), nil @@ -96,7 +97,7 @@ func (d *daprGrpcAPI) GetBulkState(ctx context.Context, request *dapr_v1pb.GetBu // 1. get store store, err := d.getStateStore(request.StoreName) if err != nil { - d.logger.Errorf("[runtime] [grpc.GetBulkState] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.GetBulkState] error: %v", err) return &dapr_v1pb.GetBulkStateResponse{}, err } @@ -137,7 +138,7 @@ func (d *daprGrpcAPI) GetBulkState(ctx context.Context, request *dapr_v1pb.GetBu pool := workerpool.New(int(request.Parallelism)) resultCh := make(chan *dapr_v1pb.BulkStateItem, n) for i := 0; i < n; i++ { - pool.Submit(d.generateGetStateTask(store, &reqs[i], resultCh)) + pool.Submit(generateGetStateTask(store, &reqs[i], resultCh)) } pool.StopWait() for { @@ -159,7 +160,7 @@ func (d *daprGrpcAPI) QueryStateAlpha1(ctx context.Context, request *dapr_v1pb.Q // 1. get state store component store, err := d.getStateStore(request.StoreName) if err != nil { - d.logger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) return ret, err } @@ -167,7 +168,7 @@ func (d *daprGrpcAPI) QueryStateAlpha1(ctx context.Context, request *dapr_v1pb.Q querier, ok := store.(state.Querier) if !ok { err = status.Errorf(codes.Unimplemented, messages.ErrNotFound, "Query") - d.logger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) return ret, err } @@ -175,7 +176,7 @@ func (d *daprGrpcAPI) QueryStateAlpha1(ctx context.Context, request *dapr_v1pb.Q var req state.QueryRequest if err = jsoniter.Unmarshal([]byte(request.GetQuery()), &req.Query); err != nil { err = status.Errorf(codes.InvalidArgument, messages.ErrMalformedRequest, err.Error()) - d.logger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) return ret, err } req.Metadata = request.GetMetadata() @@ -185,7 +186,7 @@ func (d *daprGrpcAPI) QueryStateAlpha1(ctx context.Context, request *dapr_v1pb.Q // 5. convert response if err != nil { err = status.Errorf(codes.Internal, messages.ErrStateQuery, request.GetStoreName(), err.Error()) - d.logger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) return ret, err } if resp == nil || len(resp.Results) == 0 { @@ -209,7 +210,7 @@ func (d *daprGrpcAPI) DeleteState(ctx context.Context, request *dapr_v1pb.Delete // 1. get store store, err := d.getStateStore(request.StoreName) if err != nil { - d.logger.Errorf("[runtime] [grpc.DeleteState] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.DeleteState] error: %v", err) return &emptypb.Empty{}, err } // 2. generate the actual key @@ -222,7 +223,7 @@ func (d *daprGrpcAPI) DeleteState(ctx context.Context, request *dapr_v1pb.Delete // 4. check result if err != nil { err = d.wrapDaprComponentError(err, messages.ErrStateDelete, request.Key, err.Error()) - d.logger.Errorf("[runtime] [grpc.DeleteState] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.DeleteState] error: %v", err) return &empty.Empty{}, err } return &empty.Empty{}, nil @@ -232,7 +233,7 @@ func (d *daprGrpcAPI) DeleteBulkState(ctx context.Context, request *dapr_v1pb.De // 1. get store store, err := d.getStateStore(request.StoreName) if err != nil { - d.logger.Errorf("[runtime] [grpc.DeleteBulkState] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.DeleteBulkState] error: %v", err) return &empty.Empty{}, err } // 2. convert request @@ -248,7 +249,7 @@ func (d *daprGrpcAPI) DeleteBulkState(ctx context.Context, request *dapr_v1pb.De err = store.BulkDelete(reqs) // 4. check result if err != nil { - d.logger.Errorf("[runtime] [grpc.DeleteBulkState] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.DeleteBulkState] error: %v", err) return &emptypb.Empty{}, err } return &emptypb.Empty{}, nil @@ -258,20 +259,20 @@ func (d *daprGrpcAPI) ExecuteStateTransaction(ctx context.Context, request *dapr // 1. check params if d.stateStores == nil || len(d.stateStores) == 0 { err := status.Error(codes.FailedPrecondition, messages.ErrStateStoresNotConfigured) - d.logger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) return &emptypb.Empty{}, err } storeName := request.StoreName if d.stateStores[storeName] == nil { err := status.Errorf(codes.InvalidArgument, messages.ErrStateStoreNotFound, storeName) - d.logger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) return &emptypb.Empty{}, err } // 2. find store store, ok := d.transactionalStateStores[storeName] if !ok { err := status.Errorf(codes.Unimplemented, messages.ErrStateStoreNotSupported, storeName) - d.logger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) return &emptypb.Empty{}, err } // 3. convert request @@ -282,7 +283,7 @@ func (d *daprGrpcAPI) ExecuteStateTransaction(ctx context.Context, request *dapr var req = op.Request // tolerant npe if req == nil { - d.logger.Warnf("[runtime] [grpc.ExecuteStateTransaction] one of TransactionalStateOperation.Request is nil") + log.DefaultLogger.Warnf("[runtime] [grpc.ExecuteStateTransaction] one of TransactionalStateOperation.Request is nil") continue } key, err := state2.GetModifiedStateKey(req.Key, request.StoreName, d.appId) @@ -303,7 +304,7 @@ func (d *daprGrpcAPI) ExecuteStateTransaction(ctx context.Context, request *dapr } default: err := status.Errorf(codes.Unimplemented, messages.ErrNotSupportedStateOperation, op.OperationType) - d.logger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) return &emptypb.Empty{}, err } operations = append(operations, operation) @@ -316,7 +317,7 @@ func (d *daprGrpcAPI) ExecuteStateTransaction(ctx context.Context, request *dapr // 5. check result if err != nil { err = status.Errorf(codes.Internal, messages.ErrStateTransaction, err.Error()) - d.logger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) return &emptypb.Empty{}, err } return &emptypb.Empty{}, nil @@ -412,7 +413,7 @@ func (d *daprGrpcAPI) wrapDaprComponentError(err error, format string, args ...i return status.Errorf(codes.Internal, format, args...) } -func (d *daprGrpcAPI) generateGetStateTask(store state.Store, req *state.GetRequest, resultCh chan *dapr_v1pb.BulkStateItem) func() { +func generateGetStateTask(store state.Store, req *state.GetRequest, resultCh chan *dapr_v1pb.BulkStateItem) func() { return func() { // get r, err := store.Get(req) @@ -431,7 +432,7 @@ func (d *daprGrpcAPI) generateGetStateTask(store state.Store, req *state.GetRequ case resultCh <- item: default: //never happen - d.logger.Errorf("[api.generateGetStateTask] can not push result to the resultCh. item: %+v", item) + log.DefaultLogger.Errorf("[api.generateGetStateTask] can not push result to the resultCh. item: %+v", item) } } } diff --git a/pkg/grpc/default_api/api.go b/pkg/grpc/default_api/api.go index 4f91a2ef60..9e1d26632d 100644 --- a/pkg/grpc/default_api/api.go +++ b/pkg/grpc/default_api/api.go @@ -28,8 +28,7 @@ import ( "github.com/dapr/components-contrib/state" jsoniter "github.com/json-iterator/go" "google.golang.org/grpc" - - "mosn.io/layotto/kit/logger" + "mosn.io/pkg/log" "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/file" @@ -89,8 +88,7 @@ type api struct { topicPerComponent map[string]TopicSubscriptions streamer *streamer // json - json jsoniter.API - logger logger.Logger + json jsoniter.API } func (a *api) Init(conn *grpc.ClientConn) error { @@ -135,8 +133,7 @@ func NewAPI( stateStores, transactionalStateStores, files, lockStores, sequencers, sendToOutputBindingFn, secretStores) // construct - - a := &api{ + return &api{ daprAPI: dAPI, appId: appId, hellos: hellos, @@ -151,20 +148,14 @@ func NewAPI( sendToOutputBindingFn: sendToOutputBindingFn, secretStores: secretStores, json: jsoniter.ConfigFastest, - logger: logger.NewLayottoLogger("runtime"), } - logger.RegisterComponentLoggerListener("grpc", a) - return a -} -func (a *api) OnLogLevelChanged(outputLevel logger.LogLevel) { - a.logger.SetLogLevel(outputLevel) } func (a *api) SayHello(ctx context.Context, in *runtimev1pb.SayHelloRequest) (*runtimev1pb.SayHelloResponse, error) { h, err := a.getHello(in.ServiceName) if err != nil { - a.logger.Errorf("[runtime] [grpc.say_hello] get hello error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.say_hello] get hello error: %v", err) return nil, err } // create hello request based on pb.go struct @@ -173,7 +164,7 @@ func (a *api) SayHello(ctx context.Context, in *runtimev1pb.SayHelloRequest) (*r } resp, err := h.Hello(ctx, req) if err != nil { - a.logger.Errorf("[runtime] [grpc.say_hello] request hello error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.say_hello] request hello error: %v", err) return nil, err } // create response base on hello.Response diff --git a/pkg/grpc/default_api/api_configuration.go b/pkg/grpc/default_api/api_configuration.go index 933129374d..fd98306571 100644 --- a/pkg/grpc/default_api/api_configuration.go +++ b/pkg/grpc/default_api/api_configuration.go @@ -23,6 +23,7 @@ import ( "sync" "google.golang.org/protobuf/types/known/emptypb" + "mosn.io/pkg/log" "mosn.io/pkg/utils" "mosn.io/layotto/components/configstores" @@ -110,7 +111,7 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat req, err := sub.Recv() // 1.2. if an error happens,stop all the subscribers if err != nil { - a.logger.Errorf("occur error in subscribe, err: %+v", err) + log.DefaultLogger.Errorf("occur error in subscribe, err: %+v", err) // stop all the subscribers for _, store := range subscribedStore { // TODO this method will stop subscribers created by other connections.Should be refactored @@ -125,7 +126,7 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat store, ok := a.configStores[req.StoreName] // 1.3.1. stop if StoreName is not supported if !ok { - a.logger.Errorf("configure store [%+v] don't support now", req.StoreName) + log.DefaultLogger.Errorf("configure store [%+v] don't support now", req.StoreName) // stop all the subscribers for _, store := range subscribedStore { store.StopSubscribe() @@ -170,6 +171,6 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat } }, nil) wg.Wait() - a.logger.Warnf("subscribe gorountine exit") + log.DefaultLogger.Warnf("subscribe gorountine exit") return subErr } diff --git a/pkg/grpc/default_api/api_file.go b/pkg/grpc/default_api/api_file.go index 03259f06da..0d2d0f2af9 100644 --- a/pkg/grpc/default_api/api_file.go +++ b/pkg/grpc/default_api/api_file.go @@ -20,8 +20,6 @@ import ( "context" "io" - "mosn.io/layotto/kit/logger" - "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -29,6 +27,8 @@ import ( "mosn.io/layotto/components/file" + "mosn.io/pkg/log" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" ) @@ -59,7 +59,7 @@ func (a *api) GetFile(req *runtimev1pb.GetFileRequest, stream runtimev1pb.Runtim for { length, err := data.Read(buf) if err != nil && err != io.EOF { - a.logger.Warnf("get file fail, err: %+v", err) + log.DefaultLogger.Warnf("get file fail, err: %+v", err) return status.Errorf(codes.Internal, "get file fail,err: %+v", err) } if err == nil || (err == io.EOF && length != 0) { @@ -77,7 +77,6 @@ func (a *api) GetFile(req *runtimev1pb.GetFileRequest, stream runtimev1pb.Runtim type putObjectStreamReader struct { data []byte server runtimev1pb.Runtime_PutFileServer - logger logger.Logger } func newPutObjectStreamReader(data []byte, server runtimev1pb.Runtime_PutFileServer) *putObjectStreamReader { @@ -99,7 +98,7 @@ func (r *putObjectStreamReader) Read(p []byte) (int, error) { req, err := r.server.Recv() if err != nil { if err != io.EOF { - r.logger.Errorf("recv data from grpc stream fail, err:%+v", err) + log.DefaultLogger.Errorf("recv data from grpc stream fail, err:%+v", err) } return count, err } @@ -121,7 +120,6 @@ func (a *api) PutFile(stream runtimev1pb.Runtime_PutFileServer) error { return status.Errorf(codes.InvalidArgument, "not support store type: %+v", req.StoreName) } fileReader := newPutObjectStreamReader(req.Data, stream) - fileReader.logger = a.logger if req.Metadata == nil { req.Metadata = make(map[string]string) } diff --git a/pkg/grpc/default_api/api_file_test.go b/pkg/grpc/default_api/api_file_test.go index b18b3f6771..14b483728a 100644 --- a/pkg/grpc/default_api/api_file_test.go +++ b/pkg/grpc/default_api/api_file_test.go @@ -70,8 +70,10 @@ func TestPutFile(t *testing.T) { assert.Equal(t, err, status.Errorf(codes.InvalidArgument, "not support store type: mock1")) mockStream.EXPECT().Recv().Return(&runtimev1pb.PutFileRequest{StoreName: "mock"}, nil).Times(1) + stream := newPutObjectStreamReader(nil, mockStream) + Metadata := make(map[string]string) mockStream.EXPECT().Context().Return(context.Background()) - mockFile.EXPECT().Put(context.Background(), gomock.Any()).Return(errors.New("err occur")).Times(1) + mockFile.EXPECT().Put(context.Background(), &file.PutFileStu{DataStream: stream, FileName: "", Metadata: Metadata}).Return(errors.New("err occur")).Times(1) err = api.PutFile(mockStream) s, _ := status.FromError(err) assert.Equal(t, s.Message(), "err occur") diff --git a/pkg/grpc/default_api/api_lock.go b/pkg/grpc/default_api/api_lock.go index 31b7786797..28d9d1b806 100644 --- a/pkg/grpc/default_api/api_lock.go +++ b/pkg/grpc/default_api/api_lock.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "mosn.io/pkg/log" "mosn.io/layotto/components/lock" "mosn.io/layotto/pkg/messages" @@ -32,7 +33,7 @@ func (a *api) TryLock(ctx context.Context, req *runtimev1pb.TryLockRequest) (*ru // 1. validate if a.lockStores == nil || len(a.lockStores) == 0 { err := status.Error(codes.FailedPrecondition, messages.ErrLockStoresNotConfigured) - a.logger.Errorf("[runtime] [grpc.TryLock] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.TryLock] error: %v", err) return &runtimev1pb.TryLockResponse{}, err } if req.ResourceId == "" { @@ -58,13 +59,13 @@ func (a *api) TryLock(ctx context.Context, req *runtimev1pb.TryLockRequest) (*ru var err error compReq.ResourceId, err = runtime_lock.GetModifiedLockKey(compReq.ResourceId, req.StoreName, a.appId) if err != nil { - a.logger.Errorf("[runtime] [grpc.TryLock] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.TryLock] error: %v", err) return &runtimev1pb.TryLockResponse{}, err } // 4. delegate to the component compResp, err := store.TryLock(ctx, compReq) if err != nil { - a.logger.Errorf("[runtime] [grpc.TryLock] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.TryLock] error: %v", err) return &runtimev1pb.TryLockResponse{}, err } // 5. convert response @@ -76,7 +77,7 @@ func (a *api) Unlock(ctx context.Context, req *runtimev1pb.UnlockRequest) (*runt // 1. validate if a.lockStores == nil || len(a.lockStores) == 0 { err := status.Error(codes.FailedPrecondition, messages.ErrLockStoresNotConfigured) - a.logger.Errorf("[runtime] [grpc.Unlock] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.Unlock] error: %v", err) return newInternalErrorUnlockResponse(), err } if req.ResourceId == "" { @@ -98,13 +99,13 @@ func (a *api) Unlock(ctx context.Context, req *runtimev1pb.UnlockRequest) (*runt var err error compReq.ResourceId, err = runtime_lock.GetModifiedLockKey(compReq.ResourceId, req.StoreName, a.appId) if err != nil { - a.logger.Errorf("[runtime] [grpc.TryLock] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.TryLock] error: %v", err) return newInternalErrorUnlockResponse(), err } // 4. delegate to the component compResp, err := store.Unlock(ctx, compReq) if err != nil { - a.logger.Errorf("[runtime] [grpc.Unlock] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.Unlock] error: %v", err) return newInternalErrorUnlockResponse(), err } // 5. convert response diff --git a/pkg/grpc/default_api/api_pubsub.go b/pkg/grpc/default_api/api_pubsub.go index 2473fc26ed..43c78216e6 100644 --- a/pkg/grpc/default_api/api_pubsub.go +++ b/pkg/grpc/default_api/api_pubsub.go @@ -30,6 +30,7 @@ import ( "encoding/base64" "github.com/dapr/components-contrib/contenttype" + "mosn.io/pkg/log" runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" ) @@ -79,7 +80,7 @@ func (a *api) beginPubSub(pubsubName string, ps pubsub.PubSub, topicRoutes map[s // 2. loop subscribing every for topic, route := range v.topic2Details { // TODO limit topic scope - a.logger.Debugf("[runtime][beginPubSub]subscribing to topic=%s on pubsub=%s", topic, pubsubName) + log.DefaultLogger.Debugf("[runtime][beginPubSub]subscribing to topic=%s on pubsub=%s", topic, pubsubName) // ask component to subscribe if err := ps.Subscribe(pubsub.SubscribeRequest{ Topic: topic, @@ -91,7 +92,7 @@ func (a *api) beginPubSub(pubsubName string, ps pubsub.PubSub, topicRoutes map[s msg.Metadata[Metadata_key_pubsubName] = pubsubName return a.publishMessageGRPC(ctx, msg) }); err != nil { - a.logger.Warnf("[runtime][beginPubSub]failed to subscribe to topic %s: %s", topic, err) + log.DefaultLogger.Warnf("[runtime][beginPubSub]failed to subscribe to topic %s: %s", topic, err) return err } } @@ -120,7 +121,7 @@ func (a *api) getInterestedTopics() (map[string]TopicSubscriptions, error) { // 2. handle app subscriptions client := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn) - subscriptions = a.listTopicSubscriptions(client) + subscriptions = listTopicSubscriptions(client, log.DefaultLogger) // TODO handle declarative subscriptions // 3. prepare result @@ -141,7 +142,7 @@ func (a *api) getInterestedTopics() (map[string]TopicSubscriptions, error) { for topic := range v.topic2Details { topics = append(topics, topic) } - a.logger.Infof("[runtime][getInterestedTopics]app is subscribed to the following topics: %v through pubsub=%s", topics, pubsubName) + log.DefaultLogger.Infof("[runtime][getInterestedTopics]app is subscribed to the following topics: %v through pubsub=%s", topics, pubsubName) } } // 5. cache the result @@ -167,21 +168,21 @@ func (a *api) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) er res, err := clientV1.OnTopicEvent(ctx, envelope) // Check result - return a.retryStrategy(err, res, cloudEvent) + return retryStrategy(err, res, cloudEvent) } // retryStrategy returns error when the message should be redelivered -func (a *api) retryStrategy(err error, res *runtimev1pb.TopicEventResponse, cloudEvent map[string]interface{}) error { +func retryStrategy(err error, res *runtimev1pb.TopicEventResponse, cloudEvent map[string]interface{}) error { if err != nil { errStatus, hasErrStatus := status.FromError(err) if hasErrStatus && (errStatus.Code() == codes.Unimplemented) { // DROP - a.logger.Warnf("[runtime]non-retriable error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) + log.DefaultLogger.Warnf("[runtime]non-retriable error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) return nil } err = fmt.Errorf("error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) - a.logger.Debugf("%s", err) + log.DefaultLogger.Debugf("%s", err) // on error from application, return error for redelivery of event return err } @@ -194,17 +195,17 @@ func (a *api) retryStrategy(err error, res *runtimev1pb.TopicEventResponse, clou case runtimev1pb.TopicEventResponse_RETRY: return fmt.Errorf("RETRY status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) case runtimev1pb.TopicEventResponse_DROP: - a.logger.Warnf("[runtime]DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) + log.DefaultLogger.Warnf("[runtime]DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) return nil } // Consider unknown status field as error and retry return fmt.Errorf("unknown status returned from app while processing pub/sub event %v: %v", cloudEvent[pubsub.IDField].(string), res.GetStatus()) } -func (a *api) listTopicSubscriptions(client runtimev1pb.AppCallbackClient) []*runtimev1pb.TopicSubscription { +func listTopicSubscriptions(client runtimev1pb.AppCallbackClient, log log.ErrorLogger) []*runtimev1pb.TopicSubscription { resp, err := client.ListTopicSubscriptions(context.Background(), &emptypb.Empty{}) if err != nil { - a.logger.Errorf("[runtime][listTopicSubscriptions]error after callback: %s", err) + log.Errorf("[runtime][listTopicSubscriptions]error after callback: %s", err) return make([]*runtimev1pb.TopicSubscription, 0) } if resp != nil && len(resp.Subscriptions) > 0 { @@ -218,13 +219,13 @@ func (a *api) envelopeFromSubscriptionMessage(ctx context.Context, msg *pubsub.N var cloudEvent map[string]interface{} err := a.json.Unmarshal(msg.Data, &cloudEvent) if err != nil { - a.logger.Debugf("[runtime]error deserializing cloud events proto: %s", err) + log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err) return nil, cloudEvent, err } // 2. Drop msg if the current cloud event has expired if pubsub.HasExpired(cloudEvent) { - a.logger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string)) + log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string)) return nil, cloudEvent, nil } @@ -243,7 +244,7 @@ func (a *api) envelopeFromSubscriptionMessage(ctx context.Context, msg *pubsub.N if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil { decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string)) if decodeErr != nil { - a.logger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr) + log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr) return nil, cloudEvent, err } diff --git a/pkg/grpc/default_api/api_pubsub_test.go b/pkg/grpc/default_api/api_pubsub_test.go index 0f55e89e44..5809c767f1 100644 --- a/pkg/grpc/default_api/api_pubsub_test.go +++ b/pkg/grpc/default_api/api_pubsub_test.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/test/bufconn" "google.golang.org/protobuf/types/known/emptypb" + "mosn.io/pkg/log" mock_pubsub "mosn.io/layotto/pkg/mock/components/pubsub" mock_appcallback "mosn.io/layotto/pkg/mock/runtime/appcallback" @@ -191,8 +192,6 @@ func (m *mockClient) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEven } func Test_listTopicSubscriptions(t *testing.T) { - a := NewAPI("", nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - var apiForTest = a.(*api) - topics := apiForTest.listTopicSubscriptions(&mockClient{}) + topics := listTopicSubscriptions(&mockClient{}, log.DefaultLogger) assert.True(t, topics != nil && len(topics) == 0) } diff --git a/pkg/grpc/default_api/api_sequencer.go b/pkg/grpc/default_api/api_sequencer.go index c3f83d5c4f..bf9b62b1bc 100644 --- a/pkg/grpc/default_api/api_sequencer.go +++ b/pkg/grpc/default_api/api_sequencer.go @@ -22,6 +22,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "mosn.io/pkg/log" "mosn.io/layotto/components/sequencer" "mosn.io/layotto/pkg/messages" @@ -33,7 +34,7 @@ func (a *api) GetNextId(ctx context.Context, req *runtimev1pb.GetNextIdRequest) // 1. validate if len(a.sequencers) == 0 { err := status.Error(codes.FailedPrecondition, messages.ErrSequencerStoresNotConfigured) - a.logger.Errorf("[runtime] [grpc.GetNextId] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.GetNextId] error: %v", err) return &runtimev1pb.GetNextIdResponse{}, err } if req.Key == "" { @@ -48,7 +49,7 @@ func (a *api) GetNextId(ctx context.Context, req *runtimev1pb.GetNextIdRequest) // modify key compReq.Key, err = runtime_sequencer.GetModifiedSeqKey(compReq.Key, req.StoreName, a.appId) if err != nil { - a.logger.Errorf("[runtime] [grpc.GetNextId] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.GetNextId] error: %v", err) return &runtimev1pb.GetNextIdResponse{}, err } // 3. find store component @@ -67,7 +68,7 @@ func (a *api) GetNextId(ctx context.Context, req *runtimev1pb.GetNextIdRequest) } // 5. convert response if err != nil { - a.logger.Errorf("[runtime] [grpc.GetNextId] error: %v", err) + log.DefaultLogger.Errorf("[runtime] [grpc.GetNextId] error: %v", err) return &runtimev1pb.GetNextIdResponse{}, err } return &runtimev1pb.GetNextIdResponse{ diff --git a/pkg/grpc/default_api/api_subscribe.go b/pkg/grpc/default_api/api_subscribe.go index bdd984023f..19c85f4f94 100644 --- a/pkg/grpc/default_api/api_subscribe.go +++ b/pkg/grpc/default_api/api_subscribe.go @@ -224,7 +224,7 @@ func (a *api) publishMessageForStream(ctx context.Context, msg *pubsub.NewMessag } // 5. Check result - return a.retryStrategy(err, resp.Status, cloudEvent) + return retryStrategy(err, resp.Status, cloudEvent) } func (c *conn) notifyPublishResponse(ctx context.Context, resp *runtimev1pb.SubscribeTopicEventsRequestProcessed) { diff --git a/pkg/grpc/default_api/api_test.go b/pkg/grpc/default_api/api_test.go index a5b279e48c..29ad80d4df 100644 --- a/pkg/grpc/default_api/api_test.go +++ b/pkg/grpc/default_api/api_test.go @@ -27,8 +27,6 @@ import ( l8grpc "mosn.io/layotto/pkg/grpc" - "mosn.io/layotto/kit/logger" - "mosn.io/layotto/components/hello" "mosn.io/layotto/components/rpc" "mosn.io/layotto/pkg/mock" @@ -87,11 +85,9 @@ func TestSayHello(t *testing.T) { t.Run("no hello stored", func(t *testing.T) { ctrl := gomock.NewController(t) mockHello := mock.NewMockHelloService(ctrl) - api := &api{ - hellos: map[string]hello.HelloService{ - "mock": mockHello}, - logger: logger.NewLayottoLogger("test"), - } + api := &api{hellos: map[string]hello.HelloService{ + "mock": mockHello, + }} _, err := api.SayHello(context.Background(), &runtimev1pb.SayHelloRequest{ ServiceName: "no register", }) @@ -101,10 +97,7 @@ func TestSayHello(t *testing.T) { }) t.Run("empty say hello", func(t *testing.T) { - api := &api{ - hellos: map[string]hello.HelloService{}, - logger: logger.NewLayottoLogger("test"), - } + api := &api{hellos: map[string]hello.HelloService{}} _, err := api.SayHello(context.Background(), &runtimev1pb.SayHelloRequest{ ServiceName: "mock", }) diff --git a/pkg/grpc/extension/s3/s3.go b/pkg/grpc/extension/s3/s3.go index 0b97481af3..e015f7426a 100644 --- a/pkg/grpc/extension/s3/s3.go +++ b/pkg/grpc/extension/s3/s3.go @@ -31,8 +31,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" - - "mosn.io/layotto/kit/logger" + "mosn.io/pkg/log" "mosn.io/layotto/pkg/grpc" ) @@ -53,23 +52,15 @@ var ( type S3Server struct { appId string ossInstance map[string]l8s3.Oss - logger logger.Logger } func NewS3Server(ac *grpc.ApplicationContext) grpc.GrpcAPI { - s3Instance = &S3Server{ - logger: logger.NewLayottoLogger("s3"), - } - logger.RegisterComponentLoggerListener("s3", s3Instance) + s3Instance = &S3Server{} s3Instance.appId = ac.AppId s3Instance.ossInstance = ac.Oss return s3Instance } -func (s *S3Server) OnLogLevelChanged(level logger.LogLevel) { - s.logger.SetLogLevel(level) -} - func (s *S3Server) Init(conn *rawGRPC.ClientConn) error { return nil } @@ -112,7 +103,7 @@ func (s *S3Server) GetObject(req *s3.GetObjectInput, stream s3.ObjectStorageServ } defer func() { if err := recover(); err != nil { - s.logger.Errorf("GetObject occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("GetObject occur panic, stack info: %+v", string(debug.Stack())) } result.DataStream.Close() *buffsPtr = buf @@ -127,7 +118,7 @@ func (s *S3Server) GetObject(req *s3.GetObjectInput, stream s3.ObjectStorageServ for totalBytesRead < len(buf) { length, err = result.DataStream.Read(buf[totalBytesRead:]) if err != nil && err != io.EOF { - s.logger.Warnf("oss GetObject fail, err: %+v", err) + log.DefaultLogger.Warnf("oss GetObject fail, err: %+v", err) return status.Errorf(codes.Internal, "oss GetObject fail, err: %+v", err) } totalBytesRead += length @@ -170,7 +161,6 @@ func (s *S3Server) GetObject(req *s3.GetObjectInput, stream s3.ObjectStorageServ type putObjectStreamReader struct { data []byte server s3.ObjectStorageService_PutObjectServer - logger logger.Logger } func newPutObjectStreamReader(data []byte, server s3.ObjectStorageService_PutObjectServer) *putObjectStreamReader { @@ -192,7 +182,7 @@ func (r *putObjectStreamReader) Read(p []byte) (int, error) { req, err := r.server.Recv() if err != nil { if err != io.EOF { - r.logger.Errorf("recv data from grpc stream fail, err:%+v", err) + log.DefaultLogger.Errorf("recv data from grpc stream fail, err:%+v", err) } return count, err } @@ -203,7 +193,7 @@ func (r *putObjectStreamReader) Read(p []byte) (int, error) { func (s *S3Server) PutObject(stream s3.ObjectStorageService_PutObjectServer) error { defer func() { if err := recover(); err != nil { - s.logger.Errorf("PutObject occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("PutObject occur panic, stack info: %+v", string(debug.Stack())) } }() req, err := stream.Recv() @@ -219,7 +209,6 @@ func (s *S3Server) PutObject(stream s3.ObjectStorageService_PutObjectServer) err return status.Errorf(codes.InvalidArgument, NotSupportStoreName, req.StoreName) } fileReader := newPutObjectStreamReader(req.Body, stream) - fileReader.logger = s.logger st := &l8s3.PutObjectInput{ ACL: req.Acl, @@ -253,7 +242,7 @@ func (s *S3Server) PutObject(stream s3.ObjectStorageService_PutObjectServer) err func (s *S3Server) DeleteObject(ctx context.Context, req *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("DeleteObject occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("DeleteObject occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -278,7 +267,7 @@ func (s *S3Server) DeleteObject(ctx context.Context, req *s3.DeleteObjectInput) func (s *S3Server) PutObjectTagging(ctx context.Context, req *s3.PutObjectTaggingInput) (*s3.PutObjectTaggingOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("PutObjectTagging occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("PutObjectTagging occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -304,7 +293,7 @@ func (s *S3Server) PutObjectTagging(ctx context.Context, req *s3.PutObjectTaggin func (s *S3Server) DeleteObjectTagging(ctx context.Context, req *s3.DeleteObjectTaggingInput) (*s3.DeleteObjectTaggingOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("DeleteObjectTagging occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("DeleteObjectTagging occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -329,7 +318,7 @@ func (s *S3Server) DeleteObjectTagging(ctx context.Context, req *s3.DeleteObject func (s *S3Server) GetObjectTagging(ctx context.Context, req *s3.GetObjectTaggingInput) (*s3.GetObjectTaggingOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("GetObjectTagging occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("GetObjectTagging occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -354,7 +343,7 @@ func (s *S3Server) GetObjectTagging(ctx context.Context, req *s3.GetObjectTaggin func (s *S3Server) CopyObject(ctx context.Context, req *s3.CopyObjectInput) (*s3.CopyObjectOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("CopyObject occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("CopyObject occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -380,7 +369,7 @@ func (s *S3Server) CopyObject(ctx context.Context, req *s3.CopyObjectInput) (*s3 func (s *S3Server) DeleteObjects(ctx context.Context, req *s3.DeleteObjectsInput) (*s3.DeleteObjectsOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("DeleteObjects occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("DeleteObjects occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -406,7 +395,7 @@ func (s *S3Server) DeleteObjects(ctx context.Context, req *s3.DeleteObjectsInput func (s *S3Server) ListObjects(ctx context.Context, req *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("ListObjects occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("ListObjects occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -432,7 +421,7 @@ func (s *S3Server) ListObjects(ctx context.Context, req *s3.ListObjectsInput) (* func (s *S3Server) GetObjectCannedAcl(ctx context.Context, req *s3.GetObjectCannedAclInput) (*s3.GetObjectCannedAclOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("GetObjectCannedAcl occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("GetObjectCannedAcl occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -458,7 +447,7 @@ func (s *S3Server) GetObjectCannedAcl(ctx context.Context, req *s3.GetObjectCann func (s *S3Server) PutObjectCannedAcl(ctx context.Context, req *s3.PutObjectCannedAclInput) (*s3.PutObjectCannedAclOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("PutObjectCannedAcl occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("PutObjectCannedAcl occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -484,7 +473,7 @@ func (s *S3Server) PutObjectCannedAcl(ctx context.Context, req *s3.PutObjectCann func (s *S3Server) RestoreObject(ctx context.Context, req *s3.RestoreObjectInput) (*s3.RestoreObjectOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("RestoreObject occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("RestoreObject occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -510,7 +499,7 @@ func (s *S3Server) RestoreObject(ctx context.Context, req *s3.RestoreObjectInput func (s *S3Server) CreateMultipartUpload(ctx context.Context, req *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("CreateMultipartUpload occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("CreateMultipartUpload occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -537,7 +526,6 @@ func (s *S3Server) CreateMultipartUpload(ctx context.Context, req *s3.CreateMult type uploadPartStreamReader struct { data []byte server s3.ObjectStorageService_UploadPartServer - logger logger.Logger } func newUploadPartStreamReader(data []byte, server s3.ObjectStorageService_UploadPartServer) *uploadPartStreamReader { @@ -559,7 +547,7 @@ func (r *uploadPartStreamReader) Read(p []byte) (int, error) { req, err := r.server.Recv() if err != nil { if err != io.EOF { - r.logger.Errorf("recv data from grpc stream fail, err:%+v", err) + log.DefaultLogger.Errorf("recv data from grpc stream fail, err:%+v", err) } return count, err } @@ -570,7 +558,7 @@ func (r *uploadPartStreamReader) Read(p []byte) (int, error) { func (s *S3Server) UploadPart(stream s3.ObjectStorageService_UploadPartServer) error { defer func() { if err := recover(); err != nil { - s.logger.Errorf("UploadPart occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("UploadPart occur panic, stack info: %+v", string(debug.Stack())) } }() req, err := stream.Recv() @@ -586,7 +574,6 @@ func (s *S3Server) UploadPart(stream s3.ObjectStorageService_UploadPartServer) e return status.Errorf(codes.InvalidArgument, NotSupportStoreName, req.StoreName) } fileReader := newUploadPartStreamReader(req.Body, stream) - fileReader.logger = s.logger st := &l8s3.UploadPartInput{} err = transferData(req, st) @@ -609,7 +596,7 @@ func (s *S3Server) UploadPart(stream s3.ObjectStorageService_UploadPartServer) e func (s *S3Server) UploadPartCopy(ctx context.Context, req *s3.UploadPartCopyInput) (*s3.UploadPartCopyOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("UploadPartCopy occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("UploadPartCopy occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -635,7 +622,7 @@ func (s *S3Server) UploadPartCopy(ctx context.Context, req *s3.UploadPartCopyInp func (s *S3Server) CompleteMultipartUpload(ctx context.Context, req *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("CompleteMultipartUpload occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("CompleteMultipartUpload occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -661,7 +648,7 @@ func (s *S3Server) CompleteMultipartUpload(ctx context.Context, req *s3.Complete func (s *S3Server) AbortMultipartUpload(ctx context.Context, req *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("AbortMultipartUpload occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("AbortMultipartUpload occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -687,7 +674,7 @@ func (s *S3Server) AbortMultipartUpload(ctx context.Context, req *s3.AbortMultip func (s *S3Server) ListMultipartUploads(ctx context.Context, req *s3.ListMultipartUploadsInput) (*s3.ListMultipartUploadsOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("ListMultipartUploads occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("ListMultipartUploads occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -712,7 +699,7 @@ func (s *S3Server) ListMultipartUploads(ctx context.Context, req *s3.ListMultipa func (s *S3Server) ListObjectVersions(ctx context.Context, req *s3.ListObjectVersionsInput) (*s3.ListObjectVersionsOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("ListObjectVersions occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("ListObjectVersions occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -739,7 +726,7 @@ func (s *S3Server) ListObjectVersions(ctx context.Context, req *s3.ListObjectVer func (s *S3Server) HeadObject(ctx context.Context, req *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("HeadObject occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("HeadObject occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -766,7 +753,7 @@ func (s *S3Server) HeadObject(ctx context.Context, req *s3.HeadObjectInput) (*s3 func (s *S3Server) IsObjectExist(ctx context.Context, req *s3.IsObjectExistInput) (*s3.IsObjectExistOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("IsObjectExist occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("IsObjectExist occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -789,7 +776,7 @@ func (s *S3Server) IsObjectExist(ctx context.Context, req *s3.IsObjectExistInput func (s *S3Server) SignURL(ctx context.Context, req *s3.SignURLInput) (*s3.SignURLOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("SignURL occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("SignURL occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -812,7 +799,7 @@ func (s *S3Server) SignURL(ctx context.Context, req *s3.SignURLInput) (*s3.SignU func (s *S3Server) UpdateDownloadBandwidthRateLimit(ctx context.Context, req *s3.UpdateBandwidthRateLimitInput) (*emptypb.Empty, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("UpdateDownloadBandwidthRateLimit occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("UpdateDownloadBandwidthRateLimit occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -832,7 +819,7 @@ func (s *S3Server) UpdateDownloadBandwidthRateLimit(ctx context.Context, req *s3 func (s *S3Server) UpdateUploadBandwidthRateLimit(ctx context.Context, req *s3.UpdateBandwidthRateLimitInput) (*emptypb.Empty, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("UpdateUploadBandwidthRateLimit occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("UpdateUploadBandwidthRateLimit occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { @@ -852,7 +839,6 @@ func (s *S3Server) UpdateUploadBandwidthRateLimit(ctx context.Context, req *s3.U type appendObjectStreamReader struct { data []byte server s3.ObjectStorageService_AppendObjectServer - logger logger.Logger } func newAppendObjectStreamReader(data []byte, server s3.ObjectStorageService_AppendObjectServer) *appendObjectStreamReader { @@ -874,7 +860,7 @@ func (r *appendObjectStreamReader) Read(p []byte) (int, error) { req, err := r.server.Recv() if err != nil { if err != io.EOF { - r.logger.Errorf("recv data from grpc stream fail, err:%+v", err) + log.DefaultLogger.Errorf("recv data from grpc stream fail, err:%+v", err) } return count, err } @@ -885,7 +871,7 @@ func (r *appendObjectStreamReader) Read(p []byte) (int, error) { func (s *S3Server) AppendObject(stream s3.ObjectStorageService_AppendObjectServer) error { defer func() { if err := recover(); err != nil { - s.logger.Errorf("AppendObject occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("AppendObject occur panic, stack info: %+v", string(debug.Stack())) } }() req, err := stream.Recv() @@ -901,7 +887,6 @@ func (s *S3Server) AppendObject(stream s3.ObjectStorageService_AppendObjectServe return status.Errorf(codes.InvalidArgument, NotSupportStoreName, req.StoreName) } fileReader := newAppendObjectStreamReader(req.Body, stream) - fileReader.logger = s.logger st := &l8s3.AppendObjectInput{} err = transferData(req, st) @@ -922,7 +907,7 @@ func (s *S3Server) AppendObject(stream s3.ObjectStorageService_AppendObjectServe func (s *S3Server) ListParts(ctx context.Context, req *s3.ListPartsInput) (*s3.ListPartsOutput, error) { defer func() { if err := recover(); err != nil { - s.logger.Errorf("ListParts occur panic, stack info: %+v", string(debug.Stack())) + log.DefaultLogger.Errorf("ListParts occur panic, stack info: %+v", string(debug.Stack())) } }() if s.ossInstance[req.StoreName] == nil { diff --git a/pkg/runtime/component_generated.go b/pkg/runtime/component_generated.go index 8e8263abe5..22601e6586 100644 --- a/pkg/runtime/component_generated.go +++ b/pkg/runtime/component_generated.go @@ -18,6 +18,8 @@ package runtime import ( "context" + "mosn.io/pkg/log" + cryption "mosn.io/layotto/components/cryption" email "mosn.io/layotto/components/email" phone "mosn.io/layotto/components/phone" @@ -47,7 +49,7 @@ func newExtensionComponents() *extensionComponents { } func (m *MosnRuntime) initCryptionService(factorys ...*cryption.Factory) error { - m.logger.Infof("[runtime] init CryptionService") + log.DefaultLogger.Infof("[runtime] init CryptionService") // 1. register all implementation reg := cryption.NewRegistry(m.info) @@ -75,7 +77,7 @@ func (m *MosnRuntime) initCryptionService(factorys ...*cryption.Factory) error { } func (m *MosnRuntime) initEmailService(factorys ...*email.Factory) error { - m.logger.Infof("[runtime] init EmailService") + log.DefaultLogger.Infof("[runtime] init EmailService") // 1. register all implementation reg := email.NewRegistry(m.info) @@ -103,7 +105,7 @@ func (m *MosnRuntime) initEmailService(factorys ...*email.Factory) error { } func (m *MosnRuntime) initPhoneCallService(factorys ...*phone.Factory) error { - m.logger.Infof("[runtime] init PhoneCallService") + log.DefaultLogger.Infof("[runtime] init PhoneCallService") // 1. register all implementation reg := phone.NewRegistry(m.info) @@ -131,7 +133,7 @@ func (m *MosnRuntime) initPhoneCallService(factorys ...*phone.Factory) error { } func (m *MosnRuntime) initSmsService(factorys ...*sms.Factory) error { - m.logger.Infof("[runtime] init SmsService") + log.DefaultLogger.Infof("[runtime] init SmsService") // 1. register all implementation reg := sms.NewRegistry(m.info) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 596168b703..d92802d90c 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -49,8 +49,7 @@ import ( "github.com/dapr/components-contrib/state" rawGRPC "google.golang.org/grpc" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" - - "mosn.io/layotto/kit/logger" + "mosn.io/pkg/log" "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" @@ -113,11 +112,6 @@ type MosnRuntime struct { errInt ErrInterceptor started bool initRuntimeStages []initRuntimeStage - logger logger.Logger -} - -func (m *MosnRuntime) OnLogLevelChanged(level logger.LogLevel) { - m.logger.SetLogLevel(level) } func (m *MosnRuntime) RuntimeConfig() *MosnRuntimeConfig { @@ -128,7 +122,7 @@ type initRuntimeStage func(o *runtimeOptions, m *MosnRuntime) error func NewMosnRuntime(runtimeConfig *MosnRuntimeConfig) *MosnRuntime { info := info.NewRuntimeInfo() - mr := &MosnRuntime{ + return &MosnRuntime{ runtimeConfig: runtimeConfig, info: info, helloRegistry: hello.NewRegistry(info), @@ -158,10 +152,7 @@ func NewMosnRuntime(runtimeConfig *MosnRuntimeConfig) *MosnRuntime { dynamicComponents: make(map[lifecycle.ComponentKey]common.DynamicComponent), extensionComponents: *newExtensionComponents(), started: false, - logger: logger.NewLayottoLogger("mosn"), } - logger.RegisterComponentLoggerListener("mosn", mr) - return mr } func (m *MosnRuntime) GetInfo() *info.RuntimeInfo { @@ -203,7 +194,7 @@ func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { m.errInt = o.errInt } else { m.errInt = func(err error, format string, args ...interface{}) { - m.logger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } } // init runtime with runtimeOptions @@ -311,7 +302,7 @@ func DefaultInitRuntimeStage(o *runtimeOptions, m *MosnRuntime) error { } func (m *MosnRuntime) initHellos(hellos ...*hello.HelloFactory) error { - m.logger.Infof("[runtime] init hello service") + log.DefaultLogger.Infof("[runtime] init hello service") // register all hello services implementation m.helloRegistry.Register(hellos...) for name, config := range m.runtimeConfig.HelloServiceManagement { @@ -336,7 +327,7 @@ func (m *MosnRuntime) initHellos(hellos ...*hello.HelloFactory) error { } func (m *MosnRuntime) initConfigStores(configStores ...*configstores.StoreFactory) error { - m.logger.Infof("[runtime] init config service") + log.DefaultLogger.Infof("[runtime] init config service") // register all config store services implementation m.configStoreRegistry.Register(configStores...) for name, config := range m.runtimeConfig.ConfigStoreManagement { @@ -359,7 +350,7 @@ func (m *MosnRuntime) initConfigStores(configStores ...*configstores.StoreFactor } func (m *MosnRuntime) initRpcs(rpcs ...*rpc.Factory) error { - m.logger.Infof("[runtime] init rpc service") + log.DefaultLogger.Infof("[runtime] init rpc service") // register all rpc components m.rpcRegistry.Register(rpcs...) for name, config := range m.runtimeConfig.RpcManagement { @@ -381,7 +372,7 @@ func (m *MosnRuntime) initRpcs(rpcs ...*rpc.Factory) error { func (m *MosnRuntime) initPubSubs(factorys ...*runtime_pubsub.Factory) error { // 1. init components - m.logger.Infof("[runtime] start initializing pubsub components") + log.DefaultLogger.Infof("[runtime] start initializing pubsub components") // register all components implementation m.pubSubRegistry.Register(factorys...) for name, config := range m.runtimeConfig.PubSubManagement { @@ -420,7 +411,7 @@ func (m *MosnRuntime) initPubSubs(factorys ...*runtime_pubsub.Factory) error { } func (m *MosnRuntime) initStates(factorys ...*runtime_state.Factory) error { - m.logger.Infof("[runtime] start initializing state components") + log.DefaultLogger.Infof("[runtime] start initializing state components") // 1. register all the implementation m.stateRegistry.Register(factorys...) // 2. loop initializing @@ -449,7 +440,7 @@ func (m *MosnRuntime) initStates(factorys ...*runtime_state.Factory) error { // 2.2. save prefix strategy err = runtime_state.SaveStateConfiguration(name, config.Metadata) if err != nil { - m.logger.Errorf("error save state keyprefix: %s", err.Error()) + log.DefaultLogger.Errorf("error save state keyprefix: %s", err.Error()) return err } } @@ -457,7 +448,7 @@ func (m *MosnRuntime) initStates(factorys ...*runtime_state.Factory) error { } func (m *MosnRuntime) initOss(factorys ...*oss.Factory) error { - m.logger.Infof("[runtime] init oss service") + log.DefaultLogger.Infof("[runtime] init oss service") // 1. register all oss store services implementation m.ossRegistry.Register(factorys...) @@ -486,7 +477,7 @@ func (m *MosnRuntime) initOss(factorys ...*oss.Factory) error { } func (m *MosnRuntime) initFiles(files ...*file.Factory) error { - m.logger.Infof("[runtime] init file service") + log.DefaultLogger.Infof("[runtime] init file service") // register all files store services implementation m.fileRegistry.Register(files...) @@ -512,7 +503,7 @@ func (m *MosnRuntime) initFiles(files ...*file.Factory) error { } func (m *MosnRuntime) initLocks(factorys ...*runtime_lock.Factory) error { - m.logger.Infof("[runtime] start initializing lock components") + log.DefaultLogger.Infof("[runtime] start initializing lock components") // 1. register all the implementation m.lockRegistry.Register(factorys...) // 2. loop initializing @@ -549,7 +540,7 @@ func (m *MosnRuntime) initLocks(factorys ...*runtime_lock.Factory) error { } func (m *MosnRuntime) initSequencers(factorys ...*runtime_sequencer.Factory) error { - m.logger.Infof("[runtime] start initializing sequencer components") + log.DefaultLogger.Infof("[runtime] start initializing sequencer components") // 1. register all the implementation m.sequencerRegistry.Register(factorys...) // 2. loop initializing @@ -609,7 +600,7 @@ func (m *MosnRuntime) initAppCallbackConnection() error { defer cancel() conn, err := rawGRPC.DialContext(ctx, address, opts...) if err != nil { - m.logger.Warnf("[runtime]failed to init callback client to address %v : %s", address, err) + log.DefaultLogger.Warnf("[runtime]failed to init callback client to address %v : %s", address, err) return err } m.AppCallbackConn = conn @@ -617,7 +608,7 @@ func (m *MosnRuntime) initAppCallbackConnection() error { } func (m *MosnRuntime) initOutputBinding(factorys ...*mbindings.OutputBindingFactory) error { - m.logger.Infof("[runtime] start initializing OutputBinding components") + log.DefaultLogger.Infof("[runtime] start initializing OutputBinding components") // 1. register all factory methods. m.bindingsRegistry.RegisterOutputBinding(factorys...) // 2. loop initializing @@ -654,7 +645,7 @@ func (m *MosnRuntime) initInputBinding(factorys ...*mbindings.InputBindingFactor } func (m *MosnRuntime) initSecretStores(factorys ...*msecretstores.Factory) error { - m.logger.Infof("[runtime] start initializing SecretStores components") + log.DefaultLogger.Infof("[runtime] start initializing SecretStores components") // 1. register all factory methods. m.secretStoresRegistry.Register(factorys...) // 2. loop initializing @@ -684,7 +675,7 @@ func (m *MosnRuntime) initSecretStores(factorys ...*msecretstores.Factory) error func (m *MosnRuntime) AppendInitRuntimeStage(f initRuntimeStage) { if f == nil || m.started { - m.logger.Errorf("[runtime] invalid initRuntimeStage or already started") + log.DefaultLogger.Errorf("[runtime] invalid initRuntimeStage or already started") return } m.initRuntimeStages = append(m.initRuntimeStages, f) @@ -708,14 +699,14 @@ func (m *MosnRuntime) initRuntime(r *runtimeOptions) error { } } - m.logger.Infof("[runtime] initRuntime stages cost: %v", time.Since(st)) + log.DefaultLogger.Infof("[runtime] initRuntime stages cost: %v", time.Since(st)) return nil } func (m *MosnRuntime) registerPluggableComponent() { list, err := pluggable.Discover() if err != nil { - m.logger.Errorf("[runtime] discover pluggable components failed: %v", err) + log.DefaultLogger.Errorf("[runtime] discover pluggable components failed: %v", err) return } @@ -747,7 +738,7 @@ func (m *MosnRuntime) registerPluggableComponent() { m.secretStoresRegistry.Register(v.(*csecretsotres.Factory)) // todo custom default: - m.logger.Warnf("[runtime]unknown pluggable component factory type %v", t) + log.DefaultLogger.Warnf("[runtime]unknown pluggable component factory type %v", t) } } } @@ -760,12 +751,12 @@ func (m *MosnRuntime) SetCustomComponent(kind string, name string, component cus } func (m *MosnRuntime) initCustomComponents(kind2factorys map[string][]*custom.Factory) error { - m.logger.Infof("[runtime] start initializing custom components") + log.DefaultLogger.Infof("[runtime] start initializing custom components") // loop all configured custom components. for kind, name2Config := range m.runtimeConfig.CustomComponent { factorys, ok := kind2factorys[kind] if !ok || len(factorys) == 0 { - m.logger.Errorf("[runtime] Your required component kind %s is not supported.", kind) + log.DefaultLogger.Errorf("[runtime] Your required component kind %s is not supported.", kind) continue } // register all the factorys diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 345498fa1f..d89e17c4a0 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -62,6 +62,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" rawGRPC "google.golang.org/grpc" + "mosn.io/pkg/log" "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" @@ -374,7 +375,7 @@ func TestMosnRuntime_initPubSubs(t *testing.T) { // construct MosnRuntime m := NewMosnRuntime(cfg) m.errInt = func(err error, format string, args ...interface{}) { - m.logger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } // test initPubSubs err := m.initPubSubs(mpubsub.NewFactory("mock", f)) @@ -405,7 +406,7 @@ func TestMosnRuntime_initPubSubsNotExistMetadata(t *testing.T) { // construct MosnRuntime m := NewMosnRuntime(cfg) m.errInt = func(err error, format string, args ...interface{}) { - m.logger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } // test initPubSubs err := m.initPubSubs(mpubsub.NewFactory("mock", f)) @@ -436,7 +437,7 @@ func TestMosnRuntime_initStates(t *testing.T) { // construct MosnRuntime m := NewMosnRuntime(cfg) m.errInt = func(err error, format string, args ...interface{}) { - m.logger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } // test initStates err := m.initStates(mstate.NewFactory("status", f)) @@ -462,7 +463,7 @@ func TestMosnRuntime_initRpc(t *testing.T) { // construct MosnRuntime m := NewMosnRuntime(cfg) m.errInt = func(err error, format string, args ...interface{}) { - m.logger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } // test initRpcs method err := m.initRpcs(rpc.NewRpcFactory("rpc", f)) @@ -488,7 +489,7 @@ func TestMosnRuntime_initConfigStores(t *testing.T) { } m := NewMosnRuntime(cfg) m.errInt = func(err error, format string, args ...interface{}) { - m.logger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } err := m.initConfigStores(configstores.NewStoreFactory("store_config", f)) assert.Nil(t, err) @@ -512,7 +513,7 @@ func TestMosnRuntime_initHellos(t *testing.T) { } m := NewMosnRuntime(cfg) m.errInt = func(err error, format string, args ...interface{}) { - m.logger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } err := m.initHellos(hello.NewHelloFactory("hello", f)) assert.Nil(t, err) @@ -536,7 +537,7 @@ func TestMosnRuntime_initSequencers(t *testing.T) { } m := NewMosnRuntime(cfg) m.errInt = func(err error, format string, args ...interface{}) { - m.logger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } err := m.initSequencers(runtime_sequencer.NewFactory("sequencers", f)) assert.Nil(t, err) @@ -560,7 +561,7 @@ func TestMosnRuntime_initLocks(t *testing.T) { } m := NewMosnRuntime(cfg) m.errInt = func(err error, format string, args ...interface{}) { - m.logger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } err := m.initLocks(mlock.NewFactory("lock", f)) assert.Nil(t, err) diff --git a/pkg/runtime/sequencer/cache.go b/pkg/runtime/sequencer/cache.go index c8a996a173..d81b1acc15 100644 --- a/pkg/runtime/sequencer/cache.go +++ b/pkg/runtime/sequencer/cache.go @@ -18,10 +18,9 @@ import ( "sync" "time" + "mosn.io/pkg/log" "mosn.io/pkg/utils" - "mosn.io/layotto/kit/logger" - "mosn.io/layotto/components/sequencer" ) @@ -41,7 +40,6 @@ type DoubleBuffer struct { backUpBufferChan chan *Buffer lock sync.Mutex Store sequencer.Store - logger logger.Logger } type Buffer struct { @@ -56,18 +54,11 @@ func NewDoubleBuffer(key string, store sequencer.Store) *DoubleBuffer { size: defaultSize, Store: store, backUpBufferChan: make(chan *Buffer, 1), - logger: logger.NewLayottoLogger("sequencer/doubleBuffer"), } - logger.RegisterComponentLoggerListener("sequencer/doubleBuffer", d) - return d } -func (d *DoubleBuffer) OnLogLevelChanged(outputLevel logger.LogLevel) { - d.logger.SetLogLevel(outputLevel) -} - // init double buffer func (d *DoubleBuffer) init() error { @@ -108,7 +99,7 @@ func (d *DoubleBuffer) getId() (int64, error) { for i := 0; i < defaultRetry; i++ { buffer, err := d.getNewBuffer() if err != nil { - d.logger.Errorf("[DoubleBuffer] [getNewBuffer] error: %v", err) + log.DefaultLogger.Errorf("[DoubleBuffer] [getNewBuffer] error: %v", err) continue } d.backUpBufferChan <- buffer @@ -118,7 +109,7 @@ func (d *DoubleBuffer) getId() (int64, error) { for { buffer, err := d.getNewBuffer() if err != nil { - d.logger.Errorf("[DoubleBuffer] [getNewBuffer] error: %v", err) + log.DefaultLogger.Errorf("[DoubleBuffer] [getNewBuffer] error: %v", err) time.Sleep(waitTime) continue } diff --git a/pkg/runtime/sequencer/cache_test.go b/pkg/runtime/sequencer/cache_test.go index 2eb13457a7..9e2418c524 100644 --- a/pkg/runtime/sequencer/cache_test.go +++ b/pkg/runtime/sequencer/cache_test.go @@ -18,7 +18,6 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/stretchr/testify/assert" - "mosn.io/layotto/components/sequencer" "mosn.io/layotto/components/sequencer/redis" )