From d0bfdb64ceb2fe4d3274bfdf7fe9a425bc1a80ea Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 9 Jan 2025 21:44:02 -0500 Subject: [PATCH] NIFI-14145: Added new arrayOf RecordPath function --- .../nifi/record/path/functions/ArrayOf.java | 83 ++++++++++++++++ .../record/path/functions/UnescapeJson.java | 19 +++- .../record/path/paths/RecordPathCompiler.java | 11 +++ .../nifi/record/path/TestRecordPath.java | 94 +++++++++++++++++++ .../src/main/asciidoc/record-path-guide.adoc | 56 +++++++++++ 5 files changed, 259 insertions(+), 4 deletions(-) create mode 100644 nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ArrayOf.java diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ArrayOf.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ArrayOf.java new file mode 100644 index 000000000000..294b1f65d8b3 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ArrayOf.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +public class ArrayOf extends RecordPathSegment { + private final RecordPathSegment[] elementPaths; + + public ArrayOf(final RecordPathSegment[] elementPaths, final boolean absolute) { + super("arrayOf", null, absolute); + this.elementPaths = elementPaths; + } + + @Override + public Stream evaluate(final RecordPathEvaluationContext context) { + final List values = new ArrayList<>(); + final List fieldValues = new ArrayList<>(); + + for (final RecordPathSegment elementPath : elementPaths) { + final Stream stream = elementPath.evaluate(context); + stream.forEach(fv -> { + fieldValues.add(fv); + values.add(fv.getValue()); + }); + } + + if (fieldValues.isEmpty()) { + return Stream.of(); + } + + DataType merged = null; + for (final FieldValue fieldValue : fieldValues) { + final DataType dataType = getDataType(fieldValue); + if (merged == null) { + merged = dataType; + continue; + } + + merged = DataTypeUtils.mergeDataTypes(merged, dataType); + } + + final Object[] array = values.toArray(); + final RecordField field = new RecordField("arrayOf", merged); + final FieldValue fieldValue = new StandardFieldValue(array, field, null); + return Stream.of(fieldValue); + } + + private DataType getDataType(final FieldValue fieldValue) { + final RecordField recordField = fieldValue.getField(); + if (recordField != null) { + return recordField.getDataType(); + } else { + return DataTypeUtils.inferDataType(fieldValue.getValue(), RecordFieldType.STRING.getDataType()); + } + } +} diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java index d5e821a44b87..cad9f24dd86f 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java @@ -26,6 +26,7 @@ import org.apache.nifi.record.path.util.RecordPathUtils; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; @@ -69,13 +70,23 @@ public Stream evaluate(final RecordPathEvaluationContext context) { if (value instanceof String) { try { - DataType dataType = fv.getField().getDataType(); - if (fv.getField().getDataType() instanceof ChoiceDataType) { - dataType = DataTypeUtils.chooseDataType(value, (ChoiceDataType) fv.getField().getDataType()); + final RecordField recordField = fv.getField(); + DataType dataType; + final String fieldName; + if (recordField == null) { + dataType = DataTypeUtils.inferDataType(fv.getValue(), RecordFieldType.STRING.getDataType()); + fieldName = "unescapeJson"; + } else { + dataType = recordField.getDataType(); + fieldName = recordField.getFieldName(); + } + + if (dataType.getFieldType() == RecordFieldType.CHOICE) { + dataType = DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType); } return new StandardFieldValue( - convertFieldValue(value, fv.getField().getFieldName(), dataType, convertMapToRecord, recursiveMapToRecord), + convertFieldValue(value, fieldName, dataType, convertMapToRecord, recursiveMapToRecord), fv.getField(), fv.getParent().orElse(null) ); } catch (IOException e) { diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java index 191990f4f1d3..a83eee6d9f93 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -36,6 +36,7 @@ import org.apache.nifi.record.path.filter.RecordPathFilter; import org.apache.nifi.record.path.filter.StartsWith; import org.apache.nifi.record.path.functions.Anchored; +import org.apache.nifi.record.path.functions.ArrayOf; import org.apache.nifi.record.path.functions.Base64Decode; import org.apache.nifi.record.path.functions.Base64Encode; import org.apache.nifi.record.path.functions.Coalesce; @@ -278,6 +279,16 @@ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegme return new Concat(argPaths, absolute); } + case "arrayOf": { + final int numArgs = argumentListTree.getChildCount(); + + final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs]; + for (int i = 0; i < numArgs; i++) { + argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute); + } + + return new ArrayOf(argPaths, absolute); + } case "mapOf": { final int numArgs = argumentListTree.getChildCount(); diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java index 1131da1e1445..f92c200aac7f 100644 --- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java +++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java @@ -67,7 +67,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; @SuppressWarnings({"SameParameterValue"}) public class TestRecordPath { @@ -906,6 +908,98 @@ public void canReferenceRecordRootInStandaloneFunction() { assertEquals(Map.of("copy", record.toString()), multipleArgumentsFieldValue.getValue()); } + @Nested + class ArrayOf { + + @Test + public void testSimpleArrayOfValues() { + final RecordPath recordPath = RecordPath.compile("arrayOf( 'a', 'b', 'c' )"); + final RecordPathResult result = recordPath.evaluate(record); + final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue(); + + assertInstanceOf(Object[].class, resultValue); + assertArrayEquals(new Object[] {"a", "b", "c"}, (Object[]) resultValue); + + } + + @Test + public void testAppendString() { + final RecordPath recordPath = RecordPath.compile("arrayOf( /friends[*], 'Junior' )"); + final RecordPathResult result = recordPath.evaluate(record); + final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue(); + + assertInstanceOf(Object[].class, resultValue); + assertArrayEquals(new Object[] {"John", "Jane", "Jacob", "Judy", "Junior"}, (Object[]) resultValue); + } + + @Test + public void testAppendSingleRecord() { + final RecordPath recordPath = RecordPath.compile("arrayOf( /accounts[*], recordOf('id', '5555', 'balance', '123.45') )"); + final RecordPathResult result = recordPath.evaluate(record); + final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue(); + + assertInstanceOf(Object[].class, resultValue); + final Object[] values = (Object[]) resultValue; + assertEquals(3, values.length); + + assertInstanceOf(Record.class, values[2]); + final Record added = (Record) values[2]; + assertEquals("5555", added.getValue("id")); + assertEquals("123.45", added.getValue("balance")); + } + + @Test + public void testPrependSingleRecord() { + final RecordPath recordPath = RecordPath.compile("arrayOf( recordOf('id', '5555', 'balance', '123.45'), /accounts[*] )"); + final RecordPathResult result = recordPath.evaluate(record); + final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue(); + + assertInstanceOf(Object[].class, resultValue); + final Object[] values = (Object[]) resultValue; + assertEquals(3, values.length); + + assertInstanceOf(Record.class, values[0]); + final Record added = (Record) values[0]; + assertEquals("5555", added.getValue("id")); + assertEquals("123.45", added.getValue("balance")); + } + + + @Test + public void testAppendMultipleValues() { + final RecordPath recordPath = RecordPath.compile("arrayOf( /accounts[*], recordOf('id', '5555', 'balance', '123.45'), /accounts[0] )"); + final RecordPathResult result = recordPath.evaluate(record); + final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue(); + + assertInstanceOf(Object[].class, resultValue); + final Object[] values = (Object[]) resultValue; + assertEquals(4, values.length); + + assertInstanceOf(Record.class, values[2]); + final Record added = (Record) values[2]; + assertEquals("5555", added.getValue("id")); + assertEquals("123.45", added.getValue("balance")); + + assertSame(values[0], values[3]); + } + + @Test + public void testWithUnescapeJson() { + final RecordPath recordPath = RecordPath.compile("arrayOf( /accounts[*], unescapeJson('{\"id\": 5555, \"balance\": 123.45}', 'true') )"); + final RecordPathResult result = recordPath.evaluate(record); + final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue(); + + assertInstanceOf(Object[].class, resultValue); + final Object[] values = (Object[]) resultValue; + assertEquals(3, values.length); + + assertInstanceOf(Record.class, values[2]); + final Record added = (Record) values[2]; + assertEquals(5555, added.getValue("id")); + assertEquals(123.45, added.getValue("balance")); + } + } + @Nested class Anchored { @Test diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc index ec667ffe974d..89af5e561bcb 100644 --- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc +++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc @@ -1259,6 +1259,62 @@ Each pair of arguments resembles a field in the new record. Every odd argument, the first one of each pair, is used as field name and coerced into a String value. Every even argument, the second one of each pair, is used as field value. + +=== arrayOf + +Creates an array from multiple values. + +``` +{ + "id": "1234", + "elements": [{ + "name": "book", + "color": "red" + }, { + "name": "computer", + "color": "black" + }] +} +``` + +We could make use of the `arrayOf` function, in conjunction with the `unescapeJson` function to append a new element to the `elements` array. +Given this example input, a RecordPath of `arrayOf(/elements[*], unescapeJson('{"name":"phone","color":"blue"}'))` would return the following array: + +``` +[{ + "name": "book", + "color": "red" +}, { + "name": "computer", + "color": "black" +}, { + "name": "phone", + "color": "blue" +}] +``` + +We may also use the `arrayOf` function for appending multiple elements to an array. Given the input record: + +``` +{ + "name": "John Doe", + "friends": ["Jane", "Jack", "Jill"] +} +``` + +A RecordPath of `arrayOf(/friends[*], 'Joe', 'Jim', 'Jeremy')` would return the following array: + +``` +["Jane", "Jack", "Jill", "Joe", "Jim", "Jeremy"] +``` + +We can also simply create an array from arbitrary values, such as `arrayOf('good-bye', 'adios', 'au revoir')` which would return the following array: + +``` +["good-bye", "adios", "au revoir"] +``` + + [[filter_functions]] == Filter Functions