Skip to content

Commit

Permalink
Update PreventUpdate behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelattwood committed Dec 31, 2024
1 parent 67b03ec commit bb40ab3
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 18 deletions.
34 changes: 29 additions & 5 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger

func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger, consumer *api.Consumer) error {
// Create or Update the stream based on the spec
if consumer.Spec.PreventUpdate || r.ReadOnly() {
if r.ReadOnly() {
log.Info("Skipping consumer creation or update.",
"preventDelete", consumer.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
return nil
Expand All @@ -170,9 +169,34 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
}

err = r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error {
log.Info("Consumer created or updated.")
_, err := js.CreateOrUpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
return err
consumerName := targetConfig.Name
if consumerName == "" {
consumerName = targetConfig.Durable
}

exists := false
_, err := js.Consumer(ctx, consumer.Spec.StreamName, consumerName)
if err == nil {
exists = true
}

if !exists {
log.Info("Creating Consumer.")
_, err := js.CreateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
return err
}

if !consumer.Spec.PreventUpdate {
log.Info("Updating Consumer.")
_, err := js.UpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
return err
} else {
log.Info("Skipping Consumer update.",
"preventUpdate", consumer.Spec.PreventUpdate,
)
}

return nil
})
if err != nil {
err = fmt.Errorf("create or update consumer: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/consumer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,15 @@ var _ = Describe("Consumer Controller", func() {
consumer.Spec.PreventUpdate = true
Expect(k8sClient.Update(ctx, consumer)).To(Succeed())
})
It("should not create the consumer", func(ctx SpecContext) {
It("should create the consumer", func(ctx SpecContext) {
By("running Reconcile")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
Expect(result.IsZero()).To(BeTrue())

By("checking that no consumer was created")
By("checking that consumer was created")
_, err = jsClient.Consumer(ctx, streamName, consumerName)
Expect(err).To(MatchError(jetstream.ErrConsumerNotFound))
Expect(err).ToNot(HaveOccurred())
})
It("should not update the consumer", func(ctx SpecContext) {
By("creating the consumer")
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/keyvalue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,20 @@ func (r *KeyValueReconciler) createOrUpdate(ctx context.Context, log logr.Logger
// UpdateKeyValue is called on every reconciliation when the stream is not to be deleted.
// TODO(future-feature): Do we need to check if config differs?
err = r.WithJetStreamClient(keyValueConnOpts(keyValue.Spec), func(js jetstream.JetStream) error {
log.Info("Creating or updating KeyValue.")

exists := false
_, err := js.KeyValue(ctx, targetConfig.Bucket)
if err == nil {
exists = true
}

if !exists {
log.Info("Creating KeyValue.")
_, err = js.CreateKeyValue(ctx, targetConfig)
return err
}

if !keyValue.Spec.PreventUpdate {
log.Info("Updating KeyValue.")
_, err = js.UpdateKeyValue(ctx, targetConfig)
return err
} else {
Expand Down
29 changes: 24 additions & 5 deletions internal/controller/stream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,8 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st

func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, stream *api.Stream) error {
// Create or Update the stream based on the spec
if stream.Spec.PreventUpdate || r.ReadOnly() {
if r.ReadOnly() {
log.Info("Skipping stream creation or update.",
"preventDelete", stream.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
return nil
Expand All @@ -169,9 +168,29 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger,
// CreateOrUpdateStream is called on every reconciliation when the stream is not to be deleted.
// TODO(future-feature): Do we need to check if config differs?
err = r.WithJetStreamClient(streamConnOpts(stream.Spec), func(js jetstream.JetStream) error {
log.Info("Creating or updating stream.")
_, err = js.CreateOrUpdateStream(ctx, targetConfig)
return err
exists := false
_, err := js.Stream(ctx, targetConfig.Name)
if err == nil {
exists = true
}

if !exists {
log.Info("Creating Stream.")
_, err := js.CreateStream(ctx, targetConfig)
return err
}

if !stream.Spec.PreventUpdate {
log.Info("Updating Stream.")
_, err := js.UpdateStream(ctx, targetConfig)
return err
} else {
log.Info("Skipping Stream update.",
"preventUpdate", stream.Spec.PreventUpdate,
)
}

return nil
})
if err != nil {
err = fmt.Errorf("create or update stream: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/stream_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,15 @@ var _ = Describe("Stream Controller", func() {
stream.Spec.PreventUpdate = true
Expect(k8sClient.Update(ctx, stream)).To(Succeed())
})
It("should not create the stream", func(ctx SpecContext) {
It("should create the stream", func(ctx SpecContext) {
By("running Reconcile")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
Expect(result.IsZero()).To(BeTrue())

By("checking that no stream was created")
By("checking that stream was created")
_, err = jsClient.Stream(ctx, streamName)
Expect(err).To(MatchError(jetstream.ErrStreamNotFound))
Expect(err).NotTo(HaveOccurred())
})
It("should not update the stream", func(ctx SpecContext) {
By("creating the stream")
Expand Down

0 comments on commit bb40ab3

Please sign in to comment.