Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/external catalog config #1285

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions dbt/adapters/snowflake/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import Dict, Optional, Any

import textwrap

from dbt.adapters.base import BaseRelation
from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationType
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.relation_configs import RelationResults


class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration):
catalog_type = CatalogIntegrationType.managed

def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str:
"""
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
:param config:
:param relation:
:return:
"""
base_location: str = f"_dbt/{relation.schema}/{relation.name}"

if sub_path := config.get("base_location_subpath"):
base_location += f"/{sub_path}"

iceberg_ddl_predicates: str = f"""
external_volume = '{self.external_volume}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
import agate

# this try block can be removed once enable_iceberg_materializations is retired
try:
catalog_results: "agate.Table" = relation_results["catalog"]
except KeyError:
# this happens when `enable_iceberg_materializations` is turned off
return {}

if len(catalog_results) == 0:
# this happens when the dynamic table is a standard dynamic table (e.g. not iceberg)
return {}

# for now, if we get catalog results, it's because this is an iceberg table
# this is because we only run `show iceberg tables` to get catalog metadata
# this will need to be updated once this is in `show objects`
catalog: "agate.Row" = catalog_results.rows[0]
config_dict = {
"table_format": "iceberg",
"name": catalog.get("catalog_name"),
"external_volume": catalog.get("external_volume_name"),
"base_location": catalog.get("base_location"),
}

return config_dict


class SnowflakeGlueCatalogIntegration(CatalogIntegration):
catalog_type = CatalogIntegrationType.glue
auto_refresh: Optional[str] = None # "TRUE" | "FALSE"
replace_invalid_characters: Optional[str] = None # "TRUE" | "FALSE"

def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None:
if adapter_configs:
if "auto_refresh" in adapter_configs:
self.auto_refresh = adapter_configs["auto_refresh"]
if "replace_invalid_characters" in adapter_configs:
self.replace_invalid_characters = adapter_configs["replace_invalid_characters"]

def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str:
ddl_predicate = f"""
external_volume = '{self.external_volume}'
catalog = '{self.integration_name}'
"""
if self.namespace:
ddl_predicate += f"CATALOG_NAMESPACE = '{self.namespace}'\n"
if self.auto_refresh:
ddl_predicate += f"auto_refresh = {self.auto_refresh}\n"
if self.replace_invalid_characters:
ddl_predicate += f"replace_invalid_characters = {self.replace_invalid_characters}\n"
return ddl_predicate
10 changes: 10 additions & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
from dbt.adapters.base.meta import available
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.contracts.catalog import CatalogIntegrationType
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.snowflake.catalog import (
SnowflakeManagedIcebergCatalogIntegration,
SnowflakeGlueCatalogIntegration,
)
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.sql.impl import (
LIST_SCHEMAS_MACRO_NAME,
Expand Down Expand Up @@ -55,6 +60,7 @@ class SnowflakeConfig(AdapterConfig):
table_format: Optional[str] = None
external_volume: Optional[str] = None
base_location_subpath: Optional[str] = None
catalog_name: Optional[str] = None


class SnowflakeAdapter(SQLAdapter):
Expand All @@ -63,6 +69,10 @@ class SnowflakeAdapter(SQLAdapter):
ConnectionManager = SnowflakeConnectionManager

AdapterSpecificConfigs = SnowflakeConfig
CatalogIntegrations = {
CatalogIntegrationType.managed: SnowflakeManagedIcebergCatalogIntegration,
CatalogIntegrationType.glue: SnowflakeGlueCatalogIntegration,
}

CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
Expand Down
43 changes: 27 additions & 16 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import textwrap

from dataclasses import dataclass, field
from typing import FrozenSet, Optional, Type, Iterator, Tuple


from dbt.adapters.clients import catalogs as catalogs_client
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType
from dbt.adapters.contracts.relation import ComponentName, RelationConfig
from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.events.functions import fire_event, warn_or_error
Expand Down Expand Up @@ -64,6 +64,10 @@ def is_dynamic_table(self) -> bool:

@property
def is_iceberg_format(self) -> bool:
if self.catalog_name:
return (
catalogs_client.get_catalog(self.catalog_name).table_format == TableFormat.ICEBERG
)
return self.table_format == TableFormat.ICEBERG

@classproperty
Expand Down Expand Up @@ -167,7 +171,11 @@ def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) ->
"""

transient_explicitly_set_true: bool = config.get("transient", False)

