Skip to content

Commit

Permalink
DBT-556 Added support for materializing Kudu table through impala ada…
Browse files Browse the repository at this point in the history
…pter (#207)

* DBT-556 Added support for materializing Kudu table through impala adapter.

* DBT-556 Addressed review comment.

* DBT-556 Incorporated a review comment in CONTRIBUTING.md file.

* DBT-556 Updated README.md with correct set of available tests.
  • Loading branch information
niranjancdw authored Oct 22, 2024
1 parent eb66439 commit df1d2d9
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 34 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ When `dbt-impala` is installed this way, any changes you make to the `dbt-impala
`dbt-impala` contains [functional](https://github.com/cloudera/dbt-impala/tree/master/tests/functional/) tests. Functional tests require an actual Impala warehouse to test against.

- You can run functional tests "locally" by configuring a `test.env` file with appropriate `ENV` variables.
- To run `Kudu functional tests` as part of the test suite when underlying storage is `Kudu`, please set the `ENV` variable `DISABLE_KUDU_TEST` to `false`. Kudu tests are disabled by default as this `ENV` variable is set to true.

```
cp test.env.example test.env
Expand Down
18 changes: 18 additions & 0 deletions KUDU_INTEGRATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Kudu Integration using dbt-impala

The `dbt-impala` adapter allows you to use [dbt](https://www.getdbt.com/) along with [Apache Kudu](https://kudu.apache.org) and [Cloudera Data Platform](https://cloudera.com)


## Getting started

- [Install dbt](https://docs.getdbt.com/docs/installation)
- Read the [introduction](https://docs.getdbt.com/docs/introduction/) and [viewpoint](https://docs.getdbt.com/docs/about/viewpoint/)

### Requirements

- In a CDP public cloud deployment, Kudu is available as one of the many Cloudera Runtime services within the Real-time Data Mart template.
- To use Kudu, you can create a Data Hub cluster by selecting Real-time Data Mart template template in the Management Console.
- Follow this [article](https://blog.cloudera.com/integrating-cloudera-data-warehouse-with-kudu-clusters) on integrating the created Kudu service with Impala CDW.


For general instructions, please follow [Readme](README.md) guidelines.
67 changes: 34 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The `dbt-impala` adapter allows you to use [dbt](https://www.getdbt.com/) along

- [Install dbt](https://docs.getdbt.com/docs/installation)
- Read the [introduction](https://docs.getdbt.com/docs/introduction/) and [viewpoint](https://docs.getdbt.com/docs/about/viewpoint/)
- For using `dbt-impala` adapter against [Apache Kudu](https://kudu.apache.org), please follow [Kudu Integration](KUDU_INTEGRATION.md) guidelines.

### Requirements

Expand Down Expand Up @@ -40,40 +41,40 @@ demo_project:
```

## Supported features
| Name | Supported | Iceberg |
|------|-----------|---------|
|Materialization: View|Yes| N/A |
|Materialization: Table|Yes| Yes |
|Materialization: Table with Partitions |Yes| Yes |
|Materialization: Incremental - Append|Yes| Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes |
|Materialization: Incremental - Insert+Overwrite |Yes| Yes |
|Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes |
|Materialization: Incremental - Merge|No| No |
|Materialization: Ephemeral|Yes| Yes |
|Seeds|Yes| Yes |
|Tests|Yes| Yes |
|Snapshots|No| No |
|Documentation|Yes| Yes |
|Authentication: LDAP|Yes| Yes |
|Authentication: Kerberos|Yes| No |
| Name | Supported | Iceberg | Kudu |
|------|-----------|---------|------|
|Materialization: View|Yes| N/A | N/A |
|Materialization: Table|Yes| Yes | Yes |
|Materialization: Table with Partitions |Yes| Yes | No |
|Materialization: Incremental - Append|Yes| Yes | Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes | No |
|Materialization: Incremental - Insert+Overwrite |Yes| Yes | Yes |
|Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes | No |
|Materialization: Incremental - Merge|No| No | No |
|Materialization: Ephemeral|Yes| Yes | No |
|Seeds|Yes| Yes | Yes |
|Tests|Yes| Yes | Yes |
|Snapshots|No| No | No |
|Documentation|Yes| Yes | Yes |
|Authentication: LDAP|Yes| Yes | Yes |
|Authentication: Kerberos|Yes| No | No |

### Tests Coverage

#### Functional Tests
| Name | Base | Iceberg |
|------|------|---------|
|Materialization: View|Yes| N/A |
|Materialization: Table|Yes| Yes |
|Materialization: Table with Partitions |Yes| Yes |
|Materialization: Incremental - Append|Yes| Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes |
|Materialization: Incremental - Insert+Overwrite |Yes| Yes |
|Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes |
|Materialization: Ephemeral|Yes| Yes |
|Seeds|Yes| Yes |
|Tests|Yes| Yes |
|Snapshots|No| No |
|Documentation| Yes | Yes |
|Authentication: LDAP|Yes| Yes |
|Authentication: Kerberos|No| No |
| Name | Base | Iceberg | Kudu |
|------|------|---------|------|
|Materialization: View|Yes| N/A | N/A |
|Materialization: Table|Yes| Yes | Yes |
|Materialization: Table with Partitions |Yes| Yes | No |
|Materialization: Incremental - Append|Yes| Yes | Yes |
|Materialization: Incremental - Append with Partitions |Yes| Yes | No |
|Materialization: Incremental - Insert+Overwrite |Yes| No | No |
|Materialization: Incremental - Insert+Overwrite with Partition |Yes| Yes | No |
|Materialization: Ephemeral|Yes| Yes | No |
|Seeds|Yes| Yes | Yes |
|Tests|Yes| Yes | Yes |
|Snapshots|No| No | No |
|Documentation| Yes | Yes | Yes |
|Authentication: LDAP|Yes| Yes | Yes |
|Authentication: Kerberos|No| No | No |
2 changes: 1 addition & 1 deletion dbt/adapters/impala/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ImpalaIncludePolicy(Policy):
class ImpalaRelation(BaseRelation):
quote_policy: ImpalaQuotePolicy = field(default_factory=lambda: ImpalaQuotePolicy())
include_policy: ImpalaIncludePolicy = field(default_factory=lambda: ImpalaIncludePolicy())
quote_character: str = None
quote_character: str = "`"
information: str = None

def __post_init__(self):
Expand Down
9 changes: 9 additions & 0 deletions dbt/include/impala/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@
{%- endif %}
{%- endmacro -%}

{% macro ct_option_primary_key(label, required=false) %}
{%- set primaryKey = config.get('primary_key', validator=validation[basestring]) -%}

{%- if primaryKey is not none %}
{{label}} {{primaryKey}}
{%- endif %}
{%- endmacro -%}

{% macro ct_option_stored_as(label, required=false) %}
{%- set storedAs = config.get('stored_as', validator=validation[basestring]) -%}

Expand Down Expand Up @@ -180,6 +188,7 @@
{{ ct_option_row_format(label="row format") }}
{{ ct_option_with_serdeproperties(label="with serdeproperties") }}
{%- if table_type == 'iceberg' -%} STORED BY ICEBERG {%- endif -%}
{{ ct_option_primary_key(label="PRIMARY KEY") }}
{{ ct_option_stored_as(label="stored as") }}
{{ ct_option_location_clause(label="location") }}
{{ ct_option_cached_in(label="cached in") }}
Expand Down
1 change: 1 addition & 0 deletions test.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ IMPALA_SCHEMA=my_schema
IMPALA_USER=my_user
IMPALA_PASSWORD=my_password
IMPALA_HTTP_PATH=my_http_path
DISABLE_KUDU_TEST=true
182 changes: 182 additions & 0 deletions tests/functional/adapter/test_kudu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright 2024 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import os
from dbt.tests.util import run_dbt, relation_from_name, check_relations_equal

from dbt.tests.adapter.basic.test_incremental import (
BaseIncremental,
BaseIncrementalNotSchemaChange,
)

from dbt.tests.adapter.basic.files import (
schema_base_yml,
model_incremental,
)

pytestmark = pytest.mark.skipif(
os.getenv(key="DISABLE_KUDU_TEST", default="true") == "true",
reason="Kudu tests will be run when DISABLE_KUDU_TEST is set to false in test.env",
)

incremental_kudu_sql = (
"""
{{
config(
materialized="incremental",
stored_as="kudu",
primary_key="(id)"
)
}}
""".strip()
+ model_incremental
)


class TestIncrementalKudu(BaseIncremental):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"name": "incremental_test_model"}

@pytest.fixture(scope="class")
def models(self):
return {"incremental_test_model.sql": incremental_kudu_sql, "schema.yml": schema_base_yml}

def test_incremental(self, project):
# seed command
results = run_dbt(["seed"])
assert len(results) == 2

# base table rowcount
relation = relation_from_name(project.adapter, "base")
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 10

# added table rowcount
relation = relation_from_name(project.adapter, "added")
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 20

# run command
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--vars", "seed_name: base"])
assert len(results) == 1

# check relations equal
check_relations_equal(project.adapter, ["base", "incremental_test_model"])

# change seed_name var
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--vars", "seed_name: added"])
assert len(results) == 1

# check relations equal
check_relations_equal(project.adapter, ["added", "incremental_test_model"])

# run full-refresh and compare with base table again
results = run_dbt(
[
"run",
"--select",
"incremental_test_model",
"--full-refresh",
"--vars",
"seed_name: base",
]
)
assert len(results) == 1

check_relations_equal(project.adapter, ["base", "incremental_test_model"])

# get catalog from docs generate
catalog = run_dbt(["docs", "generate"])
assert len(catalog.nodes) == 3
assert len(catalog.sources) == 1


insertoverwrite_sql = """
{{
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
partition_by="id_partition",
stored_as="kudu",
primary_key="(id)"
)
}}
select *, id as id_partition from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
""".strip()


@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu")
class TestInsertoverwriteKudu(TestIncrementalKudu):
@pytest.fixture(scope="class")
def models(self):
return {"incremental_test_model.sql": insertoverwrite_sql, "schema.yml": schema_base_yml}


incremental_single_partitionby_sql = """
{{
config(
materialized="incremental",
partition_by="id_partition",
stored_as="kudu",
primary_key="(id)"
)
}}
select *, id as id_partition from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
""".strip()


@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu")
class TestIncrementalWithSinglePartitionKeyKudu(TestIncrementalKudu):
@pytest.fixture(scope="class")
def models(self):
return {
"incremental_test_model.sql": incremental_single_partitionby_sql,
"schema.yml": schema_base_yml,
}


incremental_multiple_partitionby_sql = """
{{
config(
materialized="incremental",
partition_by=["id_partition1", "id_partition2"],
stored_as="kudu",
primary_key="(id)"
)
}}
select *, id as id_partition1, id as id_partition2 from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
""".strip()


@pytest.mark.skip(reason="Need to fix partition by syntax for Kudu")
class TestIncrementalWithMultiplePartitionKeyKudu(TestIncrementalKudu):
@pytest.fixture(scope="class")
def models(self):
return {
"incremental_test_model.sql": incremental_multiple_partitionby_sql,
"schema.yml": schema_base_yml,
}

0 comments on commit df1d2d9

Please sign in to comment.