Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Enable predicate push down for nested types. #1382

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary)
}
(Encoding::Plain, _, true, false) => {
(Encoding::Plain, _, true, _) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(State::Optional(values))
}
(Encoding::Plain, _, false, false) => {
(Encoding::Plain, _, false, _) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);
Expand Down
7 changes: 3 additions & 4 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,15 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {
) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();

match (page.encoding(), is_optional, is_filtered) {
(Encoding::Plain, true, false) => {
match (page.encoding(), is_optional) {
(Encoding::Plain, true) => {
let (_, _, values) = split_buffer(page)?;
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(State::Optional(values))
}
(Encoding::Plain, false, false) => {
(Encoding::Plain, false) => {
let (_, _, values) = split_buffer(page)?;
let values = BitmapIter::new(values, 0, values.len() * 8);

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ where
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary)
}
(Encoding::Plain, _, true, false) => Values::try_new::<P>(page).map(State::Optional),
(Encoding::Plain, _, false, false) => Values::try_new::<P>(page).map(State::Required),
(Encoding::Plain, _, true, _) => Values::try_new::<P>(page).map(State::Optional),
(Encoding::Plain, _, false, _) => Values::try_new::<P>(page).map(State::Required),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down
104 changes: 80 additions & 24 deletions tests/it/io/parquet/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use arrow2::error::Error;
use arrow2::io::parquet::read::indexes;
use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::parquet::write::*};

/// Returns 2 sets of pages with different the same number of rows distributed un-evenly
fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec<Page>, Vec<Page>, Schema)> {
/// Returns sets of pages with different the same number of rows distributed un-evenly.
/// One [`Array`] -> one [`Page`].
/// Primitive arrays -> one vec of [`Page`].
/// Struct arrays -> multiple vecs of [`Page`].
fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec<Vec<Page>>, Schema)> {
// create pages with different number of rows
let array11 = PrimitiveArray::<i64>::from_slice([1, 2, 3, 4]);
let array12 = PrimitiveArray::<i64>::from_slice([5]);
Expand Down Expand Up @@ -46,31 +49,42 @@ fn pages(arrays: &[&dyn Array], encoding: Encoding) -> Result<(Vec<Page>, Vec<Pa
})
.collect::<Result<Vec<_>>>()?;

let pages2 = arrays
.iter()
.flat_map(|array| {
array_to_pages(
*array,
parquet_schema.columns()[1]
.descriptor
.primitive_type
.clone(),
&[Nested::Primitive(None, true, array.len())],
options,
encoding,
)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap()
})
.collect::<Vec<_>>();

Ok((pages1, pages2, schema))
let mut pages = vec![pages1];

let mut pages2 = vec![];

for array in arrays {
let type_ = parquet_schema.columns()[1].base_type.clone();
let types = to_parquet_leaves(type_.clone());
let encoding = (0..types.len())
.into_iter()
.map(|_| encoding)
.collect::<Vec<_>>();
let columns = array_to_columns(array, type_, options, &encoding)?
.into_iter()
.map(|iter| iter.collect::<Vec<_>>())
.collect::<Vec<_>>();
if pages2.is_empty() {
pages2 = columns
.into_iter()
.map(|c| c.into_iter().map(|p| p.unwrap()).collect::<Vec<_>>())
.collect::<Vec<_>>();
} else {
for (i, column) in columns.into_iter().enumerate() {
let column = column.into_iter().map(|p| p.unwrap()).collect::<Vec<_>>();
pages2[i].extend(column);
}
}
}

pages.extend(pages2);

Ok((pages, schema))
}

/// Tests reading pages while skipping indexes
fn read_with_indexes(
(pages1, pages2, schema): (Vec<Page>, Vec<Page>, Schema),
(pages, schema): (Vec<Vec<Page>>, Schema),
expected: Box<dyn Array>,
) -> Result<()> {
let options = WriteOptions {
Expand All @@ -87,7 +101,9 @@ fn read_with_indexes(
Result::Ok(DynStreamingIterator::new(compressed_pages))
};

let row_group = DynIter::new(vec![to_compressed(pages1), to_compressed(pages2)].into_iter());
let pages = pages.into_iter().map(to_compressed).collect::<Vec<_>>();

let row_group = DynIter::new(pages.into_iter());

let writer = vec![];
let mut writer = FileWriter::try_new(writer, schema, options)?;
Expand Down Expand Up @@ -291,3 +307,43 @@ fn indexed_dict() -> Result<()> {

read_with_indexes(pages(&[&array], Encoding::RleDictionary)?, expected)
}

#[test]
fn indexed_required_struct() -> Result<()> {
let array21 = StructArray::new(
DataType::Struct(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Boolean, false),
]),
vec![
Int32Array::from_slice([1, 2, 3]).boxed(),
BooleanArray::from_slice([true, false, true]).boxed(),
],
None,
);
let array22 = StructArray::new(
DataType::Struct(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Boolean, false),
]),
vec![
Int32Array::from_slice([4, 5, 6]).boxed(),
BooleanArray::from_slice([false, true, false]).boxed(),
],
None,
);
let expected = StructArray::new(
DataType::Struct(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Boolean, false),
]),
vec![
Int32Array::from_slice([5]).boxed(),
BooleanArray::from_slice([true]).boxed(),
],
None,
)
.boxed();

read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected)
}