From 079b75edf138bd9ecb451bb94e121df1219aa69f Mon Sep 17 00:00:00 2001 From: Jermaine Hua Date: Wed, 18 Dec 2024 09:56:51 +0800 Subject: [PATCH] feat: support in memory configstore (#1095) Co-authored-by: wenxuwan --- cmd/layotto/main.go | 2 + cmd/layotto_multiple_api/main.go | 2 + cmd/layotto_without_xds/main.go | 2 + .../configstores/in-memory/configstore.go | 181 +++++++++++ .../in-memory/configstore_test.go | 281 ++++++++++++++++++ 5 files changed, 468 insertions(+) create mode 100644 components/configstores/in-memory/configstore.go create mode 100644 components/configstores/in-memory/configstore_test.go diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index b96cb156f8..bbce00a267 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -89,6 +89,7 @@ import ( // Configuration "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/configstores/apollo" + store_inmemory "mosn.io/layotto/components/configstores/in-memory" // Pub/Sub dapr_comp_pubsub "github.com/dapr/components-contrib/pubsub" @@ -291,6 +292,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp configstores.NewStoreFactory("apollo", apollo.NewStore), configstores.NewStoreFactory("etcd", etcdv3.NewStore), configstores.NewStoreFactory("nacos", nacos.NewStore), + configstores.NewStoreFactory("in-memory", store_inmemory.NewStore), ), // RPC diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go index 1856926a2c..dd872f7c20 100644 --- a/cmd/layotto_multiple_api/main.go +++ b/cmd/layotto_multiple_api/main.go @@ -92,6 +92,7 @@ import ( // Configuration "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/configstores/apollo" + store_inmemory "mosn.io/layotto/components/configstores/in-memory" "mosn.io/layotto/components/configstores/nacos" // Pub/Sub @@ -297,6 +298,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp configstores.NewStoreFactory("apollo", apollo.NewStore), configstores.NewStoreFactory("etcd", etcdv3.NewStore), configstores.NewStoreFactory("nacos", nacos.NewStore), + configstores.NewStoreFactory("in-memory", store_inmemory.NewStore), ), // RPC diff --git a/cmd/layotto_without_xds/main.go b/cmd/layotto_without_xds/main.go index daa277828c..01f5c89ffc 100644 --- a/cmd/layotto_without_xds/main.go +++ b/cmd/layotto_without_xds/main.go @@ -83,6 +83,7 @@ import ( // Configuration "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/configstores/apollo" + store_inmemory "mosn.io/layotto/components/configstores/in-memory" // Pub/Sub dapr_comp_pubsub "github.com/dapr/components-contrib/pubsub" @@ -270,6 +271,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp configstores.NewStoreFactory("apollo", apollo.NewStore), configstores.NewStoreFactory("etcd", etcdv3.NewStore), configstores.NewStoreFactory("nacos", nacos.NewStore), + configstores.NewStoreFactory("in-memory", store_inmemory.NewStore), ), // RPC diff --git a/components/configstores/in-memory/configstore.go b/components/configstores/in-memory/configstore.go new file mode 100644 index 0000000000..7e3fbed3c9 --- /dev/null +++ b/components/configstores/in-memory/configstore.go @@ -0,0 +1,181 @@ +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package in_memory + +import ( + "context" + "fmt" + "sync" + + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/pkg/actuators" + "mosn.io/layotto/components/trace" +) + +var ( + once sync.Once + readinessIndicator *actuators.HealthIndicator + livenessIndicator *actuators.HealthIndicator +) + +const ( + componentName = "configstore-memory" + defaultGroup = "default" + defaultLabel = "default" +) + +func init() { + readinessIndicator = actuators.NewHealthIndicator() + livenessIndicator = actuators.NewHealthIndicator() +} + +type InMemoryConfigStore struct { + data *sync.Map + listener *sync.Map + storeName string + appId string +} + +func NewStore() configstores.Store { + once.Do(func() { + indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator} + actuators.SetComponentsIndicator(componentName, indicators) + }) + return &InMemoryConfigStore{ + data: &sync.Map{}, + listener: &sync.Map{}, + } +} + +func (m *InMemoryConfigStore) Init(config *configstores.StoreConfig) error { + m.appId = config.AppId + m.storeName = config.StoreName + readinessIndicator.SetStarted() + livenessIndicator.SetStarted() + return nil +} + +// Get gets configuration from configuration store. +func (m *InMemoryConfigStore) Get(ctx context.Context, req *configstores.GetRequest) ([]*configstores.ConfigurationItem, error) { + + res := make([]*configstores.ConfigurationItem, 0, len(req.Keys)) + + for _, key := range req.Keys { + value, ok := m.data.Load(key) + if ok { + config := &configstores.ConfigurationItem{ + Content: value.(string), + Key: key, + Group: req.Group, + } + res = append(res, config) + } + } + trace.SetExtraComponentInfo(ctx, fmt.Sprintf("method: %+v, store: %+v", "Get", "memory")) + return res, nil +} + +// Set saves configuration into configuration store. +func (m *InMemoryConfigStore) Set(ctx context.Context, req *configstores.SetRequest) error { + if len(req.Items) == 0 { + return fmt.Errorf("params illegal:item is empty") + } + for _, item := range req.Items { + m.data.Store(item.Key, item.Content) + m.notifyChanged(item) + } + return nil +} + +// Delete deletes configuration from configuration store. +func (m *InMemoryConfigStore) Delete(ctx context.Context, req *configstores.DeleteRequest) error { + for _, key := range req.Keys { + m.data.Delete(key) + } + return nil +} + +// Subscribe gets configuration from configuration store and subscribe the updates. +func (m *InMemoryConfigStore) Subscribe(request *configstores.SubscribeReq, ch chan *configstores.SubscribeResp) error { + if request.Group == "" && len(request.Keys) > 0 { + request.Group = defaultGroup + } + + ctx := context.Background() + req := &configstores.GetRequest{ + AppId: request.AppId, + Group: request.Group, + Label: request.Label, + Keys: request.Keys, + Metadata: request.Metadata, + } + + for _, key := range req.Keys { + m.listener.Store(key, m.subscribeOnChange(ch)) + } + + items, err := m.Get(ctx, req) + if err != nil { + return err + } + + for _, item := range items { + m.notifyChanged(item) + } + + return nil +} + +func (m *InMemoryConfigStore) notifyChanged(item *configstores.ConfigurationItem) { + f, ok := m.listener.Load(item.Key) + if ok { + f.(OnChangeFunc)(item.Group, item.Key, item.Content) + } +} + +type OnChangeFunc func(group, dataId, data string) + +func (m *InMemoryConfigStore) subscribeOnChange(ch chan *configstores.SubscribeResp) OnChangeFunc { + return func(group, dataId, data string) { + resp := &configstores.SubscribeResp{ + StoreName: m.storeName, + AppId: m.appId, + Items: []*configstores.ConfigurationItem{ + { + Key: dataId, + Content: data, + Group: group, + }, + }, + } + + ch <- resp + } +} + +func (m *InMemoryConfigStore) StopSubscribe() { + // stop listening all subscribed configs + m.listener.Range(func(key, value any) bool { + m.listener.Delete(key) + return true + }) +} + +func (m *InMemoryConfigStore) GetDefaultGroup() string { + return defaultGroup +} + +func (m *InMemoryConfigStore) GetDefaultLabel() string { + return defaultLabel +} diff --git a/components/configstores/in-memory/configstore_test.go b/components/configstores/in-memory/configstore_test.go new file mode 100644 index 0000000000..0577fde248 --- /dev/null +++ b/components/configstores/in-memory/configstore_test.go @@ -0,0 +1,281 @@ +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package in_memory + +import ( + "context" + "fmt" + "sync" + "testing" + + "mosn.io/layotto/components/configstores" + + "github.com/stretchr/testify/assert" +) + +func TestImMemoryConfigStore_Set(t *testing.T) { + t.Run("set success", func(t *testing.T) { + params := &configstores.SetRequest{ + AppId: "test-set-app", + Items: []*configstores.ConfigurationItem{ + { + Group: "test-set-group", + Content: "content", + Key: "test-set-key", + }, + }, + } + + store := NewStore() + err := store.Set(context.Background(), params) + assert.Nil(t, err) + }) + + t.Run("set with empty items", func(t *testing.T) { + store := NewStore() + params := &configstores.SetRequest{ + AppId: "test-set-app", + Items: []*configstores.ConfigurationItem{}, + } + err := store.Set(context.Background(), params) + assert.EqualError(t, fmt.Errorf("params illegal:item is empty"), err.Error()) + }) + + t.Run("test notify listener success", func(t *testing.T) { + store := &InMemoryConfigStore{ + data: &sync.Map{}, + listener: &sync.Map{}, + } + ch := make(chan *configstores.SubscribeResp, 2) + + config := &configstores.StoreConfig{ + AppId: "test-app", + StoreName: "test-store", + } + + err := store.Init(config) + assert.Nil(t, err) + + subReqParams := &configstores.SubscribeReq{ + AppId: "test-app", + Group: "group", + Keys: []string{"data_id"}, + } + + setReqParams := &configstores.SetRequest{ + AppId: "test-app", + StoreName: "test-store", + Items: []*configstores.ConfigurationItem{ + { + Group: "group", + Content: "content", + Key: "data_id", + }, + }, + } + + err = store.Subscribe(subReqParams, ch) + assert.Nil(t, err) + + expected := &configstores.SubscribeResp{ + StoreName: store.storeName, + AppId: store.appId, + Items: []*configstores.ConfigurationItem{ + { + Key: "data_id", + Content: "content", + Group: "group", + }, + }, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + for v := range ch { + i++ + assert.EqualValues(t, expected, v) + } + assert.EqualValues(t, i, 3) + }() + err = store.Set(context.Background(), setReqParams) + assert.Nil(t, err) + + err = store.Set(context.Background(), setReqParams) + assert.Nil(t, err) + + err = store.Set(context.Background(), setReqParams) + assert.Nil(t, err) + close(ch) + wg.Wait() + }) + +} + +func TestInMemoryConfigStore_Get(t *testing.T) { + + t.Run("get success", func(t *testing.T) { + store := NewStore() + + params := &configstores.SetRequest{ + AppId: "test-set-app", + Items: []*configstores.ConfigurationItem{ + { + Group: "test-group", + Content: "content", + Key: "test-key", + }, + }, + } + err := store.Set(context.Background(), params) + assert.Nil(t, err) + + getRequestParams := &configstores.GetRequest{ + Group: "test-group", + Keys: []string{"test-key"}, + } + + v, err := store.Get(context.Background(), getRequestParams) + assert.Nil(t, err) + expect := []*configstores.ConfigurationItem{ + { + Key: getRequestParams.Keys[0], + Group: getRequestParams.Group, + Content: "content", + }, + } + assert.EqualValues(t, expect, v) + }) + +} + +func TestInMemoryConfigStore_Subscribe(t *testing.T) { + t.Run("test subscribe success", func(t *testing.T) { + store := &InMemoryConfigStore{ + data: &sync.Map{}, + listener: &sync.Map{}, + } + ch := make(chan *configstores.SubscribeResp, 2) + + config := &configstores.StoreConfig{ + AppId: "test-app", + StoreName: "test-store", + } + + err := store.Init(config) + assert.Nil(t, err) + + subReqParams := &configstores.SubscribeReq{ + AppId: "test-app", + Group: "group", + Keys: []string{"data_id"}, + } + + err = store.Subscribe(subReqParams, ch) + assert.Nil(t, err) + f, ok := store.listener.Load("data_id") + assert.True(t, ok) + assert.NotNil(t, f) + + expected := &configstores.SubscribeResp{ + StoreName: store.storeName, + AppId: store.appId, + Items: []*configstores.ConfigurationItem{ + { + Key: "data_id", + Content: "content", + Group: "group", + }, + }, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + for v := range ch { + i++ + assert.EqualValues(t, expected, v) + } + assert.EqualValues(t, i, 3) + }() + f.(OnChangeFunc)("group", "data_id", "content") + f.(OnChangeFunc)("group", "data_id", "content") + f.(OnChangeFunc)("group", "data_id", "content") + close(ch) + wg.Wait() + }) +} + +func TestInMemoryConfigStore_StopSubscribe(t *testing.T) { + t.Run("test stop subscribe success", func(t *testing.T) { + store := &InMemoryConfigStore{ + data: &sync.Map{}, + listener: &sync.Map{}, + } + ch := make(chan *configstores.SubscribeResp, 2) + + config := &configstores.StoreConfig{ + AppId: "test-app", + StoreName: "test-store", + } + + err := store.Init(config) + assert.Nil(t, err) + + subReqParams := &configstores.SubscribeReq{ + AppId: "test-app", + Group: "group", + Keys: []string{"data_id"}, + } + + err = store.Subscribe(subReqParams, ch) + assert.Nil(t, err) + + store.StopSubscribe() + f, ok := store.listener.Load("data_id") + assert.False(t, ok) + assert.Nil(t, f) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + for range ch { + i++ + } + assert.EqualValues(t, 0, i) + }() + + setReqParams := &configstores.SetRequest{ + AppId: "test-app", + StoreName: "test-store", + Items: []*configstores.ConfigurationItem{ + { + Group: "group", + Content: "content", + Key: "data_id", + }, + }, + } + err = store.Set(context.Background(), setReqParams) + assert.Nil(t, err) + close(ch) + wg.Wait() + }) +}