Skip to content

Commit

Permalink
Add test for 321/support all on schema change options (#1196)
Browse files Browse the repository at this point in the history
* Add iceberg ddl generation.

* Add changelog.

* add test.

---------

Co-authored-by: VersusFacit <versusfacit@users.noreply.github.com>
  • Loading branch information
VersusFacit and VersusFacit authored Sep 30, 2024
1 parent f60c476 commit eea9844
Showing 1 changed file with 45 additions and 1 deletion.
46 changes: 45 additions & 1 deletion tests/functional/iceberg/test_incremental_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from pathlib import Path

from dbt.tests.util import run_dbt, run_dbt_and_capture
from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file


_SEED_INCREMENTAL_STRATEGIES = """
Expand Down Expand Up @@ -32,6 +32,7 @@
incremental_strategy='{strategy}',
unique_key="world_id",
external_volume = "s3_iceberg_snow",
on_schema_change = "sync_all_columns"
)
}}}}
select * from {{{{ ref('upstream_table') }}}}
Expand Down Expand Up @@ -123,3 +124,46 @@ def test_incremental_strategies_with_update(self, project, setup_class):
self.__check_correct_operations(self.append, rows_affected=2)
self.__check_correct_operations("merge", rows_affected=1)
self.__check_correct_operations("delete_insert", rows_affected=1)


class TestIcebergIncrementalOnSchemaChangeMutatesRelations:
@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"enable_iceberg_materializations": True}}

@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": _SEED_INCREMENTAL_STRATEGIES,
}

@pytest.fixture(scope="function", autouse=True)
def setup_class(self, project):
run_dbt(["seed"])
run_dbt(["run"])
yield

@pytest.fixture(scope="class")
def models(self):
return {
"upstream_table.sql": _MODEL_BASIC_TABLE_MODEL,
"merge.sql": _MODEL_INCREMENTAL_ICEBERG_MERGE,
}

def test_sync_and_append_semantics(self, project, setup_class):
model_file = project.project_root / Path("models") / Path("merge.sql")
sql = f"show columns in {project.database}.{project.test_schema}.merge;"
column_names = [column[2] for column in project.run_sql(sql, fetch="all")]
assert len(column_names) == 3

write_file(_MODEL_INCREMENTAL_ICEBERG_MERGE.replace("*", "*, 1 as new_column"), model_file)
run_dbt()
column_names = [column[2].lower() for column in project.run_sql(sql, fetch="all")]
assert len(column_names) == 4
assert "new_column" in column_names

write_file(_MODEL_INCREMENTAL_ICEBERG_MERGE, model_file)
run_dbt()
column_names = [column[2].lower() for column in project.run_sql(sql, fetch="all")]
assert len(column_names) == 3
assert "new_column" not in column_names

0 comments on commit eea9844

Please sign in to comment.