Skip to content

Commit

Permalink
Introduce and use type events.Router
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Apr 26, 2024
1 parent 212132f commit 4fafe36
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 4 deletions.
231 changes: 231 additions & 0 deletions internal/events/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package events

import (
"context"
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/contracts"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/history"
"github.com/icinga/icinga-notifications/internal/incident"
"github.com/icinga/icinga-notifications/internal/notifyutils"
"github.com/icinga/icinga-notifications/internal/object"
"github.com/icinga/icinga-notifications/internal/rule"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/pkg/errors"
"go.uber.org/zap"
)

// Router is used to dispatch all incoming events to their corresponding handlers and provides a default
// handler if there is none. You should always use this type to handle events properly and must avoid bypassing
// it, and accessing the handlers directly.
type Router struct {
db *icingadb.DB
logs *logging.Logging
logger *zap.SugaredLogger
runtimeConfig *config.RuntimeConfig
}

// NewRouter returns a fully initialised events Router instance from the provided args.
func NewRouter(rc *config.RuntimeConfig, db *icingadb.DB, logs *logging.Logging) *Router {
return &Router{runtimeConfig: rc, db: db, logs: logs, logger: logs.GetChildLogger("routing").SugaredLogger}
}

// Route routes the specified event.Event to its corresponding handler.
//
// This function first constructs the target object.Object and its incident.Incident from the provided event.Event.
// After some safety checks have been carried out, the event is then handed over to the associated handler or its
// default one. Currently, ‘incident’ is the only (independent) event handler apart from the default one of this type.
//
// Note, that this function will always return event.ErrEventProcessing if the event cannot be processed successfully,
// either directly (unwrapped) or indirectly (wrapped with an additional context), except in one particular case
// where event.ErrSuperfluousStateChange is returned. Thus, the returned error can be safely forwarded to clients.
func (r *Router) Route(ctx context.Context, ev *event.Event) error {
obj, err := object.FromEvent(ctx, r.db, ev)
if err != nil {
r.logger.Errorw("Failed to sync object with the database", zap.Int64("source_id", ev.SourceId), zap.Error(err))
return event.ErrEventProcessing
}

r.logger = r.logger.With(zap.String("object", obj.DisplayName()), zap.Int64("source_id", ev.SourceId))

createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK
incidentLogger := r.logs.GetChildLogger("incident")
i, err := incident.GetCurrent(ctx, r.db, obj, incidentLogger, r.runtimeConfig, createIncident)
if err != nil {
r.logger.Errorw("Failed to create/determine an incident", zap.Error(err))
return event.ErrEventProcessing
}

if i != nil {
if i.ProcessEvent(ctx, ev) != nil {
// Expect the actual error to be logged with additional context in the incident package.
return event.ErrEventProcessing
}

return nil
}

switch ev.Type {
case event.TypeState:
if ev.Severity != event.SeverityOK {
r.logger.Warn("Cannot process state event without an incident")
return errors.Wrap(event.ErrEventProcessing, "cannot process event without an active incident")
}

r.logger.Debug("Received superfluous OK state event")
return event.ErrSuperfluousStateChange
case event.TypeAcknowledgementSet:
r.logger.Warn("Cannot set acknowledgement without an active incident")
return errors.Wrap(event.ErrEventProcessing, "cannot set acknowledgement without an active incident")
case event.TypeAcknowledgementCleared:
r.logger.Warn("Cannot clear acknowledgement without an active incident")
return errors.Wrapf(event.ErrEventProcessing, "cannot clear acknowledgement without an active incident")
}

return r.process(ctx, obj, ev)
}

// process processes the provided event and notifies routing recipients in a non-blocking fashion.
//
// This function processes the specified event in an own transaction and rolls back all changes made to the
// database if it returns with an error. However, it should be noted that notifications are triggered outside
// a database transaction initiated after successful event processing and will not undo the changes made by the
// event processing tx if sending the notifications fails.
//
// Returns always event.ErrEventProcessing in some way in case of internal processing errors.
func (r *Router) process(ctx context.Context, obj *object.Object, ev *event.Event) error {
tx, err := r.db.BeginTxx(ctx, nil)
if err != nil {
r.logger.Errorw("Failed to start a database transaction", zap.Error(err))
return event.ErrEventProcessing
}
defer func() { _ = tx.Rollback() }()

if err := ev.Sync(ctx, tx, r.db, obj.ID); err != nil {
r.logger.Errorw("Failed to sync an event with the database", zap.Error(err))
return event.ErrEventProcessing
}

// Incident filter rules are stateful, which means that once they have been matched, they remain
// effective for the ongoing incident and never need to be rechecked. For non-state events, on the
// other hand, there is no such (already matched) rule if they aren't linked to any active incident
// and need to be reevaluated all over again.
routes := r.evaluateRoutes(ev, r.evaluateRules(obj))

notifications := make(history.PendingNotifications)
for routing, channels := range r.GetRecipientsChannel(ev, routes) {
histories, err := history.AddPendingNotifications(ctx, r.db, tx, channels, func(h *history.NotificationHistory) {
h.RuleRoutingID = utils.ToDBInt(routing.ID)
})
if err != nil {
r.logger.Errorw("Failed to insert pending notification histories", zap.Inline(routing), zap.Error(err))
return event.ErrEventProcessing
}

for contact, entry := range histories {
notifications[contact] = entry
}
}

// Commit the event processing transaction before moving on to the next step and sending notifications.
if err = tx.Commit(); err != nil {
r.logger.Errorw("Cannot commit database transaction", zap.Error(err))
return event.ErrEventProcessing
}

if len(notifications) == 0 {
r.logger.Debugw("No routing recipients found, not sending notifications", zap.String("event", ev.String()))
return nil
}

err = notifyutils.NotifyContacts(ctx, contracts.NewDefaultNotifyCtx(obj, r.logger), r.db, r.runtimeConfig, ev, notifications)
if err != nil {
r.logger.Errorw("Failed to send all pending notifications", zap.Error(err))
return event.ErrEventProcessing
}

return nil
}

// evaluateRules reevaluates and retrieves all the configured event rules that match on the given object.
// DO NOT call this while holding the runtime config lock!
func (r *Router) evaluateRules(obj *object.Object) map[int64]*rule.Rule {
r.runtimeConfig.RLock()
defer r.runtimeConfig.RUnlock()

rules := make(map[int64]*rule.Rule)
for _, ru := range r.runtimeConfig.Rules {
// Skip the event rule if it's disabled
if ru == nil || !ru.IsActive.Valid || !ru.IsActive.Bool {
continue
}

if ru.ObjectFilter != nil {
matched, err := ru.ObjectFilter.Eval(obj)
if err != nil {
// Do not let our event processing tx fail due to the Object eval error, so log it and move on.
r.logger.Warnw("Failed to evaluate object filter", zap.String("rule", ru.Name), zap.Error(err))
}

if err != nil || !matched {
continue
}

r.logger.Debugw("Event rule filter matches", zap.String("rule", ru.Name),
zap.String("filter", ru.ObjectFilterExpr.String))
}

r.logger.Infof("Rule %q matches", ru.Name)

rules[ru.ID] = ru
}

return rules
}

// EvaluateRoutes evaluates and retrieves all the configured event routing that match on the given event.
func (r *Router) evaluateRoutes(ev *event.Event, rules map[int64]*rule.Rule) []*rule.Routing {
filterContext := &rule.RoutingFilter{EventType: ev.Type}

var routes []*rule.Routing
for _, ru := range rules {
for _, routing := range ru.Routes {
// Rule routing without any condition always matches.
matched := routing.Condition == nil

if !matched {
var err error
matched, err = routing.Condition.Eval(filterContext)
if err != nil {
r.logger.Warnw("Failed to evaluate routing condition", zap.String("rule", ru.Name),
zap.Inline(routing), zap.Error(err))

matched = false
}
}

if matched {
routes = append(routes, routing)
r.logger.Debugw("Routing condition matches", zap.String("rule", ru.Name), zap.Inline(routing))
}
}
}

return routes
}

// GetRecipientsChannel retrieves all the recipients channels of the routes.
func (r *Router) GetRecipientsChannel(ev *event.Event, routes []*rule.Routing) map[*rule.Routing]rule.ContactChannels {
routesChannels := make(map[*rule.Routing]rule.ContactChannels)
for _, routing := range routes {
if routesChannels[routing] == nil {
routesChannels[routing] = make(rule.ContactChannels)
}

routesChannels[routing].LoadFromRoutingRecipients(routing, ev.Time, rule.IsNotifiable)
}

return routesChannels
}
144 changes: 144 additions & 0 deletions internal/events/router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package events

import (
"context"
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/daemon"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/incident"
"github.com/icinga/icinga-notifications/internal/object"
"github.com/icinga/icinga-notifications/internal/testutils"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"testing"
"time"
)

func TestRouterRoute(t *testing.T) {
ctx := context.Background()
db := testutils.GetTestDB(ctx, t)

require.NoError(t, daemon.LoadConfig("../../config.example.yml"), "loading config file should not fail")

// Insert a dummy sources for our test cases!
source := config.Source{ID: 10, Type: "notifications", Name: "Icinga Notifications", Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}}
stmt, _ := db.BuildInsertStmt(source)
_, err := db.NamedExecContext(ctx, stmt, source)
require.NoError(t, err, "populating source table should not fail")

logs, err := logging.NewLogging("testing", zapcore.DebugLevel, "console", nil, time.Hour)
require.NoError(t, err, "logging initialisation should not fail")

// Load the just populated config into the runtime config.
runtimeConfig := config.NewRuntimeConfig(nil, logs, db)
require.NoError(t, runtimeConfig.UpdateFromDatabase(ctx), "fetching configs from database should not fail")

t.Run("InvalidEvents", func(t *testing.T) {
router := NewRouter(runtimeConfig, db, logs)

// These events require an active incident to be processed successfully.
assert.ErrorIs(t, router.Route(ctx, makeEvent(t, event.TypeState, event.SeverityOK)), event.ErrSuperfluousStateChange)
assert.ErrorIs(t, router.Route(ctx, makeEvent(t, event.TypeState, event.SeverityNone)), event.ErrEventProcessing)
assert.ErrorIs(t, router.Route(ctx, makeEvent(t, event.TypeAcknowledgementSet, event.SeverityOK)), event.ErrEventProcessing)
assert.ErrorIs(t, router.Route(ctx, makeEvent(t, event.TypeAcknowledgementCleared, event.SeverityOK)), event.ErrEventProcessing)
})

t.Run("StateChangeEvents", func(t *testing.T) {
router := NewRouter(runtimeConfig, db, logs)

states := map[string]*event.Event{
"crit": makeEvent(t, event.TypeState, event.SeverityCrit),
"warn": makeEvent(t, event.TypeState, event.SeverityWarning),
"err": makeEvent(t, event.TypeState, event.SeverityErr),
"alert": makeEvent(t, event.TypeState, event.SeverityAlert),
}

for severity, ev := range states {
assert.NoErrorf(t, router.Route(ctx, ev), "state event with severity %q should open an incident", severity)
}

reloadIncidents := func(ctx context.Context) {
object.ClearCache()

// Remove all existing incidents from the cache, as they are indexed with the
// pointer of their object, which is going to change!
for _, i := range incident.GetCurrentIncidents() {
incident.RemoveCurrent(i.Object)
}

// The incident loading process may hang due to unknown bugs or semaphore lock waits.
// Therefore, give it maximum time of 10s to finish normally, otherwise give up and fail.
ctx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(10*time.Second))
defer cancelFunc()

err := incident.LoadOpenIncidents(ctx, db, logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour), runtimeConfig)
require.NoError(t, err, "loading active incidents should not fail")
}
reloadIncidents(ctx)

for severity, ev := range states {
obj, err := object.FromEvent(ctx, db, ev)
assert.NoError(t, err)

i, err := incident.GetCurrent(ctx, db, obj, logs.GetLogger(), runtimeConfig, false)
assert.NoErrorf(t, err, "incident for event severity %q should be in cache", severity)

assert.Equal(t, obj, i.Object, "incident and event object should be the same")
assert.Equal(t, i.Severity, ev.Severity, "incident and event severity should be the same")
}

// Recover the incidents
for _, ev := range states {
ev.Time = time.Now()
ev.Severity = event.SeverityOK

assert.NoErrorf(t, router.Route(ctx, ev), "state event with severity %q should close an incident", "ok")
}
reloadIncidents(ctx)
assert.Len(t, incident.GetCurrentIncidents(), 0, "there should be no cached incidents")
})

t.Run("NonStateEvents", func(t *testing.T) {
router := NewRouter(runtimeConfig, db, logs)

states := []*event.Event{
makeEvent(t, event.TypeDowntimeStart, event.SeverityNone),
makeEvent(t, event.TypeDowntimeEnd, event.SeverityOK),
makeEvent(t, event.TypeDowntimeRemoved, event.SeverityNone),
makeEvent(t, event.TypeCustom, event.SeverityOK),
makeEvent(t, event.TypeFlappingStart, event.SeverityNone),
makeEvent(t, event.TypeFlappingEnd, event.SeverityNone),
}

for _, ev := range states {
assert.NoErrorf(t, router.Route(ctx, ev), "processing non-state event %q should not fail", ev.Type)
assert.Lenf(t, incident.GetCurrentIncidents(), 0, "non-state event %q should not open an incident", ev.Type)
}
})
}

// makeEvent creates a fully initialised event.Event of the given type and severity.
func makeEvent(t *testing.T, typ string, severity event.Severity) *event.Event {
return &event.Event{
SourceId: 10,
Name: testutils.MakeRandomString(t),
URL: "https://localhost/icingaweb2/icingadb",
Type: typ,
Time: time.Now(),
Severity: severity,
Username: "icingaadmin",
Message: "You will contract a rare disease",
Tags: map[string]string{
"Host": testutils.MakeRandomString(t),
"Service": testutils.MakeRandomString(t),
},
ExtraTags: map[string]string{
"hostgroup/database-server": "",
"servicegroup/webserver": "",
},
}
}
4 changes: 2 additions & 2 deletions internal/icinga2/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/daemon"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/incident"
"github.com/icinga/icinga-notifications/internal/events"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"go.uber.org/zap"
Expand Down Expand Up @@ -126,7 +126,7 @@ func (launcher *Launcher) launch(src *config.Source) {
CallbackFn: func(ev *event.Event) {
l := logger.With(zap.Stringer("event", ev))

err := incident.ProcessEvent(subCtx, launcher.Db, launcher.Logs, launcher.RuntimeConfig, ev)
err := events.NewRouter(launcher.RuntimeConfig, launcher.Db, launcher.Logs).Route(subCtx, ev)
switch {
case errors.Is(err, event.ErrSuperfluousStateChange):
l.Debugw("Stopped processing event with superfluous state change", zap.Error(err))
Expand Down
Loading

0 comments on commit 4fafe36

Please sign in to comment.