From b109551e72fb51fd842abfc51b769119413bd488 Mon Sep 17 00:00:00 2001 From: hezheyu Date: Mon, 6 Feb 2023 19:29:22 +0800 Subject: [PATCH] Enable filter for struct types. --- .../parquet/read/deserialize/binary/nested.rs | 4 +- .../read/deserialize/boolean/nested.rs | 7 +- .../read/deserialize/primitive/nested.rs | 4 +- tests/it/io/parquet/read_indexes.rs | 104 ++++++++++++++---- 4 files changed, 87 insertions(+), 32 deletions(-) diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 2d345140db7..d45694f8109 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -65,14 +65,14 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary) } - (Encoding::Plain, _, true, false) => { + (Encoding::Plain, _, true, _) => { let (_, _, values) = split_buffer(page)?; let values = BinaryIter::new(values); Ok(State::Optional(values)) } - (Encoding::Plain, _, false, false) => { + (Encoding::Plain, _, false, _) => { let (_, _, values) = split_buffer(page)?; let values = BinaryIter::new(values); diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index e8a3b0d5c45..ca5ae7b05f3 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -56,16 +56,15 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder { ) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; - let is_filtered = page.selected_rows().is_some(); - match (page.encoding(), is_optional, is_filtered) { - (Encoding::Plain, true, false) => { + match (page.encoding(), is_optional) { + (Encoding::Plain, true) => { let (_, _, values) = split_buffer(page)?; let values = BitmapIter::new(values, 0, values.len() * 8); Ok(State::Optional(values)) } - (Encoding::Plain, false, false) => { + (Encoding::Plain, false) => { let (_, _, values) = split_buffer(page)?; let values = BitmapIter::new(values, 0, values.len() * 8); diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 7b890bf7507..d43cb3deda0 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -99,8 +99,8 @@ where (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary) } - (Encoding::Plain, _, true, false) => Values::try_new::

(page).map(State::Optional), - (Encoding::Plain, _, false, false) => Values::try_new::

(page).map(State::Required), + (Encoding::Plain, _, true, _) => Values::try_new::

(page).map(State::Optional), + (Encoding::Plain, _, false, _) => Values::try_new::

(page).map(State::Required), _ => Err(utils::not_implemented(page)), } } diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index 4e41bb2baf6..3efd29908c2 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -5,8 +5,11 @@ use arrow2::error::Error; use arrow2::io::parquet::read::indexes; use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::parquet::write::*}; -/// Returns 2 sets of pages with different the same number of rows distributed un-evenly -fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec, Vec, Schema)> { +/// Returns sets of pages with different the same number of rows distributed un-evenly. +/// One [`Array`] -> one [`Page`]. +/// Primitive arrays -> one vec of [`Page`]. +/// Struct arrays -> multiple vecs of [`Page`]. +fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec>, Schema)> { // create pages with different number of rows let array11 = PrimitiveArray::::from_slice([1, 2, 3, 4]); let array12 = PrimitiveArray::::from_slice([5]); @@ -46,31 +49,42 @@ fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec, Vec>>()?; - let pages2 = arrays - .iter() - .flat_map(|array| { - array_to_pages( - *array, - parquet_schema.columns()[1] - .descriptor - .primitive_type - .clone(), - &[Nested::Primitive(None, true, array.len())], - options, - encoding, - ) - .unwrap() - .collect::>>() - .unwrap() - }) - .collect::>(); - - Ok((pages1, pages2, schema)) + let mut pages = vec![pages1]; + + let mut pages2 = vec![]; + + for array in arrays { + let type_ = parquet_schema.columns()[1].base_type.clone(); + let types = to_parquet_leaves(type_.clone()); + let encoding = (0..types.len()) + .into_iter() + .map(|_| encoding) + .collect::>(); + let columns = array_to_columns(array, type_, options, &encoding)? + .into_iter() + .map(|iter| iter.collect::>()) + .collect::>(); + if pages2.is_empty() { + pages2 = columns + .into_iter() + .map(|c| c.into_iter().map(|p| p.unwrap()).collect::>()) + .collect::>(); + } else { + for (i, column) in columns.into_iter().enumerate() { + let column = column.into_iter().map(|p| p.unwrap()).collect::>(); + pages2[i].extend(column); + } + } + } + + pages.extend(pages2); + + Ok((pages, schema)) } /// Tests reading pages while skipping indexes fn read_with_indexes( - (pages1, pages2, schema): (Vec, Vec, Schema), + (pages, schema): (Vec>, Schema), expected: Box, ) -> Result<()> { let options = WriteOptions { @@ -87,7 +101,9 @@ fn read_with_indexes( Result::Ok(DynStreamingIterator::new(compressed_pages)) }; - let row_group = DynIter::new(vec![to_compressed(pages1), to_compressed(pages2)].into_iter()); + let pages = pages.into_iter().map(to_compressed).collect::>(); + + let row_group = DynIter::new(pages.into_iter()); let writer = vec![]; let mut writer = FileWriter::try_new(writer, schema, options)?; @@ -291,3 +307,43 @@ fn indexed_dict() -> Result<()> { read_with_indexes(pages(&[&array], Encoding::RleDictionary)?, expected) } + +#[test] +fn indexed_required_struct() -> Result<()> { + let array21 = StructArray::new( + DataType::Struct(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Boolean, false), + ]), + vec![ + Int32Array::from_slice([1, 2, 3]).boxed(), + BooleanArray::from_slice([true, false, true]).boxed(), + ], + None, + ); + let array22 = StructArray::new( + DataType::Struct(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Boolean, false), + ]), + vec![ + Int32Array::from_slice([4, 5, 6]).boxed(), + BooleanArray::from_slice([false, true, false]).boxed(), + ], + None, + ); + let expected = StructArray::new( + DataType::Struct(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Boolean, false), + ]), + vec![ + Int32Array::from_slice([5]).boxed(), + BooleanArray::from_slice([true]).boxed(), + ], + None, + ) + .boxed(); + + read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) +}