Skip to content

Commit

Permalink
Merge pull request #58 from dnstapir/feature/mqtt_config_bootstrap
Browse files Browse the repository at this point in the history
mqtt config bootstrap
  • Loading branch information
johanix authored Sep 3, 2024
2 parents 47bd5ed + 92a21ba commit b2e7b0f
Show file tree
Hide file tree
Showing 21 changed files with 470 additions and 102 deletions.
22 changes: 11 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PROG:=tem
PROG:=tapir-pop
VERSION:=`cat ./VERSION`
COMMIT:=`git describe --dirty=+WiP --always`
APPDATE=`date +"%Y-%m-%d-%H:%M"`
Expand Down Expand Up @@ -37,20 +37,20 @@ netbsd:
/bin/sh make-version.sh $(VERSION)-$(COMMIT) $(APPDATE) $(PROG)
GOOS=netbsd GOARCH=amd64 go build $(GOFLAGS) -o ${PROG}.netbsd

gen-mqtt-msg-new-qname.go: checkout/events-mqtt-message-new_qname.json
go-jsonschema checkout/events-mqtt-message-new_qname.json --package main --tags json --only-models --output gen-mqtt-msg-new-qname.go
# gen-mqtt-msg-new-qname.go: checkout/events-mqtt-message-new_qname.json
# go-jsonschema checkout/events-mqtt-message-new_qname.json --package main --tags json --only-models --output gen-mqtt-msg-new-qname.go

gen-mqtt-msg.go: checkout/events-mqtt-message.json
go-jsonschema checkout/events-mqtt-message.json --package main --tags json --only-models --output gen-mqtt-msg.go
# gen-mqtt-msg.go: checkout/events-mqtt-message.json
# go-jsonschema checkout/events-mqtt-message.json --package main --tags json --only-models --output gen-mqtt-msg.go

checkout/events-mqtt-message-new_qname.json: checkout
cd checkout; python schemasplit.py events-mqtt-message-new_qname.yaml
# checkout/events-mqtt-message-new_qname.json: checkout
# cd checkout; python schemasplit.py events-mqtt-message-new_qname.yaml

checkout/events-mqtt-message.json: checkout
cd checkout; python schemasplit.py events-mqtt-message.yaml
# checkout/events-mqtt-message.json: checkout
# cd checkout; python schemasplit.py events-mqtt-message.yaml

checkout:
git clone git@github.com:dnstapir/protocols.git checkout
# checkout:
# git clone git@github.com:dnstapir/protocols.git checkout

