Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-12241 Efficient Parquet Splitting #7893

Closed

Conversation

takraj
Copy link
Contributor

@takraj takraj commented Oct 18, 2023

Summary

NIFI-12241

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 21

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@markap14
Copy link
Contributor

Hey @takraj I don't know much of anything about Parquet so I'm probably not the best to really review this in terms of Parquet. But looking at what's happening here, the processor does not split Parquet at all. Instead, it clones the input and adds 'count' and 'offset' types of attributes. So the naming is problematic. If I sent in a 10 GB Parquet file to SplitParquet and I get out 10 FlowFiles, I expect each to be 1 GB. Here, each one will be 10 GB because it's a clone of the original. This would lead to a lot confusion.
Perhaps a name like 'CalculateParquetOffsets' is appropriate?

@pvillard31
Copy link
Contributor

I'll giving this a try later today but yeah, the naming can certainly be better as this new processor should always be followed by a record processor using the ParquetWriter so that the actual "splitting" is happening. Could be good to leverage the new @usecase annotation to describe this.

@takraj
Copy link
Contributor Author

takraj commented Oct 18, 2023

@markap14 , @pvillard31 Thank you for your comments. I have added use cases, and renamed the processor in 9d7fd48. Please check again.

@pvillard31
Copy link
Contributor

I did the below test:

  • Large Parquet file with about 50M records -> ConvertRecord (Parquet Reader / Parquet Writer)
    File size: 944MB
    Duration: 7'01

  • Same file -> CalculateParquetOffsets (1M records split) -> same ConvertRecord with 8 concurrent tasks
    The CalculateParquetOffsets generates immediately 50 flow files (CLONE)
    Duration: 4'40

I was expecting the second case to be much faster given that we use 8 concurrent threads. Any idea? Thoughts?

@takraj
Copy link
Contributor Author

takraj commented Oct 20, 2023

@pvillard31

  • I guess creating the 50 clones is actually copying the input 50 times. You could also give it a try with 'Zero Content Output" setup.
  • To determine how many records are in the Parquet file, CalculateParquetOffsets needs to read some parts of the file, for which it uses the same library as ParquetReader does. I'm not sure how this library determines the record count, but maybe it scans the whole file and counts them. But, if your input FlowFile has a 'record.count' attribute, then this step is skipped. Give it a try, I'd expect the whole process faster.
  • ParquetReader uses RecordFilter to 'jump' to the wanted record. But as this is a general concept in the library we use, this means that the whole file is scanned through, and RecordFilter is evaluated to each of the records. Unfortunately, there is no concept like 'Jump to a record index'. Although I have an optimization idea of stop reading the records after we have read the desired number of records. RecordFilter then would be used only to find the first record to be read. I'll add a new commit soon with this improvement.

@pvillard31
Copy link
Contributor

The CalculateParquetOffsets step is taking less than 1 millisecond to complete: the 50 clones are not copying the data, they get the same reference to actual claim (where the data is stored in the content repo).

So I think it's really around the ParquetReader and how we can "jump" to the right location as you said. Will keep an eye out for your improvement for additional testing.

@pvillard31
Copy link
Contributor

New tests with 10M records (190MB):

  • normal ConvertRecord (Parquet/Parquet) - 1 min 34 sec
  • Calculate Parquet Offsets (1M records) - ConvertRecord 8 concurrent tasks - 10FFs - 38 seconds

So definitely better. I'm a +1 from a testing pov but would be nice for someone to have a look at the code changes. @bbende maybe?

@takraj
Copy link
Contributor Author

takraj commented Oct 27, 2023

@pvillard31 I've been working on further performance improvements in the last couple of days, and created a new variant of this processor, which calculates the offsets of the row group boundaries, and updated the reader to take these offsets and seek onto these positions in the input file. This is achieved with a special configuration option of the Parquet reader, called 'File Range', that selects only row groups that overlap with the specified start and end offset range. Hopefully this brings major improvement.

Could you share how did you extract the processing times from NiFi in your benchmarks? Did you simply take the 'Tasks/Time' indicator, or is there something else that I can monitor?

@takraj
Copy link
Contributor Author

takraj commented Nov 2, 2023

I have added a commit, which includes the alternative approach, that I have already mentioned in my previous comment. Please check.

@pvillard31
Copy link
Contributor

Thanks @takraj - giving another try to it.

@pvillard31
Copy link
Contributor

Test with 15,598,440 records (295MB):

  • Normal ConvertRecord (Parquet/Parquet) - 2 min 13 sec
  • Calculate Parquet Offsets (1M records) -> ConvertRecord 8 concurrent tasks - 16FFs - 38 seconds

It seems to be very similar to the previous test since the time is really related to how many records should be in every split (1M in this case). It's a 4x improvement when optimizing the number of splits versus the number of concurrent tasks so it's definitely a nice improvement.

@takraj
Copy link
Contributor Author

takraj commented Nov 6, 2023

@pvillard31 Did you try the new CalculateParquetRowGroupOffsets processor too? In this one, you cannot configre the number of records in splits, because it splits by row group boundaries. But since it calculates where the ParquetReader should seek in the file, the data is distributed and processed much more efficiently. My measurements showed major improvement in performance.

You can also combine the two as CalculateParquetRowGroupOffsets -> CalculateParquetOffsets, so you would still have the control of how many records fall into a single FlowFile too.

I can't really do any further improvement to this, due to API limitations of the underlying library.

@pvillard31
Copy link
Contributor

Oh, I'm sorry, I didn't realize it was another processor. Will try again.

@pvillard31
Copy link
Contributor

pvillard31 commented Nov 6, 2023

FF with 24M records (452.85MB)

  • ConvertRecord (Parquet/Parquet) - 3'31''
  • CalculateParquetOffsets (24 FFs) + ConvertRecord (8 CT) - 1'02''
  • CalculateParquetRowGroupOffsets (4 FFs) + ConvertRecord (8 CT) - 1'09''
  • CalculateParquetRowGroupOffsets (4 FFs) + CalculateParquetOffsets (25 FFs) + ConvertRecord (8 CT) - 0'50''

@@ -35,11 +35,18 @@ public class AvroParquetHDFSRecordReader implements HDFSRecordReader {
private GenericRecord lastRecord;
private RecordSchema recordSchema;
private boolean initialized = false;
private final Long count;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final Long count;
private final Long recordsToRead;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9f340c9

Comment on lines 106 to 110
if (count == null) {
return new AvroParquetHDFSRecordReader(readerBuilder.build());
} else {
return new AvroParquetHDFSRecordReader(readerBuilder.build(), count);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (count == null) {
return new AvroParquetHDFSRecordReader(readerBuilder.build());
} else {
return new AvroParquetHDFSRecordReader(readerBuilder.build(), count);
}
return new AvroParquetHDFSRecordReader(readerBuilder.build(), count);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9f340c9

this(parquetReader, null);
}

public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> parquetReader, Long count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> parquetReader, Long count) {
public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> parquetReader, final Long count) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9f340c9

private final ParquetReader<GenericRecord> parquetReader;

public ParquetRecordReader(final InputStream inputStream, final long inputLength, final Configuration configuration) throws IOException {
private final Long count;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final Long count;
private final Long recordsToRead;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9f340c9

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (original == null) {
if (inputFlowFile == null) {

"original" makes sense when used as a relationship because that is a very specific context. In the code "original" can mean too many things. Also "input..." describes more accurately what it is. (In contrast, in case of a relationship calling it "input" would be very confusing.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9f340c9


private List<FlowFile> getPartitions(
ProcessSession session,
FlowFile flowFile,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
FlowFile flowFile,
FlowFile inputFlowFile,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9f340c9

long recordOffset,
boolean zeroContentOutput
) {
final long numberOfPartitions = (recordCount / partitionSize) + ((recordCount % partitionSize) > 0 ? 1 : 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final long numberOfPartitions = (recordCount / partitionSize) + ((recordCount % partitionSize) > 0 ? 1 : 0);
final long numberOfPartitions = Math.ceilDiv(recordCount, partitionSize);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9f340c9

Comment on lines 78 to 80
final File schemaFile = new File(SCHEMA_PATH);
final String schemaString = IOUtils.toString(new FileInputStream(schemaFile), StandardCharsets.UTF_8);
return new Schema.Parser().parse(schemaString);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final File schemaFile = new File(SCHEMA_PATH);
final String schemaString = IOUtils.toString(new FileInputStream(schemaFile), StandardCharsets.UTF_8);
return new Schema.Parser().parse(schemaString);
try (InputStream schemaInputStream = ParquetTestUtils.class.getClassLoader().getResourceAsStream("avro/user.avsc")) {
final String schemaString = IOUtils.toString(schemaInputStream, StandardCharsets.UTF_8);
return new Schema.Parser().parse(schemaString);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9f340c9

@asfgit asfgit closed this in 9a5ec83 Dec 10, 2023
@tpalfy
Copy link
Contributor

tpalfy commented Dec 10, 2023

LGTM
Thank you @takraj for your work.
Merged to main and merging to nifi-1.x

@tpalfy
Copy link
Contributor

tpalfy commented Dec 10, 2023

Pushed to nifi-1.x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants