From 8c6a14b809d53597491de256a41faf751b99bcfc Mon Sep 17 00:00:00 2001 From: Oliver Schmitt Date: Tue, 27 Aug 2024 14:55:16 +0200 Subject: [PATCH] Added pagination for large graylog responses --- pkg/connectors/graylog/connector.go | 89 +++++++++++++++++------------ 1 file changed, 54 insertions(+), 35 deletions(-) diff --git a/pkg/connectors/graylog/connector.go b/pkg/connectors/graylog/connector.go index b1f7401..be136b2 100644 --- a/pkg/connectors/graylog/connector.go +++ b/pkg/connectors/graylog/connector.go @@ -41,7 +41,7 @@ func (c *Connector) Tag() string { } func (c *Connector) Collect(ctx context.Context) ([]connectors.Alert, error) { - sourceAlerts, err := c.collectAlertEvents(ctx) + sourceAlertPages, err := c.collectAlertEvents(ctx) if err != nil { return nil, err } @@ -49,37 +49,39 @@ func (c *Connector) Collect(ctx context.Context) ([]connectors.Alert, error) { var alerts []connectors.Alert var seenEventDefinitions []string - for _, sourceAlert := range sourceAlerts.Events { - eventAggregationId := eventToAggregationId(sourceAlert) - if slices.Contains(seenEventDefinitions, eventAggregationId) { - continue + for _, sourceAlertPage := range sourceAlertPages { + for _, sourceAlert := range sourceAlertPage.Events { + eventAggregationId := eventToAggregationId(sourceAlert) + if slices.Contains(seenEventDefinitions, eventAggregationId) { + continue + } + seenEventDefinitions = append(seenEventDefinitions, eventAggregationId) + + var streams []string + for _, stream := range sourceAlertPage.Context.Streams { + streams = append(streams, stream.Title) + } + + hostname, _ := url.Parse(c.config.URL) + labels := map[string]string{ + "Source": sourceAlert.Event.Source, + "Stream": strings.Join(streams, ","), + "Priority": priorityToLabel(sourceAlert.Event.Priority), + "EventType": alertToLabel(sourceAlert.Event.Alert), + "Hostname": hostname.Hostname(), + } + alert := connectors.Alert{ + Labels: labels, + Start: parseTime(sourceAlert.Event.TimeStamp), + State: priorityToState(sourceAlert.Event.Priority), + Description: sourceAlert.Event.Message, + Details: sourceAlertPage.Context.EventDefinitions[sourceAlert.Event.EventDefinitionId].Description, + Links: []html.HTML{ + html.HTML("🏠"), + }, + } + alerts = append(alerts, alert) } - seenEventDefinitions = append(seenEventDefinitions, eventAggregationId) - - var streams []string - for _, stream := range sourceAlerts.Context.Streams { - streams = append(streams, stream.Title) - } - - hostname, _ := url.Parse(c.config.URL) - labels := map[string]string{ - "Source": sourceAlert.Event.Source, - "Stream": strings.Join(streams, ","), - "Priority": priorityToLabel(sourceAlert.Event.Priority), - "EventType": alertToLabel(sourceAlert.Event.Alert), - "Hostname": hostname.Hostname(), - } - alert := connectors.Alert{ - Labels: labels, - Start: parseTime(sourceAlert.Event.TimeStamp), - State: priorityToState(sourceAlert.Event.Priority), - Description: sourceAlert.Event.Message, - Details: sourceAlerts.Context.EventDefinitions[sourceAlert.Event.EventDefinitionId].Description, - Links: []html.HTML{ - html.HTML("🏠"), - }, - } - alerts = append(alerts, alert) } return alerts, nil @@ -89,16 +91,34 @@ func (c *Connector) String() string { return fmt.Sprintf("Graylog (%s)", c.config.URL) } -func (c *Connector) collectAlertEvents(ctx context.Context) (eventsSearchResults, error) { - // TODO: Use pagination, however, we're unlikely to hit this limit for unresolved alerts +func (c *Connector) collectAlertEvents(ctx context.Context) ([]eventsSearchResults, error) { timeRangeSeconds := c.config.TimeRange if timeRangeSeconds == 0 { timeRangeSeconds = 600 } + page := 1 + var responsePage eventsSearchResults + var result []eventsSearchResults + for ok := true; ok; ok = len(responsePage.Events) > 0 { + var err error + responsePage, err = c.collectAlertEventsPage(ctx, page, timeRangeSeconds) + if err != nil { + return []eventsSearchResults{}, err + } + if len(responsePage.Events) > 0 { + result = append(result, responsePage) + } + page++ + } + + return result, nil +} + +func (c *Connector) collectAlertEventsPage(ctx context.Context, page int, timeRangeSeconds int) (eventsSearchResults, error) { body := eventsSearchParameters{ Query: "", - Page: 1, + Page: page, PerPage: 100, TimeRange: timeRange{ Type: TimeRangeRelative, @@ -125,7 +145,6 @@ func (c *Connector) collectAlertEvents(ctx context.Context) (eventsSearchResults slog.Any("error", err)) return eventsSearchResults{}, err } - return response, nil }