diff --git a/e2e_test/s3/file_source.py b/e2e_test/s3/file_source.py index 0f3aaaed64a76..bc91115b54a94 100644 --- a/e2e_test/s3/file_source.py +++ b/e2e_test/s3/file_source.py @@ -10,6 +10,7 @@ import time from io import StringIO from minio import Minio +import gzip from functools import partial def gen_data(file_num, item_num_per_file): @@ -164,20 +165,36 @@ def _encode(): return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" # Execute a SELECT statement - cur.execute(f'''CREATE SOURCE {_source()}( - id int, - name TEXT, - sex int, - mark int, - ) WITH ( - connector = 's3', - match_pattern = '{prefix}*.{fmt}', + if fmt == 'json': + cur.execute(f'''CREATE SOURCE {_source()}( + id int, + name TEXT, + sex int, + mark int, + ) WITH ( + connector = 's3', + match_pattern = '{prefix}*.{fmt}.gz', s3.region_name = '{config['S3_REGION']}', s3.bucket_name = '{config['S3_BUCKET']}', s3.credentials.access = '{config['S3_ACCESS_KEY']}', s3.credentials.secret = '{config['S3_SECRET_KEY']}', s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' - ) FORMAT PLAIN ENCODE {_encode()};''') + ) FORMAT PLAIN ENCODE {_encode()};''') + else: + cur.execute(f'''CREATE SOURCE {_source()}( + id int, + name TEXT, + sex int, + mark int, + ) WITH ( + connector = 's3', + match_pattern = '{prefix}*.{fmt}', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) FORMAT PLAIN ENCODE {_encode()};''') total_rows = file_num * item_num_per_file MAX_RETRIES = 40 @@ -265,19 +282,45 @@ def _table(): secure=True, ) run_id = str(random.randint(1000, 9999)) + _local = lambda idx: f'data_{idx}.{fmt}' - _s3 = lambda idx: f"{run_id}_data_{idx}.{fmt}" + if fmt == 'json': + _s3 = lambda idx: f"{run_id}_data_{idx}.{fmt}.gz" + for idx, file_str in enumerate(formatted_files): + with open(_local(idx), "w") as f: + with gzip.open(_local(idx) + '.gz', 'wb') as f_gz: + f_gz.write(file_str.encode('utf-8')) + os.fsync(f.fileno()) + + client.fput_object( + config["S3_BUCKET"], + _s3(idx), + _local(idx) + '.gz' + ) + else: + _s3 = lambda idx: f"{run_id}_data_{idx}.{fmt}" + for idx, file_str in enumerate(formatted_files): + with open(_local(idx), "w") as f: + f.write(file_str) + os.fsync(f.fileno()) + + client.fput_object( + config["S3_BUCKET"], + _s3(idx), + _local(idx) + ) # put s3 files for idx, file_str in enumerate(formatted_files): with open(_local(idx), "w") as f: - f.write(file_str) - os.fsync(f.fileno()) + with gzip.open(_local(idx) + '.gz', 'wb') as f_gz: + f_gz.write(file_str.encode('utf-8')) + os.fsync(f.fileno()) client.fput_object( config["S3_BUCKET"], _s3(idx), - _local(idx) + _local(idx) + '.gz' ) # do test diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index a770d600282a2..b37f21336eb57 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -87,6 +87,7 @@ impl OpendalEnumerator { }; let timestamp = Timestamptz::from(t); let size = om.content_length() as i64; + let metadata = FsPageItem { name, size, diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index c70a955f03d0d..020449fd42d2c 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -14,13 +14,15 @@ use std::future::IntoFuture; use std::pin::Pin; +use std::sync::Arc; use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; use futures::TryStreamExt; use futures_async_stream::try_stream; use opendal::Operator; -use risingwave_common::array::StreamChunk; +use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk}; +use risingwave_common::types::{Datum, ScalarImpl}; use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader}; use tokio_util::io::StreamReader; @@ -33,8 +35,8 @@ use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::OpendalFsSplit; use crate::source::iceberg::read_parquet_file; use crate::source::{ - BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, - SplitReader, + BoxSourceChunkStream, Column, SourceColumnDesc, SourceContextRef, SourceMessage, SourceMeta, + SplitMetaData, SplitReader, }; #[derive(Debug, Clone)] @@ -86,7 +88,7 @@ impl OpendalReader { if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config { chunk_stream = read_parquet_file( self.connector.op.clone(), - object_name, + object_name.clone(), self.columns.clone(), Some(self.parser_config.common.rw_columns.clone()), self.source_ctx.source_ctrl_opts.chunk_size, @@ -116,6 +118,19 @@ impl OpendalReader { for chunk in chunk_stream { yield chunk?; } + + if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config { + // We no longer determine whether a file is finished by comparing `offset >= size`, the reader itself can sense whether it has reached the end. + // Therefore, after a file is read completely, we yield one more chunk marked as EOF, with its offset set to `usize::MAX` to indicate that it is finished. + // In fetch executor, if `offset = usize::MAX` is encountered, the corresponding file can be deleted from the state table. + + // FIXME(wcy-fdu): The order of hidden columns in parquet encode and other encodes is inconsistent, maybe we can yeild a common eof chunk for both parquet encode and other encodes. + let eof_chunk = Self::generate_eof_chunk_for_parquet_encode( + self.parser_config.common.rw_columns.clone(), + object_name.clone(), + )?; + yield eof_chunk; + } } } @@ -132,11 +147,19 @@ impl OpendalReader { let source_name = source_ctx.source_name.clone(); let object_name = split.name.clone(); let start_offset = split.offset; - let reader = op - .read_with(&object_name) - .range(start_offset as u64..) - .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. - .await?; + // After a recovery occurs, for gzip-compressed files, it is necessary to read from the beginning each time, + // other files can continue reading from the last read `start_offset`. + let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") { + true => op.read_with(&object_name).into_future().await?, + + false => { + op.read_with(&object_name) + .range(start_offset as u64..) + .into_future() + .await? + } + }; + let stream_reader = StreamReader::new( reader.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), ); @@ -157,7 +180,10 @@ impl OpendalReader { } }; - let mut offset = start_offset; + let mut offset = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") { + true => 0, + false => start_offset, + }; let partition_input_bytes_metrics = source_ctx .metrics .partition_input_bytes @@ -181,20 +207,23 @@ impl OpendalReader { } // note that the buffer contains the newline character debug_assert_eq!(n_read, line_buf.len()); - - // FIXME(rc): Here we have to use `offset + n_read`, i.e. the offset of the next line, - // as the *message offset*, because we check whether a file is finished by comparing the - // message offset with the file size in `FsFetchExecutor::into_stream`. However, we must - // understand that this message offset is not semantically consistent with the offset of - // other source connectors. let msg_offset = (offset + n_read).to_string(); - batch.push(SourceMessage { - key: None, - payload: Some(std::mem::take(&mut line_buf).into_bytes()), - offset: msg_offset, - split_id: split.id(), - meta: SourceMeta::Empty, - }); + if (object_name.ends_with(".gz") || object_name.ends_with(".gzip")) + && offset + n_read <= start_offset + { + + // For gzip compressed files, the reader needs to read from the beginning each time, + // but it needs to skip the previously read part and start yielding chunks from a position greater than or equal to start_offset. + } else { + batch.push(SourceMessage { + key: None, + payload: Some(std::mem::take(&mut line_buf).into_bytes()), + offset: msg_offset, + split_id: split.id(), + meta: SourceMeta::Empty, + }); + } + offset += n_read; partition_input_bytes_metrics.inc_by(n_read as _); @@ -207,5 +236,90 @@ impl OpendalReader { batch.shrink_to_fit(); yield batch; } + + // For json and csv encodes, yield an eof message to mark the file has been read. + let eof_batch = vec![SourceMessage { + key: None, + payload: None, + offset: usize::MAX.to_string(), + split_id: split.id(), + meta: SourceMeta::Empty, + }]; + yield eof_batch; + } + + // Generate a special chunk to mark the end of reading. Its offset is usize::MAX and other fields are null. + fn generate_eof_chunk_for_parquet_encode( + rw_columns: Vec, + object_name: String, + ) -> Result { + const MAX_HIDDEN_COLUMN_NUMS: usize = 3; + let column_size = rw_columns.len(); + let mut chunk_columns = Vec::with_capacity(rw_columns.len() + MAX_HIDDEN_COLUMN_NUMS); + for source_column in rw_columns.clone() { + match source_column.column_type { + crate::source::SourceColumnType::Normal => { + match source_column.is_hidden_addition_col { + false => { + let rw_data_type: &risingwave_common::types::DataType = + &source_column.data_type; + let mut array_builder = + ArrayBuilderImpl::with_type(column_size, rw_data_type.clone()); + + array_builder.append_null(); + let res = array_builder.finish(); + let column = Arc::new(res); + chunk_columns.push(column); + } + // handle hidden columns, for file source, the hidden columns are only `Offset` and `Filename` + true => { + if let Some(additional_column_type) = + &source_column.additional_column.column_type + { + match additional_column_type{ + risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) =>{ + let mut array_builder = + ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone()); + // set the EOF chunk's offset to usize::MAX to mark the end of file. + let datum: Datum = Some(ScalarImpl::Utf8((usize::MAX).to_string().into())); + array_builder.append(datum); + let res = array_builder.finish(); + let column = Arc::new(res); + chunk_columns.push(column); + + }, + risingwave_pb::plan_common::additional_column::ColumnType::Filename(_) => { + let mut array_builder = + ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone()); + let datum: Datum = Some(ScalarImpl::Utf8(object_name.clone().into())); + array_builder.append( datum); + let res = array_builder.finish(); + let column = Arc::new(res); + chunk_columns.push(column); + }, + _ => unreachable!() + } + } + } + } + } + crate::source::SourceColumnType::RowId => { + let mut array_builder = + ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone()); + let datum: Datum = None; + array_builder.append(datum); + let res = array_builder.finish(); + let column = Arc::new(res); + chunk_columns.push(column); + } + // The following fields is only used in CDC source + crate::source::SourceColumnType::Offset | crate::source::SourceColumnType::Meta => { + unreachable!() + } + } + } + + let data_chunk = DataChunk::new(chunk_columns.clone(), 1_usize); + Ok(data_chunk.into()) } } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index cdef0ea52a534..fbe53160b580e 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -374,40 +374,38 @@ impl FsFetchExecutor { .unwrap(); debug_assert_eq!(mapping.len(), 1); if let Some((split_id, offset)) = mapping.into_iter().next() { - let row = state_store_handler - .get(split_id.clone()) - .await? - .expect("The fs_split should be in the state table."); - let fs_split = match row.datum_at(1) { - Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { - OpendalFsSplit::::restore_from_json( - jsonb_ref.to_owned_scalar(), - )? - } - _ => unreachable!(), - }; - // FIXME(rc): Here we compare `offset` with `fs_split.size` to determine - // whether the file is finished, where the `offset` is the starting position - // of the NEXT message line in the file. However, In other source connectors, - // we use the word `offset` to represent the offset of the current message. - // We have to be careful about this semantical inconsistency. - if offset.parse::().unwrap() >= fs_split.size { + // When `offset = usize::MAX` , it indicates that reading the current file (split_id) is finished. + // At this point, it can be deleted from the state table. + // This chunk serves only as an EOF marker, so it will be skipped and not yielded to downstream. + if offset.parse::().unwrap() == usize::MAX { splits_on_fetch -= 1; state_store_handler.delete(split_id).await?; } else { + let row = state_store_handler.get(split_id.clone()).await? + .unwrap_or_else(|| { + panic!("The fs_split (file_name) {:?} should be in the state table.", + split_id) + }); + let fs_split = match row.datum_at(1) { + Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { + OpendalFsSplit::::restore_from_json( + jsonb_ref.to_owned_scalar(), + )? + } + _ => unreachable!(), + }; state_store_handler .set(split_id, fs_split.encode_to_json()) .await?; + let chunk = prune_additional_cols( + &chunk, + split_idx, + offset_idx, + &source_desc.columns, + ); + yield Message::Chunk(chunk); } } - - let chunk = prune_additional_cols( - &chunk, - split_idx, - offset_idx, - &source_desc.columns, - ); - yield Message::Chunk(chunk); } } }