From 4fafe36411f2d5332d8ebfdfe0121cb17f00d9ae Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 25 Apr 2024 11:36:29 +0200 Subject: [PATCH] Introduce and use type `events.Router` --- internal/events/router.go | 231 ++++++++++++++++++++++++++++++++ internal/events/router_test.go | 144 ++++++++++++++++++++ internal/icinga2/launcher.go | 4 +- internal/listener/listener.go | 4 +- internal/testutils/testutils.go | 11 ++ 5 files changed, 390 insertions(+), 4 deletions(-) create mode 100644 internal/events/router.go create mode 100644 internal/events/router_test.go diff --git a/internal/events/router.go b/internal/events/router.go new file mode 100644 index 000000000..5f0d2fab8 --- /dev/null +++ b/internal/events/router.go @@ -0,0 +1,231 @@ +package events + +import ( + "context" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/contracts" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/history" + "github.com/icinga/icinga-notifications/internal/incident" + "github.com/icinga/icinga-notifications/internal/notifyutils" + "github.com/icinga/icinga-notifications/internal/object" + "github.com/icinga/icinga-notifications/internal/rule" + "github.com/icinga/icinga-notifications/internal/utils" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/logging" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// Router is used to dispatch all incoming events to their corresponding handlers and provides a default +// handler if there is none. You should always use this type to handle events properly and must avoid bypassing +// it, and accessing the handlers directly. +type Router struct { + db *icingadb.DB + logs *logging.Logging + logger *zap.SugaredLogger + runtimeConfig *config.RuntimeConfig +} + +// NewRouter returns a fully initialised events Router instance from the provided args. +func NewRouter(rc *config.RuntimeConfig, db *icingadb.DB, logs *logging.Logging) *Router { + return &Router{runtimeConfig: rc, db: db, logs: logs, logger: logs.GetChildLogger("routing").SugaredLogger} +} + +// Route routes the specified event.Event to its corresponding handler. +// +// This function first constructs the target object.Object and its incident.Incident from the provided event.Event. +// After some safety checks have been carried out, the event is then handed over to the associated handler or its +// default one. Currently, ‘incident’ is the only (independent) event handler apart from the default one of this type. +// +// Note, that this function will always return event.ErrEventProcessing if the event cannot be processed successfully, +// either directly (unwrapped) or indirectly (wrapped with an additional context), except in one particular case +// where event.ErrSuperfluousStateChange is returned. Thus, the returned error can be safely forwarded to clients. +func (r *Router) Route(ctx context.Context, ev *event.Event) error { + obj, err := object.FromEvent(ctx, r.db, ev) + if err != nil { + r.logger.Errorw("Failed to sync object with the database", zap.Int64("source_id", ev.SourceId), zap.Error(err)) + return event.ErrEventProcessing + } + + r.logger = r.logger.With(zap.String("object", obj.DisplayName()), zap.Int64("source_id", ev.SourceId)) + + createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK + incidentLogger := r.logs.GetChildLogger("incident") + i, err := incident.GetCurrent(ctx, r.db, obj, incidentLogger, r.runtimeConfig, createIncident) + if err != nil { + r.logger.Errorw("Failed to create/determine an incident", zap.Error(err)) + return event.ErrEventProcessing + } + + if i != nil { + if i.ProcessEvent(ctx, ev) != nil { + // Expect the actual error to be logged with additional context in the incident package. + return event.ErrEventProcessing + } + + return nil + } + + switch ev.Type { + case event.TypeState: + if ev.Severity != event.SeverityOK { + r.logger.Warn("Cannot process state event without an incident") + return errors.Wrap(event.ErrEventProcessing, "cannot process event without an active incident") + } + + r.logger.Debug("Received superfluous OK state event") + return event.ErrSuperfluousStateChange + case event.TypeAcknowledgementSet: + r.logger.Warn("Cannot set acknowledgement without an active incident") + return errors.Wrap(event.ErrEventProcessing, "cannot set acknowledgement without an active incident") + case event.TypeAcknowledgementCleared: + r.logger.Warn("Cannot clear acknowledgement without an active incident") + return errors.Wrapf(event.ErrEventProcessing, "cannot clear acknowledgement without an active incident") + } + + return r.process(ctx, obj, ev) +} + +// process processes the provided event and notifies routing recipients in a non-blocking fashion. +// +// This function processes the specified event in an own transaction and rolls back all changes made to the +// database if it returns with an error. However, it should be noted that notifications are triggered outside +// a database transaction initiated after successful event processing and will not undo the changes made by the +// event processing tx if sending the notifications fails. +// +// Returns always event.ErrEventProcessing in some way in case of internal processing errors. +func (r *Router) process(ctx context.Context, obj *object.Object, ev *event.Event) error { + tx, err := r.db.BeginTxx(ctx, nil) + if err != nil { + r.logger.Errorw("Failed to start a database transaction", zap.Error(err)) + return event.ErrEventProcessing + } + defer func() { _ = tx.Rollback() }() + + if err := ev.Sync(ctx, tx, r.db, obj.ID); err != nil { + r.logger.Errorw("Failed to sync an event with the database", zap.Error(err)) + return event.ErrEventProcessing + } + + // Incident filter rules are stateful, which means that once they have been matched, they remain + // effective for the ongoing incident and never need to be rechecked. For non-state events, on the + // other hand, there is no such (already matched) rule if they aren't linked to any active incident + // and need to be reevaluated all over again. + routes := r.evaluateRoutes(ev, r.evaluateRules(obj)) + + notifications := make(history.PendingNotifications) + for routing, channels := range r.GetRecipientsChannel(ev, routes) { + histories, err := history.AddPendingNotifications(ctx, r.db, tx, channels, func(h *history.NotificationHistory) { + h.RuleRoutingID = utils.ToDBInt(routing.ID) + }) + if err != nil { + r.logger.Errorw("Failed to insert pending notification histories", zap.Inline(routing), zap.Error(err)) + return event.ErrEventProcessing + } + + for contact, entry := range histories { + notifications[contact] = entry + } + } + + // Commit the event processing transaction before moving on to the next step and sending notifications. + if err = tx.Commit(); err != nil { + r.logger.Errorw("Cannot commit database transaction", zap.Error(err)) + return event.ErrEventProcessing + } + + if len(notifications) == 0 { + r.logger.Debugw("No routing recipients found, not sending notifications", zap.String("event", ev.String())) + return nil + } + + err = notifyutils.NotifyContacts(ctx, contracts.NewDefaultNotifyCtx(obj, r.logger), r.db, r.runtimeConfig, ev, notifications) + if err != nil { + r.logger.Errorw("Failed to send all pending notifications", zap.Error(err)) + return event.ErrEventProcessing + } + + return nil +} + +// evaluateRules reevaluates and retrieves all the configured event rules that match on the given object. +// DO NOT call this while holding the runtime config lock! +func (r *Router) evaluateRules(obj *object.Object) map[int64]*rule.Rule { + r.runtimeConfig.RLock() + defer r.runtimeConfig.RUnlock() + + rules := make(map[int64]*rule.Rule) + for _, ru := range r.runtimeConfig.Rules { + // Skip the event rule if it's disabled + if ru == nil || !ru.IsActive.Valid || !ru.IsActive.Bool { + continue + } + + if ru.ObjectFilter != nil { + matched, err := ru.ObjectFilter.Eval(obj) + if err != nil { + // Do not let our event processing tx fail due to the Object eval error, so log it and move on. + r.logger.Warnw("Failed to evaluate object filter", zap.String("rule", ru.Name), zap.Error(err)) + } + + if err != nil || !matched { + continue + } + + r.logger.Debugw("Event rule filter matches", zap.String("rule", ru.Name), + zap.String("filter", ru.ObjectFilterExpr.String)) + } + + r.logger.Infof("Rule %q matches", ru.Name) + + rules[ru.ID] = ru + } + + return rules +} + +// EvaluateRoutes evaluates and retrieves all the configured event routing that match on the given event. +func (r *Router) evaluateRoutes(ev *event.Event, rules map[int64]*rule.Rule) []*rule.Routing { + filterContext := &rule.RoutingFilter{EventType: ev.Type} + + var routes []*rule.Routing + for _, ru := range rules { + for _, routing := range ru.Routes { + // Rule routing without any condition always matches. + matched := routing.Condition == nil + + if !matched { + var err error + matched, err = routing.Condition.Eval(filterContext) + if err != nil { + r.logger.Warnw("Failed to evaluate routing condition", zap.String("rule", ru.Name), + zap.Inline(routing), zap.Error(err)) + + matched = false + } + } + + if matched { + routes = append(routes, routing) + r.logger.Debugw("Routing condition matches", zap.String("rule", ru.Name), zap.Inline(routing)) + } + } + } + + return routes +} + +// GetRecipientsChannel retrieves all the recipients channels of the routes. +func (r *Router) GetRecipientsChannel(ev *event.Event, routes []*rule.Routing) map[*rule.Routing]rule.ContactChannels { + routesChannels := make(map[*rule.Routing]rule.ContactChannels) + for _, routing := range routes { + if routesChannels[routing] == nil { + routesChannels[routing] = make(rule.ContactChannels) + } + + routesChannels[routing].LoadFromRoutingRecipients(routing, ev.Time, rule.IsNotifiable) + } + + return routesChannels +} diff --git a/internal/events/router_test.go b/internal/events/router_test.go new file mode 100644 index 000000000..effaec3ad --- /dev/null +++ b/internal/events/router_test.go @@ -0,0 +1,144 @@ +package events + +import ( + "context" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/daemon" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/incident" + "github.com/icinga/icinga-notifications/internal/object" + "github.com/icinga/icinga-notifications/internal/testutils" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "testing" + "time" +) + +func TestRouterRoute(t *testing.T) { + ctx := context.Background() + db := testutils.GetTestDB(ctx, t) + + require.NoError(t, daemon.LoadConfig("../../config.example.yml"), "loading config file should not fail") + + // Insert a dummy sources for our test cases! + source := config.Source{ID: 10, Type: "notifications", Name: "Icinga Notifications", Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}} + stmt, _ := db.BuildInsertStmt(source) + _, err := db.NamedExecContext(ctx, stmt, source) + require.NoError(t, err, "populating source table should not fail") + + logs, err := logging.NewLogging("testing", zapcore.DebugLevel, "console", nil, time.Hour) + require.NoError(t, err, "logging initialisation should not fail") + + // Load the just populated config into the runtime config. + runtimeConfig := config.NewRuntimeConfig(nil, logs, db) + require.NoError(t, runtimeConfig.UpdateFromDatabase(ctx), "fetching configs from database should not fail") + + t.Run("InvalidEvents", func(t *testing.T) { + router := NewRouter(runtimeConfig, db, logs) + + // These events require an active incident to be processed successfully. + assert.ErrorIs(t, router.Route(ctx, makeEvent(t, event.TypeState, event.SeverityOK)), event.ErrSuperfluousStateChange) + assert.ErrorIs(t, router.Route(ctx, makeEvent(t, event.TypeState, event.SeverityNone)), event.ErrEventProcessing) + assert.ErrorIs(t, router.Route(ctx, makeEvent(t, event.TypeAcknowledgementSet, event.SeverityOK)), event.ErrEventProcessing) + assert.ErrorIs(t, router.Route(ctx, makeEvent(t, event.TypeAcknowledgementCleared, event.SeverityOK)), event.ErrEventProcessing) + }) + + t.Run("StateChangeEvents", func(t *testing.T) { + router := NewRouter(runtimeConfig, db, logs) + + states := map[string]*event.Event{ + "crit": makeEvent(t, event.TypeState, event.SeverityCrit), + "warn": makeEvent(t, event.TypeState, event.SeverityWarning), + "err": makeEvent(t, event.TypeState, event.SeverityErr), + "alert": makeEvent(t, event.TypeState, event.SeverityAlert), + } + + for severity, ev := range states { + assert.NoErrorf(t, router.Route(ctx, ev), "state event with severity %q should open an incident", severity) + } + + reloadIncidents := func(ctx context.Context) { + object.ClearCache() + + // Remove all existing incidents from the cache, as they are indexed with the + // pointer of their object, which is going to change! + for _, i := range incident.GetCurrentIncidents() { + incident.RemoveCurrent(i.Object) + } + + // The incident loading process may hang due to unknown bugs or semaphore lock waits. + // Therefore, give it maximum time of 10s to finish normally, otherwise give up and fail. + ctx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) + defer cancelFunc() + + err := incident.LoadOpenIncidents(ctx, db, logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour), runtimeConfig) + require.NoError(t, err, "loading active incidents should not fail") + } + reloadIncidents(ctx) + + for severity, ev := range states { + obj, err := object.FromEvent(ctx, db, ev) + assert.NoError(t, err) + + i, err := incident.GetCurrent(ctx, db, obj, logs.GetLogger(), runtimeConfig, false) + assert.NoErrorf(t, err, "incident for event severity %q should be in cache", severity) + + assert.Equal(t, obj, i.Object, "incident and event object should be the same") + assert.Equal(t, i.Severity, ev.Severity, "incident and event severity should be the same") + } + + // Recover the incidents + for _, ev := range states { + ev.Time = time.Now() + ev.Severity = event.SeverityOK + + assert.NoErrorf(t, router.Route(ctx, ev), "state event with severity %q should close an incident", "ok") + } + reloadIncidents(ctx) + assert.Len(t, incident.GetCurrentIncidents(), 0, "there should be no cached incidents") + }) + + t.Run("NonStateEvents", func(t *testing.T) { + router := NewRouter(runtimeConfig, db, logs) + + states := []*event.Event{ + makeEvent(t, event.TypeDowntimeStart, event.SeverityNone), + makeEvent(t, event.TypeDowntimeEnd, event.SeverityOK), + makeEvent(t, event.TypeDowntimeRemoved, event.SeverityNone), + makeEvent(t, event.TypeCustom, event.SeverityOK), + makeEvent(t, event.TypeFlappingStart, event.SeverityNone), + makeEvent(t, event.TypeFlappingEnd, event.SeverityNone), + } + + for _, ev := range states { + assert.NoErrorf(t, router.Route(ctx, ev), "processing non-state event %q should not fail", ev.Type) + assert.Lenf(t, incident.GetCurrentIncidents(), 0, "non-state event %q should not open an incident", ev.Type) + } + }) +} + +// makeEvent creates a fully initialised event.Event of the given type and severity. +func makeEvent(t *testing.T, typ string, severity event.Severity) *event.Event { + return &event.Event{ + SourceId: 10, + Name: testutils.MakeRandomString(t), + URL: "https://localhost/icingaweb2/icingadb", + Type: typ, + Time: time.Now(), + Severity: severity, + Username: "icingaadmin", + Message: "You will contract a rare disease", + Tags: map[string]string{ + "Host": testutils.MakeRandomString(t), + "Service": testutils.MakeRandomString(t), + }, + ExtraTags: map[string]string{ + "hostgroup/database-server": "", + "servicegroup/webserver": "", + }, + } +} diff --git a/internal/icinga2/launcher.go b/internal/icinga2/launcher.go index 026c0f72a..c55e4672b 100644 --- a/internal/icinga2/launcher.go +++ b/internal/icinga2/launcher.go @@ -11,7 +11,7 @@ import ( "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" - "github.com/icinga/icinga-notifications/internal/incident" + "github.com/icinga/icinga-notifications/internal/events" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" "go.uber.org/zap" @@ -126,7 +126,7 @@ func (launcher *Launcher) launch(src *config.Source) { CallbackFn: func(ev *event.Event) { l := logger.With(zap.Stringer("event", ev)) - err := incident.ProcessEvent(subCtx, launcher.Db, launcher.Logs, launcher.RuntimeConfig, ev) + err := events.NewRouter(launcher.RuntimeConfig, launcher.Db, launcher.Logs).Route(subCtx, ev) switch { case errors.Is(err, event.ErrSuperfluousStateChange): l.Debugw("Stopped processing event with superfluous state change", zap.Error(err)) diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 73c0955e2..5de891a85 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -9,6 +9,7 @@ import ( "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/events" "github.com/icinga/icinga-notifications/internal/incident" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" @@ -128,8 +129,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } l.logger.Infow("Processing event", zap.String("event", ev.String())) - err = incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) - if err != nil { + if err := events.NewRouter(l.runtimeConfig, l.db, l.logs).Route(context.Background(), &ev); err != nil { abort(http.StatusInternalServerError, &ev, err.Error()) return } diff --git a/internal/testutils/testutils.go b/internal/testutils/testutils.go index 6f68698f2..7674e1617 100644 --- a/internal/testutils/testutils.go +++ b/internal/testutils/testutils.go @@ -2,6 +2,8 @@ package testutils import ( "context" + "crypto/rand" + "fmt" "github.com/creasty/defaults" "github.com/icinga/icingadb/pkg/config" "github.com/icinga/icingadb/pkg/icingadb" @@ -56,3 +58,12 @@ func GetTestDB(ctx context.Context, t *testing.T) *icingadb.DB { return db } + +// MakeRandomString returns a 20-byte random hex string. +func MakeRandomString(t *testing.T) string { + buf := make([]byte, 20) + _, err := rand.Read(buf) + require.NoError(t, err, "failed to generate random string") + + return fmt.Sprintf("%x", buf) +}