diff --git a/docs/source/reference/service.md b/docs/source/reference/service.md index 213a81d57..69f87acb8 100644 --- a/docs/source/reference/service.md +++ b/docs/source/reference/service.md @@ -25,7 +25,7 @@ or its dask counterpart. .. autosummary:: :toctree: generated - tiled.adapters.csv.read_csv + tiled.adapters.csv.CSVAdapter tiled.adapters.excel.ExcelAdapter tiled.adapters.hdf5.HDF5Adapter tiled.adapters.netcdf.read_netcdf diff --git a/scripts/mongo_migration/load.py b/scripts/mongo_migration/load.py deleted file mode 100644 index a706f3f8a..000000000 --- a/scripts/mongo_migration/load.py +++ /dev/null @@ -1,77 +0,0 @@ -import json -import importlib -from datetime import datetime -import dateutil -import yaml -import event_model -import os -import re -import sys -import tqdm -from bson import json_util -from databroker.mongo_normalized import MongoAdapter - -def import_from_yaml(spec): - module_name, func_name = spec.split(':') - return getattr(importlib.import_module(module_name), func_name) - - -def unbatch_documents(docs): - for item in docs: - if item['name'] == 'event_page': - for _doc in event_model.unpack_event_page(item['doc']): - yield {"name": "event", "doc": _doc} - elif item['name'] == 'datum_page': - for _doc in event_model.unpack_datum_page(item['doc']): - yield {"name":"datum", "doc": _doc} - else: - yield item - - -beamline = 'iss' -mongo_user = f"{beamline.lower()}_read" -mongo_pass = os.environ.get("MONGO_PASSWORD", MONGO_PASSWORD) - -# Load the beamline-specific patches and handlers -tiled_config_dir = f"{os.getenv('HOME')}/.config/tiled" -sys.path.append(tiled_config_dir) -with open(tiled_config_dir + "/profiles/profiles.yml", 'r') as f: - profiles = yaml.safe_load(f) -args = profiles[beamline]['direct']['trees'][0]['args'] -transforms = args.get('transforms', {}) -handlers = args.get('handler_registry', {}) -uri = args['uri'] -uri = re.sub(r'\$\{(?:MONGO_USER_)\w{3,}\}', mongo_user, uri) -uri = re.sub(r'\$\{(?:MONGO_PASSWORD_)\w{3,}\}', mongo_pass, uri) - -print(f"Connecting to {uri}") -ma = MongoAdapter.from_uri(uri, transforms={key:import_from_yaml(val) for key, val in transforms.items()}, - handler_registry={key:import_from_yaml(val) for key, val in handlers.items()}) -coll = ma._run_start_collection - - - -# docs = [] -# start_doc_cursor = coll.find() -# for i in range(100): -# bs_run = ma._get_run(start_doc_cursor.next()) -# g = bs_run.documents(fill=False, size=25) -# g = map(lambda item : {"name":item[0], "doc": (item[1], item[1].pop("_id", None))[0]}, g) -# g = unbatch_documents(g) -# docs += list(g) - - -cur = coll.find(filter={'time':{'$gt':dateutil.parser.parse('2024-10-22 00:00').timestamp(), -'$lt':dateutil.parser.parse('2024-10-23 00:00').timestamp()}} -, projection={'_id':False}) -docs = [] -for doc0 in cur: - print(doc0["uid"]) - bs_run = ma._get_run(doc0) - g = bs_run.documents(fill=False, size=25) - g = map(lambda item : {"name":item[0], "doc": (item[1], item[1].pop("_id", None))[0]}, g) - g = unbatch_documents(g) - docs += list(g) - -with open(f'data/docs_{beamline.lower()}.json', 'w') as f: - f.write(json_util.dumps(docs)) diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index 61b24e98f..9f33dbb54 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -13,7 +13,7 @@ import tifffile import xarray -from ..adapters.csv import read_csv +from ..adapters.csv import CSVAdapter from ..adapters.dataframe import ArrayAdapter from ..adapters.tiff import TiffAdapter from ..catalog import in_memory @@ -236,7 +236,7 @@ async def test_write_dataframe_external_direct(a, tmpdir): filepath = str(tmpdir / "file.csv") data_uri = ensure_uri(filepath) df.to_csv(filepath, index=False) - dfa = read_csv(data_uri) + dfa = CSVAdapter.from_uris(data_uri) structure = asdict(dfa.structure()) await a.create_node( key="x", diff --git a/tiled/_tests/test_jpeg.py b/tiled/_tests/test_jpeg.py index 3e7abc77c..ed12bcd02 100644 --- a/tiled/_tests/test_jpeg.py +++ b/tiled/_tests/test_jpeg.py @@ -32,7 +32,7 @@ def client(tmpdir_module): tree = MapAdapter( { - "color": JPEGAdapter(ensure_uri(path)), + "color": JPEGAdapter.from_uris(ensure_uri(path)), "sequence": JPEGSequenceAdapter.from_uris( [ensure_uri(filepath) for filepath in filepaths] ), diff --git a/tiled/adapters/_zoo.py b/tiled/adapters/_zoo.py deleted file mode 100644 index 899a2d0a0..000000000 --- a/tiled/adapters/_zoo.py +++ /dev/null @@ -1,244 +0,0 @@ -from typing import Any, List, Optional, Tuple, Union, Dict -from datetime import timedelta - -import dask.array -from numpy.typing import NDArray - -from ..structures.array import ArrayStructure -from ..structures.core import Spec, StructureFamily -from ..structures.table import TableStructure -from ..structures.sparse import COOStructure -from .protocols import AccessPolicy -import pandas -from .type_alliases import JSON, NDSlice -import sparse -from ..server.schemas import SortingItem -from ..structures.core import Spec, StructureFamily -from ..structures.table import TableStructure -from ..structures.awkward import AwkwardStructure -from .awkward_directory_container import DirectoryContainer -from .protocols import AccessPolicy, AnyAdapter -from .type_alliases import JSON -from .utils import IndexersMixin -import collections -MappingType = collections.abc.Mapping - - -class ArrayAdapter: - structure_family = StructureFamily.array - - def __init__( - self, - array: NDArray[Any], - structure: ArrayStructure, - *, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> None: - ... - - @classmethod - def from_array( - cls, - array: NDArray[Any], - *, - shape: Optional[Tuple[int, ...]] = None, - chunks: Optional[Tuple[Tuple[int, ...], ...]] = None, - dims: Optional[Tuple[str, ...]] = None, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "ArrayAdapter": - ... - -class TableAdapter: - structure_family = StructureFamily.table - - @classmethod - def from_pandas( - cls, - *args: Any, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - npartitions: int = 1, - **kwargs: Any, - ) -> "TableAdapter": - ... - - @classmethod - def from_dict( - cls, - *args: Any, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - npartitions: int = 1, - **kwargs: Any, - ) -> "TableAdapter": - ... - - @classmethod - def from_dask_dataframe( - cls, - ddf: dask.dataframe.DataFrame, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "TableAdapter": - ... - - def __init__( - self, - partitions: Union[dask.dataframe.DataFrame, pandas.DataFrame], - structure: TableStructure, - *, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> None: - ... - -class COOAdapter: - structure_family = StructureFamily.sparse - - @classmethod - def from_arrays( - cls, - coords: NDArray[Any], - data: Union[dask.dataframe.DataFrame, pandas.DataFrame], - shape: Tuple[int, ...], - dims: Optional[Tuple[str, ...]] = None, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "COOAdapter": - ... - - @classmethod - def from_coo( - cls, - coo: sparse.COO, - *, - dims: Optional[Tuple[str, ...]] = None, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "COOAdapter": - ... - - @classmethod - def from_global_ref( - cls, - blocks: Dict[Tuple[int, ...], Tuple[NDArray[Any], Any]], - shape: Tuple[int, ...], - chunks: Tuple[Tuple[int, ...], ...], - *, - dims: Optional[Tuple[str, ...]] = None, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "COOAdapter": - ... - - def __init__( - self, - blocks: Dict[Tuple[int, ...], Tuple[NDArray[Any], Any]], - structure: COOStructure, - *, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> None: - ... - -class SparseBlocksParquetAdapter: - structure_family = StructureFamily.sparse - - def __init__( - self, - data_uris: List[str], - structure: COOStructure, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> None: - ... - -class AwkwardAdapter: - structure_family = StructureFamily.awkward - - def __init__( - self, - container: DirectoryContainer, - structure: AwkwardStructure, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> None: - ... - - @classmethod - def from_array( - cls, - array: NDArray[Any], - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "AwkwardAdapter": - ... - -class AwkwardBuffersAdapter(AwkwardAdapter): - structure_family = StructureFamily.awkward - - @classmethod - def from_directory( - cls, - data_uri: str, - structure: AwkwardStructure, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "AwkwardBuffersAdapter": - ... - -class MapAdapter(MappingType[str, AnyAdapter], IndexersMixin): - structure_family = StructureFamily.container - - def __init__( - self, - mapping: Dict[str, Any], - *, - structure: Optional[TableStructure] = None, - metadata: Optional[JSON] = None, - sorting: Optional[List[SortingItem]] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - entries_stale_after: Optional[timedelta] = None, - metadata_stale_after: Optional[timedelta] = None, - must_revalidate: bool = True, - ) -> None: - ... - -class DatasetAdapter(MapAdapter): - - @classmethod - def from_dataset( - cls, - dataset: Any, - *, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "DatasetAdapter": - ... - - def __init__( - self, - mapping: Any, - *args: Any, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - **kwargs: Any, - ) -> None: - ... \ No newline at end of file diff --git a/tiled/adapters/arrow.py b/tiled/adapters/arrow.py index 84e5afffb..61882faf0 100644 --- a/tiled/adapters/arrow.py +++ b/tiled/adapters/arrow.py @@ -48,6 +48,19 @@ def __init__( self.specs = list(specs or []) self.access_policy = access_policy + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: TableStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "ArrowAdapter": + data_uris = [a.data_uri for a in assets] + return cls(data_uris, structure, metadata, specs, access_policy) + def metadata(self) -> JSON: """ diff --git a/tiled/adapters/awkward_buffers.py b/tiled/adapters/awkward_buffers.py index 8a595e6d8..7d27191b7 100644 --- a/tiled/adapters/awkward_buffers.py +++ b/tiled/adapters/awkward_buffers.py @@ -2,7 +2,7 @@ A directory containing awkward buffers, one file per form key. """ from pathlib import Path -from typing import List, Optional +from typing import Dict, List, Optional, Union import awkward.forms @@ -73,3 +73,17 @@ def from_directory( specs=specs, access_policy=access_policy, ) + + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: AwkwardStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "AwkwardBuffersAdapter": + return cls.from_directory( + assets[0].data_uri, structure, metadata, specs, access_policy + ) diff --git a/tiled/adapters/csv.py b/tiled/adapters/csv.py index 15d2ba61d..ce7f3fc89 100644 --- a/tiled/adapters/csv.py +++ b/tiled/adapters/csv.py @@ -4,17 +4,16 @@ import dask.dataframe import pandas +from ..structures.array import ArrayStructure, BuiltinDtype, StructDtype from ..structures.core import Spec, StructureFamily from ..structures.data_source import Asset, DataSource, Management from ..structures.table import TableStructure -from ..structures.array import ArrayStructure, StructDtype, BuiltinDtype from ..utils import ensure_uri, path_from_uri -from .array import ArrayAdapter, slice_and_shape_from_block_and_chunks +from .array import ArrayAdapter from .dataframe import DataFrameAdapter from .protocols import AccessPolicy from .table import TableAdapter -from .type_alliases import JSON, NDSlice -from numpy.typing import NDArray +from .type_alliases import JSON def read_csv( @@ -74,7 +73,7 @@ class CSVAdapter: def __init__( self, - data_uris: Union[str, List[str]], + data_uris: List[str], structure: Optional[TableStructure] = None, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, @@ -93,22 +92,53 @@ def __init__( kwargs : dict any keyword arguments that can be passed to the pandas.read_csv function, e.g. names, sep, dtype, etc. """ - # TODO Store data_uris instead and generalize to non-file schemes. - if isinstance(data_uris, str): - data_uris = [data_uris] - self._partition_paths = [path_from_uri(uri) for uri in data_uris] + self._file_paths = [path_from_uri(uri) for uri in data_uris] self._metadata = metadata or {} self._read_csv_kwargs = kwargs if structure is None: table = dask.dataframe.read_csv( - self._partition_paths[0], **self._read_csv_kwargs + self._file_paths[0], **self._read_csv_kwargs ) structure = TableStructure.from_dask_dataframe(table) - structure.npartitions = len(self._partition_paths) + structure.npartitions = len(self._file_paths) self._structure = structure self.specs = list(specs or []) self.access_policy = access_policy + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: TableStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "CSVAdapter": + return cls( + [ast.data_uri for ast in assets], + structure, + metadata, + specs, + access_policy, + **kwargs, + ) + + @classmethod + def from_uris( + cls, + data_uris: Union[str, List[str]], + structure: Optional[TableStructure] = None, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "CSVAdapter": + if isinstance(data_uris, str): + data_uris = [data_uris] + + return cls(data_uris, structure, metadata, specs, access_policy, **kwargs) + def __repr__(self) -> str: return f"{type(self).__name__}({self._structure.columns!r})" @@ -200,8 +230,7 @@ def read(self, fields: Optional[List[str]] = None) -> dask.dataframe.DataFrame: """ dfs = [ - self.read_partition(i, fields=fields) - for i in range(len(self._partition_paths)) + self.read_partition(i, fields=fields) for i in range(len(self._file_paths)) ] return dask.dataframe.concat(dfs, axis=0) @@ -224,9 +253,7 @@ def read_partition( """ - df = dask.dataframe.read_csv( - self._partition_paths[indx], **self._read_csv_kwargs - ) + df = dask.dataframe.read_csv(self._file_paths[indx], **self._read_csv_kwargs) if fields is not None: df = df[fields] @@ -321,14 +348,15 @@ class CSVArrayAdapter(ArrayAdapter): """Adapter for array-type data stored as partitioned csv files""" @classmethod - def from_catalog(cls, - data_uris: Union[str, List[str]], + def from_assets( + cls, + assets: List[Asset], structure: ArrayStructure, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, access_policy: Optional[AccessPolicy] = None, - **kwargs: Optional[Union[str, List[str], Dict[str, str]]] - ): + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "CSVArrayAdapter": """Adapter for partitioned array data stored as a sequence of csv files Parameters @@ -343,9 +371,7 @@ def from_catalog(cls, """ # Load the array lazily with Dask - if isinstance(data_uris, str): - data_uris = [data_uris] - file_paths = [path_from_uri(uri) for uri in data_uris] + file_paths = [path_from_uri(ast.data_uri) for ast in assets] ddf = dask.dataframe.read_csv(file_paths, **kwargs) if isinstance(structure.data_type, StructDtype): @@ -353,21 +379,29 @@ def from_catalog(cls, # NOTE: dask.DataFrame.to_records() allows one to pass `index=False` to drop the index column, but as # of desk ver. 2024.2.1 it seems broken and doesn't do anything. Instead, we set an index to any # (first) column in the df to prevent it from creating an extra one. - array = ddf.set_index(ddf.columns[0]).to_records(lengths=structure.chunks[0]) + array = ddf.set_index(ddf.columns[0]).to_records( + lengths=structure.chunks[0] + ) elif isinstance(structure.data_type, BuiltinDtype): # All fields have the same type -- return a usual array array = ddf.to_dask_array(lengths=structure.chunks[0]) else: raise ValueError(f"Unsupported data_type, {structure.data_type}") - return cls(array, structure, metadata=metadata, specs=specs, access_policy=access_policy) + return cls( + array, + structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) @classmethod - def from_storage(cls, + def from_uris( + cls, file_paths: Union[str, List[str]], - **kwargs: Optional[Union[str, List[str], Dict[str, str]]] - ): + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "CSVArrayAdapter": + # TODO!!! array = dask.dataframe.read_csv(file_paths, **kwargs).to_dask_array() - - return cls.from_array(array) - + return cls.from_array(array) # type: ignore diff --git a/tiled/adapters/excel.py b/tiled/adapters/excel.py index 384aac86c..04f1caacf 100644 --- a/tiled/adapters/excel.py +++ b/tiled/adapters/excel.py @@ -1,10 +1,15 @@ -from typing import Any +from typing import Any, Dict, List, Optional, Union import dask.dataframe import pandas from ..adapters.mapping import MapAdapter +from ..structures.core import Spec +from ..structures.data_source import Asset +from ..structures.table import TableStructure from .dataframe import DataFrameAdapter +from .protocols import AccessPolicy +from .type_alliases import JSON class ExcelAdapter(MapAdapter): @@ -81,3 +86,23 @@ def from_uri(cls, data_uri: str, **kwargs: Any) -> "ExcelAdapter": """ file = pandas.ExcelFile(data_uri) return cls.from_file(file) + + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: TableStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "ExcelAdapter": + data_uri = assets[0].data_uri + return cls.from_uri( + data_uri, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + **kwargs, + ) diff --git a/tiled/adapters/hdf5.py b/tiled/adapters/hdf5.py index 6606d8b13..106945e5b 100644 --- a/tiled/adapters/hdf5.py +++ b/tiled/adapters/hdf5.py @@ -3,7 +3,7 @@ import sys import warnings from pathlib import Path -from typing import Any, Iterator, List, Optional, Tuple, Union +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import h5py import numpy @@ -13,6 +13,7 @@ from ..iterviews import ItemsView, KeysView, ValuesView from ..structures.array import ArrayStructure from ..structures.core import Spec, StructureFamily +from ..structures.data_source import Asset from ..structures.table import TableStructure from ..utils import node_repr, path_from_uri from .array import ArrayAdapter @@ -103,6 +104,28 @@ def __init__( self._provided_metadata = metadata or {} super().__init__() + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "HDF5Adapter": + return hdf5_lookup( + data_uri=assets[0].data_uri, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + swmr=kwargs.get("swmr", SWMR_DEFAULT), # type: ignore + libver=kwargs.get("libver", "latest"), # type: ignore + dataset=kwargs.get("dataset"), # type: ignore + path=kwargs.get("path"), # type: ignore + ) + @classmethod def from_file( cls, diff --git a/tiled/adapters/jpeg.py b/tiled/adapters/jpeg.py index ae6a6ffa8..b6a489ad3 100644 --- a/tiled/adapters/jpeg.py +++ b/tiled/adapters/jpeg.py @@ -1,5 +1,5 @@ import builtins -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np from numpy._typing import NDArray @@ -7,6 +7,7 @@ from ..structures.array import ArrayStructure, BuiltinDtype from ..structures.core import Spec, StructureFamily +from ..structures.data_source import Asset from ..utils import path_from_uri from .protocols import AccessPolicy from .resource_cache import with_resource_cache @@ -30,7 +31,7 @@ def __init__( self, data_uri: str, *, - structure: Optional[ArrayStructure] = None, + structure: ArrayStructure, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, access_policy: Optional[AccessPolicy] = None, @@ -45,22 +46,64 @@ def __init__( specs : access_policy : """ - if not isinstance(data_uri, str): - raise Exception filepath = path_from_uri(data_uri) cache_key = (Image.open, filepath) self._file = with_resource_cache(cache_key, Image.open, filepath) self.specs = specs or [] self._provided_metadata = metadata or {} self.access_policy = access_policy + self._structure = structure + + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "JPEGAdapter": + return cls( + assets[0].data_uri, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) + + @classmethod + def from_uris( + cls, + data_uris: Union[str, List[str]], + structure: Optional[ArrayStructure] = None, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "JPEGAdapter": + if not isinstance(data_uris, str): + data_uris = data_uris[0] + + filepath = path_from_uri(data_uris) + cache_key = (Image.open, filepath) + _file = with_resource_cache(cache_key, Image.open, filepath) + if structure is None: - arr = np.asarray(self._file) + arr = np.asarray(_file) structure = ArrayStructure( shape=arr.shape, chunks=tuple((dim,) for dim in arr.shape), data_type=BuiltinDtype.from_numpy_dtype(arr.dtype), ) - self._structure = structure + + return cls( + data_uris, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) def metadata(self) -> JSON: """ diff --git a/tiled/adapters/netcdf.py b/tiled/adapters/netcdf.py index 3b9d588fb..0e986e41d 100644 --- a/tiled/adapters/netcdf.py +++ b/tiled/adapters/netcdf.py @@ -1,8 +1,14 @@ from pathlib import Path -from typing import List, Union +from typing import Dict, List, Optional, Union import xarray +from ..server.schemas import Asset +from ..structures.core import Spec +from ..structures.table import TableStructure +from ..utils import path_from_uri +from .protocols import AccessPolicy +from .type_alliases import JSON from .xarray import DatasetAdapter @@ -19,3 +25,21 @@ def read_netcdf(filepath: Union[str, List[str], Path]) -> DatasetAdapter: """ ds = xarray.open_dataset(filepath, decode_times=False) return DatasetAdapter.from_dataset(ds) + + +class NetCDFAdapter: + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: Optional[ + TableStructure + ] = None, # NOTE: ContainerStructure? ArrayStructure? + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "NetCDFAdapter": + filepath = path_from_uri(assets[0].data_uri) + + return read_netcdf(filepath) # type: ignore diff --git a/tiled/adapters/parquet.py b/tiled/adapters/parquet.py index cc7138c00..ce8c2e216 100644 --- a/tiled/adapters/parquet.py +++ b/tiled/adapters/parquet.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Any, List, Optional, Union +from typing import Any, Dict, List, Optional, Union import dask.dataframe import pandas @@ -43,6 +43,20 @@ def __init__( self.specs = list(specs or []) self.access_policy = access_policy + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: TableStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "ParquetDatasetAdapter": + return cls( + [ast.data_uri for ast in assets], structure, metadata, specs, access_policy + ) + def metadata(self) -> JSON: """ diff --git a/tiled/adapters/protocols.py b/tiled/adapters/protocols.py index 01d5cdacc..5a6e50ef3 100644 --- a/tiled/adapters/protocols.py +++ b/tiled/adapters/protocols.py @@ -19,6 +19,11 @@ class BaseAdapter(Protocol): + # @abstractmethod + # @classmethod + # def from_assets(cls, assets, kwargs) -> "BaseAdapter": + # pass + @abstractmethod def metadata(self) -> JSON: pass diff --git a/tiled/adapters/sequence.py b/tiled/adapters/sequence.py index 4e0058ced..1d2218833 100644 --- a/tiled/adapters/sequence.py +++ b/tiled/adapters/sequence.py @@ -1,13 +1,14 @@ import builtins from abc import abstractmethod from pathlib import Path -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np from numpy._typing import NDArray from ..structures.array import ArrayStructure, BuiltinDtype -from ..structures.core import Spec +from ..structures.core import Spec, StructureFamily +from ..structures.data_source import Asset from ..utils import path_from_uri from .protocols import AccessPolicy from .type_alliases import JSON, NDSlice @@ -22,7 +23,7 @@ class FileSequenceAdapter: When subclassing, define the `_load_from_files` method specific for a particular file type. """ - structure_family = "array" + structure_family = StructureFamily.array @classmethod def from_uris( @@ -33,21 +34,6 @@ def from_uris( specs: Optional[List[Spec]] = None, access_policy: Optional[AccessPolicy] = None, ) -> "FileSequenceAdapter": - """ - - Parameters - ---------- - data_uris : - structure : - metadata : - specs : - access_policy : - - Returns - ------- - - """ - return cls( filepaths=[path_from_uri(data_uri) for data_uri in data_uris], structure=structure, @@ -56,6 +42,24 @@ def from_uris( access_policy=access_policy, ) + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "FileSequenceAdapter": + return cls( + filepaths=[path_from_uri(a.data_uri) for a in assets], + structure=structure, + specs=specs, + metadata=metadata, + access_policy=access_policy, + ) + def __init__( self, filepaths: List[Path], diff --git a/tiled/adapters/sparse_blocks_parquet.py b/tiled/adapters/sparse_blocks_parquet.py index 1a5ed7dbb..d76e9c337 100644 --- a/tiled/adapters/sparse_blocks_parquet.py +++ b/tiled/adapters/sparse_blocks_parquet.py @@ -1,5 +1,5 @@ import itertools -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import dask.base import dask.dataframe @@ -68,6 +68,19 @@ def __init__( self.specs = list(specs or []) self.access_policy = access_policy + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: COOStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "SparseBlocksParquetAdapter": + data_uris = [a.data_uri for a in assets] + return cls(data_uris, structure, metadata, specs, access_policy) + @classmethod def init_storage( cls, diff --git a/tiled/adapters/tiff.py b/tiled/adapters/tiff.py index 75ef69ad1..16420b53a 100644 --- a/tiled/adapters/tiff.py +++ b/tiled/adapters/tiff.py @@ -6,6 +6,7 @@ from ..structures.array import ArrayStructure, BuiltinDtype from ..structures.core import Spec, StructureFamily +from ..structures.data_source import Asset from ..utils import path_from_uri from .protocols import AccessPolicy from .resource_cache import with_resource_cache @@ -68,6 +69,24 @@ def __init__( ) self._structure = structure + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "TiffAdapter": + return cls( + assets[0].data_uri, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) + def metadata(self) -> JSON: """ diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index 7a914965c..0ab3ffce2 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -2,7 +2,7 @@ import collections.abc import os import sys -from typing import Any, Iterator, List, Optional, Tuple, Union +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import zarr.core import zarr.hierarchy @@ -27,18 +27,6 @@ def read_zarr( structure: Optional[ArrayStructure] = None, **kwargs: Any, ) -> Union["ZarrGroupAdapter", ArrayAdapter]: - """ - - Parameters - ---------- - data_uri : - structure : - kwargs : - - Returns - ------- - - """ filepath = path_from_uri(data_uri) zarr_obj = zarr.open(filepath) # Group or Array adapter: Union[ZarrGroupAdapter, ArrayAdapter] @@ -409,3 +397,35 @@ def inlined_contents_enabled(self, depth: int) -> bool: """ return depth <= INLINED_DEPTH + + +class ZarrAdapter: + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, # NOTE: possibly need to be a Union of Array and Mapping structures + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> Union[ZarrGroupAdapter, ArrayAdapter]: + zarr_obj = zarr.open(path_from_uri(assets[0].data_uri)) # Group or Array + if isinstance(zarr_obj, zarr.hierarchy.Group): + return ZarrGroupAdapter( + zarr_obj, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + **kwargs, + ) + else: + return ZarrArrayAdapter( + zarr_obj, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + **kwargs, + ) diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index b3b407955..17e215249 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -449,12 +449,11 @@ async def lookup_adapter( async def get_adapter(self): (data_source,) = self.data_sources try: - adapter_factory = self.context.adapters_by_mimetype[data_source.mimetype] + adapter_class = self.context.adapters_by_mimetype[data_source.mimetype] except KeyError: raise RuntimeError( f"Server configuration has no adapter for mimetype {data_source.mimetype!r}" ) - parameters = collections.defaultdict(list) for asset in data_source.assets: if asset.parameter is None: continue @@ -464,8 +463,7 @@ async def get_adapter(self): f"Only 'file://...' scheme URLs are currently supported, not {asset.data_uri}" ) if scheme == "file": - # Protect against misbehaving clients reading from unintended - # parts of the filesystem. + # Protect against misbehaving clients reading from unintended parts of the filesystem. asset_path = path_from_uri(asset.data_uri) for readable_storage in self.context.readable_storage: if Path( @@ -479,18 +477,16 @@ async def get_adapter(self): f"Refusing to serve {asset.data_uri} because it is outside " "the readable storage area for this server." ) - if asset.num is None: - parameters[asset.parameter] = asset.data_uri - else: - parameters[asset.parameter].append(asset.data_uri) - adapter_kwargs = dict(parameters) - adapter_kwargs.update(data_source.parameters) - adapter_kwargs["specs"] = self.node.specs - adapter_kwargs["metadata"] = self.node.metadata_ - adapter_kwargs["structure"] = data_source.structure - adapter_kwargs["access_policy"] = self.access_policy adapter = await anyio.to_thread.run_sync( - partial(adapter_factory, **adapter_kwargs) + partial( + adapter_class.from_assets, + data_source.assets, + structure=data_source.structure, + specs=self.node.specs, + metadata=self.node.metadata_, + access_policy=self.access_policy, + **data_source.parameters, + ), ) for query in self.queries: adapter = adapter.search(query) diff --git a/tiled/client/register.py b/tiled/client/register.py index b46c31bbb..662b77e07 100644 --- a/tiled/client/register.py +++ b/tiled/client/register.py @@ -296,10 +296,12 @@ async def register_single_item( ) unhandled_items.append(item) return - adapter_factory = settings.adapters_by_mimetype[mimetype] + adapter_class = settings.adapters_by_mimetype[mimetype] logger.info(" Resolved mimetype '%s' with adapter for '%s'", mimetype, item) try: - adapter = await anyio.to_thread.run_sync(adapter_factory, ensure_uri(item)) + adapter = await anyio.to_thread.run_sync( + adapter_class.from_uris, [ensure_uri(item)] + ) except Exception: logger.exception(" SKIPPED: Error constructing adapter for '%s':", item) return @@ -408,7 +410,9 @@ async def register_image_sequence(node, name, sequence, settings): adapter_class = settings.adapters_by_mimetype[mimetype] key = settings.key_from_filename(name) try: - adapter = adapter_class([ensure_uri(filepath) for filepath in sequence]) + adapter = adapter_class.from_uris( + [ensure_uri(filepath) for filepath in sequence] + ) except Exception: logger.exception(" SKIPPED: Error constructing adapter for '%s'", name) return diff --git a/tiled/mimetypes.py b/tiled/mimetypes.py index e17e51c9c..219314117 100644 --- a/tiled/mimetypes.py +++ b/tiled/mimetypes.py @@ -13,18 +13,18 @@ AWKWARD_BUFFERS_MIMETYPE = "application/x-awkward-buffers" DEFAULT_ADAPTERS_BY_MIMETYPE = OneShotCachedMap( { - "image/tiff": lambda: importlib.import_module( - "..adapters.tiff", __name__ - ).TiffAdapter, + # "image/tiff": lambda: importlib.import_module( + # "..adapters.tiff", __name__ + # ).TiffAdapter, "multipart/related;type=image/tiff": lambda: importlib.import_module( "..adapters.tiff", __name__ - ).TiffSequenceAdapter.from_uris, + ).TiffSequenceAdapter, "image/jpeg": lambda: importlib.import_module( "..adapters.jpeg", __name__ ).JPEGAdapter, "multipart/related;type=image/jpeg": lambda: importlib.import_module( "..adapters.jpeg", __name__ - ).JPEGSequenceAdapter.from_uris, + ).JPEGSequenceAdapter, "text/csv": lambda: importlib.import_module( "..adapters.csv", __name__ ).CSVAdapter, @@ -40,13 +40,13 @@ ).CSVArrayAdapter, XLSX_MIME_TYPE: lambda: importlib.import_module( "..adapters.excel", __name__ - ).ExcelAdapter.from_uri, + ).ExcelAdapter, "application/x-hdf5": lambda: importlib.import_module( "..adapters.hdf5", __name__ - ).hdf5_lookup, + ).HDF5Adapter, "application/x-netcdf": lambda: importlib.import_module( "..adapters.netcdf", __name__ - ).read_netcdf, + ).NetCDFAdapter, PARQUET_MIMETYPE: lambda: importlib.import_module( "..adapters.parquet", __name__ ).ParquetDatasetAdapter, @@ -55,10 +55,10 @@ ).SparseBlocksParquetAdapter, ZARR_MIMETYPE: lambda: importlib.import_module( "..adapters.zarr", __name__ - ).read_zarr, + ).ZarrAdapter, AWKWARD_BUFFERS_MIMETYPE: lambda: importlib.import_module( "..adapters.awkward_buffers", __name__ - ).AwkwardBuffersAdapter.from_directory, + ).AwkwardBuffersAdapter, APACHE_ARROW_FILE_MIME_TYPE: lambda: importlib.import_module( "..adapters.arrow", __name__ ).ArrowAdapter,