Skip to content

1b5d/chunkr

Repository files navigation

chunkr

PyPI version

Support Python versions

A python library for the purpose of chunking different types of data files, without having to store the whole file in memory.

chunkr creates chunks from the source file with a user defined chunk size, then returns an iterator to loop over the resulting batches sequentially.

The type of a resulting batch is PyArrow's Table due to PyArrow's performance in reading & writing data files.

It's also possible to create a directory which contains the chunks as parquet files (currently only parquet is possible, new suggestions are welcomed), which will be cleaned up automatically when the user is done with the resulting files.

Currently supported input formats: csv, parquet

Getting started

pip install chunkr

Usage

Iterate over resulting batches

CSV input:

from chunkr import create_csv_chunk_iter

with create_csv_chunk_iter(path, chunk_size, storage_options, exclude, **extra_args) as chunk_iter:
    # process chunks
    for chunk in chunk_iter:
        # process chunk.to_pandas() or sth

Parquet:

from chunkr import create_parquet_chunk_iter

with create_parquet_chunk_iter(path, chunk_size, storage_options, exclude, **extra_args) as chunk_iter:
    # process chunks
    for chunk in chunk_iter:
        # process chunk.to_pandas() or sth

parameters:

  • path (str): the path of the input (local, sftp etc, see fsspec for possible inputs, not everything is supported though)
  • chunk_size (int, optional): number of records in a chunk. Defaults to 100_000.
  • storage_options (dict, optional): extra options to pass to the underlying storage e.g. username, password etc. Defaults to None.
  • exclude (list, optional): a list of files to be excluded, formatted as {path}->{filepath} where path is the input path and filepath is the concrete file path resolved in the target file system
  • extra_args (dict, optional): extra options passed on to the parsing system, file type specific

Create a directory containing the chunks as Parquet files

CSV input:

from chunkr import create_csv_chunk_dir

with create_csv_chunk_dir(path, output_path, chunk_size, storage_options, write_options, exclude, **extra_args) as chunks_dir:
    # process chunk files inside dir
    pd.read_parquet(file) for file in chunks_dir.iterdir()
    # the directory will be deleted when the context manager exits 

or Parquet:

from chunkr import create_parquet_chunk_dir

with create_parquet_chunk_dir(path, output_path, chunk_size, storage_options, write_options, exclude, **extra_args) as chunks_dir:
    # process chunk files inside dir
    pd.read_parquet(file) for file in chunks_dir.iterdir()
    # the directory will be deleted when the context manager exits

parameters:

  • path (str): the path of the input (local, sftp etc, see fsspec for possible input)
  • output_path (str): the path of the directory to output the chunks to
  • chunk_size (int, optional): number of records in a chunk. Defaults to 100_000.
  • storage_options (dict, optional): extra options to pass to the underlying storage e.g. username, password etc. Defaults to None.
  • write_options (dict, optional): extra options for writing the chunks passed to PyArrow's write_table() function. Defaults to None.
  • exclude (list, optional): a list of files to be excluded, formatted as {path}->{filepath} where path is the input path and filepath is the concrete file path resolved in the target file system
  • extra_args (dict, optional): extra options passed on to the parsing system, file specific

Note: currently chunkr only supports parquet as the output chunk files format

Additional examples

CSV input

Suppose you want to chunk a csv file of 1 million records into 10 parquet pieces, you can do the following:

CSV extra args are passed to PyArrows Parsing Options

from chunkr import create_csv_chunk_dir
import pandas as pd

with create_csv_chunk_dir(
            'path/to/file',
            'temp/output',
            chunk_size=100_000,
            quote_char='"',
            delimiter=',',
            escape_char='\\',
    ) as chunks_dir:

        assert 1_000_000 == sum(
            len(pd.read_parquet(file)) for file in chunks_dir.iterdir()
        )

Parquet input

Parquet extra args are passed to PyArrows iter_batches() function

from chunkr import create_parquet_chunk_dir
import pandas as pd

with create_parquet_chunk_dir(
            'path/to/file',
            'temp/output',
            chunk_size=100_000,
            columns=['id', 'name'],
    ) as chunks_dir:

        assert 1_000_000 == sum(
            len(pd.read_parquet(file)) for file in chunks_dir.iterdir()
        )

Reading file(s) inside an archive (zip, tar)

reading multiple files from a zip archive is possible, for csv files in /folder_in_archive/*.csv within an archive csv/archive.zip you can do:

from chunkr import create_csv_chunk_iter
import pandas as pd

path = 'zip://folder_in_archive/*.csv::csv/archive.zip'
with create_csv_chunk_iter(path) as chunk_iter:
    assert 1_000_000 == sum(len(chunk) for chunk in chunk_iter)

The only exception is when particularly reading a csv file from a tar.gz, there can be only 1 csv file within the archive:

from chunkr import create_csv_chunk_iter
import pandas as pd

path = 'tar://*.csv::csv/archive_single.tar.gz'
with create_csv_chunk_iter(path) as chunk_iter:
    assert 1_000_000 == sum(len(chunk) for chunk in chunk_iter)

but it's okay for other file types like parquet:

from chunkr import create_parquet_chunk_iter
import pandas as pd

path = 'tar://partition_idx=*/*.parquet::test/parquet/archive.tar.gz'
with create_parquet_chunk_iter(path) as chunk_iter:
    assert 1_000_000 == sum(len(chunk) for chunk in chunk_iter)

Reading from an SFTP remote system

To authenticate to the SFTP server, you can pass the credentials via storage_options:

from chunkr import create_parquet_chunk_iter
import pandas as pd

sftp_path = f"sftp://{sftpserver.host}:{sftpserver.port}/parquet/pyarrow_snappy.parquet"

with create_parquet_chunk_iter(
        sftp_path,
        storage_options={
            "username": "user",
            "password": "pw",
        }
    ) as chunk_iter:
    assert 1_000_000 == sum(len(chunk) for chunk in chunk_iter)

Reading from a URL

from chunkr import create_parquet_chunk_iter
import pandas as pd

url = "https://example.com/1mil.parquet"

with create_parquet_chunk_iter(url) as chunk_iter:
    assert 1_000_000 == sum(len(chunk) for chunk in chunk_iter)

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages