Skip to content

Commit

Permalink
NIFI-13578 Add Schema Branch Name and Schema Version in ValidateRecord (
Browse files Browse the repository at this point in the history
#9108)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
juldrixx authored Aug 31, 2024
1 parent ba13c5d commit f92f6f1
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 149 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;

public record Triple<A, B, C>(A first, B second, C third) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.Triple;

import java.io.IOException;
import java.util.EnumSet;
import java.util.Optional;
import java.util.OptionalInt;
Expand All @@ -33,40 +33,53 @@
import java.util.concurrent.ConcurrentMap;

public class MockSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private final ConcurrentMap<String, RecordSchema> schemaNameMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Triple<String, String, Integer>, RecordSchema> schemaNameMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Tuple<Long, Integer>, RecordSchema> schemaIdVersionMap = new ConcurrentHashMap<>();

public void addSchema(final String name, final RecordSchema schema) {
schemaNameMap.put(name, schema);
addSchema(name, null, null, schema);
}

RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
public void addSchema(final String name, final String branch, final RecordSchema schema) {
addSchema(name, branch, null, schema);
}

public void addSchema(final String name, final Integer version, final RecordSchema schema) {
addSchema(name, null, version, schema);
}

public void addSchema(final String name, final String branch, final Integer version, final RecordSchema schema) {
schemaNameMap.put(new Triple<>(name, branch, version), schema);
}

RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
final Optional<String> schemaName = schemaIdentifier.getName();
if (!schemaName.isPresent()) {
if (schemaName.isEmpty()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
}

return schemaNameMap.get(schemaName.get());
final String schemaBranch = schemaIdentifier.getBranch().orElse(null);
final Integer schemaVersion = schemaIdentifier.getVersion().isPresent() ? schemaIdentifier.getVersion().getAsInt() : null;
return schemaNameMap.get(new Triple<>(schemaName.get(), schemaBranch, schemaVersion));
}

private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
final OptionalLong schemaId = schemaIdentifier.getIdentifier();
if (!schemaId.isPresent()) {
if (schemaId.isEmpty()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
}

final OptionalInt version = schemaIdentifier.getVersion();
if (!version.isPresent()) {
if (version.isEmpty()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Version is not present");
}

final Tuple<Long, Integer> tuple = new Tuple<>(schemaId.getAsLong(), version.getAsInt());
final RecordSchema schema = schemaIdVersionMap.get(tuple);
return schema;
return schemaIdVersionMap.get(tuple);
}

@Override
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
if (schemaIdentifier.getName().isPresent()) {
return retrieveSchemaByName(schemaIdentifier);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.avro.AvroSchemaValidator;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
Expand All @@ -37,6 +36,7 @@
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
Expand Down Expand Up @@ -74,6 +74,14 @@
import java.util.Optional;
import java.util.Set;

import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;

@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand All @@ -89,10 +97,6 @@
})
public class ValidateRecord extends AbstractProcessor {

static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name-property", "Use Schema Name Property",
"The schema to validate the data against is determined by looking at the 'Schema Name' Property and looking up the schema in the configured Schema Registry");
static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use Schema Text Property",
"The schema to validate the data against is determined by looking at the 'Schema Text' Property and parsing the schema as an Avro schema");
static final AllowableValue READER_SCHEMA = new AllowableValue("reader-schema", "Use Reader's Schema",
"The schema to validate the data against is determined by asking the configured Record Reader for its schema");

Expand Down Expand Up @@ -131,31 +135,6 @@ public class ValidateRecord extends AbstractProcessor {
.defaultValue(READER_SCHEMA.getValue())
.required(true)
.build();
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("schema-registry")
.displayName("Schema Registry")
.description("Specifies the Controller Service to use for the Schema Registry. This is necessary only if the Schema Access Strategy is set to \"Use 'Schema Name' Property\".")
.identifiesControllerService(SchemaRegistry.class)
.required(false)
.build();
static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
.name("schema-name")
.displayName("Schema Name")
.description("Specifies the name of the schema to lookup in the Schema Registry property")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${schema.name}")
.required(false)
.build();
static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
.name("schema-text")
.displayName("Schema Text")
.description("The text of an Avro-formatted Schema")
.addValidator(new AvroSchemaValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${avro.schema}")
.required(false)
.build();
static final PropertyDescriptor ALLOW_EXTRA_FIELDS = new PropertyDescriptor.Builder()
.name("allow-extra-fields")
.displayName("Allow Extra Fields")
Expand Down Expand Up @@ -219,6 +198,8 @@ public class ValidateRecord extends AbstractProcessor {
SCHEMA_ACCESS_STRATEGY,
SCHEMA_REGISTRY,
SCHEMA_NAME,
SCHEMA_BRANCH_NAME,
SCHEMA_VERSION,
SCHEMA_TEXT,
ALLOW_EXTRA_FIELDS,
STRICT_TYPE_CHECKING,
Expand Down Expand Up @@ -256,6 +237,17 @@ public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}

@Override
public void migrateProperties(final PropertyConfiguration config) {
if (config.isPropertySet(SCHEMA_ACCESS_STRATEGY)) {
config.getPropertyValue(SCHEMA_ACCESS_STRATEGY).ifPresent(value -> {
if (value.equals("schema-name-property")) {
config.setProperty(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY.getValue());
}
});
}
}

@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final String schemaAccessStrategy = validationContext.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
Expand Down Expand Up @@ -482,8 +474,7 @@ private void completeFlowFile(final ProcessContext context, final ProcessSession

final Integer maxValidationDetailsLength = context.getProperty(MAX_VALIDATION_DETAILS_LENGTH).evaluateAttributeExpressions(flowFile).asInteger();

final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());

Expand Down Expand Up @@ -536,7 +527,9 @@ protected RecordSchema getValidationSchema(final ProcessContext context, final F
} else if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build();
final String schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME).evaluateAttributeExpressions(flowFile).getValue();
final Integer schemaVersion = context.getProperty(SCHEMA_VERSION).evaluateAttributeExpressions(flowFile).asInteger();
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).branch(schemaBranchName).version(schemaVersion).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
} else if (schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) {
final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue();
Expand Down
Loading

0 comments on commit f92f6f1

Please sign in to comment.