Skip to content

Commit

Permalink
Adding cloud features (#266)
Browse files Browse the repository at this point in the history
Adding cloud features
  • Loading branch information
Bidek56 authored Sep 16, 2024
1 parent 0f36b90 commit 5fade25
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 33 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ features = [
"string_pad",
"replace",
"cov",
"http"
"http",
"cloud",
"aws",
"gcp",
"azure"
]
git = "https://github.com/pola-rs/polars.git"
rev = "7686025ac7738607f2d4f6887e9a1313b7c8b1e2"
Expand Down
25 changes: 7 additions & 18 deletions polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { isPath } from "./utils";
import { type LazyDataFrame, _LazyDataFrame } from "./lazy/dataframe";
import { type Readable, Stream } from "stream";
import { concat } from "./functions";
import type { ScanParquetOptions, RowCount } from "./types";

export interface ReadCsvOptions {
inferSchemaLength: number | null;
Expand All @@ -31,7 +32,7 @@ export interface ReadCsvOptions {
skipRows: number;
tryParseDates: boolean;
skipRowsAfterHeader: number;
rowCount: any;
rowCount: RowCount;
raiseIfEmpty: boolean;
truncateRaggedLines: boolean;
missingIsNull: boolean;
Expand Down Expand Up @@ -470,23 +471,6 @@ export function readAvro(pathOrBody, options = {}) {
throw new Error("must supply either a path or body");
}

interface RowCount {
name: string;
offset: string;
}

interface ScanParquetOptions {
nRows?: number;
cache?: boolean;
parallel?: "auto" | "columns" | "row_groups" | "none";
rowCount?: RowCount;
rechunk?: boolean;
lowMemory?: boolean;
useStatistics?: boolean;
cloudOptions?: Map<string, string>;
retries?: number;
}

/**
* Lazily read from a local or cloud-hosted parquet file (or files).
Expand All @@ -503,6 +487,10 @@ interface ScanParquetOptions {
This determines the direction of parallelism. 'auto' will try to determine the optimal direction.
@param options.useStatistics - Use statistics in the parquet to determine if pages can be skipped from reading.
@param options.hivePartitioning - Infer statistics and schema from hive partitioned URL and use them to prune reads.
@param options.glob - Expand path given via globbing rules.
@param options.hiveSchema - The column names and data types of the columns by which the data is partitioned.
If set to `None` (default), the schema of the Hive partitions is inferred.
@param options.tryParseHiveDates - Whether to try parsing hive values as date/datetime types.
@param options.rechunk - In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks.
@param options.lowMemory - Reduce memory pressure at the expense of performance.
@param options.cache - Cache the result after reading.
Expand All @@ -518,6 +506,7 @@ interface ScanParquetOptions {
If `storage_options` is not provided, Polars will try to infer the information from environment variables.
@param retries - Number of retries if accessing a cloud instance fails.
@param includeFilePaths - Include the path of the source file(s) as a column with this name.
*/
export function scanParquet(source: string, options: ScanParquetOptions = {}) {
const defaultOptions = { parallel: "auto" };
Expand Down
19 changes: 14 additions & 5 deletions polars/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,21 @@ export interface ReadParquetOptions {
* Options for {@link scanParquet}
*/
export interface ScanParquetOptions {
columns?: string[] | number[];
numRows?: number;
parallel?: "auto" | "columns" | "row_groups" | "none";
rowCount?: RowCount;
nRows?: number;
rowIndexName?: string;
rowIndexOffset?: number;
cache?: boolean;
parallel?: "auto" | "columns" | "row_groups" | "none";
glob?: boolean;
hivePartitioning?: boolean;
hiveSchema?: unknown;
tryParseHiveDates?: boolean;
rechunk?: boolean;
lowMemory?: boolean;
useStatistics?: boolean;
cloudOptions?: unknown;
retries?: number;
includeFilePaths?: string;
}

/**
Expand All @@ -156,7 +165,7 @@ export interface RowCount {
/** name of column */
name: string;
/** offset */
offset: string;
offset: number;
}

/**
Expand Down
40 changes: 31 additions & 9 deletions src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,22 +713,38 @@ pub fn scan_csv(path: String, options: ScanCsvOptions) -> napi::Result<JsLazyFra
#[napi(object)]
pub struct ScanParquetOptions {
pub n_rows: Option<i64>,
pub row_index_name: Option<String>,
pub row_index_offset: Option<u32>,
pub cache: Option<bool>,
pub parallel: Wrap<ParallelStrategy>,
pub row_count: Option<JsRowCount>,
pub glob: Option<bool>,
pub hive_partitioning: Option<bool>,
pub hive_schema: Option<Wrap<Schema>>,
pub try_parse_hive_dates: Option<bool>,
pub rechunk: Option<bool>,
pub low_memory: Option<bool>,
pub use_statistics: Option<bool>,
pub cloud_options: Option<HashMap<String, String>>,
pub retries: Option<i64>,
pub include_file_paths: Option<String>,
}

#[napi(catch_unwind)]
pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<JsLazyFrame> {
let n_rows = options.n_rows.map(|i| i as usize);
let cache = options.cache.unwrap_or(true);
let glob = options.glob.unwrap_or(true);
let parallel = options.parallel;
let row_index: Option<RowIndex> = options.row_count.map(|rc| rc.into());

let row_index: Option<RowIndex> = if let Some(idn) = options.row_index_name {
Some(RowIndex {
name: idn.into(),
offset: options.row_index_offset.unwrap_or(0)
})
} else {
None
};

let rechunk = options.rechunk.unwrap_or(false);
let low_memory = options.low_memory.unwrap_or(false);
let use_statistics = options.use_statistics.unwrap_or(false);
Expand All @@ -751,6 +767,16 @@ pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<J
});
}

let hive_schema = options.hive_schema.map(|s| Arc::new(s.0));
let hive_options = HiveOptions {
enabled: options.hive_partitioning,
hive_start_idx: 0,
schema: hive_schema,
try_parse_dates: options.try_parse_hive_dates.unwrap_or(true),
};

let include_file_paths = options.include_file_paths;

let args = ScanArgsParquet {
n_rows,
cache,
Expand All @@ -760,13 +786,9 @@ pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<J
low_memory,
cloud_options,
use_statistics,
// TODO: Support Hive partitioning.
hive_options: HiveOptions {
enabled: Some(false),
..Default::default()
},
glob: true,
include_file_paths: None
hive_options,
glob,
include_file_paths: include_file_paths.map(Arc::from),
};
let lf = LazyFrame::scan_parquet(path, args).map_err(JsPolarsErr::from)?;
Ok(lf.into())
Expand Down

0 comments on commit 5fade25

Please sign in to comment.