From 652904e5e5539f06d9bd225d7fbb1df882aa0396 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Fri, 24 Nov 2023 01:17:44 -0500 Subject: [PATCH] NIFI-8932: Moved common code to base class for use by all CSVRecordReader implementations, updated unit tests --- .../nifi/csv/AbstractCSVRecordReader.java | 65 ++++++++++++++++- .../org/apache/nifi/csv/CSVRecordReader.java | 69 +------------------ .../apache/nifi/csv/FastCSVRecordReader.java | 8 +-- .../nifi/csv/JacksonCSVRecordReader.java | 9 +-- .../nifi/csv/TestFastCSVRecordReader.java | 36 +++++++--- .../nifi/csv/TestJacksonCSVRecordReader.java | 39 ++++++++--- .../csv/single-bank-account_skip_top_rows.csv | 4 +- 7 files changed, 128 insertions(+), 102 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java index 619ae31983b0..435a981680a4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java @@ -17,12 +17,19 @@ package org.apache.nifi.csv; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.io.input.BOMInputStream; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; import java.text.DateFormat; import java.util.function.Supplier; @@ -45,8 +52,12 @@ abstract public class AbstractCSVRecordReader implements RecordReader { protected final RecordSchema schema; - AbstractCSVRecordReader(final ComponentLog logger, final RecordSchema schema, final boolean hasHeader, final boolean ignoreHeader, - final String dateFormat, final String timeFormat, final String timestampFormat, final boolean trimDoubleQuote, final int skipTopRows) { + protected final Reader inputStreamReader; + + AbstractCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, + final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote, final int skipTopRows) + throws IOException { + this.logger = logger; this.schema = schema; this.hasHeader = hasHeader; @@ -77,6 +88,14 @@ abstract public class AbstractCSVRecordReader implements RecordReader { this.timestampFormat = timestampFormat; LAZY_TIMESTAMP_FORMAT = () -> DataTypeUtils.getDateFormat(timestampFormat); } + + final InputStream bomInputStream = BOMInputStream.builder().setInputStream(in).get(); + inputStreamReader = new InputStreamReader(bomInputStream, encoding); + + // Skip the number of rows at the "top" as specified + for (int i = 0; i < skipTopRows; i++) { + readNextRecord(inputStreamReader, csvFormat.getRecordSeparator()); + } } protected final Object convert(final String value, final DataType dataType, final String fieldName) { @@ -161,4 +180,46 @@ protected String trim(String value) { public RecordSchema getSchema() { return schema; } + + /** + * This method searches using the specified Reader character-by-character until the + * record separator is found. + * @param reader the Reader providing the input + * @param recordSeparator the String specifying the end of a record in the input + * @throws IOException if an error occurs during reading, including not finding the record separator in the input + */ + protected void readNextRecord(Reader reader, String recordSeparator) throws IOException { + int indexIntoSeparator = 0; + int recordSeparatorLength = recordSeparator.length(); + int code = reader.read(); + while (code != -1) { + char nextChar = (char)code; + if (recordSeparator.charAt(indexIntoSeparator) == nextChar) { + if (++indexIntoSeparator == recordSeparatorLength) { + // We have matched the separator, return the string built so far + return; + } + } else { + // The character didn't match the expected one in the record separator, reset the separator matcher + // and check if it is the first character of the separator. + indexIntoSeparator = 0; + if (recordSeparator.charAt(indexIntoSeparator) == nextChar) { + // This character is the beginning of the record separator, keep it + if (++indexIntoSeparator == recordSeparatorLength) { + // We have matched the separator, return the string built so far + return; + } + } + } + // This defensive check limits a record size to 2GB, this prevents out-of-memory errors if the record separator + // is not present in the input (or at least in the first 2GB) + if (indexIntoSeparator == Integer.MAX_VALUE) { + throw new IOException("2GB input threshold reached, the record is either larger than 2GB or the separator " + + "is not found in the first 2GB of input. Ensure the Record Separator is correct for this FlowFile."); + } + code = reader.read(); + } + // If the input ends without finding the record separator, an exception is thrown + throw new IOException("Record separator not found"); + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java index e987144a4813..f4071dcce6a6 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java @@ -19,8 +19,6 @@ import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -31,7 +29,6 @@ import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; -import org.apache.commons.io.input.BOMInputStream; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.DataType; @@ -49,15 +46,7 @@ public class CSVRecordReader extends AbstractCSVRecordReader { public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote, final int skipTopRows) throws IOException { - super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote, skipTopRows); - - final InputStream bomInputStream = BOMInputStream.builder().setInputStream(in).get(); - final Reader inputStreamReader = new InputStreamReader(bomInputStream, encoding); - - // Skip the number of rows at the "top" as specified - for (int i = 0; i < skipTopRows; i++) { - readNextRecord(inputStreamReader, csvFormat.getRecordSeparator()); - } + super(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, trimDoubleQuote, skipTopRows); CSVFormat.Builder withHeader; if (hasHeader) { @@ -128,7 +117,6 @@ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFie return null; } - private List getRecordFields() { if (this.recordFields != null) { return this.recordFields; @@ -159,59 +147,4 @@ private List getRecordFields() { public void close() throws IOException { csvParser.close(); } - - /** - * This method builds a text representation of the CSV record by searching character-by-character until the - * record separator is found. Because we never want to consume input we don't use, the method attempts to match - * the separator separately, and as it is not matched, the characters are added to the returned string. - * @param reader the Reader providing the input - * @param recordSeparator the String specifying the end of a record in the input - * @return a String created from the input until the record separator is reached. - * @throws IOException if an error occurs during reading - */ - protected String readNextRecord(Reader reader, String recordSeparator) throws IOException { - int indexIntoSeparator = 0; - int recordSeparatorLength = recordSeparator.length(); - StringBuilder lineBuilder = new StringBuilder(); - StringBuilder separatorBuilder = new StringBuilder(); - int code = reader.read(); - while (code != -1) { - char nextChar = (char)code; - if (recordSeparator.charAt(indexIntoSeparator) == nextChar) { - separatorBuilder.append(nextChar); - if (++indexIntoSeparator == recordSeparatorLength) { - // We have matched the separator, return the string built so far - lineBuilder.append(separatorBuilder); - return lineBuilder.toString(); - } - } else { - // The character didn't match the expected one in the record separator, reset the separator matcher - // and check if it is the first character of the separator. - indexIntoSeparator = 0; - if (recordSeparator.charAt(indexIntoSeparator) == nextChar) { - // This character is the beginning of the record separator, keep it - separatorBuilder = new StringBuilder(); - separatorBuilder.append(nextChar); - if (++indexIntoSeparator == recordSeparatorLength) { - // We have matched the separator, return the string built so far - return lineBuilder.toString(); - } - } else { - // This character is not the beginning of the record separator, add it to the return string - lineBuilder.append(nextChar); - } - } - // This defensive check limits a record size to 2GB, this prevents out-of-memory errors if the record separator - // is not present in the input (or at least in the first 2GB) - if (indexIntoSeparator == Integer.MAX_VALUE) { - throw new IOException("2GB input threshold reached, the record is either larger than 2GB or the separator " - + "is not found in the first 2GB of input. Ensure the Record Separator is correct for this FlowFile."); - } - code = reader.read(); - } - - // The end of input has been reached without the record separator being found, throw an exception with the string so far - - return lineBuilder.toString(); - } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java index 9ab5150f7131..975416ef5014 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java @@ -22,7 +22,6 @@ import de.siegmar.fastcsv.reader.CsvRow; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -46,11 +45,8 @@ public class FastCSVRecordReader extends AbstractCSVRecordReader { private final CsvReader csvReader; private final Iterator csvRowIterator; - private List recordFields; - private Map headerMap; - private final boolean ignoreHeader; private final boolean trimDoubleQuote; private final CSVFormat csvFormat; @@ -58,7 +54,7 @@ public class FastCSVRecordReader extends AbstractCSVRecordReader { public FastCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote, final int skipTopRows) throws IOException { - super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote, skipTopRows); + super(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, trimDoubleQuote, skipTopRows); this.ignoreHeader = ignoreHeader; this.trimDoubleQuote = trimDoubleQuote; this.csvFormat = csvFormat; @@ -83,7 +79,7 @@ public FastCSVRecordReader(final InputStream in, final ComponentLog logger, fina } } - csvReader = builder.build(new InputStreamReader(in, encoding)); + csvReader = builder.build(inputStreamReader); csvRowIterator = csvReader.iterator(); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java index 79483612f94c..1622392c46a2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java @@ -24,8 +24,6 @@ import com.fasterxml.jackson.dataformat.csv.CsvSchema; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -36,7 +34,6 @@ import java.util.Set; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.DuplicateHeaderMode; -import org.apache.commons.io.input.BOMInputStream; import org.apache.commons.lang3.CharUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.logging.ComponentLog; @@ -57,9 +54,7 @@ public class JacksonCSVRecordReader extends AbstractCSVRecordReader { public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote, final int skipTopRows) throws IOException { - super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote, skipTopRows); - - final Reader reader = new InputStreamReader(BOMInputStream.builder().setInputStream(in).get(), encoding); + super(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, trimDoubleQuote, skipTopRows); CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder() .setColumnSeparator(csvFormat.getDelimiterString().charAt(0)) @@ -96,7 +91,7 @@ public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, f .with(csvSchema) .withFeatures(features.toArray(new CsvParser.Feature[features.size()])); - recordStream = objReader.readValues(reader); + recordStream = objReader.readValues(inputStreamReader); } public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java index 46cfdeb6aa50..d8a7d17156b1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java @@ -52,7 +52,8 @@ public class TestFastCSVRecordReader { @BeforeEach public void setUp() { - format = CSVFormat.RFC4180; + // Override the record separator to match the input + format = CSVFormat.RFC4180.builder().setRecordSeparator("\n").build(); } private List getDefaultFields() { @@ -68,9 +69,9 @@ private FastCSVRecordReader createReader(final InputStream in, final RecordSchem RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", false, 0); } - private FastCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format, final boolean trimDoubleQuote) throws IOException { + private FastCSVRecordReader createReader(final InputStream in, final RecordSchema schema, final CSVFormat format, final boolean trimDoubleQuote, final int skipTopRows) throws IOException { return new FastCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", trimDoubleQuote, 0); + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", trimDoubleQuote, skipTopRows); } @Test @@ -232,7 +233,7 @@ public void testMissingField_withoutDoubleQuoteTrimming() throws IOException, Ma final byte[] inputData = csvData.getBytes(); try (final InputStream bais = new ByteArrayInputStream(inputData); - final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180.builder().setAllowMissingColumnNames(true).build(), false)) { + final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180.builder().setAllowMissingColumnNames(true).build(), false, 0)) { final Record record = reader.nextRecord(); assertNotNull(record); @@ -313,7 +314,7 @@ public void testReadRawWithDifferentFieldName_withoutDoubleQuoteTrimming() throw // test nextRecord does not contain a 'continent' field try (final InputStream bais = new ByteArrayInputStream(inputData); - final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false)) { + final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false, 0)) { final Record record = reader.nextRecord(true, true); assertNotNull(record); @@ -333,7 +334,7 @@ public void testReadRawWithDifferentFieldName_withoutDoubleQuoteTrimming() throw // test nextRawRecord does contain 'continent' field try (final InputStream bais = new ByteArrayInputStream(inputData); - final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false)) { + final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false, 0)) { final Record record = reader.nextRecord(false, false); assertNotNull(record); @@ -408,7 +409,7 @@ public void testFieldInSchemaButNotHeader_withoutDoubleQuoteTrimming() throws IO final byte[] inputData = csvData.getBytes(); try (final InputStream bais = new ByteArrayInputStream(inputData); - final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false)) { + final FastCSVRecordReader reader = createReader(bais, schema, TRIMMED_RFC4180, false, 0)) { try { reader.nextRecord(); @@ -491,7 +492,7 @@ public void testMultipleRecordsEscapedWithNull() throws IOException, MalformedRe final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_escapechar.csv"); - final FastCSVRecordReader reader = createReader(fis, schema, format, true)) { + final FastCSVRecordReader reader = createReader(fis, schema, format, true, 0)) { final Object[] firstRecord = reader.nextRecord().getValues(); final Object[] firstExpectedValues = new Object[]{"1", "John Doe\\", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; @@ -504,4 +505,23 @@ public void testMultipleRecordsEscapedWithNull() throws IOException, MalformedRe assertNull(reader.nextRecord()); } } + + @Test + public void testSkipTopRows() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream("src/test/resources/csv/single-bank-account_skip_top_rows.csv"); + final FastCSVRecordReader reader = createReader(fis, schema, format, true, 3)) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + final Object[] recordValues = record.getValues(); + final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + assertArrayEquals(expectedValues, recordValues); + assertNull(reader.nextRecord()); + } + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java index 1da1658d6356..98f0c8e73c9b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java @@ -51,12 +51,14 @@ public class TestJacksonCSVRecordReader { .setSkipHeaderRecord(true) .setTrim(true) .setQuote('"') + .setRecordSeparator("\n") .build(); private final CSVFormat formatWithNullRecordSeparator = CSVFormat.DEFAULT.builder() .setHeader() .setSkipHeaderRecord(true) .setTrim(true) .setQuote('"') + .setRecordSeparator("\n") .setRecordSeparator(null) .build(); private final CSVFormat trimmed4180 = CSVFormat.RFC4180.builder() @@ -82,12 +84,12 @@ private List getDefaultFields() { } private JacksonCSVRecordReader createReader(final InputStream in, final RecordSchema schema, final CSVFormat format) throws IOException { - return createReader(in, schema, format, true); + return createReader(in, schema, format, true, 0); } - private JacksonCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format, final boolean trimDoubleQuote) throws IOException { + private JacksonCSVRecordReader createReader(final InputStream in, final RecordSchema schema, final CSVFormat format, final boolean trimDoubleQuote, final int skipTopRows) throws IOException { return new JacksonCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", trimDoubleQuote, 0); + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", trimDoubleQuote, skipTopRows); } @Test @@ -281,7 +283,7 @@ public void testMissingField_withoutDoubleQuoteTrimming() throws IOException, Ma final byte[] inputData = csvData.getBytes(); try (final InputStream bais = new ByteArrayInputStream(inputData); - final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false)) { + final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false, 0)) { final Record record = reader.nextRecord(); assertNotNull(record); @@ -362,7 +364,7 @@ public void testReadRawWithDifferentFieldName_withoutDoubleQuoteTrimming() throw // test nextRecord does not contain a 'continent' field try (final InputStream bais = new ByteArrayInputStream(inputData); - final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false)) { + final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false, 0)) { final Record record = reader.nextRecord(true, true); assertNotNull(record); @@ -382,7 +384,7 @@ public void testReadRawWithDifferentFieldName_withoutDoubleQuoteTrimming() throw // test nextRawRecord does contain 'continent' field try (final InputStream bais = new ByteArrayInputStream(inputData); - final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false)) { + final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false, 0)) { final Record record = reader.nextRecord(false, false); assertNotNull(record); @@ -474,7 +476,7 @@ public void testFieldInSchemaButNotHeader_withoutDoubleQuoteTrimming() throws IO final byte[] inputData = csvData.getBytes(); try (final InputStream bais = new ByteArrayInputStream(inputData); - final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false)) { + final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false, 0)) { final Record record = reader.nextRecord(); assertNotNull(record); @@ -567,7 +569,7 @@ public void testExtraFieldNotInHeader_withoutDoubleQuoteTrimming() throws IOExce // test nextRecord does not contain a 'continent' field try (final InputStream bais = new ByteArrayInputStream(inputData); - final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false)) { + final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false, 0)) { final Record record = reader.nextRecord(false, false); assertNotNull(record); @@ -639,7 +641,7 @@ public void testDuplicateHeaderNames_withoutDoubleQuoteTrimming() throws IOExcep // test nextRecord has ignored the first "id" and "name" columns try (final InputStream bais = new ByteArrayInputStream(inputData); - final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false)) { + final JacksonCSVRecordReader reader = createReader(bais, schema, trimmed4180, false, 0)) { final Record record = reader.nextRecord(false, false); assertNotNull(record); @@ -747,4 +749,23 @@ public void testNullRecordSeparator() throws IOException, MalformedRecordExcepti assertNull(reader.nextRecord()); } } + + @Test + public void testSkipTopRows() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream("src/test/resources/csv/single-bank-account_skip_top_rows.csv"); + final JacksonCSVRecordReader reader = createReader(fis, schema, format, true, 3)) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + final Object[] recordValues = record.getValues(); + final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + assertArrayEquals(expectedValues, recordValues); + assertNull(reader.nextRecord()); + } + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_skip_top_rows.csv b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_skip_top_rows.csv index 280558a0cc17..a52c1e564c2c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_skip_top_rows.csv +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_skip_top_rows.csv @@ -1,5 +1,5 @@ These first 3 lines aren't real CSV records: {"they": "will be skipped"} -id, name, balance, address, city, state, zipCode, country -1, John Doe, "4750.89", "123 My Street", My City, MS, 11111, USA \ No newline at end of file +id,name,balance,address,city,state,zipCode,country +1,John Doe,"4750.89","123 My Street",My City,MS,11111,USA \ No newline at end of file