Skip to content

Commit

Permalink
NIFI-8135 allow CHOICE data types in conversion of Records to Java Maps
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #7746
  • Loading branch information
ChrisSamo632 authored and mattyb149 committed Oct 19, 2023
1 parent d4014c7 commit aac71c5
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ public static Map<String, Object> toMap(final Object value, final String fieldNa
for (final Object key : original.keySet()) {
if (!(key instanceof String)) {
keysAreStrings = false;
break;
}
}

Expand Down Expand Up @@ -854,79 +855,82 @@ public static Map<String, Object> toMap(final Object value, final String fieldNa
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static Object convertRecordFieldtoObject(final Object value, final DataType dataType) {

if (value == null) {
return null;
}

DataType chosenDataType;
if (dataType instanceof ChoiceDataType) {
final DataType chosen = chooseDataType(value, (ChoiceDataType) dataType);
chosenDataType = chosen != null ? chosen : dataType;
} else {
chosenDataType = dataType;
}

if (value instanceof Record) {
Record record = (Record) value;
RecordSchema recordSchema = record.getSchema();
final Record record = (Record) value;
final RecordSchema recordSchema = record.getSchema();
if (recordSchema == null) {
throw new IllegalTypeConversionException("Cannot convert value of type Record to Map because Record does not have an associated Schema");
}

final Map<String, Object> recordMap = new LinkedHashMap<>();
for (RecordField field : recordSchema.getFields()) {
final DataType fieldDataType = field.getDataType();
final Map<String, Object> recordMap = new LinkedHashMap<>(record.getRawFieldNames().size(), 1);
for (final RecordField field : recordSchema.getFields()) {
final String fieldName = field.getFieldName();
Object fieldValue = record.getValue(fieldName);
final Object fieldValue = record.getValue(fieldName);
if (field.getDataType() instanceof ChoiceDataType) {
final DataType chosen = chooseDataType(fieldValue, (ChoiceDataType) field.getDataType());
chosenDataType = chosen != null ? chosen : field.getDataType();
} else {
chosenDataType = field.getDataType();
}

if (fieldValue == null) {
recordMap.put(fieldName, null);
} else if (isScalarValue(fieldDataType, fieldValue)) {
} else if (isScalarValue(chosenDataType, fieldValue)) {
recordMap.put(fieldName, fieldValue);
} else if (fieldDataType instanceof RecordDataType) {
} else if (chosenDataType instanceof RecordDataType) {
Record nestedRecord = (Record) fieldValue;
recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, fieldDataType));
} else if (fieldDataType instanceof MapDataType) {
recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType)fieldDataType).getValueType()));

} else if (fieldDataType instanceof ArrayDataType) {
recordMap.put(fieldName, convertRecordArrayToJavaArray((Object[])fieldValue, ((ArrayDataType) fieldDataType).getElementType()));
recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, chosenDataType));
} else if (chosenDataType instanceof MapDataType) {
recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType) chosenDataType).getValueType()));
} else if (chosenDataType instanceof ArrayDataType) {
recordMap.put(fieldName, convertRecordArrayToJavaArray((Object[]) fieldValue, ((ArrayDataType) chosenDataType).getElementType()));
} else {
throw new IllegalTypeConversionException("Cannot convert value [" + fieldValue + "] of type " + fieldDataType.toString()
throw new IllegalTypeConversionException("Cannot convert value [" + fieldValue + "] of type " + chosenDataType
+ " to Map for field " + fieldName + " because the type is not supported");
}
}
return recordMap;
} else if (value instanceof Map) {
return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType());
} else if (dataType != null && isScalarValue(dataType, value)) {
return convertRecordMapToJavaMap((Map) value, ((MapDataType) chosenDataType).getValueType());
} else if (chosenDataType != null && isScalarValue(chosenDataType, value)) {
return value;
} else if (value instanceof Object[] && dataType instanceof ArrayDataType) {
} else if (value instanceof Object[] && chosenDataType instanceof ArrayDataType) {
// This is likely a Map whose values are represented as an array. Return a new array with each element converted to a Java object
return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) dataType).getElementType());
return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) chosenDataType).getElementType());
}

throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported");
}


