Skip to content

Commit

Permalink
feat(controller-runtime): Add consumer controller (#212)
Browse files Browse the repository at this point in the history
* implement consumerSpecToConfig

* implement consumer resource initialization

* implement consumer update/creation

* implement preventUpdate, readonly and namespace restrictions

Checks for the PreventUpdate or readonly mode during creation/update.
Skips reconciliation when resource is in namespace not matching restriction.

* test consumer creation on alternative server

* implement consumer deletion

* handle deletion when the underlying stream was deleted

* add missing GenerationChanged event filter to consumerReconciler

* update logging

Set streamName and consumerName fields once.
Reword log messages.
  • Loading branch information
adriandieter authored Dec 31, 2024
1 parent 0f218f5 commit 88f61b2
Show file tree
Hide file tree
Showing 3 changed files with 888 additions and 33 deletions.
265 changes: 262 additions & 3 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,18 @@ package controller

import (
"context"
"errors"
"fmt"
"github.com/go-logr/logr"
"github.com/nats-io/nats.go/jetstream"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"time"

jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand All @@ -36,14 +45,264 @@ type ConsumerReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile
func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := klog.FromContext(ctx)
log.Info("reconcile", "namespace", req.Namespace, "name", req.Name)

if ok := r.ValidNamespace(req.Namespace); !ok {
log.Info("Controller restricted to namespace, skipping reconciliation.")
return ctrl.Result{}, nil
}

// Fetch consumer resource
consumer := &api.Consumer{}
if err := r.Get(ctx, req.NamespacedName, consumer); err != nil {
if apierrors.IsNotFound(err) {
log.Info("Consumer resource not found. Ignoring since object must be deleted.")
return ctrl.Result{}, nil
}
return ctrl.Result{}, fmt.Errorf("get consumer resource '%s': %w", req.NamespacedName.String(), err)
}

log = log.WithValues(
"streamName", consumer.Spec.StreamName,
"consumerName", consumer.Spec.DurableName,
)

// Update ready status to unknown when no status is set
if consumer.Status.Conditions == nil || len(consumer.Status.Conditions) == 0 {
log.Info("Setting initial ready condition to unknown.")
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation")
err := r.Status().Update(ctx, consumer)
if err != nil {
return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err)
}
return ctrl.Result{Requeue: true}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(consumer, consumerFinalizer) {
log.Info("Adding consumer finalizer.")
if ok := controllerutil.AddFinalizer(consumer, consumerFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to consumer resource")
}

if err := r.Update(ctx, consumer); err != nil {
return ctrl.Result{}, fmt.Errorf("update consumer resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Check Deletion
markedForDeletion := consumer.GetDeletionTimestamp() != nil
if markedForDeletion {
if controllerutil.ContainsFinalizer(consumer, consumerFinalizer) {
err := r.deleteConsumer(ctx, log, consumer)
if err != nil {
return ctrl.Result{}, fmt.Errorf("delete consumer: %w", err)
}
} else {
log.Info("Consumer marked for deletion and already finalized. Ignoring.")
}

return ctrl.Result{}, nil
}

// Create or update stream
if err := r.createOrUpdate(ctx, log, consumer); err != nil {
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
}
return ctrl.Result{}, nil
}

func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger, consumer *api.Consumer) error {

// Set status to not false
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.")
if err := r.Status().Update(ctx, consumer); err != nil {
return fmt.Errorf("update ready condition: %w", err)
}

if !consumer.Spec.PreventDelete && !r.ReadOnly() {
err := r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error {
return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
})
switch {
case errors.Is(err, jetstream.ErrConsumerNotFound):
log.Info("Consumer does not exist. Unable to delete.")
case errors.Is(err, jetstream.ErrStreamNotFound):
log.Info("Stream of consumer does not exist. Unable to delete.")
case err != nil:
return fmt.Errorf("delete jetstream consumer: %w", err)
default:
log.Info("Consumer deleted.")
}
} else {
log.Info("Skipping consumer deletion.",
"consumerName", consumer.Spec.DurableName,
"preventDelete", consumer.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
}

log.Info("Removing consumer finalizer.")
if ok := controllerutil.RemoveFinalizer(consumer, consumerFinalizer); !ok {
return errors.New("failed to remove consumer finalizer")
}
if err := r.Update(ctx, consumer); err != nil {
return fmt.Errorf("remove finalizer: %w", err)
}

return nil
}

func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger, consumer *api.Consumer) error {

// Create or Update the stream based on the spec
if consumer.Spec.PreventUpdate || r.ReadOnly() {
log.Info("Skipping consumer creation or update.",
"preventDelete", consumer.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
return nil
}

// Map spec to consumer target config
targetConfig, err := consumerSpecToConfig(&consumer.Spec)
if err != nil {
return fmt.Errorf("map consumer spec to target config: %w", err)
}

err = r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error {
log.Info("Consumer created or updated.")
_, err := js.CreateOrUpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
return err
})
if err != nil {
err = fmt.Errorf("create or update consumer: %w", err)
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Errored", err.Error())
if err := r.Status().Update(ctx, consumer); err != nil {
log.Error(err, "Failed to update ready condition to Errored.")
}
return err
}

// update the observed generation and ready status
consumer.Status.ObservedGeneration = consumer.Generation
consumer.Status.Conditions = updateReadyCondition(
consumer.Status.Conditions,
v1.ConditionTrue,
"Reconciling",
"Consumer successfully created or updated.",
)
err = r.Status().Update(ctx, consumer)
if err != nil {
return fmt.Errorf("update ready condition: %w", err)
}

return nil
}

func consumerConnOpts(spec api.ConsumerSpec) *connectionOptions {
return &connectionOptions{
Account: spec.Account,
Creds: spec.Creds,
Nkey: spec.Nkey,
Servers: spec.Servers,
TLS: spec.TLS,
}
}

func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) {

config := &jetstream.ConsumerConfig{
Durable: spec.DurableName,
Description: spec.Description,
OptStartSeq: uint64(spec.OptStartSeq),
MaxDeliver: spec.MaxDeliver,
FilterSubject: spec.FilterSubject,
RateLimit: uint64(spec.RateLimitBps),
SampleFrequency: spec.SampleFreq,
MaxWaiting: spec.MaxWaiting,
MaxAckPending: spec.MaxAckPending,
HeadersOnly: spec.HeadersOnly,
MaxRequestBatch: spec.MaxRequestBatch,
MaxRequestMaxBytes: spec.MaxRequestMaxBytes,
Replicas: spec.Replicas,
MemoryStorage: spec.MemStorage,
FilterSubjects: spec.FilterSubjects,
Metadata: spec.Metadata,

// Explicitly set not (yet) mapped fields
Name: "",
InactiveThreshold: 0,
}

// DeliverPolicy
if spec.DeliverPolicy != "" {
err := config.DeliverPolicy.UnmarshalJSON(asJsonString(spec.DeliverPolicy))
if err != nil {
return nil, fmt.Errorf("invalid delivery policy: %w", err)
}
}

// OptStartTime RFC3339
if spec.OptStartTime != "" {
t, err := time.Parse(time.RFC3339, spec.OptStartTime)
if err != nil {
return nil, fmt.Errorf("invalid opt start time: %w", err)
}
config.OptStartTime = &t
}

// AckPolicy
if spec.AckPolicy != "" {
err := config.AckPolicy.UnmarshalJSON(asJsonString(spec.AckPolicy))
if err != nil {
return nil, fmt.Errorf("invalid ack policy: %w", err)
}
}

// AckWait
if spec.AckWait != "" {
d, err := time.ParseDuration(spec.AckWait)
if err != nil {
return nil, fmt.Errorf("invalid ack wait duration: %w", err)
}
config.AckWait = d
}

//BackOff
for _, bo := range spec.BackOff {
d, err := time.ParseDuration(bo)
if err != nil {
return nil, fmt.Errorf("invalid backoff: %w", err)
}

config.BackOff = append(config.BackOff, d)
}

// ReplayPolicy
if spec.ReplayPolicy != "" {
err := config.ReplayPolicy.UnmarshalJSON(asJsonString(spec.ReplayPolicy))
if err != nil {
return nil, fmt.Errorf("invalid replay policy: %w", err)
}
}

// MaxRequestExpires
if spec.MaxRequestExpires != "" {
d, err := time.ParseDuration(spec.MaxRequestExpires)
if err != nil {
return nil, fmt.Errorf("invalid opt start time: %w", err)
}
config.MaxRequestExpires = d
}

return config, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&jetstreamnatsiov1beta2.Consumer{}).
For(&api.Consumer{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}
Loading

0 comments on commit 88f61b2

Please sign in to comment.