Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqtt config bootstrap #58

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PROG:=tem
PROG:=tapir-pop
VERSION:=`cat ./VERSION`
COMMIT:=`git describe --dirty=+WiP --always`
APPDATE=`date +"%Y-%m-%d-%H:%M"`
Expand Down Expand Up @@ -37,20 +37,20 @@ netbsd:
/bin/sh make-version.sh $(VERSION)-$(COMMIT) $(APPDATE) $(PROG)
GOOS=netbsd GOARCH=amd64 go build $(GOFLAGS) -o ${PROG}.netbsd

gen-mqtt-msg-new-qname.go: checkout/events-mqtt-message-new_qname.json
go-jsonschema checkout/events-mqtt-message-new_qname.json --package main --tags json --only-models --output gen-mqtt-msg-new-qname.go
# gen-mqtt-msg-new-qname.go: checkout/events-mqtt-message-new_qname.json
# go-jsonschema checkout/events-mqtt-message-new_qname.json --package main --tags json --only-models --output gen-mqtt-msg-new-qname.go

gen-mqtt-msg.go: checkout/events-mqtt-message.json
go-jsonschema checkout/events-mqtt-message.json --package main --tags json --only-models --output gen-mqtt-msg.go
# gen-mqtt-msg.go: checkout/events-mqtt-message.json
# go-jsonschema checkout/events-mqtt-message.json --package main --tags json --only-models --output gen-mqtt-msg.go

checkout/events-mqtt-message-new_qname.json: checkout
cd checkout; python schemasplit.py events-mqtt-message-new_qname.yaml
# checkout/events-mqtt-message-new_qname.json: checkout
# cd checkout; python schemasplit.py events-mqtt-message-new_qname.yaml

checkout/events-mqtt-message.json: checkout
cd checkout; python schemasplit.py events-mqtt-message.yaml
# checkout/events-mqtt-message.json: checkout
# cd checkout; python schemasplit.py events-mqtt-message.yaml

checkout:
git clone git@github.com:dnstapir/protocols.git checkout
# checkout:
# git clone git@github.com:dnstapir/protocols.git checkout

