Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Config store preview #339

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,24 @@
"program": "${workspaceFolder}/main.go",
"env": {},
"args": ["monitor", "--config", "/etc/replication-manager/config.toml", "--log-file", "/var/lib/replication-manager/data/replication-manager.log"],
},
{
"name": "config_store server",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/config_store/main.go",
"env": {},
"args": [],
},
{
"name": "config_store client",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/config_store-client/main.go",
"env": {},
"args": [],
}
]
}
101 changes: 101 additions & 0 deletions cmd/config_store-client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"context"
"log"

cs "github.com/signal18/replication-manager/config_store"
)

func main() {
// generate a key one time to use for the secrets
// key, err := cs.GenerateHexKey()
// if err != nil {
// log.Fatalf("Could not generate hex key: %s", err)
// }
key := "b00a4909fe6b7113c20ad8443cfe40075b817fe4351c4e287b44f1d69336edc7"
log.Printf("Key: %s", key)

// connect to the locally available config_store server
csc := cs.NewConfigStore("127.0.0.1:7777", cs.Environment_NONE)
csc.SetKeyFromHex(key)

err := csc.ImportTOML("/etc/replication-manager/")
if err != nil {
log.Fatalf("Could not import TOML config: %s", err)
}

mySQLSection := csc.Section("mysql")

props := make([]*cs.Property, 0)
props = append(props, mySQLSection.NewProperty("client-test", "foo", "foo-2"))
props = append(props, mySQLSection.NewProperty("client-test", "bar", "value1", "value20"))

password, err := mySQLSection.NewSecret("cluster", "rootpassword", "somesecretpassword")
if err != nil {
log.Fatalf("Could not create secret")
}
props = append(props, password)

log.Printf("password property: %v", password)

ctx := context.Background()
responses, err := csc.Store(ctx, props)
if err != nil {
log.Fatalf("Error storing: %s", err)
}

for _, r := range responses {
log.Printf("Store response data: %v", r)
if r.Secret {
log.Printf("Property is secret: %s", r.GetValues())
}
}

// list the available properties
available, err := csc.Search(ctx, &cs.Query{})
if err != nil {
log.Printf("Error retrieving data: %s", err)
}
for _, r := range available {
log.Printf("List response data: %v", r)
}

specificKey, err := csc.Search(ctx, &cs.Query{
Property: &cs.Property{
Key: "foo",
Namespace: "client-test",
},
})
if err != nil {
log.Printf("Error retrieving data: %s", err)
}
for _, r := range specificKey {
log.Printf("List specificKey data: %v", r)
}

specificNamespace, err := csc.Search(ctx, &cs.Query{
Property: &cs.Property{
Namespace: "client-test",
},
})
if err != nil {
log.Printf("Error retrieving data: %s", err)
}
for _, r := range specificNamespace {
log.Printf("List specificNamespace data: %v", r)
}

specificUnavailableNamespace, err := csc.Search(ctx, &cs.Query{
Property: &cs.Property{
Namespace: "bar-section",
},
})
if err != nil {
log.Printf("Error retrieving data: %s", err)
}
for _, r := range specificUnavailableNamespace {
log.Printf("List bar-section data: %v", r)
}

}
134 changes: 134 additions & 0 deletions cmd/config_store/internal/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package server

import (
"fmt"
"io"
"log"
"net"

"github.com/signal18/replication-manager/cmd/config_store/internal/storage"
cs "github.com/signal18/replication-manager/config_store"
"google.golang.org/grpc"
)

type Server struct {
cs.ConfigStoreServer

st storage.ConfigStorage

grpcServer *grpc.Server
conf Config
Up chan bool
}

type Config struct {
ListenAddressForgRPC string
}

func NewServer(conf Config, storage storage.ConfigStorage) *Server {
s := Server{
conf: conf,
st: storage,
}

s.Up = make(chan bool)
return &s
}

func (s *Server) StartGRPCServer() error {
lis, err := net.Listen("tcp", s.conf.ListenAddressForgRPC)
if err != nil {
return err
}

// create a gRPC server object
s.grpcServer = grpc.NewServer()

cs.RegisterConfigStoreServer(s.grpcServer, s)

s.Up <- true
log.Printf("starting HTTP/2 gRPC server, listening on: %s", lis.Addr().String())
if err := s.grpcServer.Serve(lis); err != nil {
return err
}

return nil
}

func (s *Server) Store(stream cs.ConfigStore_StoreServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}

if in == nil {
return fmt.Errorf("empty object sent")
}

log.Printf("Received property: %v", in)

err = in.Validate()
if err != nil {
log.Printf("Error on validation: %s", err)
return err
}

// check if the Store is set, if not set to default
if in.Namespace == "" {
in.Namespace = "default"
}

find, err := s.st.Search(&cs.Query{
Property: in,
Limit: 1,
IgnoreValue: true,
})

if err != nil && err != storage.ErrNoRowsFound {
log.Printf("Error on searching for existing property: %s", err)
return err
}

if len(find) == 1 {
found := find[0]
if found.Key == in.Key && found.Namespace == in.Namespace && found.Environment == in.Environment {

if cs.ValuesEqual(found.Values, in.Values) {
log.Printf("Property did not change: %v", in)
stream.Send(found)
continue
}
log.Printf("Updating existing property: %v with %v", found, in)
found.Values = in.Values
found.Revision++
in = found
}
}

out, err := s.st.Store(in)
if err != nil {
log.Printf("Error on storing: %s", err)
return err
}

stream.Send(out)
}
}

func (s *Server) Search(query *cs.Query, stream cs.ConfigStore_SearchServer) error {
properties, err := s.st.Search(query)
if err != nil {
return err
}

for _, p := range properties {
err := stream.Send(p)
if err != nil {
log.Printf("Error sending: %s", err)
return err
}
}

return nil
}
Loading