Skip to content

Commit

Permalink
Credentials and identity reader (AmpX-AI#18)
Browse files Browse the repository at this point in the history
* Add configurable credentials

* Add identity reader
  • Loading branch information
tmi authored Aug 31, 2022
1 parent e9235fb commit 7983689
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 24 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The package has querying capabilities, thus the name stands for "file system que
## Quick Start
The core is installed just via `pip install fsql`. Additional filesystem support or output representation is installed via `pip install fsql[s3]` or `pip install fsql[dask]`.

For examples of usage, we use selected test files accompanied with explanatory comments:
For examples of usage (and sort-of documentation), we use selected test files accompanied with explanatory comments:
1. [basic usage](tests/test_example_usage.py),
2. [date range utils](tests/test_daterange.py),
3. [integrating with Dask](tests/test_dask.py).
Expand Down
60 changes: 38 additions & 22 deletions fsql/__init__.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@
"""fsql"""
"""Place for common fsql utils"""
# TODO eventually move the fs + config to a standalone module
from __future__ import annotations

import os
from typing import Any, NoReturn
from collections import UserDict
from typing import Any, NoReturn, Optional

import fsspec
from fsspec.spec import AbstractFileSystem

__all__: list[str] = []


def get_url_and_fs(url: str) -> tuple[str, AbstractFileSystem]:
"""This function standardizes url->protocol derivation, and allows for fs-specific parameter passing.
class FsqlConfig(UserDict):
pass

In particular, we need it to allow minio-in-place-of-s3 functionality, which is not supported on its
own using environment variables in vanilla fsspec.
"""

# TODO extend signature to make it more meaningufl, that is:
# - capture the os dependency somehow, eg by having a dict param and passing os.environ to it
# - add the user-provided FS here, to validate protocolar compatibility
# TODO org improvements -- better location for this, and change name and return type to better capture that
# the resource location and file system are inherently bound
fsql_config = FsqlConfig()

fs_key, url_suff = url.split(":/", 1)

env2dict = lambda mapping: {val: os.environ[key] for key, val in mapping.items() if key in os.environ} # noqa: E731
def set_default_config(protocol: str, config: dict[str, Any]):
"""Sets config values to be provided to every subsequent default `fs` instance creation.
Setting values is *NOT* thread safe, but reading is."""
fsql_config[protocol] = config

if fs_key == "s3":

def _get_default_config(protocol: str) -> dict[str, Any]:
"""Reads environment variables and merges with default config. Default config has precedence.
In particular, we need it to allow minio-in-place-of-s3 functionality, which is not supported on its
own using environment variables in vanilla fsspec."""
env2dict = lambda mapping: {val: os.environ[key] for key, val in mapping.items() if key in os.environ} # noqa: E731
if protocol == "s3":
# boto itself supports only access key and secret key via env
# TODO this is quite dumb, especially the max pool connections -- a more intelligent way of config is desired
# the right way forward is probably some FsFactory singleton which reads:
Expand All @@ -40,21 +44,33 @@ def get_url_and_fs(url: str) -> tuple[str, AbstractFileSystem]:
"AWS_SECRET_ACCESS_KEY": "secret",
"AWS_SESSION_TOKEN": "token",
}
l1kwargs: dict[str, Any] = env2dict(configurable_keys_l1) # mypy
config: dict[str, Any] = env2dict(configurable_keys_l1) # mypy
configurable_keys_l2 = {
"AWS_ENDPOINT_URL": "endpoint_url",
"AWS_REGION_NAME": "region_name",
}
l2kwargs = env2dict(configurable_keys_l2)
# in case of a lot of small files and local tests, we tend to exhaust the conn pool quickly and spawning warns
l1kwargs["config_kwargs"] = {"max_pool_connections": 25}
l1kwargs["client_kwargs"] = l2kwargs

fs = fsspec.filesystem("s3", **l1kwargs)
config["config_kwargs"] = {"max_pool_connections": 25}
config["client_kwargs"] = l2kwargs
else:
fs = fsspec.filesystem(fs_key)
config = {}
return {**config, **fsql_config.get(protocol, {})}


return url_suff, fs
def get_fs(protocol: str, config: Optional[dict[str, Any]] = None) -> AbstractFileSystem:
"""Creates `fs` instance with config from `config` arg, values provided by `set_default_config` and env variables;
in this order."""
config_nonnull = {} if not config else config
config_merged = {**_get_default_config(protocol), **config_nonnull}
return fsspec.filesystem(protocol, **config_merged)


def get_url_and_fs(url: str) -> tuple[str, AbstractFileSystem]:
"""This function standardizes `url -> (fs, base_path)` split. The `fs` instance can be configured via env vars or
the `set_default_config` endpoint."""
protocol, url_suff = url.split(":/", 1)
return url_suff, get_fs(protocol)


def assert_exhaustive_enum(x: NoReturn) -> NoReturn:
Expand Down
40 changes: 39 additions & 1 deletion fsql/deser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
and concatenates all the individual sub-frames into a single one
- EnumeratedDictReader, which reads the data as a dictionary, adds the partition columns as dict
keys, and concatenates into an enumerated dict (order being the alphabetic of underlying files)
- IdentityReader, which returns a list of all matching underlying files, along with the dictionary
of the partition values, and a callback for actual download of the file. This is used if specific
batching/postprocessing is desired, and the user does not want to implement a specialised reader.
In other words, this is a fancy `ls` functionality within `fsql`.
All these autodetect the input format from the suffix of the key. If this is desired to be
overridden with a fixed format, user should instantiate with the desired InputFormat.
Expand Down Expand Up @@ -45,12 +49,14 @@
from collections import defaultdict
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from enum import Enum, auto, unique
from functools import partial, reduce
from itertools import chain
from typing import Generic, Iterable, NamedTuple, Tuple, TypeVar, Union
from typing import Any, Generic, Iterable, NamedTuple, Tuple, TypeVar, Union

