Skip to content

Commit

Permalink
Revert consumer name change
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelattwood committed Jan 13, 2025
1 parent 24c422a commit ff5d00a
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 84 deletions.
48 changes: 15 additions & 33 deletions controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (c *Controller) processConsumerObject(cns *apis.Consumer, jsm jsmClientFunc
switch {
case createOK:
c.normalEvent(cns, "Creating",
fmt.Sprintf("Creating consumer %q on stream %q", consumerName(spec), spec.StreamName))
fmt.Sprintf("Creating consumer %q on stream %q", spec.DurableName, spec.StreamName))
if err := natsClientUtil(createConsumer); err != nil {
return err
}
Expand All @@ -207,39 +207,39 @@ func (c *Controller) processConsumerObject(cns *apis.Consumer, jsm jsmClientFunc
return err
}
c.normalEvent(cns, "Created",
fmt.Sprintf("Created consumer %q on stream %q", consumerName(spec), spec.StreamName))
fmt.Sprintf("Created consumer %q on stream %q", spec.DurableName, spec.StreamName))
case updateOK:
if cns.Spec.PreventUpdate {
c.normalEvent(cns, "SkipUpdate", fmt.Sprintf("Skip updating consumer %q on stream %q", consumerName(spec), spec.StreamName))
c.normalEvent(cns, "SkipUpdate", fmt.Sprintf("Skip updating consumer %q on stream %q", spec.DurableName, spec.StreamName))
if _, err := setConsumerOK(c.ctx, cns, ifc); err != nil {
return err
}
return nil
}
c.normalEvent(cns, "Updating", fmt.Sprintf("Updating consumer %q on stream %q", consumerName(spec), spec.StreamName))
c.normalEvent(cns, "Updating", fmt.Sprintf("Updating consumer %q on stream %q", spec.DurableName, spec.StreamName))
if err := natsClientUtil(updateConsumer); err != nil {
return err
}

if _, err := setConsumerOK(c.ctx, cns, ifc); err != nil {
return err
}
c.normalEvent(cns, "Updated", fmt.Sprintf("Updated consumer %q on stream %q", consumerName(spec), spec.StreamName))
c.normalEvent(cns, "Updated", fmt.Sprintf("Updated consumer %q on stream %q", spec.DurableName, spec.StreamName))
case deleteOK:
if cns.Spec.PreventDelete {
c.normalEvent(cns, "SkipDelete", fmt.Sprintf("Skip deleting consumer %q on stream %q", consumerName(spec), spec.StreamName))
c.normalEvent(cns, "SkipDelete", fmt.Sprintf("Skip deleting consumer %q on stream %q", spec.DurableName, spec.StreamName))
if _, err := setConsumerOK(c.ctx, cns, ifc); err != nil {
return err
}
return nil
}
c.normalEvent(cns, "Deleting", fmt.Sprintf("Deleting consumer %q on stream %q", consumerName(spec), spec.StreamName))
c.normalEvent(cns, "Deleting", fmt.Sprintf("Deleting consumer %q on stream %q", spec.DurableName, spec.StreamName))
if err := natsClientUtil(deleteConsumer); err != nil {
return err
}
default:
c.normalEvent(cns, "Noop", fmt.Sprintf("Nothing done for consumer %q (prevent-delete=%v, prevent-update=%v)",
consumerName(spec), spec.PreventDelete, spec.PreventUpdate,
spec.DurableName, spec.PreventDelete, spec.PreventUpdate,
))
if _, err := setConsumerOK(c.ctx, cns, ifc); err != nil {
return err
Expand All @@ -256,14 +256,14 @@ func consumerExists(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (e
}
}()

_, err = c.LoadConsumer(ctx, spec.StreamName, consumerName(spec))
_, err = c.LoadConsumer(ctx, spec.StreamName, spec.DurableName)
return err
}

func createConsumer(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to create consumer %q on stream %q: %w", consumerName(spec), spec.StreamName, err)
err = fmt.Errorf("failed to create consumer %q on stream %q: %w", spec.DurableName, spec.StreamName, err)
}
}()

