diff --git a/README.md b/README.md index 46c01bd..73853ae 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ The package has querying capabilities, thus the name stands for "file system que ## Quick Start The core is installed just via `pip install fsql`. Additional filesystem support or output representation is installed via `pip install fsql[s3]` or `pip install fsql[dask]`. -For examples of usage, we use selected test files accompanied with explanatory comments: +For examples of usage (and sort-of documentation), we use selected test files accompanied with explanatory comments: 1. [basic usage](tests/test_example_usage.py), 2. [date range utils](tests/test_daterange.py), 3. [integrating with Dask](tests/test_dask.py). diff --git a/fsql/__init__.py b/fsql/__init__.py index 4cbc4aa..3cb5b7d 100644 --- a/fsql/__init__.py +++ b/fsql/__init__.py @@ -1,8 +1,10 @@ -"""fsql""" +"""Place for common fsql utils""" +# TODO eventually move the fs + config to a standalone module from __future__ import annotations import os -from typing import Any, NoReturn +from collections import UserDict +from typing import Any, NoReturn, Optional import fsspec from fsspec.spec import AbstractFileSystem @@ -10,24 +12,26 @@ __all__: list[str] = [] -def get_url_and_fs(url: str) -> tuple[str, AbstractFileSystem]: - """This function standardizes url->protocol derivation, and allows for fs-specific parameter passing. +class FsqlConfig(UserDict): + pass - In particular, we need it to allow minio-in-place-of-s3 functionality, which is not supported on its - own using environment variables in vanilla fsspec. - """ - # TODO extend signature to make it more meaningufl, that is: - # - capture the os dependency somehow, eg by having a dict param and passing os.environ to it - # - add the user-provided FS here, to validate protocolar compatibility - # TODO org improvements -- better location for this, and change name and return type to better capture that - # the resource location and file system are inherently bound +fsql_config = FsqlConfig() - fs_key, url_suff = url.split(":/", 1) - env2dict = lambda mapping: {val: os.environ[key] for key, val in mapping.items() if key in os.environ} # noqa: E731 +def set_default_config(protocol: str, config: dict[str, Any]): + """Sets config values to be provided to every subsequent default `fs` instance creation. + Setting values is *NOT* thread safe, but reading is.""" + fsql_config[protocol] = config - if fs_key == "s3": + +def _get_default_config(protocol: str) -> dict[str, Any]: + """Reads environment variables and merges with default config. Default config has precedence. + + In particular, we need it to allow minio-in-place-of-s3 functionality, which is not supported on its + own using environment variables in vanilla fsspec.""" + env2dict = lambda mapping: {val: os.environ[key] for key, val in mapping.items() if key in os.environ} # noqa: E731 + if protocol == "s3": # boto itself supports only access key and secret key via env # TODO this is quite dumb, especially the max pool connections -- a more intelligent way of config is desired # the right way forward is probably some FsFactory singleton which reads: @@ -40,21 +44,33 @@ def get_url_and_fs(url: str) -> tuple[str, AbstractFileSystem]: "AWS_SECRET_ACCESS_KEY": "secret", "AWS_SESSION_TOKEN": "token", } - l1kwargs: dict[str, Any] = env2dict(configurable_keys_l1) # mypy + config: dict[str, Any] = env2dict(configurable_keys_l1) # mypy configurable_keys_l2 = { "AWS_ENDPOINT_URL": "endpoint_url", "AWS_REGION_NAME": "region_name", } l2kwargs = env2dict(configurable_keys_l2) # in case of a lot of small files and local tests, we tend to exhaust the conn pool quickly and spawning warns - l1kwargs["config_kwargs"] = {"max_pool_connections": 25} - l1kwargs["client_kwargs"] = l2kwargs - - fs = fsspec.filesystem("s3", **l1kwargs) + config["config_kwargs"] = {"max_pool_connections": 25} + config["client_kwargs"] = l2kwargs else: - fs = fsspec.filesystem(fs_key) + config = {} + return {**config, **fsql_config.get(protocol, {})} + - return url_suff, fs +def get_fs(protocol: str, config: Optional[dict[str, Any]] = None) -> AbstractFileSystem: + """Creates `fs` instance with config from `config` arg, values provided by `set_default_config` and env variables; + in this order.""" + config_nonnull = {} if not config else config + config_merged = {**_get_default_config(protocol), **config_nonnull} + return fsspec.filesystem(protocol, **config_merged) + + +def get_url_and_fs(url: str) -> tuple[str, AbstractFileSystem]: + """This function standardizes `url -> (fs, base_path)` split. The `fs` instance can be configured via env vars or + the `set_default_config` endpoint.""" + protocol, url_suff = url.split(":/", 1) + return url_suff, get_fs(protocol) def assert_exhaustive_enum(x: NoReturn) -> NoReturn: diff --git a/fsql/deser.py b/fsql/deser.py index 41eb414..a06dfdc 100644 --- a/fsql/deser.py +++ b/fsql/deser.py @@ -6,6 +6,10 @@ and concatenates all the individual sub-frames into a single one - EnumeratedDictReader, which reads the data as a dictionary, adds the partition columns as dict keys, and concatenates into an enumerated dict (order being the alphabetic of underlying files) + - IdentityReader, which returns a list of all matching underlying files, along with the dictionary + of the partition values, and a callback for actual download of the file. This is used if specific + batching/postprocessing is desired, and the user does not want to implement a specialised reader. + In other words, this is a fancy `ls` functionality within `fsql`. All these autodetect the input format from the suffix of the key. If this is desired to be overridden with a fixed format, user should instantiate with the desired InputFormat. @@ -45,12 +49,14 @@ from collections import defaultdict from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from enum import Enum, auto, unique from functools import partial, reduce from itertools import chain -from typing import Generic, Iterable, NamedTuple, Tuple, TypeVar, Union +from typing import Any, Generic, Iterable, NamedTuple, Tuple, TypeVar, Union import pandas as pd +from fsspec.core import OpenFile from fsspec.spec import AbstractFileSystem from fsql import assert_exhaustive_enum @@ -260,3 +266,35 @@ def concat(self, data: Iterable[dict]) -> dict: ENUMERATED_DICT_READER = EnumeratedDictReader() + + +@dataclass +class FileInPartition: + file_url: str + partition_values: dict + fs: AbstractFileSystem + + def consume(self, fd_consumer: Callable[[OpenFile], Any]): + with self.fs.open(self.file_url) as fd: + try: + return fd_consumer(fd) + except FileNotFoundError as e: + logger.warning( + f"file {self.file_url} reading exception {type(e)}, attempting cache invalidation and reread" + ) + self.fs.invalidate_cache() + return fd_consumer(fd) + + +class IdentityReader(DataReader[Iterable[FileInPartition]]): + """Think of this as a fancy `ls`.""" + + def read_single(self, partition: Partition, fs: AbstractFileSystem) -> PartitionReadOutcome: + return [[FileInPartition(file_url=partition.url, partition_values=partition.columns, fs=fs)]], [] + + def concat(self, outcomes: Iterable[Iterable[FileInPartition]]) -> Iterable[FileInPartition]: + data: Iterable[FileInPartition] = chain(*outcomes) + return data # type: ignore + + +IDENTITY_READER = IdentityReader() diff --git a/tests/test_example_usage.py b/tests/test_example_usage.py index 7700b26..554fb42 100644 --- a/tests/test_example_usage.py +++ b/tests/test_example_usage.py @@ -111,3 +111,8 @@ def weird_query_func(col2: str, colX: str) -> bool: # a year-month-day table, and you want the 1st and the 15th day of months in Q1, you can go with # `year=2022/month=[1,2,3]/day=[1,15]`. If, however, your query is a date range such as from the 14th June to # 17th September, you better head over to `test_daterange` which shows advanced capabilities. + + # You can now continue with either [date range utils](tests/test_daterange.py), or + # [integrating with Dask](tests/test_dask.py). Furthermore, there is IdentityReader which provides a fancy `ls` + # functionality. Lastly, you may want to inspect the `fsql/__init__.py` for information how to configure the S3 + # credentials. diff --git a/tests/test_identity_reader.py b/tests/test_identity_reader.py new file mode 100644 index 0000000..c28386a --- /dev/null +++ b/tests/test_identity_reader.py @@ -0,0 +1,55 @@ +import pandas as pd +from pandas.testing import assert_frame_equal + +from fsql.api import read_partitioned_table +from fsql.column_parser import AutoParser +from fsql.deser import IDENTITY_READER +from fsql.query import Q_TRUE + +df1 = pd.DataFrame(data={"c1": [0, 1], "c2": ["hello", "world"]}) +df2 = pd.DataFrame(data={"c1": [2, 3], "c2": ["salve", "mundi"]}) +df3 = pd.DataFrame(data={"c1": [4, 5], "c2": ["cthulhu", "rlyeh"]}) + + +def test_identity_reader(tmp_path): + case1_path = tmp_path / "table1" + case1_path.mkdir(parents=True) + df1.to_csv(case1_path / "f1.csv", index=False) + + case1_result_r = read_partitioned_table(f"file://{case1_path}/", Q_TRUE, data_reader=IDENTITY_READER) + case1_result = list(case1_result_r) + + assert len(case1_result) == 1 + assert case1_result[0].file_url == f"/{case1_path}/f1.csv" + assert case1_result[0].partition_values == {} + + case1_deserd = case1_result[0].consume(pd.read_csv) + assert_frame_equal(df1, case1_deserd) + + case2_path = tmp_path / "table2" + case2_part1 = case2_path / "c3=42" / "c4=test" + case2_part1.mkdir(parents=True) + case2_part2 = case2_path / "c3=43" / "c4=test" + case2_part2.mkdir(parents=True) + case2_part3 = case2_path / "c3=44" / "c4=test" + case2_part3.mkdir(parents=True) + df1.to_csv(case2_part1 / "f1.csv", index=False) + df2.to_csv(case2_part2 / "f2.csv", index=False) + df3.to_csv(case2_part3 / "f3.csv", index=False) + + parser = AutoParser.from_str("c3=[42,43]/c4=[test]") + case2_result_r = read_partitioned_table( + f"file://{case2_path}/", Q_TRUE, column_parser=parser, data_reader=IDENTITY_READER + ) + case2_result = list(case2_result_r) + + assert len(case2_result) == 2 + assert case2_result[0].file_url == f"/{case2_path}/c3=42/c4=test/f1.csv" + assert case2_result[1].file_url == f"/{case2_path}/c3=43/c4=test/f2.csv" + assert case2_result[0].partition_values == {"c3": "42", "c4": "test"} + assert case2_result[1].partition_values == {"c3": "43", "c4": "test"} + + case2_deserd1 = case2_result[0].consume(pd.read_csv) + assert_frame_equal(df1, case2_deserd1) + case2_deserd2 = case2_result[1].consume(pd.read_csv) + assert_frame_equal(df2, case2_deserd2)