import pandas as pd
from fsspec.core import OpenFile
from fsspec.spec import AbstractFileSystem

from fsql import assert_exhaustive_enum
Expand Down Expand Up @@ -260,3 +266,35 @@ def concat(self, data: Iterable[dict]) -> dict:


ENUMERATED_DICT_READER = EnumeratedDictReader()


@dataclass
class FileInPartition:
file_url: str
partition_values: dict
fs: AbstractFileSystem

def consume(self, fd_consumer: Callable[[OpenFile], Any]):
with self.fs.open(self.file_url) as fd:
try:
return fd_consumer(fd)
except FileNotFoundError as e:
logger.warning(
f"file {self.file_url} reading exception {type(e)}, attempting cache invalidation and reread"
)
self.fs.invalidate_cache()
return fd_consumer(fd)


class IdentityReader(DataReader[Iterable[FileInPartition]]):
"""Think of this as a fancy `ls`."""

def read_single(self, partition: Partition, fs: AbstractFileSystem) -> PartitionReadOutcome:
return [[FileInPartition(file_url=partition.url, partition_values=partition.columns, fs=fs)]], []

def concat(self, outcomes: Iterable[Iterable[FileInPartition]]) -> Iterable[FileInPartition]:
data: Iterable[FileInPartition] = chain(*outcomes)
return data # type: ignore


IDENTITY_READER = IdentityReader()
5 changes: 5 additions & 0 deletions tests/test_example_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,8 @@ def weird_query_func(col2: str, colX: str) -> bool:
# a year-month-day table, and you want the 1st and the 15th day of months in Q1, you can go with
# `year=2022/month=[1,2,3]/day=[1,15]`. If, however, your query is a date range such as from the 14th June to
# 17th September, you better head over to `test_daterange` which shows advanced capabilities.

# You can now continue with either [date range utils](tests/test_daterange.py), or
# [integrating with Dask](tests/test_dask.py). Furthermore, there is IdentityReader which provides a fancy `ls`
# functionality. Lastly, you may want to inspect the `fsql/__init__.py` for information how to configure the S3
# credentials.
55 changes: 55 additions & 0 deletions tests/test_identity_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import pandas as pd
from pandas.testing import assert_frame_equal

from fsql.api import read_partitioned_table
from fsql.column_parser import AutoParser
from fsql.deser import IDENTITY_READER
from fsql.query import Q_TRUE

df1 = pd.DataFrame(data={"c1": [0, 1], "c2": ["hello", "world"]})
df2 = pd.DataFrame(data={"c1": [2, 3], "c2": ["salve", "mundi"]})
df3 = pd.DataFrame(data={"c1": [4, 5], "c2": ["cthulhu", "rlyeh"]})


def test_identity_reader(tmp_path):
case1_path = tmp_path / "table1"
case1_path.mkdir(parents=True)
df1.to_csv(case1_path / "f1.csv", index=False)

case1_result_r = read_partitioned_table(f"file://{case1_path}/", Q_TRUE, data_reader=IDENTITY_READER)
case1_result = list(case1_result_r)

assert len(case1_result) == 1
assert case1_result[0].file_url == f"/{case1_path}/f1.csv"
assert case1_result[0].partition_values == {}

case1_deserd = case1_result[0].consume(pd.read_csv)
assert_frame_equal(df1, case1_deserd)

case2_path = tmp_path / "table2"
case2_part1 = case2_path / "c3=42" / "c4=test"
case2_part1.mkdir(parents=True)
case2_part2 = case2_path / "c3=43" / "c4=test"
case2_part2.mkdir(parents=True)
case2_part3 = case2_path / "c3=44" / "c4=test"
case2_part3.mkdir(parents=True)
df1.to_csv(case2_part1 / "f1.csv", index=False)
df2.to_csv(case2_part2 / "f2.csv", index=False)
df3.to_csv(case2_part3 / "f3.csv", index=False)

parser = AutoParser.from_str("c3=[42,43]/c4=[test]")
case2_result_r = read_partitioned_table(
f"file://{case2_path}/", Q_TRUE, column_parser=parser, data_reader=IDENTITY_READER
)
case2_result = list(case2_result_r)

assert len(case2_result) == 2
assert case2_result[0].file_url == f"/{case2_path}/c3=42/c4=test/f1.csv"
assert case2_result[1].file_url == f"/{case2_path}/c3=43/c4=test/f2.csv"
assert case2_result[0].partition_values == {"c3": "42", "c4": "test"}
assert case2_result[1].partition_values == {"c3": "43", "c4": "test"}

case2_deserd1 = case2_result[0].consume(pd.read_csv)
assert_frame_equal(df1, case2_deserd1)
case2_deserd2 = case2_result[1].consume(pd.read_csv)
assert_frame_equal(df2, case2_deserd2)

0 comments on commit 7983689

Please sign in to comment.