Skip to content

Commit 3c709b7

Browse files
authored
Set column comments on create for MV/ST (#1046)
2 parents abfb577 + d5d0992 commit 3c709b7

File tree

10 files changed

+171
-8
lines changed

10 files changed

+171
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- Fix bug where incorrect insert overwrite exception was being thrown when overriding compute at model level ([1032](https://github.com/databricks/dbt-databricks/issues/1032))
77
- Fix bug that was causing failures when multiple foreign key constraints are defined in a model ([1034](https://github.com/databricks/dbt-databricks/issues/1034))
88
- Fix location_root config not being treated case sensitive ([1037](https://github.com/databricks/dbt-databricks/issues/1037))
9+
- Fix column comments for streaming tables and materialized views ([1049](https://github.com/databricks/dbt-databricks/issues/1049))
910

1011
## dbt-databricks 1.10.2 (May 21, 2025)
1112

dbt/include/databricks/macros/materializations/materialized_view.sql

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@
7070
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
7171
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
7272

73-
{% do persist_docs(target_relation, model, for_relation=False) %}
74-
7573
{{ run_hooks(post_hooks, inside_transaction=True) }}
7674

7775
{% endmacro %}

dbt/include/databricks/macros/materializations/streaming_table.sql

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@
6969
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
7070
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
7171

72-
{% do persist_docs(target_relation, model, for_relation=False) %}
73-
7472
{{ run_hooks(post_hooks, inside_transaction=True) }}
7573

7674
{% endmacro %}

dbt/include/databricks/macros/relations/materialized_view/create.sql

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,15 @@
44
{%- set tblproperties = materialized_view.config["tblproperties"].tblproperties -%}
55
{%- set comment = materialized_view.config["comment"].comment -%}
66
{%- set refresh = materialized_view.config["refresh"] -%}
7-
create materialized view {{ relation.render() }}
7+
8+
{%- set columns = adapter.get_column_schema_from_query(sql) -%}
9+
{%- set model_columns = model.get('columns', {}) -%}
10+
{%- set model_constraints = model.get('constraints', []) -%}
11+
{%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, model_constraints) -%}
12+
{%- set target_relation = relation.enrich(columns_and_constraints[1]) -%}
13+
14+
create materialized view {{ target_relation.render() }}
15+
{{ get_column_and_constraints_sql(target_relation, columns_and_constraints[0]) }}
816
{{ get_create_sql_partition_by(partition_by) }}
917
{{ get_create_sql_comment(comment) }}
1018
{{ get_create_sql_tblproperties(tblproperties) }}

dbt/include/databricks/macros/relations/streaming_table/create.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,14 @@
99
{%- set comment = streaming_table.config["comment"].comment -%}
1010
{%- set refresh = streaming_table.config["refresh"] -%}
1111

12+
{%- set analysis_sql = sql | replace('STREAM ', '') | replace('stream ', '') -%}
13+
{%- set columns = adapter.get_column_schema_from_query(analysis_sql) -%}
14+
{%- set model_columns = model.get('columns', {}) -%}
15+
{%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, []) -%}
16+
17+
{#-- We don't enrich the relation with model constraints because they are not supported for streaming tables --#}
1218
CREATE STREAMING TABLE {{ relation.render() }}
19+
{{ get_column_and_constraints_sql(relation, columns_and_constraints[0]) }}
1320
{{ get_create_sql_partition_by(partition_by) }}
1421
{{ get_create_sql_comment(comment) }}
1522
{{ get_create_sql_tblproperties(tblproperties) }}

tests/functional/adapter/materialized_view_tests/fixtures.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,23 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
4444
) }}
4545
select * from {{ ref('my_seed') }}
4646
"""
47+
48+
materialized_view_schema = """
49+
version: 2
50+
51+
models:
52+
- name: my_materialized_view
53+
columns:
54+
- name: id
55+
description: "The unique identifier for each record"
56+
constraints:
57+
- type: not_null
58+
- name: value
59+
constraints:
60+
- type: primary_key
61+
columns: [id]
62+
config:
63+
persist_docs:
64+
relation: true
65+
columns: true
66+
"""

tests/functional/adapter/materialized_view_tests/test_basic.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from dbt.adapters.base.relation import BaseRelation
66
from dbt.tests import util
7+
from dbt.tests.adapter.materialized_view import files
78
from dbt.tests.adapter.materialized_view.basic import MaterializedViewBasic
89
from tests.functional.adapter.materialized_view_tests import fixtures
910

@@ -29,6 +30,15 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
2930
@pytest.mark.dlt
3031
@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
3132
class TestMaterializedViews(TestMaterializedViewsMixin, MaterializedViewBasic):
33+
@pytest.fixture(scope="class", autouse=True)
34+
def models(self):
35+
yield {
36+
"my_table.sql": files.MY_TABLE,
37+
"my_view.sql": files.MY_VIEW,
38+
"my_materialized_view.sql": files.MY_MATERIALIZED_VIEW,
39+
"schema.yml": fixtures.materialized_view_schema,
40+
}
41+
3242
def test_table_replaces_materialized_view(self, project, my_materialized_view):
3343
util.run_dbt(["run", "--models", my_materialized_view.identifier])
3444
assert self.query_relation_type(project, my_materialized_view) == "materialized_view"
@@ -70,3 +80,42 @@ def test_materialized_view_replaces_view(self, project, my_view):
7080
util.run_dbt(["run", "--models", my_view.identifier])
7181
# UC doesn't sync metadata fast enough for this to pass consistently
7282
# assert self.query_relation_type(project, my_view) == "materialized_view"
83+
84+
def test_create_materialized_view_with_comment_and_constraints(
85+
self, project, my_materialized_view
86+
):
87+
util.run_dbt(["run", "--models", my_materialized_view.identifier])
88+
assert self.query_relation_type(project, my_materialized_view) == "materialized_view"
89+
90+
# verify the non-null constraint and column comment are persisted on create
91+
results = project.run_sql(
92+
f"""
93+
SELECT
94+
is_nullable,
95+
comment
96+
FROM {project.database}.information_schema.columns
97+
WHERE table_catalog = '{project.database}'
98+
AND table_schema = '{project.test_schema}'
99+
AND table_name = '{my_materialized_view.identifier}'
100+
AND column_name = 'id'""",
101+
fetch="all",
102+
)
103+
row = results[0]
104+
assert row[0] == "NO"
105+
assert row[1] == "The unique identifier for each record"
106+
# Verify primary key constraint is persisted
107+
results = project.run_sql(
108+
f"""
109+
SELECT
110+
constraint_name,
111+
column_name
112+
FROM {project.database}.information_schema.key_column_usage
113+
WHERE table_catalog = '{project.database}'
114+
AND table_schema = '{project.test_schema}'
115+
AND table_name = '{my_materialized_view.identifier}'
116+
""",
117+
fetch="all",
118+
)
119+
assert len(results) == 1
120+
assert results[0][0] == "my_materialized_view_pk"
121+
assert results[0][1] == "id"

tests/functional/adapter/streaming_tables/fixtures.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,31 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
5151
) }}
5252
select * from stream {{ ref('my_seed') }}
5353
"""
54+
55+
streaming_table_schema = """
56+
version: 2
57+
58+
models:
59+
- name: my_streaming_table
60+
columns:
61+
- name: id
62+
description: "The unique identifier for each record"
63+
constraints:
64+
- type: not_null
65+
- name: value
66+
config:
67+
persist_docs:
68+
relation: true
69+
columns: true
70+
"""
71+
72+
73+
# There is a single CSV file in the source directory. The contents exactly matches MY_SEED
74+
streaming_table_from_file = """
75+
{{ config(
76+
materialized='streaming_table',
77+
) }}
78+
select * from stream read_files(
79+
'{{ env_var('DBT_DATABRICKS_LOCATION_ROOT') }}/test_inputs/streaming_table_test_sources'
80+
);
81+
"""

tests/functional/adapter/streaming_tables/test_st_basic.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
from tests.functional.adapter.streaming_tables import fixtures
1010

1111

12-
@pytest.mark.dlt
13-
@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
14-
class TestStreamingTablesBasic:
12+
class TestStreamingTablesMixin:
1513
@staticmethod
1614
def insert_record(project, table: BaseRelation, record: tuple[int, int]):
1715
project.run_sql(f"insert into {table} values {record}")
@@ -38,6 +36,7 @@ def models(self):
3836
"my_table.sql": MY_TABLE,
3937
"my_view.sql": MY_VIEW,
4038
"my_streaming_table.sql": fixtures.streaming_table,
39+
"schema.yml": fixtures.streaming_table_schema,
4140
}
4241

4342
@pytest.fixture(scope="class")
@@ -122,9 +121,29 @@ def setup(self, project, my_streaming_table):
122121
util.set_model_file(project, my_streaming_table, initial_model)
123122
project.run_sql(f"drop schema if exists {project.test_schema} cascade")
124123

124+
125+
@pytest.mark.dlt
126+
@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
127+
class TestStreamingTablesBasic(TestStreamingTablesMixin):
125128
def test_streaming_table_create(self, project, my_streaming_table):
126129
# setup creates it; verify it's there
127130
assert self.query_relation_type(project, my_streaming_table) == "streaming_table"
131+
# verify the non-null constraint and column comment are persisted on create
132+
results = project.run_sql(
133+
f"""
134+
SELECT
135+
is_nullable,
136+
comment
137+
FROM {project.database}.information_schema.columns
138+
WHERE table_catalog = '{project.database}'
139+
AND table_schema = '{project.test_schema}'
140+
AND table_name = '{my_streaming_table.identifier}'
141+
AND column_name = 'id'""",
142+
fetch="all",
143+
)
144+
row = results[0]
145+
assert row[0] == "NO"
146+
assert row[1] == "The unique identifier for each record"
128147

129148
def test_streaming_table_create_idempotent(self, project, my_streaming_table):
130149
# setup creates it once; verify it's there and run once
@@ -208,3 +227,34 @@ def test_streaming_table_only_updates_after_refresh(self, project, my_streaming_
208227
# view until it was refreshed
209228
assert table_start < table_mid == table_end
210229
assert view_start == view_mid < view_end
230+
231+
232+
@pytest.mark.dlt
233+
@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
234+
class TestStreamingTablesFromFiles(TestStreamingTablesMixin):
235+
@pytest.fixture(scope="class", autouse=True)
236+
def models(self):
237+
yield {
238+
"my_streaming_table.sql": fixtures.streaming_table_from_file,
239+
"schema.yml": fixtures.streaming_table_schema,
240+
}
241+
242+
def test_streaming_table_create_from_file(self, project, my_streaming_table):
243+
# setup creates it; verify it's there
244+
assert self.query_relation_type(project, my_streaming_table) == "streaming_table"
245+
# verify the non-null constraint and column comment are persisted on create
246+
results = project.run_sql(
247+
f"""
248+
SELECT
249+
is_nullable,
250+
comment
251+
FROM {project.database}.information_schema.columns
252+
WHERE table_catalog = '{project.database}'
253+
AND table_schema = '{project.test_schema}'
254+
AND table_name = '{my_streaming_table.identifier}'
255+
AND column_name = 'id'""",
256+
fetch="all",
257+
)
258+
row = results[0]
259+
assert row[0] == "NO"
260+
assert row[1] == "The unique identifier for each record"

tests/functional/adapter/streaming_tables/test_st_changes.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ def project_config_update(self):
134134
def test_change_is_applied_via_alter(self, project, my_streaming_table):
135135
self.check_start_state(project, my_streaming_table)
136136

137+
# This tries to update column definitions (e.g. comment) but should be ignored for
138+
# streaming tables. No explicit assertion needed, the job succeeding is sufficient
139+
util.write_file(fixtures.streaming_table_schema, "models", "schema.yml")
140+
137141
self.change_config_via_alter(project, my_streaming_table)
138142
_, logs = util.run_dbt_and_capture(["--debug", "run", "--models", my_streaming_table.name])
139143

0 commit comments

Comments
 (0)