Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-14145: Added new arrayOf RecordPath function #9621

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.record.path.functions;

import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

public class ArrayOf extends RecordPathSegment {
private final RecordPathSegment[] elementPaths;

public ArrayOf(final RecordPathSegment[] elementPaths, final boolean absolute) {
super("arrayOf", null, absolute);
this.elementPaths = elementPaths;
}

@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final List<Object> values = new ArrayList<>();
final List<FieldValue> fieldValues = new ArrayList<>();

for (final RecordPathSegment elementPath : elementPaths) {
final Stream<FieldValue> stream = elementPath.evaluate(context);
stream.forEach(fv -> {
fieldValues.add(fv);
values.add(fv.getValue());
});
}

if (fieldValues.isEmpty()) {
return Stream.of();
}

DataType merged = null;
for (final FieldValue fieldValue : fieldValues) {
final DataType dataType = getDataType(fieldValue);
if (merged == null) {
merged = dataType;
continue;
}

merged = DataTypeUtils.mergeDataTypes(merged, dataType);
}

final Object[] array = values.toArray();
final RecordField field = new RecordField("arrayOf", merged);
final FieldValue fieldValue = new StandardFieldValue(array, field, null);
return Stream.of(fieldValue);
}

private DataType getDataType(final FieldValue fieldValue) {
final RecordField recordField = fieldValue.getField();
if (recordField != null) {
return recordField.getDataType();
} else {
return DataTypeUtils.inferDataType(fieldValue.getValue(), RecordFieldType.STRING.getDataType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
Expand Down Expand Up @@ -69,13 +70,23 @@ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {

if (value instanceof String) {
try {
DataType dataType = fv.getField().getDataType();
if (fv.getField().getDataType() instanceof ChoiceDataType) {
dataType = DataTypeUtils.chooseDataType(value, (ChoiceDataType) fv.getField().getDataType());
final RecordField recordField = fv.getField();
DataType dataType;
final String fieldName;
if (recordField == null) {
dataType = DataTypeUtils.inferDataType(fv.getValue(), RecordFieldType.STRING.getDataType());
fieldName = "unescapeJson";
} else {
dataType = recordField.getDataType();
fieldName = recordField.getFieldName();
}

if (dataType.getFieldType() == RecordFieldType.CHOICE) {
dataType = DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType);
}

return new StandardFieldValue(
convertFieldValue(value, fv.getField().getFieldName(), dataType, convertMapToRecord, recursiveMapToRecord),
convertFieldValue(value, fieldName, dataType, convertMapToRecord, recursiveMapToRecord),
fv.getField(), fv.getParent().orElse(null)
);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.nifi.record.path.filter.RecordPathFilter;
import org.apache.nifi.record.path.filter.StartsWith;
import org.apache.nifi.record.path.functions.Anchored;
import org.apache.nifi.record.path.functions.ArrayOf;
import org.apache.nifi.record.path.functions.Base64Decode;
import org.apache.nifi.record.path.functions.Base64Encode;
import org.apache.nifi.record.path.functions.Coalesce;
Expand Down Expand Up @@ -278,6 +279,16 @@ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegme

return new Concat(argPaths, absolute);
}
case "arrayOf": {
final int numArgs = argumentListTree.getChildCount();

final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs];
for (int i = 0; i < numArgs; i++) {
argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
}

return new ArrayOf(argPaths, absolute);
}
case "mapOf": {
final int numArgs = argumentListTree.getChildCount();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

@SuppressWarnings({"SameParameterValue"})
public class TestRecordPath {
Expand Down Expand Up @@ -906,6 +908,98 @@ public void canReferenceRecordRootInStandaloneFunction() {
assertEquals(Map.of("copy", record.toString()), multipleArgumentsFieldValue.getValue());
}

@Nested
class ArrayOf {

@Test
public void testSimpleArrayOfValues() {
final RecordPath recordPath = RecordPath.compile("arrayOf( 'a', 'b', 'c' )");
final RecordPathResult result = recordPath.evaluate(record);
final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue();

assertInstanceOf(Object[].class, resultValue);
assertArrayEquals(new Object[] {"a", "b", "c"}, (Object[]) resultValue);

}

@Test
public void testAppendString() {
final RecordPath recordPath = RecordPath.compile("arrayOf( /friends[*], 'Junior' )");
final RecordPathResult result = recordPath.evaluate(record);
final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue();

assertInstanceOf(Object[].class, resultValue);
assertArrayEquals(new Object[] {"John", "Jane", "Jacob", "Judy", "Junior"}, (Object[]) resultValue);
}

@Test
public void testAppendSingleRecord() {
final RecordPath recordPath = RecordPath.compile("arrayOf( /accounts[*], recordOf('id', '5555', 'balance', '123.45') )");
final RecordPathResult result = recordPath.evaluate(record);
final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue();

assertInstanceOf(Object[].class, resultValue);
final Object[] values = (Object[]) resultValue;
assertEquals(3, values.length);

assertInstanceOf(Record.class, values[2]);
final Record added = (Record) values[2];
assertEquals("5555", added.getValue("id"));
assertEquals("123.45", added.getValue("balance"));
}

@Test
public void testPrependSingleRecord() {
final RecordPath recordPath = RecordPath.compile("arrayOf( recordOf('id', '5555', 'balance', '123.45'), /accounts[*] )");
final RecordPathResult result = recordPath.evaluate(record);
final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue();

assertInstanceOf(Object[].class, resultValue);
final Object[] values = (Object[]) resultValue;
assertEquals(3, values.length);

assertInstanceOf(Record.class, values[0]);
final Record added = (Record) values[0];
assertEquals("5555", added.getValue("id"));
assertEquals("123.45", added.getValue("balance"));
}


@Test
public void testAppendMultipleValues() {
final RecordPath recordPath = RecordPath.compile("arrayOf( /accounts[*], recordOf('id', '5555', 'balance', '123.45'), /accounts[0] )");
final RecordPathResult result = recordPath.evaluate(record);
final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue();

assertInstanceOf(Object[].class, resultValue);
final Object[] values = (Object[]) resultValue;
assertEquals(4, values.length);

assertInstanceOf(Record.class, values[2]);
final Record added = (Record) values[2];
assertEquals("5555", added.getValue("id"));
assertEquals("123.45", added.getValue("balance"));

assertSame(values[0], values[3]);
}

@Test
public void testWithUnescapeJson() {
final RecordPath recordPath = RecordPath.compile("arrayOf( /accounts[*], unescapeJson('{\"id\": 5555, \"balance\": 123.45}', 'true') )");
final RecordPathResult result = recordPath.evaluate(record);
final Object resultValue = result.getSelectedFields().findFirst().orElseThrow().getValue();

assertInstanceOf(Object[].class, resultValue);
final Object[] values = (Object[]) resultValue;
assertEquals(3, values.length);

assertInstanceOf(Record.class, values[2]);
final Record added = (Record) values[2];
assertEquals(5555, added.getValue("id"));
assertEquals(123.45, added.getValue("balance"));
}
}

@Nested
class Anchored {
@Test
Expand Down
56 changes: 56 additions & 0 deletions nifi-docs/src/main/asciidoc/record-path-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,62 @@ Each pair of arguments resembles a field in the new record.
Every odd argument, the first one of each pair, is used as field name and coerced into a String value.
Every even argument, the second one of each pair, is used as field value.


=== arrayOf

Creates an array from multiple values.

```
{
"id": "1234",
"elements": [{
"name": "book",
"color": "red"
}, {
"name": "computer",
"color": "black"
}]
}
```

We could make use of the `arrayOf` function, in conjunction with the `unescapeJson` function to append a new element to the `elements` array.
Given this example input, a RecordPath of `arrayOf(/elements[*], unescapeJson('{"name":"phone","color":"blue"}'))` would return the following array:

```
[{
"name": "book",
"color": "red"
}, {
"name": "computer",
"color": "black"
}, {
"name": "phone",
"color": "blue"
}]
```

We may also use the `arrayOf` function for appending multiple elements to an array. Given the input record:

```
{
"name": "John Doe",
"friends": ["Jane", "Jack", "Jill"]
}
```

A RecordPath of `arrayOf(/friends[*], 'Joe', 'Jim', 'Jeremy')` would return the following array:

```
["Jane", "Jack", "Jill", "Joe", "Jim", "Jeremy"]
```

We can also simply create an array from arbitrary values, such as `arrayOf('good-bye', 'adios', 'au revoir')` which would return the following array:

```
["good-bye", "adios", "au revoir"]
```


[[filter_functions]]
== Filter Functions

Expand Down
Loading