Skip to content

Commit

Permalink
Merge pull request #509 from toni-moreno/feature/gathering_frequency_…
Browse files Browse the repository at this point in the history
…by_measurement_rebased

Feature/gathering frequency by measurement , implements #404
  • Loading branch information
sbengo authored Nov 12, 2021
2 parents 1ab945d + 28e0d10 commit 8c017c4
Show file tree
Hide file tree
Showing 42 changed files with 2,033 additions and 1,612 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ public/*
test/*
profile/*
node_modules*
node_modules
47 changes: 24 additions & 23 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/toni-moreno/snmpcollector/pkg/agent/bus"
"github.com/toni-moreno/snmpcollector/pkg/agent/device"
"github.com/toni-moreno/snmpcollector/pkg/agent/output"
"github.com/toni-moreno/snmpcollector/pkg/agent/selfmon"
"github.com/toni-moreno/snmpcollector/pkg/config"
"github.com/toni-moreno/snmpcollector/pkg/data/stats"
"github.com/toni-moreno/snmpcollector/pkg/data/utils"
)

var (
Expand Down Expand Up @@ -61,7 +62,7 @@ var (
// DBConfig contains the database config
DBConfig config.DBConfig

log *logrus.Logger
log utils.Logger
// reloadMutex guards the reloadProcess flag
reloadMutex sync.Mutex
reloadProcess bool
Expand All @@ -79,7 +80,7 @@ var (
)

// SetLogger sets the current log output.
func SetLogger(l *logrus.Logger) {
func SetLogger(l utils.Logger) {
log = l
}

Expand Down Expand Up @@ -166,8 +167,8 @@ func GetDeviceJSONInfo(id string) ([]byte, error) {
}

// GetDevStats returns a map with the basic info of each device.
func GetDevStats() map[string]*device.DevStat {
devstats := make(map[string]*device.DevStat)
func GetDevStats() map[string]*stats.GatherStats {
devstats := make(map[string]*stats.GatherStats)
mutex.RLock()
for k, v := range devices {
devstats[k] = v.GetBasicStats()
Expand All @@ -194,7 +195,7 @@ func ReleaseInfluxOut(idb map[string]*output.InfluxDB) {

// DeviceProcessStop stops all device polling goroutines
func DeviceProcessStop() {
Bus.Broadcast(&bus.Message{Type: "exit"})
Bus.Broadcast(&bus.Message{Type: bus.Exit})
}

// DeviceProcessStart starts all device polling goroutines
Expand All @@ -208,15 +209,6 @@ func DeviceProcessStart() {
}
}

// ReleaseDevices releases all devices resources.
func ReleaseDevices() {
mutex.RLock()
for _, c := range devices {
c.End()
}
mutex.RUnlock()
}

func init() {
go Bus.Start()
}
Expand Down Expand Up @@ -259,14 +251,15 @@ func IsDeviceInRuntime(id string) bool {

// DeleteDeviceInRuntime removes the device `id` from the runtime array.
func DeleteDeviceInRuntime(id string) error {
// Avoid modifications to devices while deleting device
mutex.Lock()
defer mutex.Unlock()
if dev, ok := devices[id]; ok {
// Stop all device processes and its measurements. Once finished they will be removed
// from the bus and node closed (snmp connections for measurements will be closed)
dev.StopGather()
log.Debugf("Bus retuned from the exit message to the ID device %s", id)
dev.LeaveBus(Bus)
dev.End()
mutex.Lock()
delete(devices, id)
mutex.Unlock()
return nil
}
log.Errorf("There is no %s device in the runtime device list", id)
Expand All @@ -288,7 +281,16 @@ func AddDeviceInRuntime(k string, cfg *config.SnmpDeviceCfg) {

mutex.Lock()
devices[k] = dev
dev.StartGather(&gatherWg)
// Start gather goroutine for device and add it to the wait group for gather goroutines
gatherWg.Add(1)
go func() {
defer gatherWg.Done()
dev.StartGather()
log.Infof("Device %s finished", cfg.ID)
// If device goroutine has finished, leave the bus so it won't get blocked trying
// to send messages to a not running device.
dev.LeaveBus(Bus)
}()
mutex.Unlock()
}

Expand All @@ -312,16 +314,15 @@ func Start() {
func End() (time.Duration, error) {
start := time.Now()
log.Infof("END: begin device Gather processes stop... at %s", start.String())
// stop all device processes
// Stop all device processes and its measurements. Once finished they will be removed
// from the bus and node closed (snmp connections for measurements will be closed)
DeviceProcessStop()
log.Info("END: begin selfmon Gather processes stop...")
// stop the selfmon process
selfmonProc.StopGather()
log.Info("END: waiting for all Gather goroutines stop...")
// wait until Done
gatherWg.Wait()
log.Info("END: releasing Device Resources")
ReleaseDevices()
log.Info("END: releasing Selfmonitoring Resources")
selfmonProc.End()
log.Info("END: begin sender processes stop...")
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func (b *Bus) Start() {
for {
select {
case received := <-b.in:
nodes := []*Node{}
nodes := make([]*Node, len(b.nodes))
switch received.receiver {
case "all":
b.nodeLock.Lock()
nodes = b.nodes[:]
copy(nodes, b.nodes)
b.nodeLock.Unlock()
default:
id := received.receiver
Expand Down Expand Up @@ -143,13 +143,13 @@ func (b *Bus) Start() {
// Send send message to one receiver to the Bus
func (b *Bus) Send(id string, m *Message) {
b.in <- MsgCtrl{sender: nil, payload: m, receiver: id}
log.Debugf("BUS: unicast message %s , %+v sent to node %s", m.Type, m.Data, id)
log.Debugf("BUS: unicast message %s , %+v sent to node %s", m.Type.String(), m.Data, id)
<-b.waitsync
}

// Broadcast send message to all nodes attached to the bus
func (b *Bus) Broadcast(m *Message) {
b.in <- MsgCtrl{sender: nil, payload: m, receiver: "all"}
log.Debugf("BUS: Broadcast message %s , %+v sent", m.Type, m.Data)
log.Debugf("BUS: Broadcast message %s , %+v sent", m.Type.String(), m.Data)
<-b.waitsync
}
53 changes: 52 additions & 1 deletion pkg/agent/bus/node.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,59 @@
package bus

// Command define valid message types to be passed using the bus
type Command int

const (
// Exit without waiting for anything
Exit Command = iota
// SyncExit order the device and its measurement goroutines to exit and waits till them are finished
SyncExit
Enabled
LogLevel
ForceGather
FilterUpdate
// SNMPResetHard tell all measurements to recreate the goSNMP client and redo filters
SNMPResetHard
// SNMPReset tell all measurements to recreate the goSNMP client
SNMPReset
// SNMPDebug tell all measurement goroutines to enable the debug in the SNMP client.
// And also store the change in runtime to keep the value in case of a reconnect.
SNMPDebug
// SetSNMPMaxRep tell all measurement goroutines to change the MaxRepetitions of the
// current goSNMP client and also store the change in runtime to keep the value in
// case of a reconnect.
SetSNMPMaxRep
)

func (c Command) String() string {
switch c {
case Exit:
return "Exit"
case SyncExit:
return "SyncExit"
case Enabled:
return "Enabled"
case LogLevel:
return "LogLevel"
case ForceGather:
return "ForceGather"
case FilterUpdate:
return "FilterUpdate"
case SNMPResetHard:
return "SNMPResetHard"
case SNMPReset:
return "SNMPReset"
case SNMPDebug:
return "SNMPDebug"
case SetSNMPMaxRep:
return "SetSNMPMaxRep"
}
return ""
}

// Message a basic message type
type Message struct {
Type string
Type Command
Data interface{}
}

Expand Down
89 changes: 0 additions & 89 deletions pkg/agent/device/measgather.go

This file was deleted.

Loading

0 comments on commit 8c017c4

Please sign in to comment.