Expand All @@ -278,11 +278,11 @@ func createConsumer(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (e
func updateConsumer(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to update consumer %q on stream %q: %w", consumerName(spec), spec.StreamName, err)
err = fmt.Errorf("failed to update consumer %q on stream %q: %w", spec.DurableName, spec.StreamName, err)
}
}()

js, err := c.LoadConsumer(ctx, spec.StreamName, consumerName(spec))
js, err := c.LoadConsumer(ctx, spec.StreamName, spec.DurableName)
if err != nil {
return
}
Expand All @@ -298,7 +298,7 @@ func updateConsumer(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (e

func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts := []jsm.ConsumerOption{
jsm.ConsumerName(spec.Name),
jsm.DurableName(spec.DurableName),
jsm.DeliverySubject(spec.DeliverSubject),
jsm.RateLimitBitsPerSecond(uint64(spec.RateLimitBps)),
jsm.MaxAckPending(uint(spec.MaxAckPending)),
Expand All @@ -310,15 +310,6 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
jsm.ConsumerOverrideReplicas(spec.Replicas),
}

// Support deprecated option
if spec.DurableName != "" {
if spec.Name != "" && spec.Name != spec.DurableName {
return nil, fmt.Errorf("durable name and name must be the same")
}

opts = append(opts, jsm.DurableName(spec.DurableName))
}

if spec.FilterSubject != "" && len(spec.FilterSubjects) > 0 {
return nil, fmt.Errorf("cannot specify both FilterSubject and FilterSubjects")
}
Expand Down Expand Up @@ -444,7 +435,7 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
}

func deleteConsumer(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (err error) {
stream, consumer := spec.StreamName, consumerName(spec)
stream, consumer := spec.StreamName, spec.DurableName
defer func() {
if err != nil {
err = fmt.Errorf("failed to delete consumer %q on stream %q: %w", consumer, stream, err)
Expand Down Expand Up @@ -486,7 +477,7 @@ func setConsumerOK(ctx context.Context, s *apis.Consumer, i typed.ConsumerInterf
defer cancel()
res, err = i.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to set consumer %q status: %w", consumerName(s.Spec), err)
return fmt.Errorf("failed to set consumer %q status: %w", s.Spec.DurableName, err)
}
return nil
})
Expand Down Expand Up @@ -520,12 +511,3 @@ func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.Consume
})
return res, err
}

// Durable is a deprecated field with all consumers having names.
func consumerName(spec apis.ConsumerSpec) string {
if spec.Name != "" {
return spec.Name
}

return spec.DurableName
}
28 changes: 14 additions & 14 deletions controllers/jetstream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestProcessConsumer(t *testing.T) {
Generation: 1,
},
Spec: apis.ConsumerSpec{
Name: name,
DurableName: name,
DeliverPolicy: "byStartTime",
OptStartTime: time.Now().Format(time.RFC3339),
AckPolicy: "explicit",
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestProcessConsumer(t *testing.T) {
Generation: 1,
},
Spec: apis.ConsumerSpec{
Name: name,
DurableName: name,
DeliverPolicy: "invalid",
},
})
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestProcessConsumer(t *testing.T) {
Generation: 2,
},
Spec: apis.ConsumerSpec{
Name: name,
DurableName: name,
},
Status: apis.Status{
ObservedGeneration: 1,
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestProcessConsumer(t *testing.T) {
DeletionTimestamp: &ts,
},
Spec: apis.ConsumerSpec{
Name: name,
DurableName: name,
},
})
if err != nil {
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestProcessConsumer(t *testing.T) {
Generation: 1,
},
Spec: apis.ConsumerSpec{
Name: name,
DurableName: name,
},
})
if err != nil {
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestConsumerSpecToOpts(t *testing.T) {
}{
"valid consumer spec": {
given: apis.ConsumerSpec{
Name: "my-consumer",
DurableName: "my-consumer",
DeliverPolicy: "byStartSequence",
OptStartSeq: 10,
AckPolicy: "explicit",
Expand All @@ -356,7 +356,7 @@ func TestConsumerSpecToOpts(t *testing.T) {
AckPolicy: jsmapi.AckExplicit,
AckWait: 1 * time.Minute,
DeliverPolicy: jsmapi.DeliverByStartSequence,
Name: "my-consumer",
Durable: "my-consumer",
Heartbeat: 30 * time.Second,
BackOff: []time.Duration{500 * time.Millisecond, 1 * time.Second},
OptStartSeq: 10,
Expand All @@ -369,15 +369,15 @@ func TestConsumerSpecToOpts(t *testing.T) {
},
"valid consumer spec, defaults only": {
given: apis.ConsumerSpec{
Name: "my-consumer",
DurableName: "my-consumer",
},
expected: jsmapi.ConsumerConfig{
Name: "my-consumer",
Durable: "my-consumer",
},
},
"invalid deliver policy value": {
given: apis.ConsumerSpec{
Name: "my-consumer",
DurableName: "my-consumer",
DeliverPolicy: "invalid",
},
errCheck: func(t *testing.T, err error) {
Expand All @@ -387,7 +387,7 @@ func TestConsumerSpecToOpts(t *testing.T) {
},
"missing start time for deliver policy byStartTime": {
given: apis.ConsumerSpec{
Name: "my-consumer",
DurableName: "my-consumer",
DeliverPolicy: "byStartTime",
},
errCheck: func(t *testing.T, err error) {
Expand All @@ -397,8 +397,8 @@ func TestConsumerSpecToOpts(t *testing.T) {
},
"invalid ack policy": {
given: apis.ConsumerSpec{
Name: "my-consumer",
AckPolicy: "invalid",
DurableName: "my-consumer",
AckPolicy: "invalid",
},
errCheck: func(t *testing.T, err error) {
require.Error(t, err)
Expand All @@ -407,7 +407,7 @@ func TestConsumerSpecToOpts(t *testing.T) {
},
"invalid replay policy": {
given: apis.ConsumerSpec{
Name: "my-consumer",
DurableName: "my-consumer",
ReplayPolicy: "invalid",
},
errCheck: func(t *testing.T, err error) {
Expand Down
4 changes: 2 additions & 2 deletions controllers/jetstream/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func TestShouldEnqueue(t *testing.T) {
Name: "obj-name",
},
Spec: apis.ConsumerSpec{
Name: "foo",
DurableName: "foo",
},
},
next: &apis.Consumer{
Expand All @@ -320,7 +320,7 @@ func TestShouldEnqueue(t *testing.T) {
Name: "obj-name",
},
Spec: apis.ConsumerSpec{
Name: "bar",
DurableName: "bar",
},
},
want: true,
Expand Down
2 changes: 1 addition & 1 deletion deploy/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ spec:
spec:
type: object
properties:
name:
durableName:
description: The name of the Consumer.
type: string
pattern: '^[^.*>]+$'
Expand Down
28 changes: 6 additions & 22 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,9 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, fmt.Errorf("get consumer resource '%s': %w", req.NamespacedName.String(), err)
}

// Set both Name values
if consumer.Spec.Name == "" {
consumer.Spec.Name = consumer.Spec.DurableName
}

if consumer.Spec.DurableName == "" {
consumer.Spec.DurableName = consumer.Spec.Name
}

log = log.WithValues(
"streamName", consumer.Spec.StreamName,
"consumerName", consumer.Spec.Name,
"consumerName", consumer.Spec.DurableName,
)

// Update ready status to unknown when no status is set
Expand Down Expand Up @@ -139,14 +130,14 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger

if !consumer.Spec.PreventDelete && !r.ReadOnly() {
err := r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error {
_, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.Name)
_, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
if err != nil {
if errors.Is(err, jetstream.ErrConsumerNotFound) || errors.Is(err, jetstream.ErrJetStreamNotEnabled) || errors.Is(err, jetstream.ErrJetStreamNotEnabledForAccount) {
return nil
}
return err
}
return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.Name)
return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
})
switch {
case errors.Is(err, jetstream.ErrConsumerNotFound):
Expand All @@ -160,7 +151,7 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger
}
} else {
log.Info("Skipping consumer deletion.",
"consumerName", consumer.Spec.Name,
"consumerName", consumer.Spec.DurableName,
"preventDelete", consumer.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
Expand Down Expand Up @@ -194,7 +185,7 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger

err = r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error {
exists := false
_, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.Name)
_, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
if err == nil {
exists = true
} else if !errors.Is(err, jetstream.ErrConsumerNotFound) {
Expand Down Expand Up @@ -246,7 +237,7 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger

func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) {
config := &jetstream.ConsumerConfig{
Name: spec.Name,
Durable: spec.DurableName,
Description: spec.Description,
OptStartSeq: uint64(spec.OptStartSeq),
MaxDeliver: spec.MaxDeliver,
Expand All @@ -267,13 +258,6 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er
InactiveThreshold: 0,
}

if spec.DurableName != "" {
if spec.Name != "" && spec.DurableName != spec.Name {
return nil, fmt.Errorf("durable name and name must be the same")
}
config.Durable = spec.DurableName
}

// DeliverPolicy
if spec.DeliverPolicy != "" {
err := config.DeliverPolicy.UnmarshalJSON(jsonString(spec.DeliverPolicy))
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/consumer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var _ = Describe("Consumer Controller", func() {
Spec: api.ConsumerSpec{
AckPolicy: "explicit",
DeliverPolicy: "all",
Name: consumerName,
DurableName: consumerName,
Description: "test consumer",
StreamName: streamName,
ReplayPolicy: "instant",
Expand Down Expand Up @@ -603,7 +603,7 @@ func Test_consumerSpecToConfig(t *testing.T) {
DeliverPolicy: "new",
DeliverSubject: "",
Description: "test consumer",
Name: "test-consumer",
DurableName: "test-consumer",
FilterSubject: "time.us.>",
FilterSubjects: []string{"time.us.east", "time.us.west"},
FlowControl: false,
Expand Down
1 change: 0 additions & 1 deletion pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func (c *Consumer) GetSpec() interface{} {

// ConsumerSpec is the spec for a Consumer resource
type ConsumerSpec struct {
Name string `json:"name"`
DurableName string `json:"durableName"` // Maps to Durable
Description string `json:"description"`
DeliverPolicy string `json:"deliverPolicy"`
Expand Down
Loading

0 comments on commit ff5d00a

Please sign in to comment.