diff --git a/Makefile b/Makefile index ec1eb3f..de000b3 100644 --- a/Makefile +++ b/Makefile @@ -11,4 +11,10 @@ dependencies_up: dependencies_down .PHONY: tests ## tests: runs tests against a locally running mongo container tests: - go test -v ./... \ No newline at end of file + go test -v ./... + +.PHONY: example +## example: runs an http-server locally +example: + go build -o bin/server streamingconfig/example/server + ./bin/server \ No newline at end of file diff --git a/README.md b/README.md index e87addf..c205436 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ fields. ## Usage -checkout the [example](./example/main.go). +checkout the [example](./example/server/main.go). 1. Define a configuration with `json` field tags (and optionally with `default` field tags): 2. Make sure that your configuration type implements the `streamingconfig.Config` interface: @@ -36,4 +36,35 @@ checkout the [example](./example/main.go). ```shell make dependencies_up make tests -``` \ No newline at end of file +``` + +### Run example server + +```shell +make dependencies_up +make example +``` + +Optionally, you can also start a second server to check that the changes happening in one server will be reflected in the other: + +```shell +HTTP_PORT=8081 make example +``` + +#### Getting latest configuration request +```shell +curl -X GET --location "http://localhost:8080/configs/latest" +``` +#### Changing latest configuration request +```shell +curl -X PUT --location "http://localhost:8080/configs/latest" \ + -H "user-id: mark" \ + -d '{ + "name": "betty", + "age": 35 +}' +``` +#### Listing multiple versions +```shell +curl -X GET --location "http://localhost:8080/configs?fromVersion=0&toVersion=21" +``` diff --git a/example/main.go b/example/main.go deleted file mode 100644 index 883596b..0000000 --- a/example/main.go +++ /dev/null @@ -1,107 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "log/slog" - "time" - - config "streamingconfig" - - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -type conf struct { - Name string `json:"name" default:"john"` - Age int `json:"age"` -} - -func (c *conf) Update(new config.Config) error { - newCfg, ok := new.(*conf) - if !ok { - return errors.New("wrong configuration") - } - c.Name = newCfg.Name - c.Age = newCfg.Age - return nil -} - -// issue: `make dependencies_up` before running this executable -func main() { - lgr := slog.Default() - db := getDb() - repo, err := config.NewWatchedRepo[*conf]( - config.Args{ - Logger: lgr, - DB: db, - }) - if err != nil { - panic(err) - } - ctx, cnl := context.WithTimeout(context.Background(), 10*time.Second) - defer cnl() - // start repo watching - done, err := repo.Start(ctx) - if err != nil { - panic(err) - } - cfg, err := repo.GetConfig() - if err != nil { - panic(err) - } - // out: {"name":"john","age":0} - prettyPrintJson(cfg) - v0, err := repo.GetLatestVersion() - if err != nil { - panic(err) - } - // out: - // {"version":0,"updated_by":"","created_at":"0001-01-01T00:00:00Z","config":{"name":"john","age":0}} - // - // notice version 0 indicates that the repository was not yet initialized (no - // update yet called, configuration leverages only default values) - prettyPrintJson(v0) - v1, err := repo.UpdateConfig(ctx, config.UpdateConfigCmd[*conf]{ - By: "user1", - Config: &conf{ - Name: "bobby", - Age: 30, - }, - }) - if err != nil { - panic(err) - } - // out: {"version":1,"updated_by":"user1","created_at":"2024-07-16T15:55:40.06717881Z","config":{"name":"bobby","age":30}} - prettyPrintJson(v1) - - cnl() - <-done -} - -func getDb() *mongo.Database { - ctx, cnl := context.WithTimeout(context.Background(), 5*time.Second) - defer cnl() - // use test name as db name to parallel tests. - opts := options.Client() - opts.ApplyURI("mongodb://localhost:27017/?connect=direct") - client, err := mongo.Connect(ctx, opts) - if err != nil { - panic(fmt.Errorf("run `make dependencies_up` before, error: %w", err)) - } - err = client.Ping(ctx, nil) - if err != nil { - panic(fmt.Errorf("error %v\nrun `make dependencies_up` before running main\n", err)) - } - return client.Database("test") -} - -func prettyPrintJson(cfg any) { - b, err := json.Marshal(cfg) - if err != nil { - panic(err) - } - fmt.Printf("%s\n", string(b)) -} diff --git a/example/server/configs.http b/example/server/configs.http new file mode 100644 index 0000000..5765eed --- /dev/null +++ b/example/server/configs.http @@ -0,0 +1,15 @@ +### GET latest config +GET http://localhost:8080/configs/latest + + +### Modify latest config +PUT http://localhost:8080/configs/latest +user-id: mark + +{ + "name": "stark", + "age": 35 +} + +### List config versions +GET http://localhost:8080/configs?fromVersion=0&toVersion=21 diff --git a/example/server/main.go b/example/server/main.go new file mode 100644 index 0000000..7d7f29b --- /dev/null +++ b/example/server/main.go @@ -0,0 +1,109 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + config "streamingconfig" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type conf struct { + Name string `json:"name" default:"john"` + Age int `json:"age"` +} + +func (c *conf) Update(new config.Config) error { + newCfg, ok := new.(*conf) + if !ok { + return errors.New("wrong configuration") + } + c.Name = newCfg.Name + c.Age = newCfg.Age + return nil +} + +func main() { + port := os.Getenv("HTTP_PORT") + if port == "" { + port = "8080" + } + runnableCtx, cancelRunnables := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancelRunnables() + repo, done := initRepo(runnableCtx) + s := &server{repo: repo, lgr: slog.Default()} + mux := http.NewServeMux() + mux.HandleFunc("GET /configs/latest", s.latestConfigHandler) + mux.HandleFunc("PUT /configs/latest", s.putConfigHandler) + mux.HandleFunc("GET /configs", s.listConfigsHandler) + // Create a new server + srv := &http.Server{ + Addr: fmt.Sprintf(":%s", port), + Handler: mux, + } + // Start the server in a goroutine + go func() { + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatal(err) + } + }() + fmt.Printf("Server listening on port %s\n", port) + // until shutdown signal is sent + <-runnableCtx.Done() + // Graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer func() { + cancel() + }() + + if err := srv.Shutdown(ctx); err != nil { + log.Println("Error during shutdown:", err) + } + log.Println("Server stopped") + <-done +} + +func initRepo(ctx context.Context) (*config.WatchedRepo[*conf], <-chan struct{}) { + lgr := slog.Default() + db := getDb() + repo, err := config.NewWatchedRepo[*conf]( + config.Args{ + Logger: lgr, + DB: db, + }) + if err != nil { + log.Fatal(err) + } + done, err := repo.Start(ctx) + if err != nil { + log.Fatal(err) + } + return repo, done +} + +func getDb() *mongo.Database { + ctx, cnl := context.WithTimeout(context.Background(), 5*time.Second) + defer cnl() + // use test name as db name to parallel tests. + opts := options.Client() + opts.ApplyURI("mongodb://localhost:27017/?connect=direct") + client, err := mongo.Connect(ctx, opts) + if err != nil { + panic(fmt.Errorf("run `make dependencies_up` before, error: %w", err)) + } + err = client.Ping(ctx, nil) + if err != nil { + panic(fmt.Errorf("error %v\nrun `make dependencies_up` before running main\n", err)) + } + return client.Database("test") +} diff --git a/example/server/server.go b/example/server/server.go new file mode 100644 index 0000000..c319512 --- /dev/null +++ b/example/server/server.go @@ -0,0 +1,118 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "strconv" + + config "streamingconfig" +) + +type server struct { + lgr *slog.Logger + repo *config.WatchedRepo[*conf] +} + +// latestConfigHandler returns the latest configuration +func (s *server) latestConfigHandler(w http.ResponseWriter, r *http.Request) { + latestVersion, err := s.repo.GetLatestVersion() + if err != nil { + s.lgr.With("error", err).ErrorContext(r.Context(), "getting latest") + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + + if err := json.NewEncoder(w).Encode(latestVersion); err != nil { + s.lgr.With("error", err).ErrorContext(r.Context(), "encoding response") + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +// putConfigHandler returns a specific config version +func (s *server) putConfigHandler(w http.ResponseWriter, r *http.Request) { + userID := r.Header.Get("user-id") + if userID == "" { + w.WriteHeader(http.StatusUnauthorized) + return + } + // decode input config + body, err := io.ReadAll(r.Body) + if err != nil { + s.lgr.With("error", err).ErrorContext(r.Context(), "reading body payload") + w.WriteHeader(http.StatusInternalServerError) + return + } + // Close the body to avoid leaks + defer r.Body.Close() + + cfg := new(conf) + err = json.Unmarshal(body, cfg) + if err != nil { + s.lgr.With("error", err).ErrorContext(r.Context(), "unmarshalling request into configuration") + w.WriteHeader(http.StatusInternalServerError) + // Handle JSON parsing error + return + } + + updated, err := s.repo.UpdateConfig(r.Context(), config.UpdateConfigCmd[*conf]{ + By: userID, + Config: cfg, + }) + if err != nil { + s.lgr.With("error", err).ErrorContext(r.Context(), "updating configuration") + w.WriteHeader(http.StatusInternalServerError) + // Handle JSON parsing error + return + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(updated); err != nil { + s.lgr.With("error", err).ErrorContext(r.Context(), "encoding response") + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +// listConfigsHandler returns configs between versions (fromVersion and toVersion) +func (s *server) listConfigsHandler(w http.ResponseWriter, r *http.Request) { + fromVersionStr := r.URL.Query().Get("fromVersion") + toVersionStr := r.URL.Query().Get("toVersion") + if fromVersionStr == "" || toVersionStr == "" { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(w, "Missing fromVersion or toVersion parameter") + return + } + fromVersion, err := strconv.ParseUint(fromVersionStr, 0, 32) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(w, "fromVersion must be a non-negative integer") + s.lgr.With("error", err).ErrorContext(r.Context(), "parsing from-version string %s", fromVersionStr) + return + } + toVersion, err := strconv.ParseUint(toVersionStr, 0, 32) + if err != nil { + s.lgr.With("error", err).ErrorContext(r.Context(), "parsing to-version string %s", toVersionStr) + fmt.Fprintf(w, "toVersion must be a non-negative integer") + w.WriteHeader(http.StatusBadRequest) + return + } + versions, err := s.repo.ListVersionedConfigs(r.Context(), config.ListVersionedConfigsQuery{ + FromVersion: uint32(fromVersion), + ToVersion: uint32(toVersion), + }) + if err != nil { + s.lgr.With("error", err).ErrorContext(r.Context(), "listing versions") + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(versions); err != nil { + s.lgr.With("error", err).ErrorContext(r.Context(), "encoding response") + w.WriteHeader(http.StatusInternalServerError) + return + } +}