Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/external catalog config #334

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .hatch/test_against_adapter.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

set -eo pipefail
adapter=$1
branch=$2
source .hatch/test.env
adapter_dir="dbt-$adapter"

cd .hatch/
if [ ! -d "$adapter_dir" ]; then
git clone "https://github.com/dbt-labs/dbt-$adapter.git"
fi

cd "$adapter_dir"
git pull
git switch "$branch"
pip install -e .
python -m pytest tests/functional/
20 changes: 19 additions & 1 deletion dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
)
from dbt.adapters.cache import RelationsCache, _make_ref_key_dict
from dbt.adapters.capability import Capability, CapabilityDict
from dbt.adapters.clients import catalogs as catalogs_client
from dbt.adapters.contracts.catalog import CatalogIntegration
from dbt.adapters.contracts.connection import Credentials
from dbt.adapters.contracts.macros import MacroResolverProtocol
from dbt.adapters.contracts.relation import RelationConfig
Expand All @@ -88,7 +90,7 @@
SnapshotTargetNotSnapshotTableError,
UnexpectedNonTimestampError,
)
from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable
from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable, CatalogIntegrationConfigProtocol

if TYPE_CHECKING:
import agate
Expand Down Expand Up @@ -246,6 +248,7 @@ class BaseAdapter(metaclass=AdapterMeta):
- expand_column_types
- list_relations_without_caching
- is_cancelable
- execute
- create_schema
- drop_schema
- quote
Expand All @@ -259,11 +262,13 @@ class BaseAdapter(metaclass=AdapterMeta):
Macros:
- get_catalog
"""

Relation: Type[BaseRelation] = BaseRelation
Column: Type[BaseColumn] = BaseColumn
ConnectionManager: Type[BaseConnectionManager]
CatalogIntegrations: Dict[str, Type[CatalogIntegrationConfigProtocol]]

# A set of clobber config fields accepted by this adapter
# for use in materializations
Expand Down Expand Up @@ -291,6 +296,19 @@ def __init__(self, config, mp_context: SpawnContext) -> None:
self._macro_context_generator: Optional[MacroContextGeneratorCallable] = None
self.behavior = DEFAULT_BASE_BEHAVIOR_FLAGS # type: ignore

def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIntegrationConfigProtocol]]) -> None:
if catalog_integrations:
for integration_config in catalog_integrations:
catalog_type = integration_config.catalog_type
if catalog_type not in self.CatalogIntegrations:
raise DbtValidationError(f"{catalog_type} is not supported!!! - <3 Colin")
integration = self.CatalogIntegrations[catalog_type](integration_config)
catalogs_client.add_catalog(integration, integration_config.catalog_name)

@available
def get_catalog_integration(self, integration_name: str) -> CatalogIntegration:
return catalogs_client.get_catalog(integration_name)

###
# Methods to set / access a macro resolver
###
Expand Down
7 changes: 7 additions & 0 deletions dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class BaseRelation(FakeAPIObject, Hashable):
require_alias: bool = (
True # used to govern whether to add an alias when render_limited is called
)
catalog_name: Optional[str] = None

# register relation types that can be renamed for the purpose of replacing relations using stages and backups
# adding a relation type here also requires defining the associated rename macro
Expand Down Expand Up @@ -305,6 +306,11 @@ def create_from(

config_quoting = relation_config.quoting_dict
config_quoting.pop("column", None)

catalog_name = relation_config.catalog_name \
if hasattr(relation_config, "catalog_name") \
else relation_config.config.get("catalog", None)

# precedence: kwargs quoting > relation config quoting > base quoting > default quoting
quote_policy = deep_merge(
cls.get_default_quote_policy().to_dict(omit_none=True),
Expand All @@ -318,6 +324,7 @@ def create_from(
schema=relation_config.schema,
identifier=relation_config.identifier,
quote_policy=quote_policy,
catalog_name=catalog_name,
**kwargs,
)

Expand Down
34 changes: 34 additions & 0 deletions dbt/adapters/clients/catalogs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from dbt_common.exceptions import DbtValidationError

from dbt.adapters.contracts.catalog import CatalogIntegration


class CatalogIntegrations:
def __init__(self):
self._integrations = {}

def get(self, name: str) -> CatalogIntegration:
return self.integrations[name]

@property
def integrations(self) -> dict[str, CatalogIntegration]:
return self._integrations

def add_integration(self, integration: CatalogIntegration, catalog_name: str):
self._integrations[catalog_name] = integration


_CATALOG_CLIENT = CatalogIntegrations()


def get_catalog(integration_name: str) -> CatalogIntegration:
try:
return _CATALOG_CLIENT.get(integration_name)
except KeyError:
raise DbtValidationError(
f"Catalog integration '{integration_name}' not found in the catalog client"
)


def add_catalog(integration: CatalogIntegration, catalog_name: str):
_CATALOG_CLIENT.add_integration(integration, catalog_name)
59 changes: 59 additions & 0 deletions dbt/adapters/contracts/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import abc
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict

from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.relation_configs.formats import TableFormat


class CatalogIntegrationType(Enum):
managed = 'managed'
iceberg_rest = 'iceberg_rest'
glue = 'glue'
unity = 'unity'


@dataclass
class CatalogIntegrationConfig:
catalog_name: str
integration_name: str
table_format: str
catalog_type: str
external_volume: Optional[str] = None
namespace: Optional[str] = None
adapter_configs: Optional[Dict] = None


class CatalogIntegration(abc.ABC):
"""
An external catalog integration is a connection to an external catalog that can be used to
interact with the catalog. This class is an abstract base class that should be subclassed by
specific integrations in the adapters.
Implements the CatalogIntegrationProtocol.
"""
catalog_name: str
integration_name: str
table_format: TableFormat
integration_type: CatalogIntegrationType
external_volume: Optional[str] = None
namespace: Optional[str] = None

def __init__(
self, integration_config: CatalogIntegrationConfig
):
self.catalog_name = integration_config.catalog_name
self.integration_name = integration_config.integration_name
self.table_format = TableFormat(integration_config.table_format)
self.type = CatalogIntegrationType(integration_config.catalog_type)
self.external_volume = integration_config.external_volume
self.namespace = integration_config.namespace
self._handle_adapter_configs(integration_config.adapter_configs)

def _handle_adapter_configs(self, adapter_configs: Dict) -> None:
...

def render_ddl_predicates(self, relation, config: RelationConfig) -> str:
...
1 change: 1 addition & 0 deletions dbt/adapters/contracts/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
dbtClassMixin,
)


# TODO: this is a very bad dependency - shared global state
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/contracts/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class RelationConfig(Protocol):
tags: List[str]
quoting_dict: Dict[str, bool]
config: Optional[MaterializationConfig]
catalog_name: Optional[str]


class ComponentName(StrEnum):
Expand Down
49 changes: 37 additions & 12 deletions dbt/adapters/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ class ColumnProtocol(Protocol):
pass


class CatalogIntegrationConfigProtocol(Protocol):
catalog_name: str
integration_name: str
table_format: str
catalog_type: str
external_volume: Optional[str]
namespace: Optional[str]
adapter_configs: Optional[Dict]


class CatalogIntegrationProtocol(Protocol):
catalog_name: str
integration_name: str
table_format: str
integration_type: str
external_volume: Optional[str]
namespace: Optional[str]

def __init__(
self, integration_config: CatalogIntegrationConfigProtocol
) -> None: ...


Self = TypeVar("Self", bound="RelationProtocol")


Expand All @@ -51,26 +74,27 @@ def get_default_quote_policy(cls) -> Policy: ...

@classmethod
def create_from(
cls: Type[Self],
quoting: HasQuoting,
relation_config: RelationConfig,
**kwargs: Any,
cls: Type[Self],
quoting: HasQuoting,
relation_config: RelationConfig,
**kwargs: Any,
) -> Self: ...


AdapterConfig_T = TypeVar("AdapterConfig_T", bound=AdapterConfig)
ConnectionManager_T = TypeVar("ConnectionManager_T", bound=ConnectionManagerProtocol)
Relation_T = TypeVar("Relation_T", bound=RelationProtocol)
Column_T = TypeVar("Column_T", bound=ColumnProtocol)
CatalogIntegration_T = TypeVar("CatalogIntegration_T", bound=CatalogIntegrationProtocol)


class MacroContextGeneratorCallable(Protocol):
def __call__(
self,
macro_protocol: MacroProtocol,
config: AdapterRequiredConfig,
macro_resolver: MacroResolverProtocol,
package_name: Optional[str],
self,
macro_protocol: MacroProtocol,
config: AdapterRequiredConfig,
macro_resolver: MacroResolverProtocol,
package_name: Optional[str],
) -> Dict[str, Any]: ...


Expand All @@ -91,6 +115,7 @@ class AdapterProtocol( # type: ignore[misc]
Column: Type[Column_T]
Relation: Type[Relation_T]
ConnectionManager: Type[ConnectionManager_T]
CatalogIntegrations: Dict[str, Type[CatalogIntegration_T]]
connections: ConnectionManager_T

def __init__(self, config: AdapterRequiredConfig) -> None: ...
Expand All @@ -102,8 +127,8 @@ def get_macro_resolver(self) -> Optional[MacroResolverProtocol]: ...
def clear_macro_resolver(self) -> None: ...

def set_macro_context_generator(
self,
macro_context_generator: MacroContextGeneratorCallable,
self,
macro_context_generator: MacroContextGeneratorCallable,
) -> None: ...

@classmethod
Expand Down Expand Up @@ -146,5 +171,5 @@ def close(cls, connection: Connection) -> Connection: ...
def commit_if_has_connection(self) -> None: ...

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, "agate.Table"]: ...
19 changes: 19 additions & 0 deletions dbt/adapters/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self


class TableFormat(StrEnum):
"""
Some platforms may refer to this 'Object' or 'File Format'.
Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here.
"""

DEFAULT = "default"
ICEBERG = "iceberg"

@classmethod
def default(cls) -> Self:
return cls("default")

def __str__(self):
return self.value
22 changes: 22 additions & 0 deletions tests/unit/clients/test_catalogs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dbt.adapters.clients.catalogs import add_catalog, get_catalog
from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationConfig, CatalogIntegrationType
from dbt.adapters.relation_configs.formats import TableFormat


class FakeCatalogIntegration(CatalogIntegration):
def render_ddl_predicates(self, relation):
return "mocked"


def test_adding_catalog_integration():
catalog = FakeCatalogIntegration(
integration_config=CatalogIntegrationConfig(
catalog_type=CatalogIntegrationType.glue.value,
catalog_name="snowflake_managed",
integration_name="test_integration",
table_format=TableFormat.ICEBERG,
external_volume="test_volume",
)
)
add_catalog(catalog, catalog_name="fake_catalog")
get_catalog("fake_catalog")
Loading