Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): modify the logic of gzip compressed file source #20236

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,20 +182,63 @@ impl<Src: OpendalSource> OpendalReader<Src> {
// note that the buffer contains the newline character
debug_assert_eq!(n_read, line_buf.len());

// FIXME(rc): Here we have to use `offset + n_read`, i.e. the offset of the next line,
// as the *message offset*, because we check whether a file is finished by comparing the
// message offset with the file size in `FsFetchExecutor::into_stream`. However, we must
// understand that this message offset is not semantically consistent with the offset of
// other source connectors.
let msg_offset = (offset + n_read).to_string();
batch.push(SourceMessage {
key: None,
payload: Some(std::mem::take(&mut line_buf).into_bytes()),
offset: msg_offset,
split_id: split.id(),
meta: SourceMeta::Empty,
});
offset += n_read;
// The `offset` variable serves two main purposes:
// 1. It is used to check whether a file has been completely read. In the fetch executor,
// when `offset >= split.size`, it indicates that the end of the file has been reached,
// prompting the removal of the corresponding entry from the state table.
//
// 2. After recovery occurs, `offset` prevents the reading process from starting
// over from the beginning of the file. Instead, it allows the reading to continue
// directly from the last successfully read `offset`.
//
// Special handling is required for files compressed with gzip:
// - Gzip files must always be read from the beginning; they cannot be read from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we cannot achieve exactly once if gzip enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think exactly once is important for source. Some ideas to make reading gzip files exactly once:

  1. Yield one large chunk per gzip file containing all contents after decompression. This ensures that no barrier can be placed in the middle of the file but will increase memory usage.
  2. Pause fetcher upstream while reading a gzip file. This also ensures that no barrier can be placed in the middle.
  3. Use offset and size after decompression to track gzip file fetch progress, like what we already did right now. Opendal reader always starts from the beginning and skip contents below the recorded offset.

Personally perfer 3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also prefer 3 because it has the least impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved, had some offline discussion with @hzxa21 .
Please see the description of the PR for specific details. I modified some of the lister and fetcher logic. cc @tabVersion This part was designed by you, for awareness.

// a mid-point.
// - Additionally, the recorded `size` refers to the size of the file before
// compression, while the actual size read by the reader is the compressed size.
//
// Therefore, for gzip-compressed files, the `offset` can only be either 0 or
// the size of the file. When reaching the end of the file, the last message
// should set `offset = split.size` to inform the fetch executor that the file
// has been completely read. In all other scenarios, `offset` should be set to 0.
if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
let remain_buf = buf_reader.fill_buf().await?;
if remain_buf.is_empty() {
// read to the end, set the offset = `split.size` in the last Message.
batch.push(SourceMessage {
key: None,
payload: Some(std::mem::take(&mut line_buf).into_bytes()),
offset: split.size.to_string(),
split_id: split.id(),
meta: SourceMeta::Empty,
});
} else {
batch.push(SourceMessage {
key: None,
payload: Some(std::mem::take(&mut line_buf).into_bytes()),
offset: "0".to_owned(),
split_id: split.id(),
meta: SourceMeta::Empty,
});
}
} else {
// FIXME(rc): Here we have to use `offset + n_read`, i.e. the offset of the next line,
// as the *message offset*, because we check whether a file is finished by comparing the
// message offset with the file size in `FsFetchExecutor::into_stream`. However, we must
// understand that this message offset is not semantically consistent with the offset of
// other source connectors.
let msg_offset = (offset + n_read).to_string();
batch.push(SourceMessage {
key: None,
payload: Some(std::mem::take(&mut line_buf).into_bytes()),
offset: msg_offset,
split_id: split.id(),
meta: SourceMeta::Empty,
});
offset += n_read;
partition_input_bytes_metrics.inc_by(n_read as _);
}

partition_input_bytes_metrics.inc_by(n_read as _);
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

if batch.len() >= max_chunk_size {
Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,11 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
.unwrap();
debug_assert_eq!(mapping.len(), 1);
if let Some((split_id, offset)) = mapping.into_iter().next() {
let row = state_store_handler
.get(split_id.clone())
.await?
.expect("The fs_split should be in the state table.");
let row = state_store_handler.get(split_id.clone()).await?
.unwrap_or_else(|| {
panic!("The fs_split (file_name) {:?} should be in the state table.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we happen to meet this error, better not to panic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think should panic, otherwise it lost data.

split_id)
});
let fs_split = match row.datum_at(1) {
Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
OpendalFsSplit::<Src>::restore_from_json(
Expand Down
Loading