diff --git a/internal/plugin/notifier.go b/internal/plugin/notifier.go index c1d58a6b3..2a37c462e 100644 --- a/internal/plugin/notifier.go +++ b/internal/plugin/notifier.go @@ -50,97 +50,19 @@ func (c *NotifierConfig) hasActions() bool { return false } -type eventsQueue struct { - sync.RWMutex +type notifierPlugin struct { + config Config + notifier notifier.Notifier + client *plugin.Client + mu sync.RWMutex fsEvents []*notifier.FsEvent providerEvents []*notifier.ProviderEvent logEvents []*notifier.LogEvent } -func (q *eventsQueue) addFsEvent(event *notifier.FsEvent) { - q.Lock() - defer q.Unlock() - - q.fsEvents = append(q.fsEvents, event) -} - -func (q *eventsQueue) addProviderEvent(event *notifier.ProviderEvent) { - q.Lock() - defer q.Unlock() - - q.providerEvents = append(q.providerEvents, event) -} - -func (q *eventsQueue) addLogEvent(event *notifier.LogEvent) { - q.Lock() - defer q.Unlock() - - q.logEvents = append(q.logEvents, event) -} - -func (q *eventsQueue) popFsEvent() *notifier.FsEvent { - q.Lock() - defer q.Unlock() - - if len(q.fsEvents) == 0 { - return nil - } - truncLen := len(q.fsEvents) - 1 - ev := q.fsEvents[truncLen] - q.fsEvents[truncLen] = nil - q.fsEvents = q.fsEvents[:truncLen] - - return ev -} - -func (q *eventsQueue) popProviderEvent() *notifier.ProviderEvent { - q.Lock() - defer q.Unlock() - - if len(q.providerEvents) == 0 { - return nil - } - truncLen := len(q.providerEvents) - 1 - ev := q.providerEvents[truncLen] - q.providerEvents[truncLen] = nil - q.providerEvents = q.providerEvents[:truncLen] - - return ev -} - -func (q *eventsQueue) popLogEvent() *notifier.LogEvent { - q.Lock() - defer q.Unlock() - - if len(q.logEvents) == 0 { - return nil - } - truncLen := len(q.logEvents) - 1 - ev := q.logEvents[truncLen] - q.logEvents[truncLen] = nil - q.logEvents = q.logEvents[:truncLen] - - return ev -} - -func (q *eventsQueue) getSize() int { - q.RLock() - defer q.RUnlock() - - return len(q.providerEvents) + len(q.fsEvents) + len(q.logEvents) -} - -type notifierPlugin struct { - config Config - notifier notifier.Notifier - client *plugin.Client - queue *eventsQueue -} - func newNotifierPlugin(config Config) (*notifierPlugin, error) { p := ¬ifierPlugin{ config: config, - queue: &eventsQueue{}, } if err := p.initialize(); err != nil { logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config) @@ -180,7 +102,7 @@ func (p *notifierPlugin) initialize() error { Managed: false, Logger: &logger.HCLogAdapter{ Logger: hclog.New(&hclog.LoggerOptions{ - Name: fmt.Sprintf("%v.%v", logSender, notifier.PluginName), + Name: fmt.Sprintf("%s.%s", logSender, notifier.PluginName), Level: pluginsLogLevel, DisableTime: true, }), @@ -204,6 +126,34 @@ func (p *notifierPlugin) initialize() error { return nil } +func (p *notifierPlugin) queueSize() int { + p.mu.RLock() + defer p.mu.RUnlock() + + return len(p.providerEvents) + len(p.fsEvents) + len(p.logEvents) +} + +func (p *notifierPlugin) queueFsEvent(ev *notifier.FsEvent) { + p.mu.Lock() + defer p.mu.Unlock() + + p.fsEvents = append(p.fsEvents, ev) +} + +func (p *notifierPlugin) queueProviderEvent(ev *notifier.ProviderEvent) { + p.mu.Lock() + defer p.mu.Unlock() + + p.providerEvents = append(p.providerEvents, ev) +} + +func (p *notifierPlugin) queueLogEvent(ev *notifier.LogEvent) { + p.mu.Lock() + defer p.mu.Unlock() + + p.logEvents = append(p.logEvents, ev) +} + func (p *notifierPlugin) canQueueEvent(timestamp int64) bool { if p.config.NotifierOptions.RetryMaxTime == 0 { return false @@ -214,7 +164,7 @@ func (p *notifierPlugin) canQueueEvent(timestamp int64) bool { return false } if p.config.NotifierOptions.RetryQueueMaxSize > 0 { - return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize + return p.queueSize() < p.config.NotifierOptions.RetryQueueMaxSize } return true } @@ -223,13 +173,7 @@ func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) { if !slices.Contains(p.config.NotifierOptions.FsEvents, event.Action) { return } - - go func() { - Handler.addTask() - defer Handler.removeTask() - - p.sendFsEvent(event) - }() + p.sendFsEvent(event) } func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) { @@ -237,84 +181,88 @@ func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, obj !slices.Contains(p.config.NotifierOptions.ProviderObjects, event.ObjectType) { return } + p.sendProviderEvent(event, object) +} + +func (p *notifierPlugin) notifyLogEvent(event *notifier.LogEvent) { + p.sendLogEvent(event) +} - go func() { +func (p *notifierPlugin) sendFsEvent(ev *notifier.FsEvent) { + go func(event *notifier.FsEvent) { Handler.addTask() defer Handler.removeTask() - objectAsJSON, err := object.RenderAsJSON(event.Action != "delete") - if err != nil { - logger.Warn(logSender, "", "unable to render user as json for action %v: %v", event.Action, err) - return + if err := p.notifier.NotifyFsEvent(event); err != nil { + logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err) + if p.canQueueEvent(event.Timestamp) { + p.queueFsEvent(event) + } } - event.ObjectData = objectAsJSON - p.sendProviderEvent(event) - }() + }(ev) } -func (p *notifierPlugin) notifyLogEvent(event *notifier.LogEvent) { - go func() { +func (p *notifierPlugin) sendProviderEvent(ev *notifier.ProviderEvent, object Renderer) { + go func(event *notifier.ProviderEvent) { Handler.addTask() defer Handler.removeTask() - p.sendLogEvent(event) - }() -} - -func (p *notifierPlugin) sendFsEvent(event *notifier.FsEvent) { - if err := p.notifier.NotifyFsEvent(event); err != nil { - logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err) - if p.canQueueEvent(event.Timestamp) { - p.queue.addFsEvent(event) + if object != nil { + objectAsJSON, err := object.RenderAsJSON(event.Action != "delete") + if err != nil { + logger.Error(logSender, "", "unable to render user as json for action %q: %v", event.Action, err) + } else { + event.ObjectData = objectAsJSON + } } - } -} -func (p *notifierPlugin) sendProviderEvent(event *notifier.ProviderEvent) { - if err := p.notifier.NotifyProviderEvent(event); err != nil { - logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err) - if p.canQueueEvent(event.Timestamp) { - p.queue.addProviderEvent(event) + if err := p.notifier.NotifyProviderEvent(event); err != nil { + logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err) + if p.canQueueEvent(event.Timestamp) { + p.queueProviderEvent(event) + } } - } + }(ev) } -func (p *notifierPlugin) sendLogEvent(event *notifier.LogEvent) { - if err := p.notifier.NotifyLogEvent(event); err != nil { - logger.Warn(logSender, "", "unable to send log event to plugin %v: %v", p.config.Cmd, err) - if p.canQueueEvent(event.Timestamp) { - p.queue.addLogEvent(event) +func (p *notifierPlugin) sendLogEvent(ev *notifier.LogEvent) { + go func(event *notifier.LogEvent) { + Handler.addTask() + defer Handler.removeTask() + + if err := p.notifier.NotifyLogEvent(event); err != nil { + logger.Warn(logSender, "", "unable to send log event to plugin %v: %v", p.config.Cmd, err) + if p.canQueueEvent(event.Timestamp) { + p.queueLogEvent(event) + } } - } + }(ev) } func (p *notifierPlugin) sendQueuedEvents() { - queueSize := p.queue.getSize() + queueSize := p.queueSize() if queueSize == 0 { return } - logger.Debug(logSender, "", "check queued events for notifier %q, events size: %v", p.config.Cmd, queueSize) - fsEv := p.queue.popFsEvent() - for fsEv != nil { - go func(ev *notifier.FsEvent) { - p.sendFsEvent(ev) - }(fsEv) - fsEv = p.queue.popFsEvent() + p.mu.Lock() + defer p.mu.Unlock() + + logger.Debug(logSender, "", "send queued events for notifier %q, events size: %v", p.config.Cmd, queueSize) + + for _, ev := range p.fsEvents { + p.sendFsEvent(ev) } + p.fsEvents = nil - providerEv := p.queue.popProviderEvent() - for providerEv != nil { - go func(ev *notifier.ProviderEvent) { - p.sendProviderEvent(ev) - }(providerEv) - providerEv = p.queue.popProviderEvent() + for _, ev := range p.providerEvents { + p.sendProviderEvent(ev, nil) } - logEv := p.queue.popLogEvent() - for logEv != nil { - go func(ev *notifier.LogEvent) { - p.sendLogEvent(ev) - }(logEv) - logEv = p.queue.popLogEvent() + p.providerEvents = nil + + for _, ev := range p.logEvents { + p.sendLogEvent(ev) } - logger.Debug(logSender, "", "queued events sent for notifier %q, new events size: %v", p.config.Cmd, p.queue.getSize()) + p.logEvents = nil + + logger.Debug(logSender, "", "%d queued events sent for notifier %q,", queueSize, p.config.Cmd) } diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go index b47260e6c..94479c81d 100644 --- a/internal/plugin/plugin.go +++ b/internal/plugin/plugin.go @@ -642,7 +642,9 @@ func (m *Manager) restartNotifierPlugin(config Config, idx int) { } m.notifLock.Lock() - plugin.queue = m.notifiers[idx].queue + plugin.fsEvents = m.notifiers[idx].fsEvents + plugin.providerEvents = m.notifiers[idx].providerEvents + plugin.logEvents = m.notifiers[idx].logEvents m.notifiers[idx] = plugin m.notifLock.Unlock() plugin.sendQueuedEvents()