diff --git a/Makefile b/Makefile index f148a3b..9ba64d3 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -PROG:=tem +PROG:=tapir-pop VERSION:=`cat ./VERSION` COMMIT:=`git describe --dirty=+WiP --always` APPDATE=`date +"%Y-%m-%d-%H:%M"` @@ -37,20 +37,20 @@ netbsd: /bin/sh make-version.sh $(VERSION)-$(COMMIT) $(APPDATE) $(PROG) GOOS=netbsd GOARCH=amd64 go build $(GOFLAGS) -o ${PROG}.netbsd -gen-mqtt-msg-new-qname.go: checkout/events-mqtt-message-new_qname.json - go-jsonschema checkout/events-mqtt-message-new_qname.json --package main --tags json --only-models --output gen-mqtt-msg-new-qname.go +# gen-mqtt-msg-new-qname.go: checkout/events-mqtt-message-new_qname.json +# go-jsonschema checkout/events-mqtt-message-new_qname.json --package main --tags json --only-models --output gen-mqtt-msg-new-qname.go -gen-mqtt-msg.go: checkout/events-mqtt-message.json - go-jsonschema checkout/events-mqtt-message.json --package main --tags json --only-models --output gen-mqtt-msg.go +# gen-mqtt-msg.go: checkout/events-mqtt-message.json +# go-jsonschema checkout/events-mqtt-message.json --package main --tags json --only-models --output gen-mqtt-msg.go -checkout/events-mqtt-message-new_qname.json: checkout - cd checkout; python schemasplit.py events-mqtt-message-new_qname.yaml +# checkout/events-mqtt-message-new_qname.json: checkout +# cd checkout; python schemasplit.py events-mqtt-message-new_qname.yaml -checkout/events-mqtt-message.json: checkout - cd checkout; python schemasplit.py events-mqtt-message.yaml +# checkout/events-mqtt-message.json: checkout +# cd checkout; python schemasplit.py events-mqtt-message.yaml -checkout: - git clone git@github.com:dnstapir/protocols.git checkout +# checkout: +# git clone git@github.com:dnstapir/protocols.git checkout clean: @rm -f $(PROG) *~ version.go diff --git a/apihandler.go b/apihandler.go index 36e75b6..cdbfec9 100644 --- a/apihandler.go +++ b/apihandler.go @@ -71,9 +71,31 @@ func APIcommand(conf *Config) func(w http.ResponseWriter, r *http.Request) { switch cp.Command { case "status": log.Printf("Daemon status inquiry\n") + rt := make(chan tapir.StatusUpdaterResponse) + var ts *tapir.StatusUpdaterResponse + + conf.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{ + Component: "status", + Status: "status", + Response: rt, + } + + select { + case foo := <-rt: + ts = &foo + case <-time.After(2 * time.Second): + log.Println("Timeout waiting for response on rt channel") + ts = nil + } + resp = tapir.CommandResponse{ Status: "ok", // only status we know, so far - Msg: "We're happy, but send more cookies"} + Msg: "We're happy, but send more cookies", + } + if ts != nil { + resp.TapirFunctionStatus = ts.FunctionStatus + } + case "stop": log.Printf("Daemon instructed to stop\n") resp = tapir.CommandResponse{ @@ -245,9 +267,11 @@ func APIbootstrap(conf *Config) func(w http.ResponseWriter, r *http.Request) { case "greylist-status": me := conf.TemData.MqttEngine stats := me.Stats() - resp.MsgCounters = stats.MsgCounters - resp.MsgTimeStamps = stats.MsgTimeStamps - log.Printf("API: greylist-status: msgs: %d last msg: %v", stats.MsgCounters[bp.ListName], stats.MsgTimeStamps[bp.ListName]) + // resp.MsgCounters = stats.MsgCounters + // resp.MsgTimeStamps = stats.MsgTimeStamps + resp.TopicData = stats + // log.Printf("API: greylist-status: msgs: %d last msg: %v", stats.MsgCounters[bp.ListName], stats.MsgTimeStamps[bp.ListName]) + log.Printf("API: greylist-status: %v", stats) case "export-greylist": td := conf.TemData @@ -403,7 +427,7 @@ func APIdebug(conf *Config) func(w http.ResponseWriter, r *http.Request) { case "mqtt-stats": log.Printf("TEM debug MQTT stats") - resp.MqttStats = td.MqttEngine.Stats() + resp.TopicData = td.MqttEngine.Stats() case "reaper-stats": log.Printf("TEM debug reaper stats") @@ -449,6 +473,37 @@ func APIdebug(conf *Config) func(w http.ResponseWriter, r *http.Request) { resp.RpzOutput = append(resp.RpzOutput, *rpzn) } + case "send-status": + log.Printf("TEM debug send status") + + rt := make(chan tapir.StatusUpdaterResponse) + var sur *tapir.StatusUpdaterResponse + + conf.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{ + Component: dp.Component, + Status: dp.Status, + Msg: "debug status update", + TimeStamp: time.Now(), + Response: rt, + } + + select { + case foo := <-rt: + sur = &foo + case <-time.After(2 * time.Second): + log.Println("Timeout waiting for response on rt channel") + sur = nil + } + + if sur != nil { + resp.Status = "ok" + resp.Msg = sur.Msg + } else { + resp.Msg = "Status update for component " + dp.Component + " sent" + resp.Error = true + resp.ErrorMsg = "Timeout waiting for response on rt channel" + } + default: resp.ErrorMsg = fmt.Sprintf("Unknown command: %s", dp.Command) resp.Error = true @@ -461,7 +516,7 @@ func SetupRouter(conf *Config) *mux.Router { sr := r.PathPrefix("/api/v1").Headers("X-API-Key", viper.GetString("apiserver.key")).Subrouter() - sr.HandleFunc("/ping", tapir.APIping("tem", conf.BootTime)).Methods("POST") + sr.HandleFunc("/ping", tapir.APIping("tapir-pop", conf.BootTime)).Methods("POST") sr.HandleFunc("/command", APIcommand(conf)).Methods("POST") sr.HandleFunc("/bootstrap", APIbootstrap(conf)).Methods("POST") sr.HandleFunc("/debug", APIdebug(conf)).Methods("POST") @@ -474,7 +529,7 @@ func SetupBootstrapRouter(conf *Config) *mux.Router { r := mux.NewRouter().StrictSlash(true) sr := r.PathPrefix("/api/v1").Headers("X-API-Key", viper.GetString("apiserver.key")).Subrouter() - sr.HandleFunc("/ping", tapir.APIping("tem", conf.BootTime)).Methods("POST") + sr.HandleFunc("/ping", tapir.APIping("tapir-pop", conf.BootTime)).Methods("POST") sr.HandleFunc("/bootstrap", APIbootstrap(conf)).Methods("POST") // sr.HandleFunc("/debug", APIdebug(conf)).Methods("POST") // sr.HandleFunc("/show/api", tapir.APIshowAPI(r)).Methods("GET") @@ -501,7 +556,7 @@ func walkRoutes(router *mux.Router, address string) { // In practice APIdispatcher doesn't need a termination signal, as it will // just sit inside http.ListenAndServe, but we keep it for symmetry. -func APIdispatcher(conf *Config, done <-chan struct{}) { +func APIhandler(conf *Config, done <-chan struct{}) { gob.Register(tapir.WBGlist{}) // Must register the type for gob encoding router := SetupRouter(conf) @@ -510,21 +565,32 @@ func APIdispatcher(conf *Config, done <-chan struct{}) { addresses := viper.GetStringSlice("apiserver.addresses") tlsaddresses := viper.GetStringSlice("apiserver.tlsaddresses") - certfile := viper.GetString("certs.tem.cert") - keyfile := viper.GetString("certs.tem.key") + certfile := viper.GetString("certs.tapir-pop.cert") + if certfile == "" { + log.Printf("*** APIhandler: Error: TLS cert file not specified under key certs.tapir-pop.cert") + } + keyfile := viper.GetString("certs.tapir-pop.key") + if keyfile == "" { + log.Printf("*** APIhandler: Error: TLS key file not specified under key certs.tapir-pop.key") + } bootstrapaddresses := viper.GetStringSlice("bootstrapserver.addresses") bootstraptlsaddresses := viper.GetStringSlice("bootstrapserver.tlsaddresses") bootstraprouter := SetupBootstrapRouter(conf) + log.Printf("*** APIhandler: addresses: %v bootstrapaddresses: %v", addresses, bootstrapaddresses) + log.Printf("*** APIhandler: tlsaddresses: %v bootstraptlsaddresses: %v", tlsaddresses, bootstraptlsaddresses) + tlspossible := true _, err := os.Stat(certfile) if os.IsNotExist(err) { + log.Printf("*** APIhandler: Error: TLS cert file \"%s\" does not exist", certfile) tlspossible = false } _, err = os.Stat(keyfile) if os.IsNotExist(err) { + log.Printf("*** APIhandler: Error: TLS key file \"%s\" does not exist", keyfile) tlspossible = false } diff --git a/bootstrap.go b/bootstrap.go index 298e7bc..a92b4cd 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -8,7 +8,6 @@ import ( "encoding/gob" "encoding/json" "fmt" - "log" "net/http" "time" @@ -27,10 +26,10 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir cd := viper.GetString("certs.certdir") if cd == "" { - log.Fatalf("Error: missing config key: certs.certdir") + TEMExiter("BootstrapMqttSource error: missing config key: certs.certdir") } // cert := cd + "/" + certname - cert := cd + "/" + "tem" + cert := cd + "/" + "tapir-pop" tlsConfig, err := tapir.NewClientConfig(viper.GetString("certs.cacertfile"), cert+".key", cert+".crt") if err != nil { TEMExiter("BootstrapMqttSource: Error: Could not set up TLS: %v", err) @@ -39,7 +38,7 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir tlsConfig.InsecureSkipVerify = true err = api.SetupTLS(tlsConfig) if err != nil { - return nil, fmt.Errorf("error setting up TLS for the API client: %v", err) + TEMExiter("BootstrapMqttSource: error setting up TLS for the API client: %v", err) } bootstrapaddrs := viper.GetStringSlice("bootstrapserver.addresses") @@ -51,7 +50,7 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir // Is this myself? for _, bs := range bootstrapaddrs { if bs == server { - td.Logger.Printf("MQTT bootstrap server %s is myself, skipping", server) + td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s is myself, skipping", server) continue } } @@ -61,12 +60,12 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir // Send an API ping command pr, err := api.SendPing(0, false) if err != nil { - td.Logger.Printf("Ping to MQTT bootstrap server %s failed: %v", server, err) + td.Logger.Printf("BootstrapMqttSource: Ping to MQTT bootstrap server %s failed: %v", server, err) continue } uptime := time.Since(pr.BootTime).Round(time.Second) - td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages", server, uptime, 17) + td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages", server, uptime, 17) status, buf, err := api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{ Command: "greylist-status", @@ -87,30 +86,31 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir var br tapir.BootstrapResponse err = json.Unmarshal(buf, &br) if err != nil { - td.Logger.Printf("Error decoding greylist-status response from %s: %v. Giving up.\n", server, err) + td.Logger.Printf("BootstrapMqttSource: Error decoding greylist-status response from %s: %v. Giving up.\n", server, err) continue } if br.Error { - td.Logger.Printf("Bootstrap server %s responded with error: %s (instead of greylist status)", server, br.ErrorMsg) + td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded with error: %s (instead of greylist status)", server, br.ErrorMsg) } if len(br.Msg) != 0 { - td.Logger.Printf("Bootstrap server %s responded: %s", server, br.Msg) + td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded: %s", server, br.Msg) } - td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages on the %s topic (last msg arrived at %s), ", server, uptime, br.MsgCounters["greylist"], src.Name, br.MsgTimeStamps["greylist"].Format(time.RFC3339)) + td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages on the %s topic (last sub msg arrived at %s), ", server, uptime, br.TopicData[src.Name].SubMsgs, src.Name, br.TopicData[src.Name].LatestSub.Format(tapir.TimeLayout)) status, buf, err = api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{ Command: "export-greylist", ListName: src.Name, Encoding: "gob", // XXX: This is our default, but we'll test other encodings later }, true) + if err != nil { - fmt.Printf("Error from RequestNG: %v\n", err) + td.Logger.Printf("BootstrapMqttSource: Error from RequestNG: %v\n", err) continue } if status != http.StatusOK { - td.Logger.Printf("HTTP Error: %s\n", buf) + td.Logger.Printf("BootstrapMqttSource: HTTP Error: %s\n", buf) continue } @@ -123,14 +123,14 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir var br tapir.BootstrapResponse err = json.Unmarshal(buf, &br) if err != nil { - td.Logger.Printf("Error decoding bootstrap response from %s: %v. Giving up.\n", server, err) + td.Logger.Printf("BootstrapMqttSource: Error decoding bootstrap response from %s: %v. Giving up.\n", server, err) continue } if br.Error { - td.Logger.Printf("Bootstrap server %s responded with error: %s (instead of GOB blob)", server, br.ErrorMsg) + td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded with error: %s (instead of GOB blob)", server, br.ErrorMsg) } if len(br.Msg) != 0 { - td.Logger.Printf("Bootstrap server %s responded: %s (instead of GOB blob)", server, br.Msg) + td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded: %s (instead of GOB blob)", server, br.Msg) } // return nil, fmt.Errorf("Command Error: %s", br.ErrorMsg) continue @@ -151,5 +151,5 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir } // If no bootstrap server succeeded - return nil, fmt.Errorf("all bootstrap servers failed") + return nil, fmt.Errorf("BootstrapMqttSource: all bootstrap servers failed") } diff --git a/config.go b/config.go index 29b628a..8441cba 100644 --- a/config.go +++ b/config.go @@ -127,7 +127,8 @@ type GreylistConf struct { type InternalConf struct { // RefreshZoneCh chan RpzRefresher // RpzCmdCh chan RpzCmdData - APIStopCh chan struct{} + APIStopCh chan struct{} + ComponentStatusCh chan tapir.ComponentStatusUpdate } func ValidateConfig(v *viper.Viper, cfgfile string) error { @@ -139,7 +140,7 @@ func ValidateConfig(v *viper.Viper, cfgfile string) error { } } else { if err := v.Unmarshal(&config); err != nil { - TEMExiter("ValidateConfig: unmarshal error: %v", err) + TEMExiter("ValidateConfig: Unmarshal error: %v", err) } } diff --git a/generate-csr.sh b/generate-csr.sh new file mode 100755 index 0000000..c58657f --- /dev/null +++ b/generate-csr.sh @@ -0,0 +1,31 @@ +#!/bin/sh + +if [ $# != 1 ]; then + echo Usage: $0 instance-id + echo \"instance-id\" is a string that you choose yourself that will identify this DNS TAPIR Edge instance. + echo A domain name is usually a good idea. + exit 1 +fi + +id=$1 + +echo Your chosen DNS TAPIR Edge Id is \"$id\". +/bin/echo -n "Proceed [yes]: " +default_ans="yes" +read answer + +if [ "$answer" == "" ]; then + answer=$default_ans +fi + +echo You typed: \"$answer\" + +if [ "$answer" != "yes" ]; then + echo Terminating. + exit 1 +fi + +openssl genpkey -genparam -algorithm ec -pkeyopt ec_paramgen_curve:P-256 -out ecparam.pem +openssl req -new -out ${id}.csr -newkey ec:ecparam.pem -keyout ${id}.key -subj "/CN=${id}" -nodes + +echo Send the file \"${id}.csr\" to DNS TAPIR admin. You will receive a \"${id}.crt\" in return. diff --git a/go.mod b/go.mod index a2700b1..0934137 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module tem +module tapir-pop go 1.22.0 diff --git a/logging.go b/logging.go index 3fac949..e4f4a02 100644 --- a/logging.go +++ b/logging.go @@ -17,6 +17,15 @@ import ( func SetupLogging(conf *Config) { logfile := viper.GetString("log.file") + debug := viper.GetString("log.mode") == "debug" + logoptions := log.Ldate | log.Ltime + if debug { + log.Println("Logging in debug mode (showing file and line number)") + logoptions |= log.Lshortfile + } + + prefix := "" + if logfile != "" { log.SetOutput(&lumberjack.Logger{ Filename: logfile, @@ -24,7 +33,7 @@ func SetupLogging(conf *Config) { MaxBackups: 3, MaxAge: 14, }) - fmt.Printf("TEM standard logging to: %s\n", logfile) + fmt.Printf("TAPIR-POP standard logging to: %s\n", logfile) } else { TEMExiter("Error: standard log (key log.file) not specified") } @@ -37,14 +46,17 @@ func SetupLogging(conf *Config) { TEMExiter("error opening TEM policy logfile '%s': %v", logfile, err) } - conf.Loggers.Policy = log.New(f, "policy: ", log.Lshortfile) + if debug { + prefix = "policy: " + } + conf.Loggers.Policy = log.New(f, prefix, logoptions) conf.Loggers.Policy.SetOutput(&lumberjack.Logger{ Filename: logfile, MaxSize: 20, MaxBackups: 3, MaxAge: 14, }) - fmt.Printf("TEM policy logging to: %s\n", logfile) + fmt.Printf("TAPIR-POP policy logging to: %s\n", logfile) } else { log.Println("No policy logfile specified, using default") conf.Loggers.Policy = log.Default() @@ -58,14 +70,17 @@ func SetupLogging(conf *Config) { TEMExiter("error opening TEM dnsengine logfile '%s': %v", logfile, err) } - conf.Loggers.Dnsengine = log.New(f, "dnsengine: ", log.Lshortfile) + if debug { + prefix = "dnsengine: " + } + conf.Loggers.Dnsengine = log.New(f, prefix, logoptions) conf.Loggers.Dnsengine.SetOutput(&lumberjack.Logger{ Filename: logfile, MaxSize: 20, MaxBackups: 3, MaxAge: 14, }) - fmt.Printf("TEM dnsengine logging to: %s\n", logfile) + fmt.Printf("TAPIR-POP dnsengine logging to: %s\n", logfile) } else { log.Println("No dnsengine logfile specified, using default") conf.Loggers.Dnsengine = log.Default() @@ -79,14 +94,17 @@ func SetupLogging(conf *Config) { TEMExiter("error opening TEM MQTT logfile '%s': %v", logfile, err) } - conf.Loggers.Mqtt = log.New(f, "mqtt: ", log.Lshortfile) + if debug { + prefix = "mqtt: " + } + conf.Loggers.Mqtt = log.New(f, prefix, logoptions) conf.Loggers.Mqtt.SetOutput(&lumberjack.Logger{ Filename: logfile, MaxSize: 20, MaxBackups: 3, MaxAge: 14, }) - fmt.Printf("TEM MQTT logging to: %s\n", logfile) + fmt.Printf("TAPIR-POP MQTT logging to: %s\n", logfile) } else { log.Println("No MQTT logfile specified, using default") conf.Loggers.Mqtt = log.Default() diff --git a/main.go b/main.go index 85f608a..355fec3 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( "time" _ "github.com/mattn/go-sqlite3" + flag "github.com/spf13/pflag" "github.com/spf13/viper" "github.com/dnstapir/tapir" @@ -119,15 +120,22 @@ func mainloop(conf *Config, configfile *string, td *TemData) { log.Println("mainloop: leaving signal dispatcher") } +var Gconfig Config + func main() { - var conf Config + // var conf Config + + flag.BoolVarP(&tapir.GlobalCF.Debug, "debug", "d", false, "Debug mode") + flag.BoolVarP(&tapir.GlobalCF.Verbose, "verbose", "v", false, "Verbose mode") + flag.Parse() + var cfgFileUsed string var cfgFile string if cfgFile != "" { viper.SetConfigFile(cfgFile) } else { - viper.SetConfigFile(tapir.DefaultTemCfgFile) + viper.SetConfigFile(tapir.DefaultPopCfgFile) } viper.AutomaticEnv() // read in environment variables that match @@ -137,38 +145,38 @@ func main() { fmt.Fprintln(os.Stderr, "Using config file:", viper.ConfigFileUsed()) cfgFileUsed = viper.ConfigFileUsed() } else { - TEMExiter("Could not load config %s: Error: %v", tapir.DefaultTemCfgFile, err) + TEMExiter("Could not load config %s: Error: %v", tapir.DefaultPopCfgFile, err) } - viper.SetConfigFile(tapir.TemSourcesCfgFile) + viper.SetConfigFile(tapir.PopSourcesCfgFile) if err := viper.MergeInConfig(); err == nil { fmt.Fprintln(os.Stderr, "Using config file:", viper.ConfigFileUsed()) cfgFileUsed = viper.ConfigFileUsed() } else { - TEMExiter("Could not load config %s: Error: %v", tapir.TemSourcesCfgFile, err) + TEMExiter("Could not load config %s: Error: %v", tapir.PopSourcesCfgFile, err) } - viper.SetConfigFile(tapir.TemOutputsCfgFile) + viper.SetConfigFile(tapir.PopOutputsCfgFile) if err := viper.MergeInConfig(); err == nil { fmt.Fprintln(os.Stderr, "Using config file:", viper.ConfigFileUsed()) cfgFileUsed = viper.ConfigFileUsed() } else { - TEMExiter("Could not load config %s: Error: %v", tapir.TemOutputsCfgFile, err) + TEMExiter("Could not load config %s: Error: %v", tapir.PopOutputsCfgFile, err) } - viper.SetConfigFile(tapir.TemPolicyCfgFile) + viper.SetConfigFile(tapir.PopPolicyCfgFile) if err := viper.MergeInConfig(); err == nil { fmt.Fprintln(os.Stderr, "Using config file:", viper.ConfigFileUsed()) cfgFileUsed = viper.ConfigFileUsed() } else { - TEMExiter("Could not load config %s: Error: %v", tapir.TemPolicyCfgFile, err) + TEMExiter("Could not load config %s: Error: %v", tapir.PopPolicyCfgFile, err) } - SetupLogging(&conf) + SetupLogging(&Gconfig) err := ValidateConfig(nil, cfgFileUsed) // will terminate on error if err != nil { TEMExiter("Error validating config: %v", err) } - err = viper.Unmarshal(&conf) + err = viper.Unmarshal(&Gconfig) if err != nil { TEMExiter("Error unmarshalling config into struct: %v", err) } @@ -177,11 +185,29 @@ func main() { var stopch = make(chan struct{}, 10) - td, err := NewTemData(&conf, log.Default()) + statusch := make(chan tapir.ComponentStatusUpdate, 10) + Gconfig.Internal.ComponentStatusCh = statusch + + td, err := NewTemData(&Gconfig, log.Default()) if err != nil { TEMExiter("Error from NewTemData: %v", err) } - go td.RefreshEngine(&conf, stopch) + + if td.MqttEngine == nil { + td.mu.Lock() + err := td.CreateMqttEngine(viper.GetString("tapir.mqtt.clientid"), statusch, td.MqttLogger) + if err != nil { + TEMExiter("Error creating MQTT Engine: %v", err) + } + td.mu.Unlock() + err = td.StartMqttEngine(td.MqttEngine) + if err != nil { + TEMExiter("Error starting MQTT Engine: %v", err) + } + } + + go td.StatusUpdater(&Gconfig, stopch) // Note that StatusUpdater must as early as possible + go td.RefreshEngine(&Gconfig, stopch) log.Println("*** main: Calling ParseSourcesNG()") err = td.ParseSourcesNG() @@ -196,16 +222,23 @@ func main() { } apistopper := make(chan struct{}) // - conf.Internal.APIStopCh = apistopper - go APIdispatcher(&conf, apistopper) + Gconfig.Internal.APIStopCh = apistopper + go APIhandler(&Gconfig, apistopper) // go httpsserver(&conf, apistopper) go func() { - if err := DnsEngine(&conf); err != nil { + if err := DnsEngine(&Gconfig); err != nil { log.Printf("Error starting DnsEngine: %v", err) } }() - conf.BootTime = time.Now() + Gconfig.BootTime = time.Now() + + statusch <- tapir.ComponentStatusUpdate{ + Component: "main-boot", + Status: "ok", + Msg: "TAPIR Policy Processor started", + TimeStamp: time.Now(), + } - mainloop(&conf, &cfgFileUsed, td) + mainloop(&Gconfig, &cfgFileUsed, td) } diff --git a/mqtt.go b/mqtt.go index 510b5ea..a72d346 100644 --- a/mqtt.go +++ b/mqtt.go @@ -13,13 +13,13 @@ import ( "github.com/miekg/dns" ) -func (td *TemData) CreateMqttEngine(clientid string, lg *log.Logger) error { +func (td *TemData) CreateMqttEngine(clientid string, statusch chan tapir.ComponentStatusUpdate, lg *log.Logger) error { if clientid == "" { TEMExiter("Error starting MQTT Engine: clientid not specified in config") } var err error td.Logger.Printf("Creating MQTT Engine with clientid %s", clientid) - td.MqttEngine, err = tapir.NewMqttEngine(clientid, tapir.TapirSub, lg) // sub, but no pub + td.MqttEngine, err = tapir.NewMqttEngine("tapir-pop", clientid, tapir.TapirSub, statusch, lg) // sub, but no pub if err != nil { TEMExiter("Error from NewMqttEngine: %v\n", err) } @@ -40,11 +40,12 @@ func (td *TemData) StartMqttEngine(meng *tapir.MqttEngine) error { //TEMExiter("Error from NewMqttEngine: %v\n", err) //} - cmnder, _, inbox, err := meng.StartEngine() + cmnder, outbox, inbox, err := meng.StartEngine() if err != nil { log.Fatalf("Error from StartEngine(): %v", err) } td.TapirMqttCmdCh = cmnder + td.TapirMqttPubCh = outbox td.TapirMqttSubCh = inbox td.TapirMqttEngineRunning = true diff --git a/policy.go b/policy.go index 515edf8..cee6f4d 100644 --- a/policy.go +++ b/policy.go @@ -31,10 +31,10 @@ type TemOutputs struct { } func (td *TemData) ParseOutputs() error { - td.Logger.Printf("ParseOutputs: reading outputs from %s", tapir.TemOutputsCfgFile) - cfgdata, err := os.ReadFile(tapir.TemOutputsCfgFile) + td.Logger.Printf("ParseOutputs: reading outputs from %s", tapir.PopOutputsCfgFile) + cfgdata, err := os.ReadFile(tapir.PopOutputsCfgFile) if err != nil { - log.Fatalf("Error from ReadFile(%s): %v", tapir.TemOutputsCfgFile, err) + log.Fatalf("Error from ReadFile(%s): %v", tapir.PopOutputsCfgFile, err) } var oconf = TemOutputs{ diff --git a/tem-outputs.sample.yaml b/pop-outputs.sample.yaml similarity index 100% rename from tem-outputs.sample.yaml rename to pop-outputs.sample.yaml diff --git a/tem-policy.sample.yaml b/pop-policy.sample.yaml similarity index 100% rename from tem-policy.sample.yaml rename to pop-policy.sample.yaml diff --git a/tem-sources.sample.yaml b/pop-sources.sample.yaml similarity index 100% rename from tem-sources.sample.yaml rename to pop-sources.sample.yaml diff --git a/tem.globalconfig.sample.yaml b/pop.globalconfig.sample.yaml similarity index 100% rename from tem.globalconfig.sample.yaml rename to pop.globalconfig.sample.yaml diff --git a/tem.sample.yaml b/pop.sample.yaml similarity index 86% rename from tem.sample.yaml rename to pop.sample.yaml index 64b096e..67d4966 100644 --- a/tem.sample.yaml +++ b/pop.sample.yaml @@ -22,7 +22,7 @@ bootstrapserver: dnsengine: active: true addresses: [ 127.0.0.1:5360 ] - logfile: /var/log/dnstapir/tem-dnsengine.log + logfile: /var/log/dnstapir/pop-dnsengine.log services: reaper: @@ -37,7 +37,7 @@ services: tapir: mqtt: - logfile: /var/log/dnstapir/tem-mqtt.log + logfile: /var/log/dnstapir/pop-mqtt.log server: tls://mqtt.dev.dnstapir.se:8883 uid: johani clientid: this-must-be-unique @@ -53,9 +53,13 @@ tapir: config: srcname: dns-tapir - topic: events/up/johani/config + topic: config/down/tem/johani validatorkey: /etc/dnstapir/certs/mqttsigner-pub.pem + status: + topic: status/up/tem/must-be-unique + signingkey: /etc/dnstapir/certs/mqttsigner-key.pem + # redirects: # tapir: landing-page.dnstapir.se # police: www.polisen.se @@ -71,4 +75,4 @@ certs: key: /etc/dnstapir/certs/tem.key log: - file: /var/log/dnstapir/tem.log + file: /var/log/dnstapir/pop.log diff --git a/refreshengine.go b/refreshengine.go index d2b59f0..10f90d7 100644 --- a/refreshengine.go +++ b/refreshengine.go @@ -89,25 +89,50 @@ func (td *TemData) RefreshEngine(conf *Config, stopch chan struct{}) { select { case tpkg = <-TapirIntelCh: switch tpkg.Data.MsgType { - case "intel-update", "observation": + case "observation", "intel-update": log.Printf("RefreshEngine: Tapir Observation update: (src: %s) %d additions and %d removals\n", tpkg.Data.SrcName, len(tpkg.Data.Added), len(tpkg.Data.Removed)) _, err := td.ProcessTapirUpdate(tpkg) if err != nil { + Gconfig.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{ + Status: "fail", + Component: "tapir-observation", + Msg: fmt.Sprintf("ProcessTapirUpdate error: %v", err), + } log.Printf("RefreshEngine: Error from ProcessTapirUpdate(): %v", err) } + Gconfig.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{ + Status: "ok", + Component: "tapir-observation", + Msg: fmt.Sprintf("ProcessTapirUpdate: MQTT observation message received"), + } log.Printf("RefreshEngine: Tapir Observation update evaluated.") case "global-config": if !strings.HasSuffix(tpkg.Topic, "config") { log.Printf("RefreshEngine: received global-config message on wrong topic: %s. Ignored", tpkg.Topic) + Gconfig.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{ + Status: "fail", + Component: "mqtt-config", + Msg: fmt.Sprintf("RefreshEngine: received global-config message on wrong topic: %s. Ignored", tpkg.Topic), + } continue } td.ProcessTapirGlobalConfig(tpkg.Data) log.Printf("RefreshEngine: Tapir Global Config evaluated.") + Gconfig.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{ + Status: "ok", + Component: "mqtt-config", + Msg: fmt.Sprintf("RefreshEngine: Tapir Global Config evaluated."), + } default: log.Printf("RefreshEngine: Tapir Message: unknown msg type: %s", tpkg.Data.MsgType) + Gconfig.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{ + Status: "fail", + Component: "mqtt-unknown", + Msg: fmt.Sprintf("RefreshEngine: Tapir Message: unknown msg type: %s", tpkg.Data.MsgType), + } } // log.Printf("RefreshEngine: Tapir IntelUpdate: %v", tpkg.Data) @@ -411,24 +436,45 @@ func (td *TemData) RefreshEngine(conf *Config, stopch chan struct{}) { func (td *TemData) NotifyDownstreams() error { td.Logger.Printf("RefreshEngine: Notifying %d downstreams for RPZ zone %s", len(td.Downstreams), td.Rpz.ZoneName) for _, d := range td.Downstreams { + dest := net.JoinHostPort(d.Address, strconv.Itoa(d.Port)) + csu := tapir.ComponentStatusUpdate{ + Component: "downstream-notify", + Status: "fail", + Msg: fmt.Sprintf("Notifying downstream %s about new SOA serial (%d) for RPZ zone %s", dest, td.Rpz.Axfr.SOA.Serial, td.Rpz.ZoneName), + TimeStamp: time.Now(), + } + m := new(dns.Msg) m.SetNotify(td.Rpz.ZoneName) td.Rpz.Axfr.SOA.Serial = td.Rpz.CurrentSerial // m.Ns = append(m.Ns, dns.RR(&td.Rpz.Axfr.SOA)) - dest := net.JoinHostPort(d.Address, strconv.Itoa(d.Port)) td.Logger.Printf("RefreshEngine: Notifying downstream %s about new SOA serial (%d) for RPZ zone %s", dest, td.Rpz.Axfr.SOA.Serial, td.Rpz.ZoneName) r, err := dns.Exchange(m, dest) if err != nil { // well, we tried - td.Logger.Printf("Error from downstream %s on Notify(%s): %v", dest, td.Rpz.ZoneName, err) + csu.Msg = fmt.Sprintf("Error from downstream %s on NOTIFY(%s): %v", dest, td.Rpz.ZoneName, err) + Gconfig.Internal.ComponentStatusCh <- csu + td.Logger.Println(csu.Msg) continue } if r.Opcode != dns.OpcodeNotify { // well, we tried - td.Logger.Printf("Error: not a NOTIFY QR from downstream %s on Notify(%s): %s", - dest, td.Rpz.ZoneName, dns.OpcodeToString[r.Opcode]) + csu.Msg = fmt.Sprintf("Error: not a NOTIFY response from downstream %s on NOTIFY(%s): %s", dest, td.Rpz.ZoneName, dns.OpcodeToString[r.Opcode]) + Gconfig.Internal.ComponentStatusCh <- csu + td.Logger.Println(csu.Msg) + continue + } else { - td.Logger.Printf("RefreshEngine: Downstream %s responded correctly to Notify(%s) about new SOA serial (%d)", dest, td.Rpz.ZoneName, td.Rpz.Axfr.SOA.Serial) + if r.Rcode != dns.RcodeSuccess { + csu.Msg = fmt.Sprintf("Downstream %s responded with rcode %s to NOTIFY(%s) about new SOA serial (%d)", dest, dns.RcodeToString[r.Rcode], td.Rpz.ZoneName, td.Rpz.Axfr.SOA.Serial) + Gconfig.Internal.ComponentStatusCh <- csu + td.Logger.Println(csu.Msg) + continue + } + csu.Status = "success" + csu.Msg = fmt.Sprintf("Downstream %s responded correctly to NOTIFY(%s) about new SOA serial (%d)", dest, td.Rpz.ZoneName, td.Rpz.Axfr.SOA.Serial) + Gconfig.Internal.ComponentStatusCh <- csu + td.Logger.Println(csu.Msg) } } return nil diff --git a/rpz.go b/rpz.go index 8c7610e..ac03dbc 100644 --- a/rpz.go +++ b/rpz.go @@ -60,21 +60,21 @@ func (td *TemData) GenerateRpzAxfr() error { switch glist.Format { case "map": for k, v := range glist.Names { - td.Logger.Printf("Adding name %s from greylist %s to tentative output.", k, gname) + // td.Logger.Printf("Adding name %s from greylist %s to tentative output.", k, gname) if _, exists := td.BlacklistedNames[k]; exists { - td.Logger.Printf("Greylisted name %s is also blacklisted. No need to add twice.", k) + // td.Logger.Printf("Greylisted name %s is also blacklisted. No need to add twice.", k) } else if td.Whitelisted(k) { - td.Logger.Printf("Greylisted name %s is also whitelisted. Dropped from output.", k) + // td.Logger.Printf("Greylisted name %s is also whitelisted. Dropped from output.", k) } else { - td.Logger.Printf("Greylisted name %s is not whitelisted. Evalutate inclusion in output.", k) + // td.Logger.Printf("Greylisted name %s is not whitelisted. Evalutate inclusion in output.", k) action := td.ComputeRpzAction(k) if action == tapir.WHITELIST { - td.Logger.Printf("Greylisted name %s is not included in output.", k) + // td.Logger.Printf("Greylisted name %s is not included in output.", k) } else { - td.Logger.Printf("Greylisted name %s is included in output.", k) + // td.Logger.Printf("Greylisted name %s is included in output.", k) if _, exists := grey[k]; exists { - td.Logger.Printf("Grey name %s already in output. Combining tags and actions.", k) + // td.Logger.Printf("Grey name %s already in output. Combining tags and actions.", k) tmp := grey[k] tmp.TagMask = grey[k].TagMask | v.TagMask tmp.Action = tmp.Action | v.Action diff --git a/sources.go b/sources.go index f9d530f..b68a474 100644 --- a/sources.go +++ b/sources.go @@ -35,15 +35,16 @@ func NewTemData(conf *Config, lg *log.Logger) (*TemData, error) { } td := TemData{ - Lists: map[string]map[string]*tapir.WBGlist{}, - Logger: lg, - MqttLogger: conf.Loggers.Mqtt, - RpzRefreshCh: make(chan RpzRefresh, 10), - RpzCommandCh: make(chan RpzCmdData, 10), - Rpz: rpzdata, - ReaperInterval: time.Duration(repint) * time.Second, - Verbose: viper.GetBool("log.verbose"), - Debug: viper.GetBool("log.debug"), + Lists: map[string]map[string]*tapir.WBGlist{}, + Logger: lg, + MqttLogger: conf.Loggers.Mqtt, + RpzRefreshCh: make(chan RpzRefresh, 10), + RpzCommandCh: make(chan RpzCmdData, 10), + ComponentStatusCh: conf.Internal.ComponentStatusCh, + Rpz: rpzdata, + ReaperInterval: time.Duration(repint) * time.Second, + Verbose: viper.GetBool("log.verbose"), + Debug: viper.GetBool("log.debug"), } td.Lists["whitelist"] = make(map[string]*tapir.WBGlist, 3) @@ -113,7 +114,7 @@ func NewTemData(conf *Config, lg *log.Logger) (*TemData, error) { func (td *TemData) ParseSourcesNG() error { var srcfoo SrcFoo - configFile := filepath.Clean(tapir.TemSourcesCfgFile) + configFile := filepath.Clean(tapir.PopSourcesCfgFile) data, err := os.ReadFile(configFile) if err != nil { return fmt.Errorf("error reading config file: %v", err) @@ -162,11 +163,13 @@ func (td *TemData) ParseSourcesNG() error { if td.MqttEngine == nil { td.mu.Lock() - err := td.CreateMqttEngine(viper.GetString("tapir.mqtt.clientid"), td.MqttLogger) + err := td.CreateMqttEngine(viper.GetString("tapir.mqtt.clientid"), td.ComponentStatusCh, td.MqttLogger) if err != nil { TEMExiter("Error creating MQTT Engine: %v", err) } td.mu.Unlock() + } else { + td.Logger.Printf("ParseSourcesNG: MQTT Engine already created") } // Ensure that the MQTT Engine listens on the DNS TAPIR config topic @@ -179,10 +182,12 @@ func (td *TemData) ParseSourcesNG() error { if err != nil { TEMExiter("Error fetching MQTT validator key for topic %s: %v", cfgtopic, err) } - err = td.MqttEngine.AddTopic(cfgtopic, nil, valkey) + // err = td.MqttEngine.AddTopic(cfgtopic, nil, valkey) + topicdata, err := td.MqttEngine.PubSubToTopic(cfgtopic, nil, valkey, nil) // XXX: should have a channel to the config processor if err != nil { TEMExiter("Error adding topic %s to MQTT Engine: %v", cfgtopic, err) } + td.Logger.Printf("ParseSourcesNG: Topic data for topic %s: %+v", cfgtopic, topicdata) } for name, src := range srcs { @@ -229,10 +234,12 @@ func (td *TemData) ParseSourcesNG() error { } td.Logger.Printf("ParseSourcesNG: Adding topic '%s' to MQTT Engine", src.Topic) - err = td.MqttEngine.AddTopic(src.Topic, nil, valkey) + // err = td.MqttEngine.AddTopic(src.Topic, nil, valkey) + topicdata, err := td.MqttEngine.PubSubToTopic(src.Topic, nil, valkey, nil) if err != nil { TEMExiter("Error adding topic %s to MQTT Engine: %v", src.Topic, err) } + td.Logger.Printf("ParseSourcesNG: Topic data for topic %s: %+v", src.Topic, topicdata) newsource.Format = "map" // for now if len(src.Bootstrap) > 0 { diff --git a/statusupdater.go b/statusupdater.go new file mode 100644 index 0000000..951b417 --- /dev/null +++ b/statusupdater.go @@ -0,0 +1,146 @@ +/* + * Johan Stenstam, johan.stenstam@internetstiftelsen.se + */ +package main + +import ( + "fmt" + "log" + "path/filepath" + "slices" + "time" + + "github.com/dnstapir/tapir" + "github.com/spf13/viper" +) + +func (td *TemData) StatusUpdater(conf *Config, stopch chan struct{}) { + + var s = tapir.TapirFunctionStatus{ + Function: "tapir-pop", + FunctionID: "random-popper", + ComponentStatus: make(map[string]tapir.TapirComponentStatus), + } + + // me := td.MqttEngine + // if me == nil { + // TEMExiter("StatusUpdater: MQTT Engine not running") + // } + + // Create a new mqtt engine just for the statusupdater. + me, err := tapir.NewMqttEngine("statusupdater", viper.GetString("tapir.mqtt.clientid")+"statusupdates", tapir.TapirPub, td.ComponentStatusCh, log.Default()) + if err != nil { + TEMExiter("StatusUpdater: Error creating MQTT Engine: %v", err) + } + + // var TemStatusCh = make(chan tapir.TemStatusUpdate, 100) + //conf.Internal.TemStatusCh = TemStatusCh + + ticker := time.NewTicker(60 * time.Second) + + statusTopic := viper.GetString("tapir.status.topic") + if statusTopic == "" { + TEMExiter("StatusUpdater: MQTT status topic not set") + } + keyfile := viper.GetString("tapir.status.signingkey") + if keyfile == "" { + TEMExiter("StatusUpdater: MQTT status signing key not set") + } + keyfile = filepath.Clean(keyfile) + signkey, err := tapir.FetchMqttSigningKey(statusTopic, keyfile) + if err != nil { + TEMExiter("StatusUpdater: Error fetching MQTT signing key for topic %s: %v", statusTopic, err) + } + + td.Logger.Printf("StatusUpdater: Adding topic '%s' to MQTT Engine", statusTopic) + msg, err := me.PubSubToTopic(statusTopic, signkey, nil, nil) + if err != nil { + TEMExiter("Error adding topic %s to MQTT Engine: %v", statusTopic, err) + } + td.Logger.Printf("StatusUpdater: Topic status for MQTT engine %s: %+v", me.Creator, msg) + + _, outbox, _, err := me.StartEngine() + if err != nil { + TEMExiter("StatusUpdater: Error starting MQTT Engine: %v", err) + } + + log.Printf("StatusUpdater: Starting") + + var known_components = []string{"tapir-observation", "mqtt-event", "rpz", "rpz-ixfr", "rpz-inbound", "downstream-notify", + "downstream-ixfr", "mqtt-config", "mqtt-unknown", "main-boot", "cert-status"} + + var csu tapir.ComponentStatusUpdate + var dirty bool + for { + select { + case <-ticker.C: + if dirty { + td.Logger.Printf("StatusUpdater: Status is dirty, publishing status update: %+v", s) + // publish an mqtt status update + outbox <- tapir.MqttPkg{ + Topic: statusTopic, + Type: "data", + Data: tapir.TapirMsg{TapirFunctionStatus: s}, + } + dirty = false + } + case csu = <-td.ComponentStatusCh: + log.Printf("StatusUpdater: got status update message: %v", csu) + switch csu.Status { + case "fail", "warn", "ok": + log.Printf("StatusUpdater: status failure: %s", csu.Msg) + var sur tapir.StatusUpdaterResponse + switch { + case slices.Contains(known_components, csu.Component): + comp := s.ComponentStatus[csu.Component] + comp.Status = csu.Status + comp.Msg = csu.Msg + switch csu.Status { + case "fail": + comp.NumFails++ + comp.LastFail = csu.TimeStamp + comp.ErrorMsg = csu.Msg + case "warn": + comp.NumWarns++ + comp.LastWarn = csu.TimeStamp + comp.ErrorMsg = csu.Msg + case "ok": + comp.NumFails = 0 + comp.NumWarns = 0 + comp.LastSuccess = csu.TimeStamp + } + s.ComponentStatus[csu.Component] = comp + dirty = true + sur.Msg = fmt.Sprintf("StatusUpdater: %s report for known component: %s", csu.Status, csu.Component) + default: + log.Printf("StatusUpdater: %s report for unknown component: %s", csu.Status, csu.Component) + sur.Error = true + sur.ErrorMsg = fmt.Sprintf("StatusUpdater: %s report for unknown component: %s", csu.Status, csu.Component) + sur.Msg = fmt.Sprintf("StatusUpdater: known components are: %v", known_components) + } + + if csu.Response != nil { + csu.Response <- sur + } + + case "status": + log.Printf("StatusUpdater: request for status report. Response: %v", csu.Response) + if csu.Response != nil { + csu.Response <- tapir.StatusUpdaterResponse{ + FunctionStatus: s, + KnownComponents: known_components, + } + log.Printf("StatusUpdater: request for status report sent") + } else { + log.Printf("StatusUpdater: request for status report ignored due to lack of a response channel") + } + + default: + log.Printf("StatusUpdater: Unknown status: %s", csu.Status) + } + case <-stopch: + log.Printf("StatusUpdater: stopping") + return + } + } +} diff --git a/structs.go b/structs.go index 4f2ca97..cd8c1cb 100644 --- a/structs.go +++ b/structs.go @@ -1,4 +1,5 @@ /* + * Johan Stenstam, johan.stenstam@internetstiftelsen.se * Copyright (c) DNS TAPIR */ package main @@ -21,6 +22,7 @@ type TemData struct { TapirMqttCmdCh chan tapir.MqttEngineCmd TapirMqttSubCh chan tapir.MqttPkg TapirMqttPubCh chan tapir.MqttPkg // not used ATM + ComponentStatusCh chan tapir.ComponentStatusUpdate Logger *log.Logger MqttLogger *log.Logger BlacklistedNames map[string]bool diff --git a/xfr.go b/xfr.go index 82b2a4d..1345fc3 100644 --- a/xfr.go +++ b/xfr.go @@ -11,6 +11,7 @@ import ( "net" "strings" "sync" + "time" "github.com/dnstapir/tapir" "github.com/miekg/dns" @@ -211,6 +212,12 @@ func (td *TemData) RpzIxfrOut(w dns.ResponseWriter, r *dns.Msg) (uint32, int, er err := tr.Out(w, r, outbound_xfr) if err != nil { td.Logger.Printf("Error from transfer.Out(): %v", err) + td.ComponentStatusCh <- tapir.ComponentStatusUpdate{ + Component: "rpz-ixfr", + Status: "fail", + Msg: fmt.Sprintf("Error from transfer.Out(): %v", err), + TimeStamp: time.Now(), + } } wg.Done() }() @@ -306,6 +313,12 @@ func (td *TemData) RpzIxfrOut(w dns.ResponseWriter, r *dns.Msg) (uint32, int, er td.Logger.Printf("RpzIxfrOut: Error from Close(): %v", err) } + td.ComponentStatusCh <- tapir.ComponentStatusUpdate{ + Component: "rpz-ixfr", + Status: "ok", + TimeStamp: time.Now(), + } + td.Logger.Printf("RpzIxfrOut: %s: Sent %d RRs (including SOA twice).", zone, total_sent) err = td.PruneRpzIxfrChain() if err != nil {