From 50da1818aca495c8b74689de340a7080142c86b7 Mon Sep 17 00:00:00 2001 From: Ash Berlin Date: Wed, 18 Jan 2017 15:15:44 +0000 Subject: [PATCH] Add a kubernetes backend Although kubernetes isn't a key-value store it provides useful information for some workloads. There are integration tests included for this but they are not run automatically on Travis because it is hard/impossible to get even something like minikube running there. --- backends/client.go | 3 + backends/kubernetes/client.go | 378 ++++++++++++++++++ .../kubernetes/confdir/conf.d/k8s.toml | 7 + .../kubernetes/confdir/templates/k8s-svc.tmpl | 14 + integration/kubernetes/k8s.yml | 52 +++ integration/kubernetes/test.sh | 25 ++ 6 files changed, 479 insertions(+) create mode 100644 backends/kubernetes/client.go create mode 100644 integration/kubernetes/confdir/conf.d/k8s.toml create mode 100644 integration/kubernetes/confdir/templates/k8s-svc.tmpl create mode 100644 integration/kubernetes/k8s.yml create mode 100755 integration/kubernetes/test.sh diff --git a/backends/client.go b/backends/client.go index 5e60228a3..699741288 100644 --- a/backends/client.go +++ b/backends/client.go @@ -8,6 +8,7 @@ import ( "github.com/kelseyhightower/confd/backends/dynamodb" "github.com/kelseyhightower/confd/backends/env" "github.com/kelseyhightower/confd/backends/etcd" + "github.com/kelseyhightower/confd/backends/kubernetes" "github.com/kelseyhightower/confd/backends/rancher" "github.com/kelseyhightower/confd/backends/redis" "github.com/kelseyhightower/confd/backends/stackengine" @@ -39,6 +40,8 @@ func New(config Config) (StoreClient, error) { // Create the etcd client upfront and use it for the life of the process. // The etcdClient is an http.Client and designed to be reused. return etcd.NewEtcdClient(backendNodes, config.ClientCert, config.ClientKey, config.ClientCaKeys, config.BasicAuth, config.Username, config.Password) + case "kubernetes": + return kubernetes.New(backendNodes, config.ClientCert, config.ClientKey, config.ClientCaKeys, config.BasicAuth, config.Username, config.Password) case "zookeeper": return zookeeper.NewZookeeperClient(backendNodes) case "rancher": diff --git a/backends/kubernetes/client.go b/backends/kubernetes/client.go new file mode 100644 index 000000000..3c60fafbe --- /dev/null +++ b/backends/kubernetes/client.go @@ -0,0 +1,378 @@ +/* +Package kubernetes provides a backend for confd by synthesising a +key/value-like view on top of the kubernetes API. + +Using In-Cluster + +The simplest way to use this backend is to run it inside a pod in your +kubernetes cluster: + + confd --backend kubernetes --watch + +In this case (with no `-node` flag given) it will look at the kubernetes +service account in the default location and use this to find and speak to the +API server. + +If you don't want to look at the services in a namespace other than the current +one then set a `POD_NAMESPACE` environment variable + +Using Out-of-Cluster + +It is also possible to use this backend from outside of the kubernetes cluster. +The easiest way is to use "kubectl proxy" to handle the authentication for you: + + kubectl proxy & + confd --backend kubernetes --node 127.0.0.1:8001 + +or you can specify the credentials with "--username"/"--password" or with a +combination of "--client-ca-keys", "--client-cert", and "--client-key" + +To specify the namespace in this specify the node with a query parameter -- for example: + + kubectl proxy & + confd --backend kubernetes --node 127.0.0.1:8001?namespace=my-team + + +Mapping API Object to Variables + +Since confd expects a key-value store and the kubernetes API doesn't expose +this directly we have to define our own pattern of variables from the API +objects. + +The only API objects (and thus initial key path) supported are endpoints. + +For a service called "mysvc" it will create the following variables under +"/endpoints/mysvc": + +— A "ports/$port_name" variable for each named port with the port number as the +value. Ports with numbers only names are not present. + +— A set of keys under "ips" for each ready pod + + /endpoints/mysvc/ips/0: 172.17.0.6 + /endpoints/mysvc/ips/1: 172.17.07 + +— A set of keys under "allips" that combines ready and notready pods + + /endpoints/mysvc/allips/0: 172.17.0.6 + /endpoints/mysvc/allips/1: 172.17.0.7 + /endpoints/mysvc/allips/2: 172.17.0.5 + +A complete listing of all the variables created in this example service are + + /endpoints/mysvc/ports/http: 8080 + /endpoints/mysvc/ips/0: 172.17.0.6 + /endpoints/mysvc/ips/1: 172.17.07 + /endpoints/mysvc/allips/0: 172.17.0.6 + /endpoints/mysvc/allips/1: 172.17.0.7 + /endpoints/mysvc/allips/2: 172.17.0.5 + +*/ +package kubernetes + +import ( + "fmt" + "io/ioutil" + "net/url" + "os" + "strconv" + "strings" + + "github.com/kelseyhightower/confd/log" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" +) + +type Client struct { + clientset *kubernetes.Clientset + endpointResourceVersion string + endpointWatcher cache.ListerWatcher +} + +// New creates a Kubernetes backend for the givne config credentials. +// +// If all of the given values are empty/thier default value then we will +// attempt to configure from in-cluster ServiceAccount provided to k8s pods +// including targeting the current namespace. +// +func New(machines []string, cert, key, caCert string, basicAuth bool, username string, password string) (*Client, error) { + namespace := "default" + + // If everything is empty, try the in cluster config + var cfg *rest.Config + if len(machines) == 0 && cert == "" && key == "" && caCert == "" && username == "" && password == "" { + var err error + cfg, err = rest.InClusterConfig() + if err != nil { + return nil, err + } + + token, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/" + api.ServiceAccountNamespaceKey) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + } else { + namespace = string(token) + } + } else { + if len(machines) != 1 { + return nil, fmt.Errorf("kubernetes backend only supports a single node, %d given", len(machines)) + } + // Check for `?namespace=' in the machines[0] + url, err := url.Parse(machines[0]) + if err != nil { + return nil, fmt.Errorf("Error parsing node URL: %s", err) + } + + if ns, ok := url.Query()["namespace"]; ok && len(ns) >= 1 { + namespace = ns[len(ns)-1] + } + + // IF we are given "host:port?opts" then the bit we care about will be in + // Path. If we are given "http://host:port?opts" then it will appear in + // host. Handle both cases. + var host string + if url.Host != "" { + host = url.Host + } else { + host = url.Path + } + + cfg = &rest.Config{ + Host: host, + Username: username, + Password: password, + TLSClientConfig: rest.TLSClientConfig{ + CertFile: cert, + KeyFile: key, + CAFile: caCert, + }, + } + } + + if ns, ok := os.LookupEnv("POD_NAMESPACE"); ok { + log.Info("Changing target kubernetes namespace to %q from POD_NAMESPACE environment variable", ns) + namespace = ns + } + + log.Info("Using kubernetes API server at %s looking at namespace %q", cfg.Host, namespace) + + clientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, err + } + + return &Client{ + clientset: clientset, + endpointWatcher: cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "endpoints", namespace, nil), + }, nil +} + +type endpointMatcher interface { + Matches(e v1.Endpoints) bool +} + +type allEndpointsMatcher struct { +} + +func (allEndpointsMatcher) Matches(v1.Endpoints) bool { + // "/endpoints" case + return true +} + +type serviceNameMatcher struct { + ServiceName string +} + +func (m serviceNameMatcher) Matches(e v1.Endpoints) bool { + return e.Name == m.ServiceName +} + +func varsFromV1Endpoint(e v1.Endpoints) map[string]string { + vars := make(map[string]string) + + addHostVars := func(idx int, kind string, addr v1.EndpointAddress) { + varName := fmt.Sprintf("/endpoints/%s/%s/%d", e.Name, kind, idx) + vars[varName] = addr.IP + } + + for _, subset := range e.Subsets { + + portPrefix := fmt.Sprintf("/endpoints/%s/port/", e.Name) + for _, port := range subset.Ports { + if port.Name != "" { + vars[portPrefix+port.Name] = strconv.Itoa(int(port.Port)) + } else { + vars[portPrefix+strconv.Itoa(int(port.Port))] = strconv.Itoa(int(port.Port)) + } + } + + for n, node := range subset.Addresses { + addHostVars(n, "ips", node) + // We want allips to include ready and not-ready IPs + addHostVars(n, "allips", node) + } + for n, node := range subset.NotReadyAddresses { + addHostVars(n+len(subset.Addresses), "allips", node) + } + } + return vars +} + +func newEndpointMatchFromKeyParts(parts []string) (endpointMatcher, bool) { + // Types of path we might be given: + // - /endpoints (all endpoints!) + // - /endpoints/mysvc (this service) + + if parts[0] != "endpoints" { + panic("parts must start with \"endpoints\"!") + } + + if len(parts) == 1 { + return allEndpointsMatcher{}, true + } + + matcher := serviceNameMatcher{ServiceName: parts[1]} + + return matcher, true +} + +func (k *Client) buildMatchers(keys []string) ([]endpointMatcher, error) { + var matchers []endpointMatcher + for _, key := range keys { + key = strings.TrimPrefix(key, "/") + parts := strings.Split(key, "/") + + switch parts[0] { + case "endpoints": + matcher, ok := newEndpointMatchFromKeyParts(parts) + if ok { + matchers = append(matchers, matcher) + } + default: + return nil, fmt.Errorf("Unknown key type %q", parts[0]) + } + } + return matchers, nil +} + +func (k *Client) GetValues(keys []string) (map[string]string, error) { + log.Debug("Getting keys: %+v", keys) + vars := make(map[string]string) + + endpointMatchers, err := k.buildMatchers(keys) + if err != nil { + return nil, err + } + + if len(endpointMatchers) > 0 { + k.setEndpointValues(&vars, endpointMatchers) + } + + log.Debug("Got vars %#+v", vars) + return vars, nil +} + +func (k *Client) setEndpointValues(vars *map[string]string, matchers []endpointMatcher) error { + + genericList, err := k.endpointWatcher.List(api.ListOptions{}) + if err != nil { + return err + } + + list, ok := genericList.(*v1.EndpointsList) + if !ok { + return fmt.Errorf("Expected a *v1.EndpointsList but got %T", genericList) + } + + // Store the version so if we are in Watch mode it will restart from the same + // place so we don't miss any changes + k.endpointResourceVersion = list.GetResourceVersion() + + for _, ep := range list.Items { + for _, matcher := range matchers { + if !matcher.Matches(ep) { + log.Debug("Endpoint %+v didn't match %#+v", ep.Name, matcher) + continue + } + log.Debug("Endpoint %+v matched", ep.Name) + for k, v := range varsFromV1Endpoint(ep) { + (*vars)[k] = v + } + } + } + + return nil +} + +func (k *Client) WatchPrefix(prefix string, keys []string, lastIndex uint64, stopChan chan bool) (uint64, error) { + endpointMatchers, err := k.buildMatchers(keys) + if err != nil { + return lastIndex, err + } + + listWatcher := k.endpointWatcher + + if k.endpointResourceVersion == "" { + // We don't yet have a resource version so this is the first time through. + // So yes, something has changed (from nothing to whatever the current + // state is) + return lastIndex, nil + } + + opts := api.ListOptions{ + ResourceVersion: k.endpointResourceVersion, + } + epWatcher, err := listWatcher.Watch(opts) + if err != nil { + return lastIndex, err + } + + for { + select { + case <-stopChan: + epWatcher.Stop() + return lastIndex, nil + case e := <-epWatcher.ResultChan(): + switch obj := e.Object.(type) { + case nil: + // Timeout or other error. Just try again from where we last were. + epWatcher.Stop() + epWatcher, err = listWatcher.Watch(opts) + if err != nil { + return lastIndex, err + } + + case *unversioned.Status: + // If we get anything we don't understand we should clear the + // ResourceVersion so that we come in with a fresh one and start the + // watch again + k.endpointResourceVersion = "" + if obj.Status == unversioned.StatusFailure && obj.Reason == unversioned.StatusReasonGone { + log.Info("Restarting watch after getting Gone reason: %s", obj.Message) + return lastIndex, nil + } else { + return lastIndex, fmt.Errorf("Kubernetes API returned an error %#+v", e.Object) + } + default: + k.endpointResourceVersion = "" + return lastIndex, fmt.Errorf("Expected a *v1.Endpoints but got %T, %#+v", e.Object, e.Object) + case *v1.Endpoints: + + for _, matcher := range endpointMatchers { + if matcher.Matches(*obj) { + k.endpointResourceVersion = opts.ResourceVersion + epWatcher.Stop() + return lastIndex, nil + } + } + } + } + } +} diff --git a/integration/kubernetes/confdir/conf.d/k8s.toml b/integration/kubernetes/confdir/conf.d/k8s.toml new file mode 100644 index 000000000..2456fd7a8 --- /dev/null +++ b/integration/kubernetes/confdir/conf.d/k8s.toml @@ -0,0 +1,7 @@ +[template] +mode = "0644" +src = "k8s-svc.tmpl" +dest = "/tmp/confd-k8s-svc.conf" +keys = [ + "/endpoints/test-app", +] diff --git a/integration/kubernetes/confdir/templates/k8s-svc.tmpl b/integration/kubernetes/confdir/templates/k8s-svc.tmpl new file mode 100644 index 000000000..57dbef228 --- /dev/null +++ b/integration/kubernetes/confdir/templates/k8s-svc.tmpl @@ -0,0 +1,14 @@ +--BEGIN-ALLIPS-- +# There should be two lines here +{{ $port := getv "/endpoints/test-app/port/http" "this-should-be-8080" -}} +{{ range getvs "/endpoints/test-app/allips/*" -}} +host: {{ . }}:{{ $port }} +{{ end -}} +--END-- + +--BEGIN-IPS-- +# There should be one line here because only one of the pods is ready +{{ range getvs "/endpoints/test-app/ips/*" -}} +host: {{ . }}:{{ $port }} +{{- end }} +--END-- diff --git a/integration/kubernetes/k8s.yml b/integration/kubernetes/k8s.yml new file mode 100644 index 000000000..a08808a33 --- /dev/null +++ b/integration/kubernetes/k8s.yml @@ -0,0 +1,52 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + name: confd-test +--- +# Create two pods manually, one listening so it will be ready, one not. +apiVersion: v1 +kind: Pod +metadata: + name: test-app-ready + namespace: confd-test + labels: + app: test-app +spec: + containers: + - image: alpine + name: test-app + command: ["nc", "-lk", "-p", "8080", "-v", "-e", "date"] + readinessProbe: + tcpSocket: + port: 8080 +--- +apiVersion: v1 +kind: Pod +metadata: + name: test-app-notready + namespace: confd-test + labels: + app: test-app +spec: + containers: + - image: alpine + name: test-app + # Use 'sh' as the command so that we can trap SIGINT and exit cleanly + command: ["sh", "-c", "trap 'exit 0' SIGTERM; while true; do sleep 0.5; done"] + readinessProbe: + tcpSocket: + port: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-app + namespace: confd-test +spec: + selector: + app: test-app + ports: + - name: http + port: 8080 + diff --git a/integration/kubernetes/test.sh b/integration/kubernetes/test.sh new file mode 100755 index 000000000..59eeb20c3 --- /dev/null +++ b/integration/kubernetes/test.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -e -o pipefail + +# Configure kubernetes +kubectl apply -f ./integration/kubernetes/k8s.yml + +# `kubectl proxy` should be running for this to work +# Run confd -- we need to use a custom confdir because k8s isn't a real key value store, but just emulates one from the endpoints API +confd --watch --onetime --log-level debug --confdir ./integration/kubernetes/confdir --backend kubernetes --node 127.0.0.1:8001?namespace=confd-test + + +count=$(sed &2 "Expected 2 lines for /endpoints/test-app/allips/*, got $count" + exit 1 +} + +count=$(sed &2 "Expected 1 lines for /endpoints/test-app/ips/*, got $count" + exit 1 +} + +echo "Tests OK"