Skip to content

Commit

Permalink
Merge branch 'coder:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
projectoperations authored Oct 21, 2024
2 parents a88ec4c + 29099d4 commit 3b76c49
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 194 deletions.
4 changes: 2 additions & 2 deletions coderd/notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ func (m *Manager) loop(ctx context.Context) error {
var eg errgroup.Group

// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
eg.Go(func() error {
return m.notifier.run(ctx, m.success, m.failure)
return m.notifier.run(m.success, m.failure)
})

// Periodically flush notification state changes to the store.
Expand Down
27 changes: 16 additions & 11 deletions coderd/notifications/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/quartz"
"github.com/coder/serpent"

"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbgen"
Expand All @@ -32,24 +34,25 @@ func TestBufferedUpdates(t *testing.T) {

// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
_, _, api := coderdtest.NewWithAPI(t, nil)
store, _ := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

interceptor := &syncInterceptor{Store: api.Database}
interceptor := &syncInterceptor{Store: store}
santa := &santaHandler{}

cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
cfg.StoreSyncInterval = serpent.Duration(time.Hour) // Ensure we don't sync the store automatically.

// GIVEN: a manager which will pass or fail notifications based on their "nice" labels
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), createMetrics(), api.Logger.Named("notifications-manager"))
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), createMetrics(), logger.Named("notifications-manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
database.NotificationMethodSmtp: santa,
})
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), api.Logger.Named("notifications-enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := dbgen.User(t, api.Database, database.User{})
user := dbgen.User(t, store, database.User{})

// WHEN: notifications are enqueued which should succeed and fail
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "true", "i": "0"}, "") // Will succeed.
Expand Down Expand Up @@ -103,7 +106,8 @@ func TestBuildPayload(t *testing.T) {

// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
_, _, api := coderdtest.NewWithAPI(t, nil)
store, _ := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

// GIVEN: a set of helpers to be injected into the templates
const label = "Click here!"
Expand All @@ -115,7 +119,7 @@ func TestBuildPayload(t *testing.T) {
}

// GIVEN: an enqueue interceptor which returns mock metadata
interceptor := newEnqueueInterceptor(api.Database,
interceptor := newEnqueueInterceptor(store,
// Inject custom message metadata to influence the payload construction.
func() database.FetchNewMessageMetadataRow {
// Inject template actions which use injected help functions.
Expand All @@ -137,7 +141,7 @@ func TestBuildPayload(t *testing.T) {
}
})

enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, api.Logger.Named("notifications-enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
require.NoError(t, err)

// WHEN: a notification is enqueued
Expand All @@ -160,10 +164,11 @@ func TestStopBeforeRun(t *testing.T) {

// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
_, _, api := coderdtest.NewWithAPI(t, nil)
store, _ := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

// GIVEN: a standard manager
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), api.Database, defaultHelpers(), createMetrics(), api.Logger.Named("notifications-manager"))
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), store, defaultHelpers(), createMetrics(), logger.Named("notifications-manager"))
require.NoError(t, err)

// THEN: validate that the manager can be stopped safely without Run() having been called yet
Expand Down
66 changes: 38 additions & 28 deletions coderd/notifications/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/quartz"

"github.com/coder/serpent"

"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
Expand All @@ -39,7 +39,8 @@ func TestMetrics(t *testing.T) {

// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
_, _, api := coderdtest.NewWithAPI(t, nil)
store, _ := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

reg := prometheus.NewRegistry()
metrics := notifications.NewMetrics(reg)
Expand All @@ -59,7 +60,7 @@ func TestMetrics(t *testing.T) {
cfg.RetryInterval = serpent.Duration(time.Millisecond * 50)
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) // Twice as long as fetch interval to ensure we catch pending updates.

mgr, err := notifications.NewManager(cfg, api.Database, defaultHelpers(), metrics, api.Logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), metrics, logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
Expand All @@ -69,10 +70,10 @@ func TestMetrics(t *testing.T) {
method: handler,
})

