diff --git a/.circleci/config.yml b/.circleci/config.yml index 262a70ba2..0eac387da 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -86,6 +86,7 @@ workflows: only: - develop - /^int.*/ + - /^fix.*/ test-integration: jobs: - integration: @@ -93,4 +94,5 @@ workflows: branches: only: - develop - - /^int.*/ \ No newline at end of file + - /^int.*/ + - /^fix.*/ \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index 0f13a614c..67f2c4c55 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,5 +1,5 @@ name: 'dbtvault' -version: '0.7.4' +version: '0.7.5' require-dbt-version: [">=0.18.0", "<0.20.0"] config-version: 2 diff --git a/macros/internal/multikey.sql b/macros/internal/multikey.sql index c880410f5..f0506a907 100644 --- a/macros/internal/multikey.sql +++ b/macros/internal/multikey.sql @@ -16,7 +16,9 @@ {%- if condition in ['<>', '!=', '='] -%} {%- for col in columns -%} - {{ (prefix[0] ~ '.') if prefix }}{{ col }} {{ condition }} {{ (prefix[1] ~ '.') if prefix }}{{ col }} + {%- if prefix -%} + {{- dbtvault.prefix([col], prefix[0], alias_target='target') }} {{ condition }} {{ dbtvault.prefix([col], prefix[1]) -}} + {%- endif %} {%- if not loop.last %} {{ operator }} {% endif %} {% endfor -%} {%- else -%} diff --git a/macros/materialisations/shared_helpers.sql b/macros/materialisations/shared_helpers.sql index b28b9cdba..ed6b1971f 100644 --- a/macros/materialisations/shared_helpers.sql +++ b/macros/materialisations/shared_helpers.sql @@ -1,10 +1,19 @@ -{% macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') %} +{%- macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') -%} {%- if model_sql.find(placeholder) == -1 -%} {%- set error_message -%} Model '{{ model.unique_id }}' does not include the required string '{{ placeholder }}' in its sql {%- endset -%} - {{ exceptions.raise_compiler_error(error_message) }} + {{- exceptions.raise_compiler_error(error_message) -}} {%- endif -%} -{% endmacro %} \ No newline at end of file +{%- endmacro -%} + + +{%- macro is_any_incremental() -%} + {%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() -%} + {%- do return(true) -%} + {%- else -%} + {%- do return(false) -%} + {%- endif -%} +{%- endmacro -%} \ No newline at end of file diff --git a/macros/staging/rank_columns.sql b/macros/staging/rank_columns.sql index 4cbbe959b..0694daff4 100644 --- a/macros/staging/rank_columns.sql +++ b/macros/staging/rank_columns.sql @@ -12,7 +12,19 @@ {%- if columns[col] is mapping and columns[col].partition_by and columns[col].order_by -%} - {{- "RANK() OVER (PARTITION BY {} ORDER BY {}) AS {}".format(columns[col].partition_by, columns[col].order_by, col) | indent(4) -}} + {%- if dbtvault.is_list(columns[col].order_by) -%} + {%- set order_by_str = columns[col].order_by | join(", ") -%} + {%- else -%} + {%- set order_by_str = columns[col].order_by -%} + {%- endif -%} + + {%- if dbtvault.is_list(columns[col].partition_by) -%} + {%- set partition_by_str = columns[col].partition_by | join(", ") -%} + {%- else -%} + {%- set partition_by_str = columns[col].partition_by -%} + {%- endif -%} + + {{- "RANK() OVER (PARTITION BY {} ORDER BY {}) AS {}".format(partition_by_str, order_by_str, col) | indent(4) -}} {%- endif -%} diff --git a/macros/tables/eff_sat.sql b/macros/tables/eff_sat.sql index c74eb4c25..be696b4b8 100644 --- a/macros/tables/eff_sat.sql +++ b/macros/tables/eff_sat.sql @@ -21,104 +21,109 @@ {{- dbtvault.prepend_generated_by() }} WITH source_data AS ( - SELECT * - FROM {{ ref(source_model) }} + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }} + FROM {{ ref(source_model) }} AS a + WHERE {{ dbtvault.multikey(src_dfk, prefix='a', condition='IS NOT NULL') }} + AND {{ dbtvault.multikey(src_sfk, prefix='a', condition='IS NOT NULL') }} {%- if model.config.materialized == 'vault_insert_by_period' %} - WHERE __PERIOD_FILTER__ - {% endif %} - {%- set source_cte = "source_data" %} -), - -{%- if model.config.materialized == 'vault_insert_by_rank' %} -rank_col AS ( - SELECT * FROM source_data - WHERE __RANK_FILTER__ - {%- set source_cte = "rank_col" %} + AND __PERIOD_FILTER__ + {%- elif model.config.materialized == 'vault_insert_by_rank' %} + AND __RANK_FILTER__ + {%- endif %} ), -{% endif -%} -{%- if load_relation(this) is none %} +{%- if dbtvault.is_any_incremental() %} -records_to_insert AS ( - SELECT {{ dbtvault.alias_all(source_cols, 'e') }} - FROM {{ source_cte }} AS e -) -{%- else %} - -latest_open_eff AS -( +{# Selecting the most recent records for each link hashkey -#} +latest_records AS ( SELECT {{ dbtvault.alias_all(source_cols, 'b') }}, ROW_NUMBER() OVER ( - PARTITION BY - {%- for driving_key in dfk_cols %} - {{ driving_key }}{{ ", " if not loop.last }} - {%- endfor %} + PARTITION BY b.{{ src_pk }} ORDER BY b.{{ src_ldts }} DESC - ) AS row_number + ) AS row_num FROM {{ this }} AS b - WHERE TO_DATE(b.{{ src_end_date }}) = TO_DATE('9999-12-31') - QUALIFY row_number = 1 + QUALIFY row_num = 1 ), -stage_slice AS -( - SELECT {{ dbtvault.alias_all(source_cols, 'stage') }} - FROM {{ "rank_col" if model.config.materialized == 'vault_insert_by_rank' else "source_data" }} AS stage +{# Selecting the open records of the most recent records for each link hashkey -#} +latest_open AS ( + SELECT {{ dbtvault.alias_all(source_cols, 'c') }} + FROM latest_records AS c + WHERE TO_DATE(c.{{ src_end_date }}) = TO_DATE('9999-12-31') ), -new_open_records AS ( - SELECT DISTINCT - {{ dbtvault.alias_all(source_cols, 'stage') }} - FROM stage_slice AS stage - LEFT JOIN latest_open_eff AS e - ON stage.{{ src_pk }} = e.{{ src_pk }} - WHERE e.{{ src_pk }} IS NULL - AND {{ dbtvault.multikey(src_dfk, prefix='stage', condition='IS NOT NULL') }} - AND {{ dbtvault.multikey(src_sfk, prefix='stage', condition='IS NOT NULL') }} +{# Selecting the closed records of the most recent records for each link hashkey -#} +latest_closed AS ( + SELECT {{ dbtvault.alias_all(source_cols, 'd') }} + FROM latest_records AS d + WHERE TO_DATE(d.{{ src_end_date }}) != TO_DATE('9999-12-31') ), -{%- if is_auto_end_dating %} -links_to_end_date AS ( - SELECT a.* - FROM latest_open_eff AS a - LEFT JOIN stage_slice AS b - ON {{ dbtvault.multikey(src_dfk, prefix=['a', 'b'], condition='=') }} - WHERE {{ dbtvault.multikey(src_sfk, prefix='b', condition='IS NULL', operator='OR') }} - OR {{ dbtvault.multikey(src_sfk, prefix=['a', 'b'], condition='<>', operator='OR') }} +{# Identifying the completely new link relationships to be opened in eff sat -#} +new_open_records AS ( + SELECT DISTINCT + {{ dbtvault.alias_all(source_cols, 'f') }} + FROM source_data AS f + LEFT JOIN latest_records AS lr + ON f.{{ src_pk }} = lr.{{ src_pk }} + WHERE lr.{{ src_pk }} IS NULL ), -new_end_dated_records AS ( +{# Identifying the currently closed link relationships to be reopened in eff sat -#} +new_reopened_records AS ( SELECT DISTINCT - h.{{ src_pk }}, - {{ dbtvault.alias_all(fk_cols, 'g') }}, - h.EFFECTIVE_FROM AS {{ src_start_date }}, h.{{ src_source }} - FROM latest_open_eff AS h - INNER JOIN links_to_end_date AS g - ON g.{{ src_pk }} = h.{{ src_pk }} + lc.{{ src_pk }}, + {{ dbtvault.alias_all(fk_cols, 'lc') }}, + lc.{{ src_start_date }} AS {{ src_start_date }}, + g.{{ src_end_date }} AS {{ src_end_date }}, + g.{{ src_eff }} AS {{ src_eff }}, + g.{{ src_ldts }}, + g.{{ src_source }} + FROM source_data AS g + INNER JOIN latest_closed lc + ON g.{{ src_pk }} = lc.{{ src_pk }} ), -amended_end_dated_records AS ( +{%- if is_auto_end_dating %} + +{# Creating the closing records -#} +{# Identifying the currently open relationships that need to be closed due to change in SFK(s) -#} +new_closed_records AS ( SELECT DISTINCT - a.{{ src_pk }}, - {{ dbtvault.alias_all(fk_cols, 'a') }}, - a.{{ src_start_date }}, - stage.{{ src_eff }} AS END_DATE, stage.{{ src_eff }}, stage.{{ src_ldts }}, - a.{{ src_source }} - FROM new_end_dated_records AS a - INNER JOIN stage_slice AS stage - ON {{ dbtvault.multikey(src_dfk, prefix=['stage', 'a'], condition='=') }} - WHERE {{ dbtvault.multikey(src_sfk, prefix='stage', condition='IS NOT NULL') }} - AND {{ dbtvault.multikey(src_dfk, prefix='stage', condition='IS NOT NULL') }} + lo.{{ src_pk }}, + {{ dbtvault.alias_all(fk_cols, 'lo') }}, + lo.{{ src_start_date }} AS {{ src_start_date }}, + h.{{ src_eff }} AS {{ src_end_date }}, + h.{{ src_eff }} AS {{ src_eff }}, + h.{{ src_ldts }}, + lo.{{ src_source }} + FROM source_data AS h + INNER JOIN latest_open AS lo + ON {{ dbtvault.multikey(src_dfk, prefix=['lo', 'h'], condition='=') }} + WHERE ({{ dbtvault.multikey(src_sfk, prefix=['lo', 'h'], condition='<>', operator='OR') }}) ), + +{#- if is_auto_end_dating -#} {%- endif %} records_to_insert AS ( SELECT * FROM new_open_records + UNION + SELECT * FROM new_reopened_records {%- if is_auto_end_dating %} UNION - SELECT * FROM amended_end_dated_records + SELECT * FROM new_closed_records {%- endif %} ) + +{%- else %} + +records_to_insert AS ( + SELECT {{ dbtvault.alias_all(source_cols, 'i') }} + FROM source_data AS i +) + +{#- if not dbtvault.is_any_incremental() -#} {%- endif %} SELECT * FROM records_to_insert diff --git a/macros/tables/hub.sql b/macros/tables/hub.sql index 754051572..b708cb0ac 100644 --- a/macros/tables/hub.sql +++ b/macros/tables/hub.sql @@ -91,7 +91,7 @@ row_rank_union AS ( records_to_insert AS ( SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} FROM {{ ns.last_cte }} AS a - {%- if dbtvault.is_vault_insert_by_period() or is_incremental() %} + {%- if dbtvault.is_any_incremental() %} LEFT JOIN {{ this }} AS d ON a.{{ src_pk }} = d.{{ src_pk }} WHERE {{ dbtvault.prefix([src_pk], 'd') }} IS NULL diff --git a/macros/tables/link.sql b/macros/tables/link.sql index a1c9ba50d..bb7891326 100644 --- a/macros/tables/link.sql +++ b/macros/tables/link.sql @@ -41,7 +41,7 @@ row_rank_{{ source_number }} AS ( {%- endif %} ROW_NUMBER() OVER( PARTITION BY {{ src_pk }} - ORDER BY {{ src_ldts }} ASC + ORDER BY {{ src_ldts }} ) AS row_number FROM {{ ref(src) }} {%- if source_model | length == 1 %} @@ -96,7 +96,7 @@ row_rank_union AS ( records_to_insert AS ( SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} FROM {{ ns.last_cte }} AS a - {%- if dbtvault.is_vault_insert_by_period() or is_incremental() %} + {%- if dbtvault.is_any_incremental() %} LEFT JOIN {{ this }} AS d ON a.{{ src_pk }} = d.{{ src_pk }} WHERE {{ dbtvault.prefix([src_pk], 'd') }} IS NULL diff --git a/macros/tables/ma_sat.sql b/macros/tables/ma_sat.sql new file mode 100644 index 000000000..a96eb40f4 --- /dev/null +++ b/macros/tables/ma_sat.sql @@ -0,0 +1,127 @@ +{%- macro ma_sat(src_pk, src_cdk, src_hashdiff, src_payload, src_eff, src_ldts, src_source, source_model) -%} + + {{- adapter.dispatch('ma_sat', packages = dbtvault.get_dbtvault_namespaces())(src_pk=src_pk, src_cdk=src_cdk, src_hashdiff=src_hashdiff, + src_payload=src_payload, src_eff=src_eff, src_ldts=src_ldts, + src_source=src_source, source_model=source_model) -}} + +{%- endmacro %} + +{%- macro default__ma_sat(src_pk, src_cdk, src_hashdiff, src_payload, src_eff, src_ldts, src_source, source_model) -%} + + +{{- dbtvault.check_required_parameters(src_pk=src_pk, src_cdk=src_cdk, src_hashdiff=src_hashdiff, + src_payload=src_payload, src_ldts=src_ldts, src_source=src_source, + source_model=source_model) -}} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_cdk, src_payload, src_eff, src_ldts, src_source]) -%} +{%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} +{%- set cdk_cols = dbtvault.expand_column_list(columns=[src_cdk]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +WITH source_data AS ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'a', alias_target='source') }} + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }} + {%- endif %} + ,COUNT(DISTINCT {{ dbtvault.prefix([src_hashdiff], 'a') }}, {{ dbtvault.prefix(cdk_cols, 'a') }} ) + OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'a') }}) AS source_count + FROM {{ ref(source_model) }} AS a + WHERE {{ dbtvault.prefix([src_pk], 'a') }} IS NOT NULL + {%- for child_key in src_cdk %} + AND {{ dbtvault.multikey(child_key, 'a', condition='IS NOT NULL') }} + {%- endfor %} + {%- if model.config.materialized == 'vault_insert_by_period' %} + AND __PERIOD_FILTER__ + {%- elif model.config.materialized == 'vault_insert_by_rank' %} + AND __RANK_FILTER__ + {%- endif %} +), + +{% if dbtvault.is_any_incremental() %} + +{# Select latest records from satellite together with count of distinct hashdiffs for each hashkey #} +latest_records AS ( + SELECT *, COUNT(DISTINCT {{ dbtvault.prefix([src_hashdiff], 'latest_selection') }}, {{ dbtvault.prefix(cdk_cols, 'latest_selection') }} ) + OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'latest_selection') }}) AS target_count + FROM ( + SELECT {{ dbtvault.prefix(cdk_cols, 'target_records', alias_target='target') }}, {{ dbtvault.prefix(rank_cols, 'target_records', alias_target='target') }} + ,RANK() OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'target_records') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'target_records') }} DESC) AS rank_value + FROM {{ this }} AS target_records + INNER JOIN + (SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_pks') }} + FROM source_data AS source_pks) AS source_records + ON {{ dbtvault.prefix([src_pk], 'target_records') }} = {{ dbtvault.prefix([src_pk], 'source_records') }} + QUALIFY rank_value = 1 + ) AS latest_selection +), + +{# Select PKs and hashdiff counts for matching stage and sat records #} +{# Matching by hashkey + hashdiff + cdk #} +matching_records AS ( + SELECT {{ dbtvault.prefix([src_pk], 'stage', alias_target='target') }} + ,COUNT(DISTINCT {{ dbtvault.prefix([src_hashdiff], 'stage') }}, {{ dbtvault.prefix(cdk_cols, 'stage') }}) AS match_count + FROM source_data AS stage + INNER JOIN latest_records + ON {{ dbtvault.prefix([src_pk], 'stage') }} = {{ dbtvault.prefix([src_pk], 'latest_records', alias_target='target') }} + AND {{ dbtvault.prefix([src_hashdiff], 'stage') }} = {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} + {%- for child_key in src_cdk %} + AND {{ dbtvault.prefix([child_key], 'stage') }} = {{ dbtvault.prefix([child_key], 'latest_records') }} + {%- endfor %} + GROUP BY {{ dbtvault.prefix([src_pk], 'stage') }} +), + +{# Select stage records with PKs that exist in sat where hashdiffs differ #} +{# either where total counts differ or where match counts differ #} +satellite_update AS ( + SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'stage', alias_target='target') }} + FROM source_data AS stage + INNER JOIN latest_records + ON {{ dbtvault.prefix([src_pk], 'latest_records') }} = {{ dbtvault.prefix([src_pk], 'stage') }} + LEFT OUTER JOIN matching_records + ON {{ dbtvault.prefix([src_pk], 'matching_records') }} = {{ dbtvault.prefix([src_pk], 'latest_records') }} + WHERE (stage.source_count != latest_records.target_count + OR COALESCE(matching_records.match_count, 0) != latest_records.target_count) + {%- if model.config.materialized == 'vault_insert_by_rank' or model.config.materialized == 'vault_insert_by_period' %} + AND {{ dbtvault.prefix([src_ldts], 'stage') }} >= {{ dbtvault.prefix([src_ldts], 'latest_records') }} + {%- endif %} +), + +{# Select stage records with PKs that do not exist in sat #} +satellite_insert AS ( + SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'stage', alias_target='target') }} + FROM source_data AS stage + LEFT OUTER JOIN latest_records + ON {{ dbtvault.prefix([src_pk], 'stage') }} = {{ dbtvault.prefix([src_pk], 'latest_records') }} + WHERE {{ dbtvault.prefix([src_pk], 'latest_records') }} IS NULL +), + +{%- endif %} + +records_to_insert AS ( + SELECT {% if not dbtvault.is_any_incremental() %} DISTINCT {% endif %} {{ dbtvault.alias_all(source_cols, 'stage') }} + FROM source_data AS stage + {#- Restrict to "to-do lists" of keys selected by satellite_update and satellite_insert CTEs #} + {%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() %} + INNER JOIN satellite_update + ON {{ dbtvault.prefix([src_pk], 'satellite_update') }} = {{ dbtvault.prefix([src_pk], 'stage') }} + + UNION + + SELECT {{ dbtvault.alias_all(source_cols, 'stage') }} + FROM source_data AS stage + INNER JOIN satellite_insert + ON {{ dbtvault.prefix([src_pk], 'satellite_insert') }} = {{ dbtvault.prefix([src_pk], 'stage') }} + {%- endif %} +) + +{# Select stage records #} +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/sat.sql b/macros/tables/sat.sql index d5c373850..ca5c09e22 100644 --- a/macros/tables/sat.sql +++ b/macros/tables/sat.sql @@ -9,7 +9,7 @@ {%- macro default__sat(src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source, source_model) -%} {{- dbtvault.check_required_parameters(src_pk=src_pk, src_hashdiff=src_hashdiff, src_payload=src_payload, - src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, + src_ldts=src_ldts, src_source=src_source, source_model=source_model) -}} {%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_eff, src_ldts, src_source]) -%} @@ -29,51 +29,40 @@ WITH source_data AS ( SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }} {%- endif %} FROM {{ ref(source_model) }} AS a + WHERE {{ dbtvault.prefix([src_pk], 'a') }} IS NOT NULL {%- if model.config.materialized == 'vault_insert_by_period' %} - WHERE __PERIOD_FILTER__ - AND {{ dbtvault.multikey(src_pk, condition='IS NOT NULL') }} - {% elif model.config.materialized != 'vault_insert_by_rank' and model.config.materialized != 'vault_insert_by_period' %} - WHERE {{ dbtvault.multikey(src_pk, condition='IS NOT NULL') }} + AND __PERIOD_FILTER__ + {% elif model.config.materialized == 'vault_insert_by_rank' %} + AND __RANK_FILTER__ {% endif %} - {%- set source_cte = "source_data" %} ), -{%- if model.config.materialized == 'vault_insert_by_rank' %} -rank_col AS ( - SELECT * FROM source_data - WHERE __RANK_FILTER__ - AND {{ dbtvault.multikey(src_pk, condition='IS NOT NULL') }} - {%- set source_cte = "rank_col" %} -), -{% endif -%} - -{% if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() %} - -update_records AS ( - SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} - FROM {{ this }} as a - JOIN source_data as b - ON a.{{ src_pk }} = b.{{ src_pk }} -), +{% if dbtvault.is_any_incremental() %} latest_records AS ( - SELECT {{ dbtvault.prefix(rank_cols, 'c', alias_target='target') }}, - CASE WHEN RANK() - OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'c') }} - ORDER BY {{ dbtvault.prefix([src_ldts], 'c') }} DESC) = 1 - THEN 'Y' ELSE 'N' END AS latest - FROM update_records as c - QUALIFY latest = 'Y' + + SELECT {{ dbtvault.prefix(rank_cols, 'current_records', alias_target='target') }}, + RANK() OVER ( + PARTITION BY {{ dbtvault.prefix([src_pk], 'current_records') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'current_records') }} DESC + ) AS rank + FROM {{ this }} AS current_records + JOIN ( + SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_data') }} + FROM source_data + ) AS source_records + ON {{ dbtvault.prefix([src_pk], 'current_records') }} = {{ dbtvault.prefix([src_pk], 'source_records') }} + QUALIFY rank = 1 ), {%- endif %} records_to_insert AS ( - SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'e') }} - FROM {{ source_cte }} AS e - {%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() %} + SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }} + FROM source_data AS stage + {%- if dbtvault.is_any_incremental() %} LEFT JOIN latest_records - ON {{ dbtvault.prefix([src_pk], 'latest_records', alias_target='target') }} = {{ dbtvault.prefix([src_pk], 'e') }} - WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'e') }} + ON {{ dbtvault.prefix([src_pk], 'latest_records', alias_target='target') }} = {{ dbtvault.prefix([src_pk], 'stage') }} + WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'stage') }} OR {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL {%- endif %} ) diff --git a/macros/tables/t_link.sql b/macros/tables/t_link.sql index eca4fe93a..80a20765c 100644 --- a/macros/tables/t_link.sql +++ b/macros/tables/t_link.sql @@ -36,7 +36,7 @@ WITH stage AS ( records_to_insert AS ( SELECT DISTINCT {{ dbtvault.prefix(source_cols, 'stg') }} FROM stage AS stg - {% if is_incremental() -%} + {% if dbtvault.is_any_incremental() -%} LEFT JOIN {{ this }} AS tgt ON {{ dbtvault.prefix([src_pk], 'stg') }} = {{ dbtvault.prefix([src_pk], 'tgt') }} WHERE {{ dbtvault.prefix([src_pk], 'tgt') }} IS NULL