Skip to content

Commit

Permalink
fix: resolve schema normalization (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstrates authored Aug 27, 2024
1 parent a2adab9 commit 5fe31d4
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 122 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ If you want to build the provider from source, follow these steps:
## Testing the Provider

The acceptance tests rely on [Testcontainers for Go (Redpanda)](https://golang.testcontainers.org/modules/redpanda/) to
provide a Schema Registry API.

This has some limitations:
provide a Schema Registry API. This has some limitations:

- Redpanda only supports `AVRO` and `PROTOBUF` encoding, cannot test `JSON` schemas
[[1]](https://github.com/redpanda-data/redpanda/issues/6220)
Expand All @@ -42,7 +40,7 @@ terraform {
required_providers {
schemaregistry = {
source = "cultureamp/schemaregistry"
version = "1.1.0"
version = "1.2.1"
}
}
}
Expand Down
42 changes: 38 additions & 4 deletions examples/resources/schemaregistry_schema/resource.tf
Original file line number Diff line number Diff line change
@@ -1,7 +1,41 @@
resource "schemaregistry_schema" "example" {
subject = "example-subject"
schema = "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"
resource "schemaregistry_schema" "ref_01" {
subject = "%s"
schema_type = "AVRO"
compatibility_level = "FORWARD_TRANSITIVE"
compatibility_level = "NONE"
schema = <<EOF
{
"type": "record",
"name": "Test",
"fields": [
{
"name": "f1",
"type": "string"
}
]
}
EOF
}

resource "schemaregistry_schema" "example_01" {
subject = "%s"
schema_type = "AVRO"
compatibility_level = "NONE"
hard_delete = true
schema = jsonencode({
"type" : "record",
"name" : "Example",
"fields" : [
{
"name" : "f1",
"type" : "string"
}
]
})
references = [
{
name = "TestRef01"
subject = schemaregistry_schema.ref_01.subject
version = schemaregistry_schema.ref_01.version
},
]
}
16 changes: 3 additions & 13 deletions internal/provider/data_source_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator"
"github.com/hashicorp/terraform-plugin-framework/datasource"
"github.com/hashicorp/terraform-plugin-framework/datasource/schema"
"github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-framework/schema/validator"
"github.com/hashicorp/terraform-plugin-framework/types"
"github.com/riferrei/srclient"
Expand Down Expand Up @@ -182,7 +181,7 @@ func (d *schemaDataSource) Read(ctx context.Context, req datasource.ReadRequest,
}

// Map response body to schema data source model
outputs := d.mapSchemaToOutputs(subject, schema, compatibilityLevel, &resp.Diagnostics)
outputs := d.mapSchemaToOutputs(subject, schema, compatibilityLevel)

// Set state to fully populated data
diags = resp.State.Set(ctx, outputs)
Expand All @@ -207,21 +206,12 @@ func (d *schemaDataSource) fetchCompatibilityLevel(subject string) (*srclient.Co

// mapSchemaToOutputs maps the schema and compatibility level to the schema data source model.
func (d *schemaDataSource) mapSchemaToOutputs(subject string, schema *srclient.Schema,
compatibilityLevel *srclient.CompatibilityLevel, diagnostics *diag.Diagnostics) schemaDataSourceModel {
// Normalize the schema string
normalizedSchema, err := NormalizeJSON(schema.Schema(), diagnostics)
if err != nil {
diagnostics.AddError(
"Normalization Error",
fmt.Sprintf("Could not normalize schema: %s", err),
)
return schemaDataSourceModel{}
}
compatibilityLevel *srclient.CompatibilityLevel) schemaDataSourceModel {

return schemaDataSourceModel{
ID: types.StringValue(subject),
Subject: types.StringValue(subject),
Schema: jsontypes.NewNormalizedValue(normalizedSchema),
Schema: jsontypes.NewNormalizedValue(schema.Schema()),
SchemaID: types.Int64Value(int64(schema.ID())),
SchemaType: types.StringValue(FromSchemaType(schema.SchemaType())),
Version: types.Int64Value(int64(schema.Version())),
Expand Down
22 changes: 17 additions & 5 deletions internal/provider/data_source_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ func TestAccSchemaDataSource_basic(t *testing.T) {
Config: testAccSchemaDataSourceConfig_single(subjectName),
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttr(datasourceName, "subject", subjectName),
resource.TestCheckResourceAttr(datasourceName, "schema", NormalizedJSON(initialSchema)),
resource.TestCheckResourceAttr(datasourceName, "schema_type", "AVRO"),
resource.TestCheckResourceAttr(datasourceName, "compatibility_level", "NONE"),
resource.TestCheckResourceAttrWith(datasourceName, "schema", func(state string) error {
return ValidateSchemaString(initialSchema, state)
}),
),
},
},
Expand All @@ -41,9 +43,11 @@ func TestAccSchemaDataSource_multipleVersions(t *testing.T) {
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttrSet(datasourceName, "id"),
resource.TestCheckResourceAttr(datasourceName, "subject", subjectName),
resource.TestCheckResourceAttr(datasourceName, "schema", NormalizedJSON(initialSchema)),
resource.TestCheckResourceAttr(datasourceName, "schema_type", "AVRO"),
resource.TestCheckResourceAttr(datasourceName, "compatibility_level", "NONE"),
resource.TestCheckResourceAttrWith(datasourceName, "schema", func(state string) error {
return ValidateSchemaString(initialSchema, state)
}),
),
},
// Update schema to a new version
Expand All @@ -52,9 +56,11 @@ func TestAccSchemaDataSource_multipleVersions(t *testing.T) {
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttrSet(datasourceName, "id"),
resource.TestCheckResourceAttr(datasourceName, "subject", subjectName),
resource.TestCheckResourceAttr(datasourceName, "schema", NormalizedJSON(updatedSchema)),
resource.TestCheckResourceAttr(datasourceName, "schema_type", "AVRO"),
resource.TestCheckResourceAttr(datasourceName, "compatibility_level", "BACKWARD"),
resource.TestCheckResourceAttrWith(datasourceName, "schema", func(state string) error {
return ValidateSchemaString(updatedSchema, state)
}),
),
},
// Validate updated version of the schema
Expand All @@ -63,10 +69,12 @@ func TestAccSchemaDataSource_multipleVersions(t *testing.T) {
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttrSet(datasourceName, "id"),
resource.TestCheckResourceAttr(datasourceName, "subject", subjectName),
resource.TestCheckResourceAttr(datasourceName, "schema", NormalizedJSON(updatedSchema)),
resource.TestCheckResourceAttr(datasourceName, "schema_type", "AVRO"),
resource.TestCheckResourceAttr(datasourceName, "compatibility_level", "BACKWARD"),
resource.TestCheckResourceAttr(datasourceName, "version", "2"),
resource.TestCheckResourceAttrWith(datasourceName, "schema", func(state string) error {
return ValidateSchemaString(updatedSchema, state)
}),
),
},
},
Expand Down Expand Up @@ -102,7 +110,11 @@ resource "schemaregistry_schema" "test_01" {
{
"name": "f1",
"type": "string"
}
},
{
"name": "f2",
"type": "string"
}
]
})
}
Expand Down
3 changes: 2 additions & 1 deletion internal/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
const (
testAccProviderVersion = "test"
testAccProviderType = "schemaregistry"
redpandaContainerImage = "docker.redpanda.com/redpandadata/redpanda:v23.3.3"
redpandaContainerImage = "docker.redpanda.com/redpandadata/redpanda:v24.1.15"
)

