Skip to content

Commit

Permalink
NIFI-8932: Moved common code to base class for use by all CSVRecordRe…
Browse files Browse the repository at this point in the history
…ader implementations, updated unit tests
  • Loading branch information
mattyb149 committed Nov 24, 2023
1 parent acef7f0 commit 652904e
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@

package org.apache.nifi.csv;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.io.input.BOMInputStream;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.text.DateFormat;
import java.util.function.Supplier;

Expand All @@ -45,8 +52,12 @@ abstract public class AbstractCSVRecordReader implements RecordReader {

protected final RecordSchema schema;

AbstractCSVRecordReader(final ComponentLog logger, final RecordSchema schema, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final boolean trimDoubleQuote, final int skipTopRows) {
protected final Reader inputStreamReader;

AbstractCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote, final int skipTopRows)
throws IOException {

this.logger = logger;
this.schema = schema;
this.hasHeader = hasHeader;
Expand Down Expand Up @@ -77,6 +88,14 @@ abstract public class AbstractCSVRecordReader implements RecordReader {
this.timestampFormat = timestampFormat;
LAZY_TIMESTAMP_FORMAT = () -> DataTypeUtils.getDateFormat(timestampFormat);
}

final InputStream bomInputStream = BOMInputStream.builder().setInputStream(in).get();
inputStreamReader = new InputStreamReader(bomInputStream, encoding);

// Skip the number of rows at the "top" as specified
for (int i = 0; i < skipTopRows; i++) {
readNextRecord(inputStreamReader, csvFormat.getRecordSeparator());
}
}

protected final Object convert(final String value, final DataType dataType, final String fieldName) {
Expand Down Expand Up @@ -161,4 +180,46 @@ protected String trim(String value) {
public RecordSchema getSchema() {
return schema;
}

/**
* This method searches using the specified Reader character-by-character until the
* record separator is found.
* @param reader the Reader providing the input
* @param recordSeparator the String specifying the end of a record in the input
* @throws IOException if an error occurs during reading, including not finding the record separator in the input
*/
protected void readNextRecord(Reader reader, String recordSeparator) throws IOException {
int indexIntoSeparator = 0;
int recordSeparatorLength = recordSeparator.length();
int code = reader.read();
while (code != -1) {
char nextChar = (char)code;
if (recordSeparator.charAt(indexIntoSeparator) == nextChar) {
if (++indexIntoSeparator == recordSeparatorLength) {
// We have matched the separator, return the string built so far
return;
}
} else {
// The character didn't match the expected one in the record separator, reset the separator matcher
// and check if it is the first character of the separator.
indexIntoSeparator = 0;
if (recordSeparator.charAt(indexIntoSeparator) == nextChar) {
// This character is the beginning of the record separator, keep it
if (++indexIntoSeparator == recordSeparatorLength) {
// We have matched the separator, return the string built so far
return;
}
}
}
// This defensive check limits a record size to 2GB, this prevents out-of-memory errors if the record separator
// is not present in the input (or at least in the first 2GB)
if (indexIntoSeparator == Integer.MAX_VALUE) {
throw new IOException("2GB input threshold reached, the record is either larger than 2GB or the separator "
+ "is not found in the first 2GB of input. Ensure the Record Separator is correct for this FlowFile.");
}
code = reader.read();
}
// If the input ends without finding the record separator, an exception is thrown
throw new IOException("Record separator not found");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -31,7 +29,6 @@
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.io.input.BOMInputStream;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.DataType;
Expand All @@ -49,15 +46,7 @@ public class CSVRecordReader extends AbstractCSVRecordReader {
public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote, final int skipTopRows)
throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote, skipTopRows);

final InputStream bomInputStream = BOMInputStream.builder().setInputStream(in).get();
final Reader inputStreamReader = new InputStreamReader(bomInputStream, encoding);

// Skip the number of rows at the "top" as specified
for (int i = 0; i < skipTopRows; i++) {
readNextRecord(inputStreamReader, csvFormat.getRecordSeparator());
}
super(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, trimDoubleQuote, skipTopRows);

CSVFormat.Builder withHeader;
if (hasHeader) {
Expand Down Expand Up @@ -128,7 +117,6 @@ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFie
return null;
}


private List<RecordField> getRecordFields() {
if (this.recordFields != null) {
return this.recordFields;
Expand Down Expand Up @@ -159,59 +147,4 @@ private List<RecordField> getRecordFields() {
public void close() throws IOException {
csvParser.close();
}

/**
* This method builds a text representation of the CSV record by searching character-by-character until the
* record separator is found. Because we never want to consume input we don't use, the method attempts to match
* the separator separately, and as it is not matched, the characters are added to the returned string.
* @param reader the Reader providing the input
* @param recordSeparator the String specifying the end of a record in the input
* @return a String created from the input until the record separator is reached.
* @throws IOException if an error occurs during reading
*/
protected String readNextRecord(Reader reader, String recordSeparator) throws IOException {
int indexIntoSeparator = 0;
int recordSeparatorLength = recordSeparator.length();
StringBuilder lineBuilder = new StringBuilder();
StringBuilder separatorBuilder = new StringBuilder();
int code = reader.read();
while (code != -1) {
char nextChar = (char)code;
if (recordSeparator.charAt(indexIntoSeparator) == nextChar) {
separatorBuilder.append(nextChar);
if (++indexIntoSeparator == recordSeparatorLength) {
// We have matched the separator, return the string built so far
lineBuilder.append(separatorBuilder);
return lineBuilder.toString();
}
} else {
// The character didn't match the expected one in the record separator, reset the separator matcher
// and check if it is the first character of the separator.
indexIntoSeparator = 0;
if (recordSeparator.charAt(indexIntoSeparator) == nextChar) {
// This character is the beginning of the record separator, keep it
separatorBuilder = new StringBuilder();
separatorBuilder.append(nextChar);
if (++indexIntoSeparator == recordSeparatorLength) {
// We have matched the separator, return the string built so far
return lineBuilder.toString();
}
} else {
// This character is not the beginning of the record separator, add it to the return string
lineBuilder.append(nextChar);
}
}
// This defensive check limits a record size to 2GB, this prevents out-of-memory errors if the record separator
// is not present in the input (or at least in the first 2GB)
if (indexIntoSeparator == Integer.MAX_VALUE) {
throw new IOException("2GB input threshold reached, the record is either larger than 2GB or the separator "
+ "is not found in the first 2GB of input. Ensure the Record Separator is correct for this FlowFile.");
}
code = reader.read();
}

// The end of input has been reached without the record separator being found, throw an exception with the string so far

return lineBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import de.siegmar.fastcsv.reader.CsvRow;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -46,19 +45,16 @@
public class FastCSVRecordReader extends AbstractCSVRecordReader {
private final CsvReader csvReader;
private final Iterator<CsvRow> csvRowIterator;

private List<RecordField> recordFields;

private Map<String, Integer> headerMap;

private final boolean ignoreHeader;
private final boolean trimDoubleQuote;
private final CSVFormat csvFormat;

public FastCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote,
final int skipTopRows) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote, skipTopRows);
super(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, trimDoubleQuote, skipTopRows);
this.ignoreHeader = ignoreHeader;
this.trimDoubleQuote = trimDoubleQuote;
this.csvFormat = csvFormat;
Expand All @@ -83,7 +79,7 @@ public FastCSVRecordReader(final InputStream in, final ComponentLog logger, fina
}
}

csvReader = builder.build(new InputStreamReader(in, encoding));
csvReader = builder.build(inputStreamReader);
csvRowIterator = csvReader.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -36,7 +34,6 @@
import java.util.Set;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.DuplicateHeaderMode;
import org.apache.commons.io.input.BOMInputStream;
import org.apache.commons.lang3.CharUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.logging.ComponentLog;
Expand All @@ -57,9 +54,7 @@ public class JacksonCSVRecordReader extends AbstractCSVRecordReader {
public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote,
final int skipTopRows) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote, skipTopRows);

final Reader reader = new InputStreamReader(BOMInputStream.builder().setInputStream(in).get(), encoding);
super(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, trimDoubleQuote, skipTopRows);

CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder()
.setColumnSeparator(csvFormat.getDelimiterString().charAt(0))
Expand Down Expand Up @@ -96,7 +91,7 @@ public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, f
.with(csvSchema)
.withFeatures(features.toArray(new CsvParser.Feature[features.size()]));

recordStream = objReader.readValues(reader);
recordStream = objReader.readValues(inputStreamReader);
}

public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class TestFastCSVRecordReader {

@BeforeEach
public void setUp() {
format = CSVFormat.RFC4180;
// Override the record separator to match the input
format = CSVFormat.RFC4180.builder().setRecordSeparator("\n").build();
}

private List<RecordField> getDefaultFields() {
Expand All @@ -68,9 +69,9 @@ private FastCSVRecordReader createReader(final InputStream in, final RecordSchem
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", false, 0);
}

private FastCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format, final boolean trimDoubleQuote) throws IOException {
private FastCSVRecordReader createReader(final InputStream in, final RecordSchema schema, final CSVFormat format, final boolean trimDoubleQuote, final int skipTopRows) throws IOException {
return new FastCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", trimDoubleQuote, 0);
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", trimDoubleQuote, skipTopRows);
}

@Test
Expand Down Expand Up @@ -232,7 +233,7 @@ public void testMissingField_withoutDoubleQuoteTrimming() throws IOException, Ma
final byte[] inputData = csvData.getBytes();

try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180.builder().setAllowMissingColumnNames(true).build(), false)) {
final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180.builder().setAllowMissingColumnNames(true).build(), false, 0)) {

final Record record = reader.nextRecord();
assertNotNull(record);
Expand Down Expand Up @@ -313,7 +314,7 @@ public void testReadRawWithDifferentFieldName_withoutDoubleQuoteTrimming() throw

// test nextRecord does not contain a 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false)) {
final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false, 0)) {

final Record record = reader.nextRecord(true, true);
assertNotNull(record);
Expand All @@ -333,7 +334,7 @@ public void testReadRawWithDifferentFieldName_withoutDoubleQuoteTrimming() throw

// test nextRawRecord does contain 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false)) {
final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false, 0)) {

final Record record = reader.nextRecord(false, false);
assertNotNull(record);
Expand Down Expand Up @@ -408,7 +409,7 @@ public void testFieldInSchemaButNotHeader_withoutDoubleQuoteTrimming() throws IO
final byte[] inputData = csvData.getBytes();

try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false)) {
final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false, 0)) {

try {
reader.nextRecord();
Expand Down Expand Up @@ -491,7 +492,7 @@ public void testMultipleRecordsEscapedWithNull() throws IOException, MalformedRe
final RecordSchema schema = new SimpleRecordSchema(fields);

try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_escapechar.csv");
final FastCSVRecordReader reader = createReader(fis, schema, format, true)) {
final FastCSVRecordReader reader = createReader(fis, schema, format, true, 0)) {

final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[]{"1", "John Doe\\", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
Expand All @@ -504,4 +505,23 @@ public void testMultipleRecordsEscapedWithNull() throws IOException, MalformedRe
assertNull(reader.nextRecord());
}
}

@Test
public void testSkipTopRows() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);

final RecordSchema schema = new SimpleRecordSchema(fields);

try (final InputStream fis = new FileInputStream("src/test/resources/csv/single-bank-account_skip_top_rows.csv");
final FastCSVRecordReader reader = createReader(fis, schema, format, true, 3)) {

final Record record = reader.nextRecord();
assertNotNull(record);
final Object[] recordValues = record.getValues();
final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
assertArrayEquals(expectedValues, recordValues);
assertNull(reader.nextRecord());
}
}
}
Loading

0 comments on commit 652904e

Please sign in to comment.