Skip to content

Commit

Permalink
Add JSON schema support (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstucki authored Nov 11, 2024
1 parent 6e60328 commit 0e52097
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 52 deletions.
5 changes: 4 additions & 1 deletion operator/api/redpanda/v1alpha2/schema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,23 @@ func (s *Schema) GetClusterSource() *ClusterSource {
}

// SchemaType specifies the type of the given schema.
// +kubebuilder:validation:Enum=avro;protobuf
// +kubebuilder:validation:Enum=avro;protobuf;json
type SchemaType string

const (
SchemaTypeAvro SchemaType = "avro"
SchemaTypeJSON SchemaType = "json"
SchemaTypeProtobuf SchemaType = "protobuf"
)

var (
schemaTypesFromKafka = map[sr.SchemaType]SchemaType{
sr.TypeJSON: SchemaTypeJSON,
sr.TypeAvro: SchemaTypeAvro,
sr.TypeProtobuf: SchemaTypeProtobuf,
}
schemaTypesToKafka = map[SchemaType]sr.SchemaType{
SchemaTypeJSON: sr.TypeJSON,
SchemaTypeAvro: sr.TypeAvro,
SchemaTypeProtobuf: sr.TypeProtobuf,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ spec:
enum:
- avro
- protobuf
- json
type: string
text:
description: Text is the actual unescaped text of a schema.
Expand Down
123 changes: 72 additions & 51 deletions operator/pkg/client/schemas/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"k8s.io/utils/ptr"
)

const validSchema = `
const (
validAvroSchema = `
{
"type": "record",
"name": "test",
Expand All @@ -39,6 +40,18 @@ const validSchema = `
]
}
`
validJSONSchema = `
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"order_id": { "type": "string" },
"total": { "type": "number" }
},
"required": ["order_id", "total"],
"additionalProperties": false
}`
)

func normalizeSchema(t *testing.T, ctx context.Context, syncer *Syncer, schema *v1alpha2.Schema) {
actualSchema, err := syncer.getLatest(ctx, schema)
Expand Down Expand Up @@ -88,7 +101,7 @@ func TestSyncer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()

container, err := redpanda.Run(ctx, "docker.redpanda.com/redpandadata/redpanda:v23.2.8",
container, err := redpanda.Run(ctx, "docker.redpanda.com/redpandadata/redpanda:v24.2.10",
redpanda.WithEnableSchemaRegistryHTTPBasicAuth(),
redpanda.WithEnableKafkaAuthorization(),
redpanda.WithEnableSASL(),
Expand All @@ -106,54 +119,62 @@ func TestSyncer(t *testing.T) {

syncer := NewSyncer(schemaRegistryClient)

schema := &v1alpha2.Schema{
ObjectMeta: metav1.ObjectMeta{
Name: "schema",
},
Spec: v1alpha2.SchemaSpec{
Text: validSchema,
},
}

reference := &v1alpha2.Schema{
ObjectMeta: metav1.ObjectMeta{
Name: "reference",
},
Spec: v1alpha2.SchemaSpec{
Text: validSchema,
},
for schemaType, schemaText := range map[v1alpha2.SchemaType]string{
v1alpha2.SchemaTypeAvro: validAvroSchema,
v1alpha2.SchemaTypeJSON: validJSONSchema,
} {
t.Run(string(schemaType), func(t *testing.T) {
schema := &v1alpha2.Schema{
ObjectMeta: metav1.ObjectMeta{
Name: "schema-" + string(schemaType),
},
Spec: v1alpha2.SchemaSpec{
Type: ptr.To(schemaType),
Text: schemaText,
},
}

reference := &v1alpha2.Schema{
ObjectMeta: metav1.ObjectMeta{
Name: "reference" + string(schemaType),
},
Spec: v1alpha2.SchemaSpec{
Type: ptr.To(schemaType),
Text: schemaText,
},
}

// create initial schema and reference
expectSchemaUpdate(t, ctx, syncer, schema, true)
expectSchemaUpdate(t, ctx, syncer, reference, true)

// update references
schema.Spec.References = []v1alpha2.SchemaReference{
{
Subject: reference.Name,
Name: "test",
Version: 1,
},
}
expectSchemaUpdate(t, ctx, syncer, schema, true)

// update compatibility level
schema.Spec.CompatibilityLevel = ptr.To(v1alpha2.CompatabilityLevelFull)
expectSchemaUpdate(t, ctx, syncer, schema, false)

// TODO: Request from core support for the following
// https://github.com/redpanda-data/redpanda/issues/23548
// - update schema rules: rules not supported
// - update metadata: metadata is not supported
// update normalization: normalization is not supported

// delete
err = syncer.Delete(ctx, schema)
require.NoError(t, err)

subjects, err := schemaRegistryClient.Subjects(ctx)
require.NoError(t, err)
require.NotContains(t, subjects, schema.Name)
})
}

// create initial schema and reference
expectSchemaUpdate(t, ctx, syncer, schema, true)
expectSchemaUpdate(t, ctx, syncer, reference, true)

// update references
schema.Spec.References = []v1alpha2.SchemaReference{
{
Subject: reference.Name,
Name: "test",
Version: 1,
},
}
expectSchemaUpdate(t, ctx, syncer, schema, true)

// update compatibility level
schema.Spec.CompatibilityLevel = ptr.To(v1alpha2.CompatabilityLevelFull)
expectSchemaUpdate(t, ctx, syncer, schema, false)

// TODO: Request from core support for the following
// https://github.com/redpanda-data/redpanda/issues/23548
// - update schema rules: rules not supported
// - update metadata: metadata is not supported
// update type: JSON is not supported
// update normalization: normalization is not supported

// delete
err = syncer.Delete(ctx, schema)
require.NoError(t, err)

subjects, err := schemaRegistryClient.Subjects(ctx)
require.NoError(t, err)
require.NotContains(t, subjects, schema.Name)
}

0 comments on commit 0e52097

Please sign in to comment.