catalog_name = config.get("catalog_name", None)
if catalog_name:
catalog = catalogs_client.get_catalog(catalog_name)
if catalog.table_format == TableFormat.ICEBERG:
return "iceberg"
# Temporary tables are a Snowflake feature that do not exist in the
# Iceberg framework. We ignore the Iceberg status of the model.
if temporary:
Expand Down Expand Up @@ -203,18 +211,21 @@ def get_ddl_prefix_for_alter(self) -> str:
else:
return ""

def get_iceberg_ddl_options(self, config: RelationConfig) -> str:
base_location: str = f"_dbt/{self.schema}/{self.name}"

if subpath := config.get("base_location_subpath"):
base_location += f"/{subpath}"

iceberg_ddl_predicates: str = f"""
external_volume = '{config.get('external_volume')}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)
def add_managed_catalog_integration(self, config: RelationConfig) -> str:
catalog_name = "snowflake_managed"
external_volume = config.get("external_volume")
integration_config = CatalogIntegrationConfig(
catalog_name=catalog_name,
integration_name=catalog_name,
table_format=self.table_format,
catalog_type=CatalogIntegrationType.managed.value,
external_volume=external_volume,
)
catalogs_client.add_catalog(
SnowflakeManagedIcebergCatalogIntegration(integration_config),
catalog_name=catalog_name,
)
return catalog_name

def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]:
drop_view_message: str = (
Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/snowflake/relation_configs/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any

return config_dict

@classmethod
def from_relation_config(cls, relation_config: RelationConfig) -> Self:
return cls.from_dict(cls.parse_relation_config(relation_config))

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
# this try block can be removed once enable_iceberg_materializations is retired
Expand Down
43 changes: 38 additions & 5 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from dataclasses import dataclass
from typing import Optional, Dict, Any, TYPE_CHECKING
from typing import Optional, Dict, Any, TYPE_CHECKING, Union

from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.adapters.clients import catalogs as catalogs_client
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.contracts.relation import ComponentName
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self

from dbt.adapters.relation_configs.formats import TableFormat
from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration
from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)


if TYPE_CHECKING:
import agate

Expand All @@ -37,6 +40,34 @@ def default(cls) -> Self:
return cls("ON_CREATE")


def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> str:
if not catalog_info:
return "SNOWFLAKE"
elif isinstance(catalog_info, str):
return catalog_info
elif isinstance(catalog_info, dict):
catalog_config = SnowflakeCatalogConfig.from_dict(catalog_info)
else:
catalog_config = SnowflakeCatalogConfig.from_relation_config(catalog_info)

if catalog_config.table_format != TableFormat.default():
catalog_name = "snowflake_managed"
integration_config = CatalogIntegrationConfig(
catalog_name=catalog_name,
integration_name=catalog_config.name,
table_format=catalog_config.table_format,
catalog_type=CatalogIntegrationType.managed.value,
external_volume=catalog_config.external_volume,
)
catalogs_client.add_catalog(
SnowflakeManagedIcebergCatalogIntegration(integration_config),
catalog_name=catalog_name,
)
return catalog_name
else:
return TableFormat.default().value


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"""
Expand All @@ -60,12 +91,13 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
catalog: SnowflakeCatalogConfig
catalog: str = "SNOWFLAKE"
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
catalog = _setup_catalog_integration(config_dict["catalog"])
kwargs_dict = {
"name": cls._render_part(ComponentName.Identifier, config_dict.get("name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
Expand All @@ -75,7 +107,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]),
"catalog": catalog,
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
}
Expand All @@ -84,14 +116,15 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self:

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
catalog = _setup_catalog_integration(relation_config)
config_dict = {
"name": relation_config.identifier,
"schema_name": relation_config.schema,
"database_name": relation_config.database,
"query": relation_config.compiled_code,
"target_lag": relation_config.config.extra.get("target_lag"),
"snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config),
"catalog": catalog,
}

if refresh_mode := relation_config.config.extra.get("refresh_mode"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
{%- if dynamic_table.catalog != 'snowflake' -%}
{{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
Expand Down Expand Up @@ -70,12 +70,12 @@
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}

{% set catalog_integration = adapter.get_catalog_integration(relation.catalog) -%}

create dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
{{ catalog_integration.render_ddl_predicates(relation, config.model.config) }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
{%- if dynamic_table.catalog != 'SNOWFLAKE' -%}
{{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
Expand Down Expand Up @@ -68,13 +68,16 @@
-- Returns:
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}
{% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%}

{% if not catalog_integration -%}
{{ raise('Catalog integration is required for iceberg tables') }}
{%- endif -%}

create or replace dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
{{ catalog_integration.render_ddl_predicates(relation) }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
Expand Down
Loading
Loading