var testAccProtoV6ProviderFactories = map[string]func() (tfprotov6.ProviderServer, error){
Expand All @@ -32,6 +32,7 @@ func TestMain(m *testing.M) {
redpanda.WithNewServiceAccount("superuser-1", "test"),
redpanda.WithSuperusers("superuser-1"),
redpanda.WithEnableSchemaRegistryHTTPBasicAuth(),
redpanda.WithBootstrapConfig("schema_registry_normalize_on_startup", true),
)
if err != nil {
log.Fatalf("failed to start container: %s", err)
Expand Down
54 changes: 7 additions & 47 deletions internal/provider/resource_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,24 +193,14 @@ func (r *schemaResource) Create(ctx context.Context, req resource.CreateRequest,
return
}

// Normalize the schema string
schemaString := plan.Schema.ValueString()
normalizedSchema, err := NormalizeJSON(schemaString, &resp.Diagnostics)
if err != nil {
resp.Diagnostics.AddError(
"Invalid JSON Schema",
fmt.Sprintf("Schema validation failed: %s", err),
)
return
}

// Generate API request body from plan
schemaString := plan.Schema.ValueString()
schemaType := ToSchemaType(plan.SchemaType.ValueString())
references := ToRegistryReferences(plan.Reference)
compatibilityLevel := ToCompatibilityLevelType(plan.CompatibilityLevel.ValueString())

// Create new schema resource
schema, err := r.client.CreateSchema(subject, normalizedSchema, schemaType, references...)
schema, err := r.client.CreateSchema(subject, schemaString, schemaType, references...)
if err != nil {
resp.Diagnostics.AddError(
"Error creating schema",
Expand Down Expand Up @@ -268,20 +258,10 @@ func (r *schemaResource) Read(ctx context.Context, req resource.ReadRequest, res
}

schemaType := FromSchemaType(schema.SchemaType())

// Normalize the schema string
schemaString := state.Schema.ValueString()
normalizedSchema, err := NormalizeJSON(schemaString, &resp.Diagnostics)
if err != nil {
resp.Diagnostics.AddError(
"Invalid JSON Schema",
fmt.Sprintf("Schema validation failed: %s", err),
)
return
}

// Update state with refreshed values
state.Schema = jsontypes.NewNormalizedValue(normalizedSchema)
state.Schema = jsontypes.NewNormalizedValue(schemaString)
state.SchemaID = types.Int64Value(int64(schema.ID()))
state.SchemaType = types.StringValue(schemaType)
state.Version = types.Int64Value(int64(schema.Version()))
Expand All @@ -306,25 +286,15 @@ func (r *schemaResource) Update(ctx context.Context, req resource.UpdateRequest,
return
}

// Normalize the schema string
schemaString := plan.Schema.ValueString()
normalizedSchema, err := NormalizeJSON(schemaString, &resp.Diagnostics)
if err != nil {
resp.Diagnostics.AddError(
"Invalid JSON Schema",
fmt.Sprintf("Schema validation failed: %s", err),
)
return
}

// Generate API request body from plan
schemaString := plan.Schema.ValueString()
subject := plan.Subject.ValueString()
references := ToRegistryReferences(plan.Reference)
schemaType := ToSchemaType(plan.SchemaType.ValueString())
compatibilityLevel := ToCompatibilityLevelType(plan.CompatibilityLevel.ValueString())

// Update existing schema
schema, err := r.client.CreateSchema(subject, normalizedSchema, schemaType, references...)
schema, err := r.client.CreateSchema(subject, schemaString, schemaType, references...)
if err != nil {
resp.Diagnostics.AddError(
"Error updating schema",
Expand All @@ -343,7 +313,7 @@ func (r *schemaResource) Update(ctx context.Context, req resource.UpdateRequest,
}

// Update state with refreshed values
plan.Schema = jsontypes.NewNormalizedValue(normalizedSchema)
plan.Schema = jsontypes.NewNormalizedValue(schemaString)
plan.SchemaID = types.Int64Value(int64(schema.ID()))
plan.Version = types.Int64Value(int64(schema.Version()))
plan.Reference = FromRegistryReferences(schema.References())
Expand Down Expand Up @@ -409,21 +379,11 @@ func (r *schemaResource) ImportState(ctx context.Context, req resource.ImportSta

schemaType := FromSchemaType(schema.SchemaType())

// Normalize the schema string
normalizedSchema, err := NormalizeJSON(schema.Schema(), &resp.Diagnostics)
if err != nil {
resp.Diagnostics.AddError(
"Invalid JSON Schema",
fmt.Sprintf("Schema validation failed: %s", err),
)
return
}

// Create state from retrieved schema
state := schemaResourceModel{
ID: types.StringValue(subject),
Subject: types.StringValue(subject),
Schema: jsontypes.NewNormalizedValue(normalizedSchema),
Schema: jsontypes.NewNormalizedValue(schema.Schema()),
SchemaID: types.Int64Value(int64(schema.ID())),
SchemaType: types.StringValue(schemaType),
Version: types.Int64Value(int64(schema.Version())),
Expand Down
Loading

0 comments on commit 5fe31d4

Please sign in to comment.