diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index a6b7be10..aca4f5c3 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -155,9 +155,8 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger, consumer *api.Consumer) error { // Create or Update the stream based on the spec - if consumer.Spec.PreventUpdate || r.ReadOnly() { + if r.ReadOnly() { log.Info("Skipping consumer creation or update.", - "preventDelete", consumer.Spec.PreventDelete, "read-only", r.ReadOnly(), ) return nil @@ -170,9 +169,34 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger } err = r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error { - log.Info("Consumer created or updated.") - _, err := js.CreateOrUpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig) - return err + consumerName := targetConfig.Name + if consumerName == "" { + consumerName = targetConfig.Durable + } + + exists := false + _, err := js.Consumer(ctx, consumer.Spec.StreamName, consumerName) + if err == nil { + exists = true + } + + if !exists { + log.Info("Creating Consumer.") + _, err := js.CreateConsumer(ctx, consumer.Spec.StreamName, *targetConfig) + return err + } + + if !consumer.Spec.PreventUpdate { + log.Info("Updating Consumer.") + _, err := js.UpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig) + return err + } else { + log.Info("Skipping Consumer update.", + "preventUpdate", consumer.Spec.PreventUpdate, + ) + } + + return nil }) if err != nil { err = fmt.Errorf("create or update consumer: %w", err) diff --git a/internal/controller/consumer_controller_test.go b/internal/controller/consumer_controller_test.go index ab973e4b..bf5410a1 100644 --- a/internal/controller/consumer_controller_test.go +++ b/internal/controller/consumer_controller_test.go @@ -290,15 +290,15 @@ var _ = Describe("Consumer Controller", func() { consumer.Spec.PreventUpdate = true Expect(k8sClient.Update(ctx, consumer)).To(Succeed()) }) - It("should not create the consumer", func(ctx SpecContext) { + It("should create the consumer", func(ctx SpecContext) { By("running Reconcile") result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) Expect(err).NotTo(HaveOccurred()) Expect(result.IsZero()).To(BeTrue()) - By("checking that no consumer was created") + By("checking that consumer was created") _, err = jsClient.Consumer(ctx, streamName, consumerName) - Expect(err).To(MatchError(jetstream.ErrConsumerNotFound)) + Expect(err).ToNot(HaveOccurred()) }) It("should not update the consumer", func(ctx SpecContext) { By("creating the consumer") diff --git a/internal/controller/keyvalue_controller.go b/internal/controller/keyvalue_controller.go index ce07ff15..bf072d35 100644 --- a/internal/controller/keyvalue_controller.go +++ b/internal/controller/keyvalue_controller.go @@ -168,8 +168,6 @@ func (r *KeyValueReconciler) createOrUpdate(ctx context.Context, log logr.Logger // UpdateKeyValue is called on every reconciliation when the stream is not to be deleted. // TODO(future-feature): Do we need to check if config differs? err = r.WithJetStreamClient(keyValueConnOpts(keyValue.Spec), func(js jetstream.JetStream) error { - log.Info("Creating or updating KeyValue.") - exists := false _, err := js.KeyValue(ctx, targetConfig.Bucket) if err == nil { @@ -177,11 +175,13 @@ func (r *KeyValueReconciler) createOrUpdate(ctx context.Context, log logr.Logger } if !exists { + log.Info("Creating KeyValue.") _, err = js.CreateKeyValue(ctx, targetConfig) return err } if !keyValue.Spec.PreventUpdate { + log.Info("Updating KeyValue.") _, err = js.UpdateKeyValue(ctx, targetConfig) return err } else { diff --git a/internal/controller/stream_controller.go b/internal/controller/stream_controller.go index 6159b46a..660f01c0 100644 --- a/internal/controller/stream_controller.go +++ b/internal/controller/stream_controller.go @@ -152,9 +152,8 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, stream *api.Stream) error { // Create or Update the stream based on the spec - if stream.Spec.PreventUpdate || r.ReadOnly() { + if r.ReadOnly() { log.Info("Skipping stream creation or update.", - "preventDelete", stream.Spec.PreventDelete, "read-only", r.ReadOnly(), ) return nil @@ -169,9 +168,29 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, // CreateOrUpdateStream is called on every reconciliation when the stream is not to be deleted. // TODO(future-feature): Do we need to check if config differs? err = r.WithJetStreamClient(streamConnOpts(stream.Spec), func(js jetstream.JetStream) error { - log.Info("Creating or updating stream.") - _, err = js.CreateOrUpdateStream(ctx, targetConfig) - return err + exists := false + _, err := js.Stream(ctx, targetConfig.Name) + if err == nil { + exists = true + } + + if !exists { + log.Info("Creating Stream.") + _, err := js.CreateStream(ctx, targetConfig) + return err + } + + if !stream.Spec.PreventUpdate { + log.Info("Updating Stream.") + _, err := js.UpdateStream(ctx, targetConfig) + return err + } else { + log.Info("Skipping Stream update.", + "preventUpdate", stream.Spec.PreventUpdate, + ) + } + + return nil }) if err != nil { err = fmt.Errorf("create or update stream: %w", err) diff --git a/internal/controller/stream_controller_test.go b/internal/controller/stream_controller_test.go index 41bf16b3..e2506a7f 100644 --- a/internal/controller/stream_controller_test.go +++ b/internal/controller/stream_controller_test.go @@ -211,15 +211,15 @@ var _ = Describe("Stream Controller", func() { stream.Spec.PreventUpdate = true Expect(k8sClient.Update(ctx, stream)).To(Succeed()) }) - It("should not create the stream", func(ctx SpecContext) { + It("should create the stream", func(ctx SpecContext) { By("running Reconcile") result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) Expect(err).NotTo(HaveOccurred()) Expect(result.IsZero()).To(BeTrue()) - By("checking that no stream was created") + By("checking that stream was created") _, err = jsClient.Stream(ctx, streamName) - Expect(err).To(MatchError(jetstream.ErrStreamNotFound)) + Expect(err).NotTo(HaveOccurred()) }) It("should not update the stream", func(ctx SpecContext) { By("creating the stream")