Skip to content

Commit

Permalink
Add schema syncing implementation (#273)
Browse files Browse the repository at this point in the history
This adds a high-level schema syncing client for CRD --> schema creation. Here's how the sync operation roughly works:

1. A user calls `Sync` with the CRD
2. The client pulls both schema data and compatability levels and merges them into a comparable in-memory object.
3. The client also pulls a special `SchemaHash` field on the CRD status which is just a non-cryptographic hash of the contents of the schema text string that has previously been synced (since the Redpanda schema creation operations have some normalization of the schema string and may no longer be 1-1 comparable with what's in the CRD)
4. All of these values are compared against what's currently in the schema CRD itself
5. If anything changes, then we create a new schema version and hash the contents of its schema text, returning both all of the version numbers of the schema and the hash of its contents.
6. The above will be used in patching the status of the CRD with both the hash of the schema and all of the versions of the schema that we've created in the registry.

Delete operations simply delete every version of the schema by their subject, and since a CRD deletion should leave things in a clean state, it issues both a soft and then hard delete.
  • Loading branch information
andrewstucki authored Oct 22, 2024
1 parent 2792964 commit 4976e07
Show file tree
Hide file tree
Showing 7 changed files with 540 additions and 2 deletions.
23 changes: 23 additions & 0 deletions operator/pkg/client/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/pkg/sr"
)

// RedpandaAdminForCluster returns a simple kgo.Client able to communicate with the given cluster specified via a Redpanda cluster.
Expand All @@ -41,6 +42,28 @@ func (c *Factory) redpandaAdminForCluster(cluster *redpandav1alpha2.Redpanda) (*
return client, nil
}

// schemaRegistryForCluster returns a simple kgo.Client able to communicate with the given cluster specified via a Redpanda cluster.
func (c *Factory) schemaRegistryForCluster(cluster *redpandav1alpha2.Redpanda) (*sr.Client, error) {
dot, err := cluster.GetDot(c.config)
if err != nil {
return nil, err
}

client, err := redpanda.SchemaRegistryClient(dot, c.dialer)
if err != nil {
return nil, err
}

if c.userAuth != nil {
client, err = sr.NewClient(append(client.Opts(), sr.BasicAuth(c.userAuth.Username, c.userAuth.Password))...)
if err != nil {
return nil, err
}
}

return client, nil
}

// KafkaForCluster returns a simple kgo.Client able to communicate with the given cluster specified via a Redpanda cluster.
func (c *Factory) kafkaForCluster(cluster *redpandav1alpha2.Redpanda, opts ...kgo.Opt) (*kgo.Client, error) {
dot, err := cluster.GetDot(c.config)
Expand Down
55 changes: 53 additions & 2 deletions operator/pkg/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
"github.com/redpanda-data/helm-charts/pkg/redpanda"
redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2"
"github.com/redpanda-data/redpanda-operator/operator/pkg/client/acls"
"github.com/redpanda-data/redpanda-operator/operator/pkg/client/schemas"
"github.com/redpanda-data/redpanda-operator/operator/pkg/client/users"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -54,20 +56,28 @@ type UserAuth struct {
// at method invocation.
type ClientFactory interface {
// KafkaClient initializes a kgo.Client based on the spec of the passed in struct.
// The struct *must* implement either the v1alpha2.KafkaConnectedObject interface of the v1alpha2.ClusterReferencingObject
// The struct *must* implement either the v1alpha2.KafkaConnectedObject interface or the v1alpha2.ClusterReferencingObject
// interface to properly initialize.
KafkaClient(ctx context.Context, object client.Object, opts ...kgo.Opt) (*kgo.Client, error)

// RedpandaAdminClient initializes a rpadmin.AdminAPI client based on the spec of the passed in struct.
// The struct *must* implement either the v1alpha2.AdminConnectedObject interface of the v1alpha2.ClusterReferencingObject
// The struct *must* implement either the v1alpha2.AdminConnectedObject interface or the v1alpha2.ClusterReferencingObject
// interface to properly initialize.
RedpandaAdminClient(ctx context.Context, object client.Object) (*rpadmin.AdminAPI, error)

// SchemaRegistryClient initializes an sr.Client based on the spec of the passed in struct.
// The struct *must* implement either the v1alpha2.SchemaRegistryConnectedObject interface or the v1alpha2.ClusterReferencingObject
// interface to properly initialize.
SchemaRegistryClient(ctx context.Context, object client.Object) (*sr.Client, error)

// ACLs returns a high-level client for synchronizing ACLs.
ACLs(ctx context.Context, object redpandav1alpha2.ClusterReferencingObject, opts ...kgo.Opt) (*acls.Syncer, error)

// Users returns a high-level client for managing users.
Users(ctx context.Context, object redpandav1alpha2.ClusterReferencingObject, opts ...kgo.Opt) (*users.Client, error)

// Schemas returns a high-level client for synchronizing Schemas.
Schemas(ctx context.Context, object redpandav1alpha2.ClusterReferencingObject) (*schemas.Syncer, error)
}

type Factory struct {
Expand Down Expand Up @@ -149,6 +159,37 @@ func (c *Factory) RedpandaAdminClient(ctx context.Context, obj client.Object) (*
return nil, ErrInvalidRedpandaClientObject
}

func (c *Factory) SchemaRegistryClient(ctx context.Context, obj client.Object) (*sr.Client, error) {
// if we pass in a Redpanda cluster, just use it
if cluster, ok := obj.(*redpandav1alpha2.Redpanda); ok {
return c.schemaRegistryForCluster(cluster)
}

cluster, err := c.getCluster(ctx, obj)
if err != nil {
return nil, err
}

if cluster != nil {
return c.schemaRegistryForCluster(cluster)
}

if spec := c.getSchemaRegistrySpec(obj); spec != nil {
return c.schemaRegistryForSpec(ctx, obj.GetNamespace(), spec)
}

return nil, ErrInvalidRedpandaClientObject
}

func (c *Factory) Schemas(ctx context.Context, obj redpandav1alpha2.ClusterReferencingObject) (*schemas.Syncer, error) {
schemaRegistryClient, err := c.SchemaRegistryClient(ctx, obj)
if err != nil {
return nil, err
}

return schemas.NewSyncer(schemaRegistryClient), nil
}

func (c *Factory) ACLs(ctx context.Context, obj redpandav1alpha2.ClusterReferencingObject, opts ...kgo.Opt) (*acls.Syncer, error) {
kafkaClient, err := c.KafkaClient(ctx, obj, opts...)
if err != nil {
Expand Down Expand Up @@ -227,3 +268,13 @@ func (c *Factory) getAdminSpec(obj client.Object) *redpandav1alpha2.AdminAPISpec

return nil
}

func (c *Factory) getSchemaRegistrySpec(obj client.Object) *redpandav1alpha2.SchemaRegistrySpec {
if o, ok := obj.(redpandav1alpha2.ClusterReferencingObject); ok {
if source := o.GetClusterSource(); source != nil {
return source.GetSchemaRegistrySpec()
}
}

return nil
}
102 changes: 102 additions & 0 deletions operator/pkg/client/schemas/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package schemas

import (
"reflect"
"slices"

redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2"
"github.com/redpanda-data/redpanda-operator/operator/pkg/functional"
"github.com/twmb/franz-go/pkg/sr"
)

type schema struct {
Subject string
CompatibilityLevel sr.CompatibilityLevel
Schema string
Type sr.SchemaType
References []sr.SchemaReference
SchemaMetadata *sr.SchemaMetadata
SchemaRuleSet *sr.SchemaRuleSet
Hash string
}

func (s *schema) toKafka() sr.Schema {
return sr.Schema{
Schema: s.Schema,
Type: s.Type,
References: s.References,
SchemaMetadata: s.SchemaMetadata,
SchemaRuleSet: s.SchemaRuleSet,
}
}

func schemaFromV1Alpha2Schema(s *redpandav1alpha2.Schema) (*schema, error) {
hash, err := s.Spec.SchemaHash()
if err != nil {
return nil, err
}
return &schema{
Subject: s.Name,
CompatibilityLevel: s.Spec.GetCompatibilityLevel().ToKafka(),
Schema: s.Spec.Text,
Type: s.Spec.GetType().ToKafka(),
References: functional.MapFn(redpandav1alpha2.SchemaReferenceToKafka, s.Spec.References),
Hash: hash,
}, nil
}

func schemaFromRedpandaSubjectSchema(s *sr.SubjectSchema, hash string, compatibility sr.CompatibilityLevel) *schema {
return &schema{
Subject: s.Subject,
CompatibilityLevel: compatibility,
Schema: s.Schema.Schema,
Type: s.Type,
References: s.References,
Hash: hash,
}
}

func (s *schema) SchemaEquals(other *schema) bool {
// subject
if s.Subject != other.Subject {
return false
}

// type
if s.Type != other.Type {
return false
}

// schema
// we cheat here, rather than trying to match the normalized schema in the cluster
// we instead just check to see if we've changed at all in the CRD
if s.Hash != other.Hash {
return false
}

// references
if !slices.Equal(s.References, other.References) {
return false
}

// metadata
if !reflect.DeepEqual(s.SchemaMetadata, other.SchemaMetadata) {
return false
}

// rule set
if !reflect.DeepEqual(s.SchemaRuleSet, other.SchemaRuleSet) {
return false
}

return true
}
122 changes: 122 additions & 0 deletions operator/pkg/client/schemas/syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package schemas

import (
"context"
"errors"

redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2"
"github.com/twmb/franz-go/pkg/sr"
)

// Syncer synchronizes Schemas for the given object to Redpanda.
type Syncer struct {
client *sr.Client
}

// NewSyncer initializes a Syncer.
func NewSyncer(client *sr.Client) *Syncer {
return &Syncer{
client: client,
}
}

// Sync synchronizes the schema in Redpanda.
func (s *Syncer) Sync(ctx context.Context, o *redpandav1alpha2.Schema) (string, []int, error) {
versions := o.Status.Versions
hash := o.Status.SchemaHash

want, err := schemaFromV1Alpha2Schema(o)
if err != nil {
return hash, versions, err
}

// default to creating the schema
createSchema := true
// default to setting compatibility for the schema subject
setCompatibility := true

if !s.isInitial(o) {
have, err := s.getLatest(ctx, o)
if err != nil {
return hash, versions, err
}

setCompatibility = have.CompatibilityLevel != want.CompatibilityLevel
createSchema = !have.SchemaEquals(want)
}

if setCompatibility {
if err := s.setCompatibility(ctx, want); err != nil {
return hash, versions, err
}
}

if createSchema {
subjectSchema, err := s.client.CreateSchema(ctx, o.Name, want.toKafka())
if err != nil {
return hash, versions, err
}
hash = want.Hash
versions = append(versions, subjectSchema.Version)
}

return hash, versions, nil
}

func (s *Syncer) isInitial(o *redpandav1alpha2.Schema) bool {
return len(o.Status.Versions) == 0
}

func (s *Syncer) setCompatibility(ctx context.Context, sc *schema) error {
results := s.client.SetCompatibility(ctx, sr.SetCompatibility{
Level: sc.CompatibilityLevel,
}, sc.Subject)
if len(results) == 0 {
return errors.New("empty results returned from syncing compatibility levels")
}
if err := results[0].Err; err != nil {
return err
}

return nil
}

func (s *Syncer) getLatest(ctx context.Context, o *redpandav1alpha2.Schema) (*schema, error) {
subjectSchema, err := s.client.SchemaByVersion(ctx, o.Name, -1)
if err != nil {
return nil, err
}

var compatibility sr.CompatibilityLevel

results := s.client.Compatibility(ctx, o.Name)
if len(results) > 0 {
result := results[0]
if err := result.Err; err != nil {
return nil, err
}
compatibility = result.Level
}

return schemaFromRedpandaSubjectSchema(&subjectSchema, o.Status.SchemaHash, compatibility), nil
}

// Delete removes the schema in Redpanda.
func (s *Syncer) Delete(ctx context.Context, o *redpandav1alpha2.Schema) error {
if _, err := s.client.DeleteSubject(ctx, o.Name, sr.SoftDelete); err != nil {
return err
}
if _, err := s.client.DeleteSubject(ctx, o.Name, sr.HardDelete); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 4976e07

Please sign in to comment.