public static Map<String, Object> convertRecordMapToJavaMap(final Map<String, Object> map, DataType valueDataType) {

public static Map<String, Object> convertRecordMapToJavaMap(final Map<String, Object> map, final DataType valueDataType) {
if (map == null) {
return null;
}

Map<String, Object> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
final Map<String, Object> resultMap = new LinkedHashMap<>();
for (final Map.Entry<String, Object> entry : map.entrySet()) {
resultMap.put(entry.getKey(), convertRecordFieldtoObject(entry.getValue(), valueDataType));
}
return resultMap;
}

public static Object[] convertRecordArrayToJavaArray(final Object[] array, DataType elementDataType) {

if (array == null || array.length == 0 || isScalarValue(elementDataType, array[0])) {
public static Object[] convertRecordArrayToJavaArray(final Object[] array, final DataType elementDataType) {
if (array == null || array.length == 0 || Arrays.stream(array).allMatch(o -> isScalarValue(elementDataType, o))) {
return array;
} else {
// Must be an array of complex types, build an array of converted values
Object[] resultArray = new Object[array.length];
for (int i = 0; i < array.length; i++) {
resultArray[i] = convertRecordFieldtoObject(array[i], elementDataType);
}
return resultArray;
return Arrays.stream(array).map(o -> convertRecordFieldtoObject(o, elementDataType)).toArray();
}
}

Expand Down Expand Up @@ -1089,7 +1093,7 @@ private static Object toEnum(Object value, EnumDataType dataType, String fieldNa
if(dataType.getEnums() != null && dataType.getEnums().contains(value)) {
return value.toString();
}
throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType.toString() + " for field " + fieldName);
throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType + " for field " + fieldName);
}

public static java.sql.Date toDate(final Object value, final Supplier<DateFormat> format, final String fieldName) {
Expand Down Expand Up @@ -1933,11 +1937,7 @@ private static boolean isMergeRequired(final RecordField thisField, final Record
return true;
}

if (!Objects.equals(thisField.getDefaultValue(), otherField.getDefaultValue())) {
return true;
}

return false;
return !Objects.equals(thisField.getDefaultValue(), otherField.getDefaultValue());
}

public static RecordField merge(final RecordField thisField, final RecordField otherField) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,30 @@ public void testConvertArrayOfRecordsToJavaArray() {
}
}

