Skip to content

Commit

Permalink
feat: allow telegraf to support hot reload (#63)
Browse files Browse the repository at this point in the history
* feat: allow telegraf to support hot reload

* Apply suggestions from code review

Co-authored-by: Marko Mikulicic <mkm@influxdata.com>

* More updates after code review

Co-authored-by: Marko Mikulicic <mkm@influxdata.com>
  • Loading branch information
Wojciech Kocjan and Marko Mikulicic authored Jul 30, 2021
1 parent 79b6434 commit fd1f79c
Show file tree
Hide file tree
Showing 14 changed files with 607 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ COPY go.sum go.sum
RUN go mod download

# Copy the go source
COPY main.go sidecar.go handler.go class_data.go errors.go ./
COPY main.go sidecar.go handler.go class_data.go errors.go watcher.go updater.go ./

# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o manager *.go
Expand Down
21 changes: 18 additions & 3 deletions class_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,26 @@ import (
"github.com/influxdata/toml"
)

type classDataHandler struct {
// directoryClassDataHandler provides a handler for getting class data from class name.
type directoryClassDataHandler struct {
Logger logr.Logger
TelegrafClassesDirectory string
}

func (c *classDataHandler) validateClassData() error {
// classDataHandler defines interface for validating class data and converting from class name to class data.
type classDataHandler interface {
getData(className string) (string, error)
validateClassData() error
}

func newDirectoryClassDataHandler(logger logr.Logger, telegrafClassesDirectory string) *directoryClassDataHandler {
return &directoryClassDataHandler{
Logger: logger,
TelegrafClassesDirectory: telegrafClassesDirectory,
}
}

func (c *directoryClassDataHandler) validateClassData() error {
classDataValid := true
filesAvailable := false

Expand Down Expand Up @@ -72,7 +86,8 @@ func (c *classDataHandler) validateClassData() error {
return nil
}

func (c *classDataHandler) getData(className string) (string, error) {
// getData returns class data for a given class name.
func (c *directoryClassDataHandler) getData(className string) (string, error) {
data, err := ioutil.ReadFile(filepath.Join(c.TelegrafClassesDirectory, className))

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions class_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Test_classDataHandler_getData(t *testing.T) {
dir := createTempClassesDirectory(t, tt.classes)
defer os.RemoveAll(dir)

testClassDataHandler := &classDataHandler{
testClassDataHandler := &directoryClassDataHandler{
Logger: logger,
TelegrafClassesDirectory: dir,
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func Test_classDataHandler_validateClassData(t *testing.T) {
dir := createTempClassesDirectory(t, tt.classes)
defer os.RemoveAll(dir)

testClassDataHandler := &classDataHandler{
testClassDataHandler := &directoryClassDataHandler{
Logger: logger,
TelegrafClassesDirectory: dir,
}
Expand Down
9 changes: 9 additions & 0 deletions deploy/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["*"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down Expand Up @@ -100,6 +106,9 @@ spec:
# allow injecting telegraf-istio sidecar for pods with
# istio sidecar annotations enabled
- --enable-istio-injection=true
# if telegraf image supports it, telegraf-watch-config can be set so
# telegraf-opeator and telegraf hot reload changes when classes change
# - --telegraf-watch-config=inotify
env:
- name: POD_NAMESPACE
valueFrom:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/influxdata/telegraf-operator
go 1.16

require (
github.com/fsnotify/fsnotify v1.4.9
github.com/go-logr/logr v0.3.0
github.com/influxdata/toml v0.0.0-20180607005434-2a2e3012f7cf
github.com/naoina/go-stringutil v0.1.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type podInjector struct {
decoder *admission.Decoder
names.NameGenerator
Logger logr.Logger
ClassDataHandler *classDataHandler
ClassDataHandler *directoryClassDataHandler
SidecarHandler *sidecarHandler
RequireAnnotationsForSecret bool
}
Expand Down
2 changes: 1 addition & 1 deletion handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ func Test_podInjector_Handle(t *testing.T) {

logger := &logrTesting.TestLogger{T: t}

testClassDataHandler := &classDataHandler{
testClassDataHandler := &directoryClassDataHandler{
Logger: logger,
TelegrafClassesDirectory: dir,
}
Expand Down
17 changes: 13 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ func main() {

logger := setupLog.WithName("podInjector")

classData := &classDataHandler{
Logger: logger,
TelegrafClassesDirectory: telegrafClassesDirectory,
}
classData := newDirectoryClassDataHandler(logger, telegrafClassesDirectory)

err = classData.validateClassData()
if err != nil {
Expand Down Expand Up @@ -151,6 +148,18 @@ func main() {
os.Exit(1)
}

updater, err := newSecretsUpdater(ctrl.Log.WithName("updater"), sidecar)
if err != nil {
setupLog.Error(err, "setting up secrets updater failed")
os.Exit(1)
}

_, err = newTelegrafClassesWatcher(ctrl.Log.WithName("watcher"), telegrafClassesDirectory, updater.onChange)
if err != nil {
setupLog.Error(err, "setting up watcher failed")
os.Exit(1)
}

hookServer.Register("/mutate-v1-pod", &webhook.Admission{Handler: &podInjector{
Logger: logger,
SidecarHandler: sidecar,
Expand Down
21 changes: 13 additions & 8 deletions sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ const (
TelegrafSecretLabelPod = "telegraf.influxdata.com/pod"
)

// sidecarHandler provides logic for handling telegraf sidecars and related secrets.
type sidecarHandler struct {
ClassDataHandler *classDataHandler
ClassDataHandler classDataHandler
Logger logr.Logger
TelegrafDefaultClass string
TelegrafImage string
Expand Down Expand Up @@ -171,12 +172,7 @@ func (h *sidecarHandler) addTelegrafSidecar(result *sidecarHandlerResponse, pod
className = extClass
}

classData, err := h.ClassDataHandler.getData(className)
if err != nil {
return newNonFatalError(err, "telegraf-operator could not create sidecar container for unknown class")
}

telegrafConf, err := h.assembleConf(pod, classData)
telegrafConf, err := h.assembleConf(pod, className)
if err != nil {
return newNonFatalError(err, "telegraf-operator could not create sidecar container due to error in class data")
}
Expand Down Expand Up @@ -217,8 +213,17 @@ func (h *sidecarHandler) addContainerAndSecret(result *sidecarHandlerResponse, p
return nil
}

func (h *sidecarHandler) getClassData(className string) (string, error) {
return h.ClassDataHandler.getData(className)
}

// Assembling telegraf configuration
func (h *sidecarHandler) assembleConf(pod *corev1.Pod, classData string) (telegrafConf string, err error) {
func (h *sidecarHandler) assembleConf(pod *corev1.Pod, className string) (telegrafConf string, err error) {
classData, err := h.ClassDataHandler.getData(className)
if err != nil {
return "", newNonFatalError(err, "telegraf-operator could not create sidecar container for unknown class")
}

ports := ports(pod)
if len(ports) != 0 {
path := "/metrics"
Expand Down
31 changes: 27 additions & 4 deletions sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"fmt"
"os"
"reflect"
"strings"
Expand Down Expand Up @@ -54,6 +55,27 @@ stringData:
type: Opaque`
)

type mockClassDataHandler struct {
classes map[string]string
}

func newMockClassDataHandler(classes map[string]string) *mockClassDataHandler {
return &mockClassDataHandler{classes: classes}
}

func (h *mockClassDataHandler) validateClassData() error {
return nil
}

func (m *mockClassDataHandler) getData(className string) (string, error) {
v, ok := m.classes[className]
if ok {
return v, nil
} else {
return "", fmt.Errorf("class %s not found", className)
}
}

func Test_skip(t *testing.T) {
handler := &sidecarHandler{
RequestsCPU: defaultRequestsCPU,
Expand Down Expand Up @@ -260,14 +282,15 @@ func Test_assembleConf(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {

handler := &sidecarHandler{
ClassDataHandler: newMockClassDataHandler(map[string]string{"class": tt.classData}),
EnableDefaultInternalPlugin: tt.enableDefaultInternalPlugin,
RequestsCPU: defaultRequestsCPU,
RequestsMemory: defaultRequestsMemory,
LimitsCPU: defaultLimitsCPU,
LimitsMemory: defaultLimitsMemory,
Logger: &logrTesting.TestLogger{T: t},
}
gotConfig, err := handler.assembleConf(tt.pod, tt.classData)
gotConfig, err := handler.assembleConf(tt.pod, "class")
if (err != nil) != tt.wantErr {
t.Errorf("assembleConf() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -592,7 +615,7 @@ status: {}
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
{
Name: "telegraf",
Image: "alpine:latest",
},
Expand Down Expand Up @@ -690,7 +713,7 @@ status: {}
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
{
Name: "telegraf-istio",
Image: "alpine:latest",
},
Expand Down Expand Up @@ -896,7 +919,7 @@ status: {}

logger := &logrTesting.TestLogger{T: t}

testClassDataHandler := &classDataHandler{
testClassDataHandler := &directoryClassDataHandler{
Logger: logger,
TelegrafClassesDirectory: dir,
}
Expand Down
115 changes: 115 additions & 0 deletions updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package main

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

// secretsUpdater updates all secrets managed by telegraf-operator whose contents have changed in all namespaces.
type secretsUpdater struct {
logger logr.Logger
clientset kubernetes.Interface
batchDelay time.Duration
assembleConf func(*corev1.Pod, string) (string, error)
}

// newSecretsUpdater creates new instance of secretsUpdater.
func newSecretsUpdater(logger logr.Logger, sidecar *sidecarHandler) (*secretsUpdater, error) {
cfg, err := config.GetConfig()
if err != nil {
return nil, err
}

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}

return &secretsUpdater{
logger: logger,
clientset: clientset,
batchDelay: 10 * time.Second,
assembleConf: sidecar.assembleConf,
}, nil
}

// onChange updates secrets all namespaces, handling and logging errors internally
func (u *secretsUpdater) onChange() {
u.logger.Info("checking secrets for updater")

ctx := context.Background()

// find all namespaces and find all telegraf-operator managed secrets in each namespace
namespaces, err := u.clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
u.logger.Error(err, "unable to list namespaces")
return
}

// iterate over all namespaces, trying to update all telegraf-operator managed secrets
for _, namespace := range namespaces.Items {
err = u.updateSecretsInNamespace(ctx, namespace.Name)
if err != nil {
u.logger.Error(err, "unable to update secrets", "namespace", namespace)
return
}
}
}

// updateSecretsInNamespace updates secrets in a single namespace, returning errors if they occur
func (u *secretsUpdater) updateSecretsInNamespace(ctx context.Context, namespace string) error {
secretsClient := u.clientset.CoreV1().Secrets(namespace)

// find all secrets having the label set by telegraf-operator, limiting results only to secrets
// that the telegraf-operator is managing
secrets, err := secretsClient.List(ctx, metav1.ListOptions{
LabelSelector: TelegrafSecretLabelClassName,
})
if err != nil {
return err
}

for _, secret := range secrets.Items {
// get the pod and class name labels
podName := secret.GetLabels()[TelegrafSecretLabelPod]
className := secret.GetLabels()[TelegrafSecretLabelClassName]

// if one of the labels was not present, throw an error
if podName == "" || className == "" {
return fmt.Errorf(`unable to get pod and class name for secret %s in namespace %s; podName="%s"; className="%s"`, secret.Name, secret.Namespace, podName, className)
}

// get the pod that the secret is used in
pod, err := u.clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return err
}

telegrafConf, err := u.assembleConf(pod, className)
if err != nil {
return err
}

// check whether secret should be updated, perform the update if needed
if string(secret.Data[TelegrafSecretDataKey]) != telegrafConf {
u.logger.Info("updating secret", "namespace", namespace, "name", secret.Name, "podName", podName, "class", className)
secret.Data[TelegrafSecretDataKey] = []byte(telegrafConf)

_, err = secretsClient.Update(ctx, &secret, metav1.UpdateOptions{})
if err != nil {
return err
}
} else {
u.logger.Info("not updating secret", "namespace", namespace, "name", secret.Name, "podName", podName, "class", className)
}
}

return nil
}
Loading

0 comments on commit fd1f79c

Please sign in to comment.