enq, err := notifications.NewStoreEnqueuer(cfg, api.Database, defaultHelpers(), api.Logger.Named("enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, api.Database)
user := createSampleUser(t, store)

// Build fingerprints for the two different series we expect.
methodTemplateFP := fingerprintLabels(notifications.LabelMethod, string(method), notifications.LabelTemplateID, template.String())
Expand Down Expand Up @@ -212,7 +213,8 @@ func TestPendingUpdatesMetric(t *testing.T) {
// SETUP
// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
_, _, api := coderdtest.NewWithAPI(t, nil)
store, _ := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

reg := prometheus.NewRegistry()
metrics := notifications.NewMetrics(reg)
Expand All @@ -223,14 +225,17 @@ func TestPendingUpdatesMetric(t *testing.T) {
// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
cfg := defaultNotificationsConfig(method)
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)

syncer := &syncInterceptor{Store: api.Database}
syncer := &syncInterceptor{Store: store}
interceptor := newUpdateSignallingInterceptor(syncer)
mClock := quartz.NewMock(t)
trap := mClock.Trap().NewTicker("Manager", "storeSync")
defer trap.Close()
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"),
fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval")
defer fetchTrap.Close()
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, logger.Named("manager"),
notifications.WithTestClock(mClock))
require.NoError(t, err)
t.Cleanup(func() {
Expand All @@ -241,10 +246,10 @@ func TestPendingUpdatesMetric(t *testing.T) {
method: handler,
})

enq, err := notifications.NewStoreEnqueuer(cfg, api.Database, defaultHelpers(), api.Logger.Named("enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, api.Database)
user := createSampleUser(t, store)

// WHEN: 2 notifications are enqueued, one of which will fail and one which will succeed
_, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test") // this will succeed
Expand All @@ -254,24 +259,27 @@ func TestPendingUpdatesMetric(t *testing.T) {

mgr.Run(ctx)
trap.MustWait(ctx).Release() // ensures ticker has been set
fetchTrap.MustWait(ctx).Release()

// Advance to the first fetch
mClock.Advance(cfg.FetchInterval.Value()).MustWait(ctx)

// THEN:
// Wait until the handler has dispatched the given notifications.
require.Eventually(t, func() bool {
// handler has dispatched the given notifications.
func() {
handler.mu.RLock()
defer handler.mu.RUnlock()

return len(handler.succeeded) == 1 && len(handler.failed) == 1
}, testutil.WaitShort, testutil.IntervalFast)
require.Len(t, handler.succeeded, 1)
require.Len(t, handler.failed, 1)
}()

// Both handler calls should be pending in the metrics.
require.Eventually(t, func() bool {
return promtest.ToFloat64(metrics.PendingUpdates) == float64(2)
}, testutil.WaitShort, testutil.IntervalFast)
require.EqualValues(t, 2, promtest.ToFloat64(metrics.PendingUpdates))

// THEN:
// Trigger syncing updates
mClock.Advance(cfg.StoreSyncInterval.Value()).MustWait(ctx)
mClock.Advance(cfg.StoreSyncInterval.Value() - cfg.FetchInterval.Value()).MustWait(ctx)

// Wait until we intercept the calls to sync the pending updates to the store.
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
Expand All @@ -296,7 +304,8 @@ func TestInflightDispatchesMetric(t *testing.T) {
// SETUP
// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
_, _, api := coderdtest.NewWithAPI(t, nil)
store, _ := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

reg := prometheus.NewRegistry()
metrics := notifications.NewMetrics(reg)
Expand All @@ -311,7 +320,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)

mgr, err := notifications.NewManager(cfg, api.Database, defaultHelpers(), metrics, api.Logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), metrics, logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
Expand All @@ -326,10 +335,10 @@ func TestInflightDispatchesMetric(t *testing.T) {
method: barrier,
})

enq, err := notifications.NewStoreEnqueuer(cfg, api.Database, defaultHelpers(), api.Logger.Named("enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, api.Database)
user := createSampleUser(t, store)

// WHEN: notifications are enqueued which will succeed (and be delayed during dispatch)
for i := 0; i < msgCount; i++ {
Expand Down Expand Up @@ -374,7 +383,8 @@ func TestCustomMethodMetricCollection(t *testing.T) {

// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
_, _, api := coderdtest.NewWithAPI(t, nil)
store, _ := dbtestutil.NewDB(t)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

var (
reg = prometheus.NewRegistry()
Expand All @@ -389,7 +399,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
)

// GIVEN: a template whose notification method differs from the default.
out, err := api.Database.UpdateNotificationTemplateMethodByID(ctx, database.UpdateNotificationTemplateMethodByIDParams{
out, err := store.UpdateNotificationTemplateMethodByID(ctx, database.UpdateNotificationTemplateMethodByIDParams{
ID: template,
Method: database.NullNotificationMethod{NotificationMethod: customMethod, Valid: true},
})
Expand All @@ -398,7 +408,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {

// WHEN: two notifications (each with different templates) are enqueued.
cfg := defaultNotificationsConfig(defaultMethod)
mgr, err := notifications.NewManager(cfg, api.Database, defaultHelpers(), metrics, api.Logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), metrics, logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
Expand All @@ -411,10 +421,10 @@ func TestCustomMethodMetricCollection(t *testing.T) {
customMethod: webhookHandler,
})

enq, err := notifications.NewStoreEnqueuer(cfg, api.Database, defaultHelpers(), api.Logger.Named("enqueuer"), quartz.NewReal())
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, api.Database)
user := createSampleUser(t, store)

_, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test")
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 3b76c49

Please sign in to comment.