Skip to content

Commit

Permalink
Merge pull request #6 from SenseUnit/events
Browse files Browse the repository at this point in the history
Events
  • Loading branch information
Snawoot authored Feb 17, 2024
2 parents 9bba561 + 412760e commit 9a3b226
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 1 deletion.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ outputs:
spec:
- kind: log
spec:
interval: 1s
interval: 60s
- kind: eventlog
spec: # or skip spec at all
only_groups: # or specify null for all groups
- 1000
- kind: hostsfile
spec:
interval: 5s
Expand Down
4 changes: 4 additions & 0 deletions iface/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ type Dialer interface {
DialContext(ctx context.Context, network, address string) (net.Conn, error)
}

type GroupEventCallback = func(group uint64, item GroupItem)

type GroupBridge interface {
Groups() []uint64
ListGroup(uint64) []GroupItem
GroupReady(uint64) bool
OnJoin(uint64, GroupEventCallback) func()
OnLeave(uint64, GroupEventCallback) func()
}

type GroupItem interface {
Expand Down
19 changes: 19 additions & 0 deletions listener/group.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package listener

import (
"context"
"fmt"
"log"
"net/netip"
Expand Down Expand Up @@ -126,3 +127,21 @@ func (g *Group) List() []iface.GroupItem {
func (g *Group) Ready() bool {
return time.Now().After(g.readyAt)
}

func (g *Group) OnJoin(cb iface.GroupEventCallback) func() {
return g.addrSet.OnInsertion(func(_ context.Context, item *ttlcache.Item[netip.Addr, struct{}]) {
cb(g.id, groupItem{
address: item.Key(),
expiresAt: item.ExpiresAt(),
})
})
}

func (g *Group) OnLeave(cb iface.GroupEventCallback) func() {
return g.addrSet.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, item *ttlcache.Item[netip.Addr, struct{}]) {
cb(g.id, groupItem{
address: item.Key(),
expiresAt: item.ExpiresAt(),
})
})
}
16 changes: 16 additions & 0 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,19 @@ func (l *Listener) GroupReady(id uint64) bool {
}
return g.Ready()
}

func (l *Listener) OnJoin(group uint64, cb iface.GroupEventCallback) func() {
g, ok := l.groups[group]
if !ok {
return func() {}
}
return g.OnJoin(cb)
}

func (l *Listener) OnLeave(group uint64, cb iface.GroupEventCallback) func() {
g, ok := l.groups[group]
if !ok {
return func() {}
}
return g.OnLeave(cb)
}
59 changes: 59 additions & 0 deletions output/eventlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package output

import (
"fmt"
"log"

"github.com/SenseUnit/rgap/config"
"github.com/SenseUnit/rgap/iface"
"github.com/SenseUnit/rgap/util"
)

type EventLogConfig struct {
Groups []uint64 `yaml:"only_groups"`
}

type EventLog struct {
bridge iface.GroupBridge
groups []uint64
unsubFns []func()
}

func NewEventLog(cfg *config.OutputConfig, bridge iface.GroupBridge) (*EventLog, error) {
var lc EventLogConfig
if err := util.CheckedUnmarshal(&cfg.Spec, &lc); err != nil {
return nil, fmt.Errorf("cannot unmarshal log output config: %w", err)
}
return &EventLog{
bridge: bridge,
groups: lc.Groups,
}, nil
}

func (o *EventLog) Start() error {
groups := o.groups
if groups == nil {
groups = o.bridge.Groups()
}
o.unsubFns = make([]func(), 0, len(o.groups)*2)
for _, group := range o.groups {
o.unsubFns = append(o.unsubFns,
o.bridge.OnJoin(group, func(group uint64, item iface.GroupItem) {
log.Printf("host %s has joined group %d", item.Address().Unmap().String(), group)
}),
o.bridge.OnLeave(group, func(group uint64, item iface.GroupItem) {
log.Printf("host %s has left group %d", item.Address().Unmap().String(), group)
}),
)
}
log.Println("started event log output plugin")
return nil
}

func (o *EventLog) Stop() error {
for _, unsub := range o.unsubFns {
unsub()
}
log.Println("stopped event log output plugin")
return nil
}
3 changes: 3 additions & 0 deletions output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var outputVCMap = map[string]OutputCtor{
"dns": func(cfg *config.OutputConfig, bridge iface.GroupBridge) (iface.StartStopper, error) {
return NewDNSServer(cfg, bridge)
},
"eventlog": func(cfg *config.OutputConfig, bridge iface.GroupBridge) (iface.StartStopper, error) {
return NewEventLog(cfg, bridge)
},
}

func OutputFromConfig(cfg *config.OutputConfig, bridge iface.GroupBridge) (iface.StartStopper, error) {
Expand Down

0 comments on commit 9a3b226

Please sign in to comment.