diff --git a/.hatch/test_against_adapter.sh b/.hatch/test_against_adapter.sh new file mode 100755 index 000000000..ed110404b --- /dev/null +++ b/.hatch/test_against_adapter.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +set -eo pipefail +adapter=$1 +branch=$2 +source .hatch/test.env +adapter_dir="dbt-$adapter" + +cd .hatch/ +if [ ! -d "$adapter_dir" ]; then + git clone "https://github.com/dbt-labs/dbt-$adapter.git" +fi + +cd "$adapter_dir" +git pull +git switch "$branch" +pip install -e . +python -m pytest tests/functional/ diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 8474b39d8..ee9573e62 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -65,6 +65,8 @@ ) from dbt.adapters.cache import RelationsCache, _make_ref_key_dict from dbt.adapters.capability import Capability, CapabilityDict +from dbt.adapters.clients import catalogs as catalogs_client +from dbt.adapters.contracts.catalog import CatalogIntegration from dbt.adapters.contracts.connection import Credentials from dbt.adapters.contracts.macros import MacroResolverProtocol from dbt.adapters.contracts.relation import RelationConfig @@ -88,7 +90,7 @@ SnapshotTargetNotSnapshotTableError, UnexpectedNonTimestampError, ) -from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable +from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable, CatalogIntegrationConfigProtocol if TYPE_CHECKING: import agate @@ -246,6 +248,7 @@ class BaseAdapter(metaclass=AdapterMeta): - expand_column_types - list_relations_without_caching - is_cancelable + - execute - create_schema - drop_schema - quote @@ -259,11 +262,13 @@ class BaseAdapter(metaclass=AdapterMeta): Macros: - get_catalog + """ Relation: Type[BaseRelation] = BaseRelation Column: Type[BaseColumn] = BaseColumn ConnectionManager: Type[BaseConnectionManager] + CatalogIntegrations: Dict[str, Type[CatalogIntegrationConfigProtocol]] # A set of clobber config fields accepted by this adapter # for use in materializations @@ -291,6 +296,19 @@ def __init__(self, config, mp_context: SpawnContext) -> None: self._macro_context_generator: Optional[MacroContextGeneratorCallable] = None self.behavior = DEFAULT_BASE_BEHAVIOR_FLAGS # type: ignore + def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIntegrationConfigProtocol]]) -> None: + if catalog_integrations: + for integration_config in catalog_integrations: + catalog_type = integration_config.catalog_type + if catalog_type not in self.CatalogIntegrations: + raise DbtValidationError(f"{catalog_type} is not supported!!! - <3 Colin") + integration = self.CatalogIntegrations[catalog_type](integration_config) + catalogs_client.add_catalog(integration, integration_config.catalog_name) + + @available + def get_catalog_integration(self, integration_name: str) -> CatalogIntegration: + return catalogs_client.get_catalog(integration_name) + ### # Methods to set / access a macro resolver ### diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 7d4888e42..866b97974 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -60,6 +60,7 @@ class BaseRelation(FakeAPIObject, Hashable): require_alias: bool = ( True # used to govern whether to add an alias when render_limited is called ) + catalog_name: Optional[str] = None # register relation types that can be renamed for the purpose of replacing relations using stages and backups # adding a relation type here also requires defining the associated rename macro @@ -305,6 +306,11 @@ def create_from( config_quoting = relation_config.quoting_dict config_quoting.pop("column", None) + + catalog_name = relation_config.catalog_name \ + if hasattr(relation_config, "catalog_name") \ + else relation_config.config.get("catalog", None) + # precedence: kwargs quoting > relation config quoting > base quoting > default quoting quote_policy = deep_merge( cls.get_default_quote_policy().to_dict(omit_none=True), @@ -318,6 +324,7 @@ def create_from( schema=relation_config.schema, identifier=relation_config.identifier, quote_policy=quote_policy, + catalog_name=catalog_name, **kwargs, ) diff --git a/dbt/adapters/clients/catalogs.py b/dbt/adapters/clients/catalogs.py new file mode 100644 index 000000000..1c9baf241 --- /dev/null +++ b/dbt/adapters/clients/catalogs.py @@ -0,0 +1,34 @@ +from dbt_common.exceptions import DbtValidationError + +from dbt.adapters.contracts.catalog import CatalogIntegration + + +class CatalogIntegrations: + def __init__(self): + self._integrations = {} + + def get(self, name: str) -> CatalogIntegration: + return self.integrations[name] + + @property + def integrations(self) -> dict[str, CatalogIntegration]: + return self._integrations + + def add_integration(self, integration: CatalogIntegration, catalog_name: str): + self._integrations[catalog_name] = integration + + +_CATALOG_CLIENT = CatalogIntegrations() + + +def get_catalog(integration_name: str) -> CatalogIntegration: + try: + return _CATALOG_CLIENT.get(integration_name) + except KeyError: + raise DbtValidationError( + f"Catalog integration '{integration_name}' not found in the catalog client" + ) + + +def add_catalog(integration: CatalogIntegration, catalog_name: str): + _CATALOG_CLIENT.add_integration(integration, catalog_name) diff --git a/dbt/adapters/contracts/catalog.py b/dbt/adapters/contracts/catalog.py new file mode 100644 index 000000000..948d12152 --- /dev/null +++ b/dbt/adapters/contracts/catalog.py @@ -0,0 +1,59 @@ +import abc +from dataclasses import dataclass +from enum import Enum +from typing import Optional, Dict + +from dbt.adapters.contracts.relation import RelationConfig +from dbt.adapters.relation_configs.formats import TableFormat + + +class CatalogIntegrationType(Enum): + managed = 'managed' + iceberg_rest = 'iceberg_rest' + glue = 'glue' + unity = 'unity' + + +@dataclass +class CatalogIntegrationConfig: + catalog_name: str + integration_name: str + table_format: str + catalog_type: str + external_volume: Optional[str] = None + namespace: Optional[str] = None + adapter_configs: Optional[Dict] = None + + +class CatalogIntegration(abc.ABC): + """ + An external catalog integration is a connection to an external catalog that can be used to + interact with the catalog. This class is an abstract base class that should be subclassed by + specific integrations in the adapters. + + Implements the CatalogIntegrationProtocol. + + """ + catalog_name: str + integration_name: str + table_format: TableFormat + integration_type: CatalogIntegrationType + external_volume: Optional[str] = None + namespace: Optional[str] = None + + def __init__( + self, integration_config: CatalogIntegrationConfig + ): + self.catalog_name = integration_config.catalog_name + self.integration_name = integration_config.integration_name + self.table_format = TableFormat(integration_config.table_format) + self.type = CatalogIntegrationType(integration_config.catalog_type) + self.external_volume = integration_config.external_volume + self.namespace = integration_config.namespace + self._handle_adapter_configs(integration_config.adapter_configs) + + def _handle_adapter_configs(self, adapter_configs: Dict) -> None: + ... + + def render_ddl_predicates(self, relation, config: RelationConfig) -> str: + ... diff --git a/dbt/adapters/contracts/connection.py b/dbt/adapters/contracts/connection.py index 2d10c9a32..da9fce85d 100644 --- a/dbt/adapters/contracts/connection.py +++ b/dbt/adapters/contracts/connection.py @@ -20,6 +20,7 @@ dbtClassMixin, ) + # TODO: this is a very bad dependency - shared global state from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event diff --git a/dbt/adapters/contracts/relation.py b/dbt/adapters/contracts/relation.py index 42beb579c..7bb0c531e 100644 --- a/dbt/adapters/contracts/relation.py +++ b/dbt/adapters/contracts/relation.py @@ -58,6 +58,7 @@ class RelationConfig(Protocol): tags: List[str] quoting_dict: Dict[str, bool] config: Optional[MaterializationConfig] + catalog_name: Optional[str] class ComponentName(StrEnum): diff --git a/dbt/adapters/protocol.py b/dbt/adapters/protocol.py index 352198663..3305618bd 100644 --- a/dbt/adapters/protocol.py +++ b/dbt/adapters/protocol.py @@ -42,6 +42,29 @@ class ColumnProtocol(Protocol): pass +class CatalogIntegrationConfigProtocol(Protocol): + catalog_name: str + integration_name: str + table_format: str + catalog_type: str + external_volume: Optional[str] + namespace: Optional[str] + adapter_configs: Optional[Dict] + + +class CatalogIntegrationProtocol(Protocol): + catalog_name: str + integration_name: str + table_format: str + integration_type: str + external_volume: Optional[str] + namespace: Optional[str] + + def __init__( + self, integration_config: CatalogIntegrationConfigProtocol + ) -> None: ... + + Self = TypeVar("Self", bound="RelationProtocol") @@ -51,10 +74,10 @@ def get_default_quote_policy(cls) -> Policy: ... @classmethod def create_from( - cls: Type[Self], - quoting: HasQuoting, - relation_config: RelationConfig, - **kwargs: Any, + cls: Type[Self], + quoting: HasQuoting, + relation_config: RelationConfig, + **kwargs: Any, ) -> Self: ... @@ -62,15 +85,16 @@ def create_from( ConnectionManager_T = TypeVar("ConnectionManager_T", bound=ConnectionManagerProtocol) Relation_T = TypeVar("Relation_T", bound=RelationProtocol) Column_T = TypeVar("Column_T", bound=ColumnProtocol) +CatalogIntegration_T = TypeVar("CatalogIntegration_T", bound=CatalogIntegrationProtocol) class MacroContextGeneratorCallable(Protocol): def __call__( - self, - macro_protocol: MacroProtocol, - config: AdapterRequiredConfig, - macro_resolver: MacroResolverProtocol, - package_name: Optional[str], + self, + macro_protocol: MacroProtocol, + config: AdapterRequiredConfig, + macro_resolver: MacroResolverProtocol, + package_name: Optional[str], ) -> Dict[str, Any]: ... @@ -91,6 +115,7 @@ class AdapterProtocol( # type: ignore[misc] Column: Type[Column_T] Relation: Type[Relation_T] ConnectionManager: Type[ConnectionManager_T] + CatalogIntegrations: Dict[str, Type[CatalogIntegration_T]] connections: ConnectionManager_T def __init__(self, config: AdapterRequiredConfig) -> None: ... @@ -102,8 +127,8 @@ def get_macro_resolver(self) -> Optional[MacroResolverProtocol]: ... def clear_macro_resolver(self) -> None: ... def set_macro_context_generator( - self, - macro_context_generator: MacroContextGeneratorCallable, + self, + macro_context_generator: MacroContextGeneratorCallable, ) -> None: ... @classmethod @@ -146,5 +171,5 @@ def close(cls, connection: Connection) -> Connection: ... def commit_if_has_connection(self) -> None: ... def execute( - self, sql: str, auto_begin: bool = False, fetch: bool = False + self, sql: str, auto_begin: bool = False, fetch: bool = False ) -> Tuple[AdapterResponse, "agate.Table"]: ... diff --git a/dbt/adapters/relation_configs/formats.py b/dbt/adapters/relation_configs/formats.py new file mode 100644 index 000000000..f440e530b --- /dev/null +++ b/dbt/adapters/relation_configs/formats.py @@ -0,0 +1,19 @@ +from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11 +from typing_extensions import Self + + +class TableFormat(StrEnum): + """ + Some platforms may refer to this 'Object' or 'File Format'. + Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here. + """ + + DEFAULT = "default" + ICEBERG = "iceberg" + + @classmethod + def default(cls) -> Self: + return cls("default") + + def __str__(self): + return self.value diff --git a/tests/unit/clients/test_catalogs.py b/tests/unit/clients/test_catalogs.py new file mode 100644 index 000000000..d429d593a --- /dev/null +++ b/tests/unit/clients/test_catalogs.py @@ -0,0 +1,22 @@ +from dbt.adapters.clients.catalogs import add_catalog, get_catalog +from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationConfig, CatalogIntegrationType +from dbt.adapters.relation_configs.formats import TableFormat + + +class FakeCatalogIntegration(CatalogIntegration): + def render_ddl_predicates(self, relation): + return "mocked" + + +def test_adding_catalog_integration(): + catalog = FakeCatalogIntegration( + integration_config=CatalogIntegrationConfig( + catalog_type=CatalogIntegrationType.glue.value, + catalog_name="snowflake_managed", + integration_name="test_integration", + table_format=TableFormat.ICEBERG, + external_volume="test_volume", + ) + ) + add_catalog(catalog, catalog_name="fake_catalog") + get_catalog("fake_catalog")