clean:
@rm -f $(PROG) *~ version.go
Expand Down
86 changes: 76 additions & 10 deletions apihandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,31 @@ func APIcommand(conf *Config) func(w http.ResponseWriter, r *http.Request) {
switch cp.Command {
case "status":
log.Printf("Daemon status inquiry\n")
rt := make(chan tapir.StatusUpdaterResponse)
var ts *tapir.StatusUpdaterResponse

conf.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{
Component: "status",
Status: "status",
Response: rt,
}

select {
case foo := <-rt:
ts = &foo
case <-time.After(2 * time.Second):
log.Println("Timeout waiting for response on rt channel")
ts = nil
}

resp = tapir.CommandResponse{
Status: "ok", // only status we know, so far
Msg: "We're happy, but send more cookies"}
Msg: "We're happy, but send more cookies",
}
if ts != nil {
resp.TapirFunctionStatus = ts.FunctionStatus
}

case "stop":
log.Printf("Daemon instructed to stop\n")
resp = tapir.CommandResponse{
Expand Down Expand Up @@ -245,9 +267,11 @@ func APIbootstrap(conf *Config) func(w http.ResponseWriter, r *http.Request) {
case "greylist-status":
me := conf.TemData.MqttEngine
stats := me.Stats()
resp.MsgCounters = stats.MsgCounters
resp.MsgTimeStamps = stats.MsgTimeStamps
log.Printf("API: greylist-status: msgs: %d last msg: %v", stats.MsgCounters[bp.ListName], stats.MsgTimeStamps[bp.ListName])
// resp.MsgCounters = stats.MsgCounters
// resp.MsgTimeStamps = stats.MsgTimeStamps
resp.TopicData = stats
// log.Printf("API: greylist-status: msgs: %d last msg: %v", stats.MsgCounters[bp.ListName], stats.MsgTimeStamps[bp.ListName])
log.Printf("API: greylist-status: %v", stats)

case "export-greylist":
td := conf.TemData
Expand Down Expand Up @@ -403,7 +427,7 @@ func APIdebug(conf *Config) func(w http.ResponseWriter, r *http.Request) {

case "mqtt-stats":
log.Printf("TEM debug MQTT stats")
resp.MqttStats = td.MqttEngine.Stats()
resp.TopicData = td.MqttEngine.Stats()

case "reaper-stats":
log.Printf("TEM debug reaper stats")
Expand Down Expand Up @@ -449,6 +473,37 @@ func APIdebug(conf *Config) func(w http.ResponseWriter, r *http.Request) {
resp.RpzOutput = append(resp.RpzOutput, *rpzn)
}

case "send-status":
log.Printf("TEM debug send status")

rt := make(chan tapir.StatusUpdaterResponse)
var sur *tapir.StatusUpdaterResponse

conf.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{
Component: dp.Component,
Status: dp.Status,
Msg: "debug status update",
TimeStamp: time.Now(),
Response: rt,
}

select {
case foo := <-rt:
sur = &foo
case <-time.After(2 * time.Second):
log.Println("Timeout waiting for response on rt channel")
sur = nil
}

if sur != nil {
resp.Status = "ok"
resp.Msg = sur.Msg
} else {
resp.Msg = "Status update for component " + dp.Component + " sent"
resp.Error = true
resp.ErrorMsg = "Timeout waiting for response on rt channel"
}

default:
resp.ErrorMsg = fmt.Sprintf("Unknown command: %s", dp.Command)
resp.Error = true
Expand All @@ -461,7 +516,7 @@ func SetupRouter(conf *Config) *mux.Router {

sr := r.PathPrefix("/api/v1").Headers("X-API-Key",
viper.GetString("apiserver.key")).Subrouter()
sr.HandleFunc("/ping", tapir.APIping("tem", conf.BootTime)).Methods("POST")
sr.HandleFunc("/ping", tapir.APIping("tapir-pop", conf.BootTime)).Methods("POST")
sr.HandleFunc("/command", APIcommand(conf)).Methods("POST")
sr.HandleFunc("/bootstrap", APIbootstrap(conf)).Methods("POST")
sr.HandleFunc("/debug", APIdebug(conf)).Methods("POST")
Expand All @@ -474,7 +529,7 @@ func SetupBootstrapRouter(conf *Config) *mux.Router {
r := mux.NewRouter().StrictSlash(true)

sr := r.PathPrefix("/api/v1").Headers("X-API-Key", viper.GetString("apiserver.key")).Subrouter()
sr.HandleFunc("/ping", tapir.APIping("tem", conf.BootTime)).Methods("POST")
sr.HandleFunc("/ping", tapir.APIping("tapir-pop", conf.BootTime)).Methods("POST")
sr.HandleFunc("/bootstrap", APIbootstrap(conf)).Methods("POST")
// sr.HandleFunc("/debug", APIdebug(conf)).Methods("POST")
// sr.HandleFunc("/show/api", tapir.APIshowAPI(r)).Methods("GET")
Expand All @@ -501,7 +556,7 @@ func walkRoutes(router *mux.Router, address string) {

// In practice APIdispatcher doesn't need a termination signal, as it will
// just sit inside http.ListenAndServe, but we keep it for symmetry.
func APIdispatcher(conf *Config, done <-chan struct{}) {
func APIhandler(conf *Config, done <-chan struct{}) {
gob.Register(tapir.WBGlist{}) // Must register the type for gob encoding
router := SetupRouter(conf)

Expand All @@ -510,21 +565,32 @@ func APIdispatcher(conf *Config, done <-chan struct{}) {

addresses := viper.GetStringSlice("apiserver.addresses")
tlsaddresses := viper.GetStringSlice("apiserver.tlsaddresses")
certfile := viper.GetString("certs.tem.cert")
keyfile := viper.GetString("certs.tem.key")
certfile := viper.GetString("certs.tapir-pop.cert")
if certfile == "" {
log.Printf("*** APIhandler: Error: TLS cert file not specified under key certs.tapir-pop.cert")
}
keyfile := viper.GetString("certs.tapir-pop.key")
if keyfile == "" {
log.Printf("*** APIhandler: Error: TLS key file not specified under key certs.tapir-pop.key")
}

bootstrapaddresses := viper.GetStringSlice("bootstrapserver.addresses")
bootstraptlsaddresses := viper.GetStringSlice("bootstrapserver.tlsaddresses")
bootstraprouter := SetupBootstrapRouter(conf)

log.Printf("*** APIhandler: addresses: %v bootstrapaddresses: %v", addresses, bootstrapaddresses)
log.Printf("*** APIhandler: tlsaddresses: %v bootstraptlsaddresses: %v", tlsaddresses, bootstraptlsaddresses)

tlspossible := true

_, err := os.Stat(certfile)
if os.IsNotExist(err) {
log.Printf("*** APIhandler: Error: TLS cert file \"%s\" does not exist", certfile)
tlspossible = false
}
_, err = os.Stat(keyfile)
if os.IsNotExist(err) {
log.Printf("*** APIhandler: Error: TLS key file \"%s\" does not exist", keyfile)
tlspossible = false
}

Expand Down
34 changes: 17 additions & 17 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/gob"
"encoding/json"
"fmt"
"log"
"net/http"
"time"

Expand All @@ -27,10 +26,10 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir

cd := viper.GetString("certs.certdir")
if cd == "" {
log.Fatalf("Error: missing config key: certs.certdir")
TEMExiter("BootstrapMqttSource error: missing config key: certs.certdir")
}
// cert := cd + "/" + certname
cert := cd + "/" + "tem"
cert := cd + "/" + "tapir-pop"
tlsConfig, err := tapir.NewClientConfig(viper.GetString("certs.cacertfile"), cert+".key", cert+".crt")
if err != nil {
TEMExiter("BootstrapMqttSource: Error: Could not set up TLS: %v", err)
Expand All @@ -39,7 +38,7 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
tlsConfig.InsecureSkipVerify = true
err = api.SetupTLS(tlsConfig)
if err != nil {
return nil, fmt.Errorf("error setting up TLS for the API client: %v", err)
TEMExiter("BootstrapMqttSource: error setting up TLS for the API client: %v", err)
}

bootstrapaddrs := viper.GetStringSlice("bootstrapserver.addresses")
Expand All @@ -51,7 +50,7 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
// Is this myself?
for _, bs := range bootstrapaddrs {
if bs == server {
td.Logger.Printf("MQTT bootstrap server %s is myself, skipping", server)
td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s is myself, skipping", server)
continue
}
}
Expand All @@ -61,12 +60,12 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
// Send an API ping command
pr, err := api.SendPing(0, false)
if err != nil {
td.Logger.Printf("Ping to MQTT bootstrap server %s failed: %v", server, err)
td.Logger.Printf("BootstrapMqttSource: Ping to MQTT bootstrap server %s failed: %v", server, err)
continue
}

uptime := time.Since(pr.BootTime).Round(time.Second)
td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages", server, uptime, 17)
td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages", server, uptime, 17)

status, buf, err := api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "greylist-status",
Expand All @@ -87,30 +86,31 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
var br tapir.BootstrapResponse
err = json.Unmarshal(buf, &br)
if err != nil {
td.Logger.Printf("Error decoding greylist-status response from %s: %v. Giving up.\n", server, err)
td.Logger.Printf("BootstrapMqttSource: Error decoding greylist-status response from %s: %v. Giving up.\n", server, err)
continue
}
if br.Error {
td.Logger.Printf("Bootstrap server %s responded with error: %s (instead of greylist status)", server, br.ErrorMsg)
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded with error: %s (instead of greylist status)", server, br.ErrorMsg)
}
if len(br.Msg) != 0 {
td.Logger.Printf("Bootstrap server %s responded: %s", server, br.Msg)
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded: %s", server, br.Msg)
}

td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages on the %s topic (last msg arrived at %s), ", server, uptime, br.MsgCounters["greylist"], src.Name, br.MsgTimeStamps["greylist"].Format(time.RFC3339))
td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages on the %s topic (last sub msg arrived at %s), ", server, uptime, br.TopicData[src.Name].SubMsgs, src.Name, br.TopicData[src.Name].LatestSub.Format(tapir.TimeLayout))

status, buf, err = api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "export-greylist",
ListName: src.Name,
Encoding: "gob", // XXX: This is our default, but we'll test other encodings later
}, true)

if err != nil {
fmt.Printf("Error from RequestNG: %v\n", err)
td.Logger.Printf("BootstrapMqttSource: Error from RequestNG: %v\n", err)
continue
}

if status != http.StatusOK {
td.Logger.Printf("HTTP Error: %s\n", buf)
td.Logger.Printf("BootstrapMqttSource: HTTP Error: %s\n", buf)
continue
}

Expand All @@ -123,14 +123,14 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
var br tapir.BootstrapResponse
err = json.Unmarshal(buf, &br)
if err != nil {
td.Logger.Printf("Error decoding bootstrap response from %s: %v. Giving up.\n", server, err)
td.Logger.Printf("BootstrapMqttSource: Error decoding bootstrap response from %s: %v. Giving up.\n", server, err)
continue
}
if br.Error {
td.Logger.Printf("Bootstrap server %s responded with error: %s (instead of GOB blob)", server, br.ErrorMsg)
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded with error: %s (instead of GOB blob)", server, br.ErrorMsg)
}
if len(br.Msg) != 0 {
td.Logger.Printf("Bootstrap server %s responded: %s (instead of GOB blob)", server, br.Msg)
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded: %s (instead of GOB blob)", server, br.Msg)
}
// return nil, fmt.Errorf("Command Error: %s", br.ErrorMsg)
continue
Expand All @@ -151,5 +151,5 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
}

// If no bootstrap server succeeded
return nil, fmt.Errorf("all bootstrap servers failed")
return nil, fmt.Errorf("BootstrapMqttSource: all bootstrap servers failed")
}
5 changes: 3 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ type GreylistConf struct {
type InternalConf struct {
// RefreshZoneCh chan RpzRefresher
// RpzCmdCh chan RpzCmdData
APIStopCh chan struct{}
APIStopCh chan struct{}
ComponentStatusCh chan tapir.ComponentStatusUpdate
}

func ValidateConfig(v *viper.Viper, cfgfile string) error {
Expand All @@ -139,7 +140,7 @@ func ValidateConfig(v *viper.Viper, cfgfile string) error {
}
} else {
if err := v.Unmarshal(&config); err != nil {
TEMExiter("ValidateConfig: unmarshal error: %v", err)
TEMExiter("ValidateConfig: Unmarshal error: %v", err)
}
}

Expand Down
31 changes: 31 additions & 0 deletions generate-csr.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/sh

if [ $# != 1 ]; then
echo Usage: $0 instance-id
echo \"instance-id\" is a string that you choose yourself that will identify this DNS TAPIR Edge instance.
echo A domain name is usually a good idea.
exit 1
fi

id=$1

echo Your chosen DNS TAPIR Edge Id is \"$id\".
/bin/echo -n "Proceed [yes]: "
default_ans="yes"
read answer

if [ "$answer" == "" ]; then
answer=$default_ans
fi

echo You typed: \"$answer\"

if [ "$answer" != "yes" ]; then
echo Terminating.
exit 1
fi

openssl genpkey -genparam -algorithm ec -pkeyopt ec_paramgen_curve:P-256 -out ecparam.pem
openssl req -new -out ${id}.csr -newkey ec:ecparam.pem -keyout ${id}.key -subj "/CN=${id}" -nodes

echo Send the file \"${id}.csr\" to DNS TAPIR admin. You will receive a \"${id}.crt\" in return.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module tem
module tapir-pop

go 1.22.0

Expand Down
Loading
Loading