Skip to content

Commit

Permalink
NIFI-13630 Handle Map Avro Type in PutBigQuery
Browse files Browse the repository at this point in the history
This closes #9151

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
juldrixx authored and exceptionfactory committed Aug 17, 2024
1 parent d4344a3 commit 9fbe6aa
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,20 @@ public static DynamicMessage createMessage(Descriptors.Descriptor descriptor, Ma
switch (field.getType()) {
case MESSAGE:
if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), (Map<String, Object>) act, tableSchema)));
final Collection<Map<String, Object>> valueMaps;
if (value instanceof Object[] arrayValue) {
valueMaps = Arrays.stream(arrayValue)
.map(item -> (Map<String, Object>) item).toList();
} else if (value instanceof Map<?, ?> mapValue) {
valueMaps = mapValue.entrySet().stream()
.map(entry -> Map.of(
"key", entry.getKey(),
"value", entry.getValue()
)).toList();
} else {
valueMaps = (Collection<Map<String, Object>>) value;
}
valueMaps.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), act, tableSchema)));
} else {
builder.setField(field, createMessage(field.getMessageType(), (Map<String, Object>) value, tableSchema));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
Expand Down Expand Up @@ -457,6 +458,26 @@ void testNextFlowFileProcessedWhenIntermittentErrorResolved() {
runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 1);
}

@Test
void testMapFieldSchema() throws Exception {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);

TableSchema myTableSchema = mockJsonTableSchema();

when(writeStream.getTableSchema()).thenReturn(myTableSchema);

when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));

decorateWithJsonRecordReaderWithSchema(runner);
runner.setProperty(PutBigQuery.RECORD_READER, "jsonReader");

runner.enqueue(jsonContent());
runner.run();

runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
}

private void decorateWithRecordReader(TestRunner runner) throws InitializationException {
CSVReader csvReader = new CSVReader();
runner.addControllerService("csvReader", csvReader);
Expand Down Expand Up @@ -484,6 +505,30 @@ private void decorateWithRecordReaderWithSchema(TestRunner runner) throws Initia
runner.enableControllerService(csvReader);
}

private void decorateWithJsonRecordReaderWithSchema(TestRunner runner) throws InitializationException {
String recordReaderSchema = """
{
"name": "recordFormatName",
"namespace": "nifi.examples",
"type": "record",
"fields": [
{
"name": "field",
"type": {
"type": "map",
"values": "string"
}
}
]
}""";

JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("jsonReader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordReaderSchema);
runner.enableControllerService(jsonReader);
}

private TableSchema mockTableSchema(String name1, TableFieldSchema.Type type1, String name2, TableFieldSchema.Type type2) {
TableSchema myTableSchema = mock(TableSchema.class);

Expand All @@ -503,6 +548,30 @@ private TableSchema mockTableSchema(String name1, TableFieldSchema.Type type1, S
return myTableSchema;
}

private TableSchema mockJsonTableSchema() {
TableSchema myTableSchema = mock(TableSchema.class);

TableFieldSchema keyFieldSchema = mock(TableFieldSchema.class);
when(keyFieldSchema.getName()).thenReturn("key");
when(keyFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING);
when(keyFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.REQUIRED);

TableFieldSchema valueFieldSchema = mock(TableFieldSchema.class);
when(valueFieldSchema.getName()).thenReturn("value");
when(valueFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING);
when(valueFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.NULLABLE);

TableFieldSchema tableFieldSchemaId = mock(TableFieldSchema.class);
when(tableFieldSchemaId.getName()).thenReturn("field");
when(tableFieldSchemaId.getType()).thenReturn(TableFieldSchema.Type.STRUCT);
when(tableFieldSchemaId.getMode()).thenReturn(TableFieldSchema.Mode.REPEATED);
when(tableFieldSchemaId.getFieldsList()).thenReturn(List.of(keyFieldSchema, valueFieldSchema));

when(myTableSchema.getFieldsList()).thenReturn(List.of(tableFieldSchemaId));

return myTableSchema;
}

private String csvContentWithLines(int lineNum) {
StringBuilder builder = new StringBuilder();
builder.append(CSV_HEADER);
Expand All @@ -516,4 +585,17 @@ private String csvContentWithLines(int lineNum) {

return builder.toString();
}

private String jsonContent() {
return """
{
"field": {
"FIELD_1": "field_1",
"FIELD_2": "field_2",
"FIELD_3": "field_3",
"FIELD_4": "field_4",
"FIELD_5": "field_5"
}
}""";
}
}

0 comments on commit 9fbe6aa

Please sign in to comment.