Skip to content

Commit

Permalink
NIFI-12124: This closes #7791. Added a new RenameRecordField processo…
Browse files Browse the repository at this point in the history
…r. In testing, also noticed that the Descendant Wildcard operator (//*) and Descendant Field Path Operator (//name, for instance) did not properly account for array of records or map elements, so addressed those concerns.

Signed-off-by: Joseph Witt <joewitt@apache.org>
  • Loading branch information
markap14 authored and joewitt committed Oct 13, 2023
1 parent f4ae292 commit 3ae0eed
Show file tree
Hide file tree
Showing 23 changed files with 740 additions and 42 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ jobs:
if: failure()
- name: Post Disk Usage
run: df
if: ${{ always() }}

macos-build-jp:
timeout-minutes: 150
Expand Down Expand Up @@ -235,6 +236,7 @@ jobs:
if: failure()
- name: Post Disk Usage
run: df
if: ${{ always() }}

windows-build:
timeout-minutes: 150
Expand Down Expand Up @@ -300,3 +302,4 @@ jobs:
if: failure()
- name: Post Disk Usage
run: df
if: ${{ always() }}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.MapEntryFieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.util.Filters;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.MapDataType;

public class DescendantFieldPath extends RecordPathSegment {
private final String descendantName;
Expand Down Expand Up @@ -74,6 +78,26 @@ private List<FieldValue> findDescendants(final FieldValue fieldValue) {
if (Filters.isRecord(childField.getDataType(), recordValue)) {
final FieldValue childFieldValue = new StandardFieldValue(recordValue, childField, fieldValue);
matchingValues.addAll(findDescendants(childFieldValue));
} else if (Filters.isRecordArray(childField.getDataType(), recordValue)) {
final Object[] arrayValues = (Object[]) recordValue;

for (final Object arrayValue : arrayValues) {
final FieldValue childFieldValue = new StandardFieldValue(arrayValue, childField, fieldValue);
matchingValues.addAll(findDescendants(childFieldValue));
}
} else if (childField.getDataType().getFieldType() == RecordFieldType.MAP) {
final Map<String, ?> map = (Map<String, ?>) recordValue;

final DataType valueType = ((MapDataType) childField.getDataType()).getValueType();

for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String mapKey = entry.getKey();
final Object mapValue = entry.getValue();
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType);
final FieldValue mapFieldValue = new MapEntryFieldValue(mapValue, elementField, fieldValue, mapKey);

matchingValues.add(mapFieldValue);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.MapEntryFieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.util.Filters;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.MapDataType;

public class WildcardDescendantPath extends RecordPathSegment {

Expand Down Expand Up @@ -66,6 +70,26 @@ private List<FieldValue> findDescendants(final FieldValue fieldValue) {
if (Filters.isRecord(childField.getDataType(), value)) {
final FieldValue childFieldValue = new StandardFieldValue(value, childField, fieldValue);
matchingValues.addAll(findDescendants(childFieldValue));
} else if (Filters.isRecordArray(childField.getDataType(), value)) {
final Object[] arrayValues = (Object[]) value;

for (final Object arrayValue : arrayValues) {
final FieldValue childFieldValue = new StandardFieldValue(arrayValue, childField, fieldValue);
matchingValues.addAll(findDescendants(childFieldValue));
}
} else if (childField.getDataType().getFieldType() == RecordFieldType.MAP) {
final Map<String, ?> map = (Map<String, ?>) value;

final DataType valueType = ((MapDataType) childField.getDataType()).getValueType();

for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String mapKey = entry.getKey();
final Object mapValue = entry.getValue();
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType);
final FieldValue mapFieldValue = new MapEntryFieldValue(mapValue, elementField, fieldValue, mapKey);

matchingValues.add(mapFieldValue);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.nifi.record.path.util;

import java.lang.reflect.Array;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ArrayDataType;

public class Filters {

Expand Down Expand Up @@ -71,4 +72,39 @@ public static boolean isRecord(final DataType dataType, final Object value) {

return false;
}

public static boolean isRecordArray(final DataType dataType, final Object value) {
if (dataType.getFieldType() != RecordFieldType.ARRAY) {
return false;
}

final ArrayDataType arrayDataType = (ArrayDataType) dataType;
final DataType elementType = arrayDataType.getElementType();

if (elementType != null && elementType.getFieldType() == RecordFieldType.RECORD) {
return true;
}

if (value == null) {
return false;
}

if (!value.getClass().isArray()) {
return false;
}

final int length = Array.getLength(value);
if (length == 0) {
return false;
}

for (int i = 0; i < length; i++) {
final Object val = Array.get(value, i);
if (!(val instanceof Record)) {
return false;
}
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,6 @@

package org.apache.nifi.record.path;

import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.uuid5.Uuid5Util;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
Expand All @@ -52,6 +37,20 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.uuid5.Uuid5Util;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -1364,6 +1363,7 @@ public void testTrimArray() {
assertEquals("John Smith", results.get(0).getValue());
assertEquals("Jane Smith", results.get(1).getValue());
}

@Test
public void testFieldName() {
final List<RecordField> fields = new ArrayList<>();
Expand All @@ -1382,6 +1382,52 @@ public void testFieldName() {
assertEquals(0L, RecordPath.compile("//name[not(startsWith(fieldName(.), 'n'))]").evaluate(record).getSelectedFields().count());
}

@Test
public void testRecursiveWithMap() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("map", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType())));
final RecordSchema schema = new SimpleRecordSchema(fields);

final Map<String, String> mapValues = new HashMap<>();
mapValues.put("a", "z");
mapValues.put("b", "Y");
mapValues.put("c", "x");

final Map<String, Object> values = new HashMap<>();
values.put("map", mapValues);

final Record record = new MapRecord(schema, values);
assertEquals("Y", RecordPath.compile("//*[. = toUpperCase(.)]").evaluate(record).getSelectedFields().findFirst().get().getValue());
}

@Test
public void testRecursiveWithChoiceThatIncludesRecord() {
final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.INT.getDataType())
));

final DataType personDataType = RecordFieldType.RECORD.getRecordDataType(personSchema);
final DataType stringDataType = RecordFieldType.STRING.getDataType();

final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("person", RecordFieldType.CHOICE.getChoiceDataType(stringDataType, personDataType)));
final RecordSchema schema = new SimpleRecordSchema(fields);

final Map<String, Object> personValueMap = new HashMap<>();
personValueMap.put("name", "John Doe");
personValueMap.put("age", 30);
final Record personRecord = new MapRecord(personSchema, personValueMap);

final Map<String, Object> values = new HashMap<>();
values.put("person", personRecord);

final Record record = new MapRecord(schema, values);
final List<Object> expectedValues = List.of(personRecord, "John Doe", 30);
assertEquals(expectedValues, RecordPath.compile("//*").evaluate(record).getSelectedFields().map(FieldValue::getValue).toList());
}


@Test
public void testToDateFromString() {
final List<RecordField> fields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@

package org.apache.nifi.serialization;

import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldRemovalPath;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -31,6 +25,11 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldRemovalPath;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;

public class SimpleRecordSchema implements RecordSchema {
private List<RecordField> fields = null;
Expand Down Expand Up @@ -188,12 +187,17 @@ && getSchemaName().isPresent() && getSchemaName().equals(other.getSchemaName()))
public int hashCode() {
int computed = this.hashCode;
if (computed == 0) {
computed = this.hashCode = 143 + 3 * fields.hashCode();
computed = this.hashCode = calculateHashCode();
}

return computed;
}

private int calculateHashCode() {
return 143 + 3 * fields.hashCode();
}


private static String createText(final List<RecordField> fields) {
final StringBuilder sb = new StringBuilder("[");

Expand Down Expand Up @@ -264,18 +268,15 @@ public Optional<String> getSchemaNamespace() {

@Override
public void removeField(final String fieldName) {
final List<RecordField> remainingFields = fields.stream()
.filter(field -> !field.getFieldName().equals(fieldName)).collect(Collectors.toList());
final List<RecordField> remainingFields = new ArrayList<>();
for (final RecordField field : fields) {
if (!field.getFieldName().equals(fieldName)) {
remainingFields.add(field);
}
}

if (remainingFields.size() != fields.size()) {
fields = null;
setFields(remainingFields);
text.set(createText(fields));
textAvailable = true;
schemaFormat = null;
schemaIdentifier = SchemaIdentifier.EMPTY;
hashCode = 0; // set to 0 to trigger re-calculation
hashCode = hashCode();
resetFields(remainingFields);
}
}

Expand All @@ -288,6 +289,41 @@ public void removePath(final RecordFieldRemovalPath path) {
}
}


@Override
public boolean renameField(final String currentName, final String newName) {
final List<RecordField> updatedFields = new ArrayList<>(fields.size());

boolean renamed = false;
for (final RecordField recordField : fields) {
if (recordField.getFieldName().equals(currentName)) {
final RecordField updated = new RecordField(newName, recordField.getDataType(), recordField.getDefaultValue(), recordField.getAliases(), recordField.isNullable());
updatedFields.add(updated);
renamed = true;
} else {
updatedFields.add(recordField);
}
}

if (!renamed) {
return false;
}

resetFields(updatedFields);
return true;
}

private void resetFields(final List<RecordField> updatedFields) {
this.fields = null;
setFields(updatedFields);
textAvailable = false;
text.set(null);
schemaFormat = null;
schemaIdentifier = SchemaIdentifier.EMPTY;
hashCode = calculateHashCode();
}


@Override
public boolean isRecursive() {
return getFields().stream().anyMatch(field -> field.getDataType().isRecursive(Collections.singletonList(this)));
Expand Down
Loading

0 comments on commit 3ae0eed

Please sign in to comment.