diff --git a/example/server/configs.http b/example/server/configs.http index 5765eed..7ec578c 100644 --- a/example/server/configs.http +++ b/example/server/configs.http @@ -4,11 +4,12 @@ GET http://localhost:8080/configs/latest ### Modify latest config PUT http://localhost:8080/configs/latest -user-id: mark +user-id: pippo { - "name": "stark", - "age": 35 + "name": "gloria", + "age": 38, + "friends": ["joe", "mark"] } ### List config versions diff --git a/streamingconfig.go b/streamingconfig.go index 75bfe08..65c9527 100644 --- a/streamingconfig.go +++ b/streamingconfig.go @@ -71,6 +71,12 @@ func WithCollectionName[T Config](collectionName string) func(repo *WatchedRepo[ } } +func WithOnUpdate[T Config](onUpdate func(conf T)) func(repo *WatchedRepo[T]) { + return func(repo *WatchedRepo[T]) { + repo.onUpdate = onUpdate + } +} + type WatchedRepo[T Config] struct { lgr *slog.Logger source *mongo.Database @@ -82,6 +88,7 @@ type WatchedRepo[T Config] struct { skipIndexOperation bool configs *mongo.Collection started bool + onUpdate func(conf T) } func NewWatchedRepo[T Config]( @@ -419,6 +426,9 @@ func (s *WatchedRepo[T]) iterateChangeStream(ctx context.Context, cs *mongo.Chan if s.cfgWithDefaults, err = copyAndSetDefaults(s.cfg); err != nil { s.lgr.With("error", err).ErrorContext(ctx, "could not set defaults") } + if s.onUpdate != nil { + s.onUpdate(s.cfgWithDefaults.Config) + } default: s.lgr.With("operationType", dto.OperationType).ErrorContext(ctx, "invalid or unexpected operation") continue diff --git a/streamingconfig_integration_test.go b/streamingconfig_integration_test.go index 1ce039d..5bd4717 100644 --- a/streamingconfig_integration_test.go +++ b/streamingconfig_integration_test.go @@ -78,15 +78,15 @@ func (a *appConfigV1) validate() error { func NewTestStore[T config.Config]( t *testing.T, - nowProvider func() time.Time, db *mongo.Database, + opts ...func(repo *config.WatchedRepo[T]), ) *config.WatchedRepo[T] { t.Helper() configStore, err := config.NewWatchedRepo( config.Args{ Logger: slog.Default(), DB: db, - }, config.WithNowFn[T](nowProvider)) + }, opts...) require.NoError(t, err) return configStore @@ -120,7 +120,7 @@ func newFixture(t *testing.T) *fixture { assert.NoError(t, client.Database("admin"). RunCommand(ctx, bson.D{primitive.E{Key: "isMaster", Value: 1}}).Decode(&result), "checking mongoDB primary status") assert.Equal(t, true, result["ismaster"]) - }, 15*time.Second, 100*time.Millisecond) + }, 30*time.Second, 100*time.Millisecond) return &fixture{db: db} } @@ -145,7 +145,7 @@ func Test_ConfigCreateFindAndUpdateConfiguration(t *testing.T) { } ctx, cnl := context.WithTimeout(context.Background(), 30*time.Second) - configStoreOne := NewTestStore[*appConfigV0](t, nowProvider, f.db) + configStoreOne := NewTestStore[*appConfigV0](t, f.db, config.WithNowFn[*appConfigV0](nowProvider)) t.Run("cannot perform methods before starting", func(t *testing.T) { cfg, err := configStoreOne.GetConfig() require.ErrorIs(t, err, config.ErrNotStarted) @@ -158,7 +158,7 @@ func Test_ConfigCreateFindAndUpdateConfiguration(t *testing.T) { done1, err := configStoreOne.Start(ctx) require.NoError(t, err) - configStoreTwo := NewTestStore[*appConfigV0](t, nowProvider, f.db) + configStoreTwo := NewTestStore[*appConfigV0](t, f.db, config.WithNowFn[*appConfigV0](nowProvider)) done2, err := configStoreTwo.Start(ctx) require.NoError(t, err) t.Cleanup(func() { @@ -287,6 +287,44 @@ func Test_ConfigCreateFindAndUpdateConfiguration(t *testing.T) { require.Len(t, configsByVersion, 0) }) }) + + t.Run("on update", func(t *testing.T) { + var cfg *appConfigV0 + configStore3 := NewTestStore[*appConfigV0]( + t, + f.db, + config.WithNowFn[*appConfigV0](nowProvider), + config.WithOnUpdate[*appConfigV0](func(conf *appConfigV0) { + cfg = conf + }), + ) + done2, err := configStore3.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + cnl() + doneOrTimeout(t, done1, 5*time.Second) + doneOrTimeout(t, done2, 5*time.Second) + }) + + now = &at + setV3 := &appConfigV0{ + Name: "n3", + Duration: 3 * time.Second, + Time: at.Add(3 * time.Second), + Nested: nestedConfig{Counter: 5}, + List: []string{"e", "f", "d"}, + } + cV3, err := configStoreTwo.UpdateConfig(ctx, config.UpdateConfigCmd[*appConfigV0]{ + By: "u2", + Config: setV3, + }) + require.NoError(t, err) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, cV3.Config, cfg) + }, 5*time.Second, 100*time.Millisecond) + + }) }) } @@ -302,11 +340,19 @@ func Test_ConfigBackwardCompatibility(t *testing.T) { } ctx, cnl := context.WithTimeout(context.Background(), 30*time.Second) - configStoreOne := NewTestStore[*appConfigV0](t, nowProvider, f.db) + configStoreOne := NewTestStore[*appConfigV0]( + t, + f.db, + config.WithNowFn[*appConfigV0](nowProvider), + ) done1, err := configStoreOne.Start(ctx) require.NoError(t, err) - configStoreTwo := NewTestStore[*appConfigV1](t, nowProvider, f.db) + configStoreTwo := NewTestStore[*appConfigV1]( + t, + f.db, + config.WithNowFn[*appConfigV1](nowProvider), + ) done2, err := configStoreTwo.Start(ctx) require.NoError(t, err) t.Cleanup(func() {