clean:
@rm -f $(PROG) *~ version.go
Expand Down
86 changes: 76 additions & 10 deletions apihandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,31 @@ func APIcommand(conf *Config) func(w http.ResponseWriter, r *http.Request) {
switch cp.Command {
case "status":
log.Printf("Daemon status inquiry\n")
rt := make(chan tapir.StatusUpdaterResponse)
var ts *tapir.StatusUpdaterResponse

conf.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{
Component: "status",
Status: "status",
Response: rt,
}

select {
case foo := <-rt:
ts = &foo
case <-time.After(2 * time.Second):
log.Println("Timeout waiting for response on rt channel")
ts = nil
}

resp = tapir.CommandResponse{
Status: "ok", // only status we know, so far
Msg: "We're happy, but send more cookies"}
Msg: "We're happy, but send more cookies",
}
if ts != nil {
resp.TapirFunctionStatus = ts.FunctionStatus
}

case "stop":
log.Printf("Daemon instructed to stop\n")
resp = tapir.CommandResponse{
Expand Down Expand Up @@ -245,9 +267,11 @@ func APIbootstrap(conf *Config) func(w http.ResponseWriter, r *http.Request) {
case "greylist-status":
me := conf.TemData.MqttEngine
stats := me.Stats()
resp.MsgCounters = stats.MsgCounters
resp.MsgTimeStamps = stats.MsgTimeStamps
log.Printf("API: greylist-status: msgs: %d last msg: %v", stats.MsgCounters[bp.ListName], stats.MsgTimeStamps[bp.ListName])
// resp.MsgCounters = stats.MsgCounters
// resp.MsgTimeStamps = stats.MsgTimeStamps
resp.TopicData = stats
// log.Printf("API: greylist-status: msgs: %d last msg: %v", stats.MsgCounters[bp.ListName], stats.MsgTimeStamps[bp.ListName])
log.Printf("API: greylist-status: %v", stats)

case "export-greylist":
td := conf.TemData
Expand Down Expand Up @@ -403,7 +427,7 @@ func APIdebug(conf *Config) func(w http.ResponseWriter, r *http.Request) {

case "mqtt-stats":
log.Printf("TEM debug MQTT stats")
resp.MqttStats = td.MqttEngine.Stats()
resp.TopicData = td.MqttEngine.Stats()

case "reaper-stats":
log.Printf("TEM debug reaper stats")
Expand Down Expand Up @@ -449,6 +473,37 @@ func APIdebug(conf *Config) func(w http.ResponseWriter, r *http.Request) {
resp.RpzOutput = append(resp.RpzOutput, *rpzn)
}

case "send-status":
log.Printf("TEM debug send status")

rt := make(chan tapir.StatusUpdaterResponse)
var sur *tapir.StatusUpdaterResponse

conf.Internal.ComponentStatusCh <- tapir.ComponentStatusUpdate{
Component: dp.Component,
Status: dp.Status,
Msg: "debug status update",
TimeStamp: time.Now(),
Response: rt,
}

select {
case foo := <-rt:
sur = &foo
case <-time.After(2 * time.Second):
log.Println("Timeout waiting for response on rt channel")
sur = nil
}

if sur != nil {
resp.Status = "ok"
resp.Msg = sur.Msg
} else {
resp.Msg = "Status update for component " + dp.Component + " sent"
resp.Error = true
resp.ErrorMsg = "Timeout waiting for response on rt channel"
}

default:
resp.ErrorMsg = fmt.Sprintf("Unknown command: %s", dp.Command)
resp.Error = true
Expand All @@ -461,7 +516,7 @@ func SetupRouter(conf *Config) *mux.Router {

sr := r.PathPrefix("/api/v1").Headers("X-API-Key",
viper.GetString("apiserver.key")).Subrouter()
sr.HandleFunc("/ping", tapir.APIping("tem", conf.BootTime)).Methods("POST")
sr.HandleFunc("/ping", tapir.APIping("tapir-pop", conf.BootTime)).Methods("POST")
sr.HandleFunc("/command", APIcommand(conf)).Methods("POST")
sr.HandleFunc("/bootstrap", APIbootstrap(conf)).Methods("POST")
sr.HandleFunc("/debug", APIdebug(conf)).Methods("POST")
Expand All @@ -474,7 +529,7 @@ func SetupBootstrapRouter(conf *Config) *mux.Router {
r := mux.NewRouter().StrictSlash(true)

sr := r.PathPrefix("/api/v1").Headers("X-API-Key", viper.GetString("apiserver.key")).Subrouter()
sr.HandleFunc("/ping", tapir.APIping("tem", conf.BootTime)).Methods("POST")
sr.HandleFunc("/ping", tapir.APIping("tapir-pop", conf.BootTime)).Methods("POST")
sr.HandleFunc("/bootstrap", APIbootstrap(conf)).Methods("POST")
// sr.HandleFunc("/debug", APIdebug(conf)).Methods("POST")
// sr.HandleFunc("/show/api", tapir.APIshowAPI(r)).Methods("GET")
Expand All @@ -501,7 +556,7 @@ func walkRoutes(router *mux.Router, address string) {

// In practice APIdispatcher doesn't need a termination signal, as it will
// just sit inside http.ListenAndServe, but we keep it for symmetry.
func APIdispatcher(conf *Config, done <-chan struct{}) {
func APIhandler(conf *Config, done <-chan struct{}) {
gob.Register(tapir.WBGlist{}) // Must register the type for gob encoding
router := SetupRouter(conf)

Expand All @@ -510,21 +565,32 @@ func APIdispatcher(conf *Config, done <-chan struct{}) {

addresses := viper.GetStringSlice("apiserver.addresses")
tlsaddresses := viper.GetStringSlice("apiserver.tlsaddresses")
certfile := viper.GetString("certs.tem.cert")
keyfile := viper.GetString("certs.tem.key")
certfile := viper.GetString("certs.tapir-pop.cert")
if certfile == "" {
log.Printf("*** APIhandler: Error: TLS cert file not specified under key certs.tapir-pop.cert")
}
keyfile := viper.GetString("certs.tapir-pop.key")
if keyfile == "" {
log.Printf("*** APIhandler: Error: TLS key file not specified under key certs.tapir-pop.key")
}

bootstrapaddresses := viper.GetStringSlice("bootstrapserver.addresses")
bootstraptlsaddresses := viper.GetStringSlice("bootstrapserver.tlsaddresses")
bootstraprouter := SetupBootstrapRouter(conf)

log.Printf("*** APIhandler: addresses: %v bootstrapaddresses: %v", addresses, bootstrapaddresses)
log.Printf("*** APIhandler: tlsaddresses: %v bootstraptlsaddresses: %v", tlsaddresses, bootstraptlsaddresses)

tlspossible := true

_, err := os.Stat(certfile)
if os.IsNotExist(err) {
log.Printf("*** APIhandler: Error: TLS cert file \"%s\" does not exist", certfile)
tlspossible = false
}
_, err = os.Stat(keyfile)
if os.IsNotExist(err) {
log.Printf("*** APIhandler: Error: TLS key file \"%s\" does not exist", keyfile)
tlspossible = false
}

Expand Down
34 changes: 17 additions & 17 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/gob"
"encoding/json"
"fmt"
"log"
"net/http"
"time"

Expand All @@ -27,10 +26,10 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir

cd := viper.GetString("certs.certdir")
if cd == "" {
log.Fatalf("Error: missing config key: certs.certdir")
TEMExiter("BootstrapMqttSource error: missing config key: certs.certdir")
}
// cert := cd + "/" + certname
cert := cd + "/" + "tem"
cert := cd + "/" + "tapir-pop"
tlsConfig, err := tapir.NewClientConfig(viper.GetString("certs.cacertfile"), cert+".key", cert+".crt")
if err != nil {
TEMExiter("BootstrapMqttSource: Error: Could not set up TLS: %v", err)
Expand All @@ -39,7 +38,7 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
tlsConfig.InsecureSkipVerify = true
err = api.SetupTLS(tlsConfig)
if err != nil {
return nil, fmt.Errorf("error setting up TLS for the API client: %v", err)
TEMExiter("BootstrapMqttSource: error setting up TLS for the API client: %v", err)
}

bootstrapaddrs := viper.GetStringSlice("bootstrapserver.addresses")
Expand All @@ -51,7 +50,7 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
// Is this myself?
for _, bs := range bootstrapaddrs {
if bs == server {
td.Logger.Printf("MQTT bootstrap server %s is myself, skipping", server)
td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s is myself, skipping", server)
continue
}
}
Expand All @@ -61,12 +60,12 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
// Send an API ping command
pr, err := api.SendPing(0, false)
if err != nil {
td.Logger.Printf("Ping to MQTT bootstrap server %s failed: %v", server, err)
td.Logger.Printf("BootstrapMqttSource: Ping to MQTT bootstrap server %s failed: %v", server, err)
continue
}

uptime := time.Since(pr.BootTime).Round(time.Second)
td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages", server, uptime, 17)
td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages", server, uptime, 17)

status, buf, err := api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "greylist-status",
Expand All @@ -87,30 +86,31 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
var br tapir.BootstrapResponse
err = json.Unmarshal(buf, &br)
if err != nil {
td.Logger.Printf("Error decoding greylist-status response from %s: %v. Giving up.\n", server, err)
td.Logger.Printf("BootstrapMqttSource: Error decoding greylist-status response from %s: %v. Giving up.\n", server, err)
continue
}
if br.Error {
td.Logger.Printf("Bootstrap server %s responded with error: %s (instead of greylist status)", server, br.ErrorMsg)
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded with error: %s (instead of greylist status)", server, br.ErrorMsg)
}
if len(br.Msg) != 0 {
td.Logger.Printf("Bootstrap server %s responded: %s", server, br.Msg)
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded: %s", server, br.Msg)
}

td.Logger.Printf("MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages on the %s topic (last msg arrived at %s), ", server, uptime, br.MsgCounters["greylist"], src.Name, br.MsgTimeStamps["greylist"].Format(time.RFC3339))
td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages on the %s topic (last sub msg arrived at %s), ", server, uptime, br.TopicData[src.Name].SubMsgs, src.Name, br.TopicData[src.Name].LatestSub.Format(tapir.TimeLayout))

status, buf, err = api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "export-greylist",
ListName: src.Name,
Encoding: "gob", // XXX: This is our default, but we'll test other encodings later
}, true)

if err != nil {
fmt.Printf("Error from RequestNG: %v\n", err)
td.Logger.Printf("BootstrapMqttSource: Error from RequestNG: %v\n", err)
continue
}

if status != http.StatusOK {
td.Logger.Printf("HTTP Error: %s\n", buf)
td.Logger.Printf("BootstrapMqttSource: HTTP Error: %s\n", buf)
continue
}

Expand All @@ -123,14 +123,14 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
var br tapir.BootstrapResponse
err = json.Unmarshal(buf, &br)
if err != nil {
td.Logger.Printf("Error decoding bootstrap response from %s: %v. Giving up.\n", server, err)
td.Logger.Printf("BootstrapMqttSource: Error decoding bootstrap response from %s: %v. Giving up.\n", server, err)
continue
}
if br.Error {
td.Logger.Printf("Bootstrap server %s responded with error: %s (instead of GOB blob)", server, br.ErrorMsg)
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded with error: %s (instead of GOB blob)", server, br.ErrorMsg)
}
if len(br.Msg) != 0 {
td.Logger.Printf("Bootstrap server %s responded: %s (instead of GOB blob)", server, br.Msg)
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded: %s (instead of GOB blob)", server, br.Msg)
}
// return nil, fmt.Errorf("Command Error: %s", br.ErrorMsg)
continue
Expand All @@ -151,5 +151,5 @@ func (td *TemData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir
}

// If no bootstrap server succeeded
return nil, fmt.Errorf("all bootstrap servers failed")
return nil, fmt.Errorf("BootstrapMqttSource: all bootstrap servers failed")
}
5 changes: 3 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ type GreylistConf struct {
type InternalConf struct {
// RefreshZoneCh chan RpzRefresher
// RpzCmdCh chan RpzCmdData
APIStopCh chan struct{}
APIStopCh chan struct{}
ComponentStatusCh chan tapir.ComponentStatusUpdate
}

func ValidateConfig(v *viper.Viper, cfgfile string) error {
Expand All @@ -139,7 +140,7 @@ func ValidateConfig(v *viper.Viper, cfgfile string) error {
}
} else {
if err := v.Unmarshal(&config); err != nil {
TEMExiter("ValidateConfig: unmarshal error: %v", err)
TEMExiter("ValidateConfig: Unmarshal error: %v", err)
}
}

Expand Down
31 changes: 31 additions & 0 deletions generate-csr.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/sh

if [ $# != 1 ]; then
echo Usage: $0 instance-id
echo \"instance-id\" is a string that you choose yourself that will identify this DNS TAPIR Edge instance.
echo A domain name is usually a good idea.
exit 1
fi

id=$1

echo Your chosen DNS TAPIR Edge Id is \"$id\".
/bin/echo -n "Proceed [yes]: "
default_ans="yes"
read answer

if [ "$answer" == "" ]; then
answer=$default_ans
fi

echo You typed: \"$answer\"

if [ "$answer" != "yes" ]; then
echo Terminating.
exit 1
fi

openssl genpkey -genparam -algorithm ec -pkeyopt ec_paramgen_curve:P-256 -out ecparam.pem
openssl req -new -out ${id}.csr -newkey ec:ecparam.pem -keyout ${id}.key -subj "/CN=${id}" -nodes

echo Send the file \"${id}.csr\" to DNS TAPIR admin. You will receive a \"${id}.crt\" in return.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module tem
module tapir-pop

go 1.22.0

Expand Down
Loading

0 comments on commit b2e7b0f

Please sign in to comment.