@Test
void testConvertRecordFieldToObjectWithNestedRecord() {
final Record record = DataTypeUtils.toRecord(new LinkedHashMap<String, Object>(){{
put("firstName", "John");
put("age", 30);
put("addresses", new Object[] {"some string", DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake Street"), "addresses")});
}}, "");

final Object obj = DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getDataType());
assertTrue(obj instanceof Map);
final Map<String, Object> map = (Map<String, Object>) obj;
assertEquals("John", map.get("firstName"));
assertEquals(30, map.get("age"));

assertTrue(map.get("addresses") instanceof Object[]);
final Object[] objArray = (Object[]) map.get("addresses");
assertEquals(2, objArray.length);
assertEquals("some string", objArray[0]);

assertTrue(objArray[1] instanceof Map);
final Map<String, Object> addressMap = (Map<String, Object>) objArray[1];
assertEquals("123 Fake Street", addressMap.get("address_1"));
}

@Test
@SuppressWarnings("unchecked")
public void testConvertRecordFieldToObject() {
Expand All @@ -243,12 +267,18 @@ public void testConvertRecordFieldToObject() {
fields.add(new RecordField("noDefault", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType())));
fields.add(new RecordField("intField", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("intArray", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
fields.add(new RecordField("objArray", RecordFieldType.ARRAY.getArrayDataType(
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType())
)));
fields.add(new RecordField("choiceArray", RecordFieldType.ARRAY.getArrayDataType(
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))
)));

// Map of Records with Arrays
List<RecordField> nestedRecordFields = new ArrayList<>();
final List<RecordField> nestedRecordFields = new ArrayList<>();
nestedRecordFields.add(new RecordField("a", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
nestedRecordFields.add(new RecordField("b", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
RecordSchema nestedRecordSchema = new SimpleRecordSchema(nestedRecordFields);
final RecordSchema nestedRecordSchema = new SimpleRecordSchema(nestedRecordFields);

fields.add(new RecordField("complex", RecordFieldType.MAP.getMapDataType(RecordFieldType.RECORD.getRecordDataType(nestedRecordSchema))));

Expand All @@ -257,6 +287,9 @@ public void testConvertRecordFieldToObject() {
values.put("noDefault", "world");
values.put("intField", 5);
values.put("intArray", new Integer[] {3,2,1});
values.put("objArray", new Object[] {3,"2","abc",1});
values.put("noChoiceArray", new Object[] {"foo","BAR"});
values.put("choiceArray", new Object[] {"foo",new Object[]{"bar","baz"}});
final Map<String, Object> complexValues = new HashMap<>();

final Map<String, Object> complexValueRecord1 = new HashMap<>();
Expand All @@ -275,22 +308,38 @@ public void testConvertRecordFieldToObject() {

Object o = DataTypeUtils.convertRecordFieldtoObject(inputRecord, RecordFieldType.RECORD.getRecordDataType(schema));
assertTrue(o instanceof Map);
Map<String,Object> outputMap = (Map<String,Object>) o;
final Map<String,Object> outputMap = (Map<String,Object>) o;
assertEquals("hello", outputMap.get("defaultOfHello"));
assertEquals("world", outputMap.get("noDefault"));
o = outputMap.get("intField");
assertEquals(5,o);
o = outputMap.get("intArray");
assertTrue(o instanceof Integer[]);
Integer[] intArray = (Integer[])o;
final Integer[] intArray = (Integer[])o;
assertEquals(3, intArray.length);
assertEquals((Integer)3, intArray[0]);
o = outputMap.get("objArray");
assertTrue(o instanceof Object[]);
final Object[] objArray = (Object[])o;
assertEquals(4, objArray.length);
assertEquals(3, objArray[0]);
assertEquals("2", objArray[1]);
o = outputMap.get("choiceArray");
assertTrue(o instanceof Object[]);
final Object[] choiceArray = (Object[])o;
assertEquals(2, choiceArray.length);
assertEquals("foo", choiceArray[0]);
assertTrue(choiceArray[1] instanceof Object[]);
final Object[] strArray = (Object[]) choiceArray[1];
assertEquals(2, strArray.length);
assertEquals("bar", strArray[0]);
assertEquals("baz", strArray[1]);
o = outputMap.get("complex");
assertTrue(o instanceof Map);
Map<String,Object> nestedOutputMap = (Map<String,Object>)o;
final Map<String,Object> nestedOutputMap = (Map<String,Object>)o;
o = nestedOutputMap.get("complex1");
assertTrue(o instanceof Map);
Map<String,Object> complex1 = (Map<String,Object>)o;
final Map<String,Object> complex1 = (Map<String,Object>)o;
o = complex1.get("a");
assertTrue(o instanceof Integer[]);
assertEquals((Integer)2, ((Integer[])o)[1]);
Expand All @@ -299,14 +348,13 @@ public void testConvertRecordFieldToObject() {
assertEquals((Integer)3, ((Integer[])o)[2]);
o = nestedOutputMap.get("complex2");
assertTrue(o instanceof Map);
Map<String,Object> complex2 = (Map<String,Object>)o;
final Map<String,Object> complex2 = (Map<String,Object>)o;
o = complex2.get("a");
assertTrue(o instanceof String[]);
assertEquals("hello", ((String[])o)[0]);
o = complex2.get("b");
assertTrue(o instanceof String[]);
assertEquals("4", ((String[])o)[1]);

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
<exclude>src/test/resources/TestJoltTransformRecord/cardrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/defaultrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/defaultrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/flattenSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/flattenedOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/shiftrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/sortrOutput.json</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
Expand Down Expand Up @@ -690,14 +692,41 @@ public void testJoltSpecInvalidEL() {
runner.assertNotValid();
}

@Test
public void testJoltComplexChoiceField() throws Exception {
final JsonTreeReader reader = new JsonTreeReader();
runner.addControllerService("reader", reader);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
runner.enableControllerService(reader);
runner.setProperty(JoltTransformRecord.RECORD_READER, "reader");

runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.setProperty(writer, "Pretty Print JSON", "true");
runner.enableControllerService(writer);

final String flattenSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/flattenSpec.json")));
runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec);
runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CHAINR);

final String inputJson = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/input.json")));
runner.enqueue(inputJson);

runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);

final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/flattenedOutput.json"))),
new String(transformed.toByteArray()));
}

private static Stream<Arguments> getChainrArguments() {
return Stream.of(
Arguments.of(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"), "has no single line comments"),
Arguments.of(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpecWithSingleLineComment.json"), "has a single line comment"));
}

private void generateTestData(int numRecords, final BiFunction<Integer, MockRecordParser, Void> recordGenerator) {

if (recordGenerator == null) {
final RecordSchema primarySchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("value", RecordFieldType.INT.getDataType())));
Expand Down Expand Up @@ -734,8 +763,6 @@ private void generateTestData(int numRecords, final BiFunction<Integer, MockReco

parser.addRecord(ratingRecord);
}


} else {
recordGenerator.apply(numRecords, parser);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[
{
"operation": "shift",
"spec": { "*": "record.&" }
},
{
"operation": "shift",
"spec": {
"record": {
"*": {
"$": "TValue[#2].name",
"@": "TValue[#2].value"
}
}
}
},
{
"operation": "default",
"spec": { "TValue[]": { "*": { "class": "unclass" } } }
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[ {
"TValue" : [ {
"name" : "datetime",
"value" : "2023-10-06 20:36:09.937019+00:00",
"class" : "unclass"
}, {
"name" : "Eta",
"value" : "",
"class" : "unclass"
} ]
}, {
"TValue" : [ {
"name" : "datetime",
"value" : "2023-08-24 17:07:03.334170+00:00",
"class" : "unclass"
}, {
"name" : "Eta",
"value" : "{Day=15, Hour=6, Minute=0, Month=8}",
"class" : "unclass"
} ]
} ]
Loading

0 comments on commit aac71c5

Please sign in to comment.