diff --git a/bootstrap.go b/bootstrap.go index a483fa3..4c7f882 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -17,7 +17,7 @@ import ( "github.com/spf13/viper" ) -func (td *PopData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir.WBGlist, error) { +func (td *PopData) BootstrapMqttSource(xxxs *tapir.WBGlist, src SourceConf) (*tapir.WBGlist, error) { // Initialize the API client api := &tapir.ApiClient{ BaseUrl: fmt.Sprintf(src.BootstrapUrl, src.Bootstrap[0]), // Must specify a valid BaseUrl diff --git a/go.mod b/go.mod index 4bb1bf7..955154a 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,10 @@ module tapir-pop go 1.22.0 +replace ( + github.com/dnstapir/tapir => ../tapir +) + require ( github.com/dnstapir/tapir v0.0.0-20240927111630-589bd474c6e4 github.com/go-playground/validator/v10 v10.22.1 diff --git a/main.go b/main.go index dd23e21..38786d3 100644 --- a/main.go +++ b/main.go @@ -129,7 +129,7 @@ func main() { mqttclientid = "tapir-pop-" + uuid.New().String() flag.BoolVarP(&tapir.GlobalCF.Debug, "debug", "d", false, "Debug mode") flag.BoolVarP(&tapir.GlobalCF.Verbose, "verbose", "v", false, "Verbose mode") - flag.StringVarP(&mqttclientid, "client-id", "", mqttclientid, "MQTT client id, default is a random string") + // flag.StringVarP(&mqttclientid, "client-id", "", mqttclientid, "MQTT client id, default is a random string") flag.Parse() @@ -185,7 +185,7 @@ func main() { POPExiter("Error unmarshalling config into struct: %v", err) } - fmt.Printf("%s (TAPIR Edge Manager) version %s (%s) starting.\n", appName, appVersion, appDate) + fmt.Printf("%s (TAPIR Policy Processor) version %s (%s) starting.\n", appName, appVersion, appDate) var stopch = make(chan struct{}, 10) diff --git a/mqtt.go b/mqtt.go index 8010d32..f518153 100644 --- a/mqtt.go +++ b/mqtt.go @@ -15,8 +15,13 @@ import ( func (pd *PopData) CreateMqttEngine(clientid string, statusch chan tapir.ComponentStatusUpdate, lg *log.Logger) error { if clientid == "" { - POPExiter("Error starting MQTT Engine: clientid not specified in config") + POPExiter("CreateMqttEngine: Error: clientid not specified in config") } + + if pd.MqttEngine != nil && pd.MqttEngine.ClientID == clientid { + POPExiter("CreateMqttEngine: Error: clientid %s already in use", clientid) + } + var err error pd.Logger.Printf("Creating MQTT Engine with clientid %s", clientid) pd.MqttEngine, err = tapir.NewMqttEngine("tapir-pop", clientid, tapir.TapirSub, statusch, lg) // sub, but no pub @@ -31,9 +36,10 @@ func (pd *PopData) StartMqttEngine(meng *tapir.MqttEngine) error { return nil } + pd.Logger.Printf("StartMqttEngine: starting MQTT Engine") cmnder, outbox, inbox, err := meng.StartEngine() if err != nil { - log.Fatalf("Error from StartEngine(): %v", err) + POPExiter("StartMqttEngine: Error from StartEngine(): %v", err) } pd.TapirMqttCmdCh = cmnder pd.TapirMqttPubCh = outbox diff --git a/sources.go b/sources.go index efcf5a6..f715426 100644 --- a/sources.go +++ b/sources.go @@ -178,7 +178,7 @@ func (pd *PopData) ParseSourcesNG() error { // defer func() { //pd.Logger.Printf("<--Thread %d: source \"%s\" (%s) is now complete. %d remaining", thread, name, src.Source, threads) // }() - pd.Logger.Printf("-->Thread %d: parsing source \"%s\" (source %s)", thread, name, src.Source) + pd.Logger.Printf("ParseSourcesNG: Thread %d: parsing source \"%s\" (source %s)", thread, name, src.Source) newsource := tapir.WBGlist{ Name: src.Name, @@ -251,12 +251,12 @@ func (pd *PopData) ParseSourcesNG() error { pd.Logger.Printf("ParseSources: source \"%s\" is now complete. %d remaining", tmp, threads) } - if pd.MqttEngine != nil && !pd.TapirMqttEngineRunning { - err := pd.StartMqttEngine(pd.MqttEngine) - if err != nil { - POPExiter("Error starting MQTT Engine: %v", err) - } - } + // if pd.MqttEngine != nil && !pd.TapirMqttEngineRunning { + // err := pd.StartMqttEngine(pd.MqttEngine) + // if err != nil { + // POPExiter("Error starting MQTT Engine: %v", err) + // } + // } pd.Logger.Printf("ParseSources: static sources done.") diff --git a/statusupdater.go b/statusupdater.go index c8970ad..bbb59e6 100644 --- a/statusupdater.go +++ b/statusupdater.go @@ -80,9 +80,12 @@ func (pd *PopData) StatusUpdater(conf *Config, stopch chan struct{}) { } pd.Logger.Printf("StatusUpdater: Topic status for MQTT engine %s: %+v", me.Creator, msg) - _, outbox, _, err := me.StartEngine() - if err != nil { - POPExiter("StatusUpdater: Error starting MQTT Engine: %v", err) + var outbox chan tapir.MqttPkgOut + if !me.Running { + _, outbox, _, err = me.StartEngine() + if err != nil { + POPExiter("StatusUpdater: Error starting MQTT Engine: %v", err) + } } log.Printf("StatusUpdater: Starting")