Skip to content

Commit

Permalink
Release 0.7.5
Browse files Browse the repository at this point in the history
New structures
- Multi-Active Satellites [Read More](../tutorial/tut_multi_active_satellites.md)

Bug Fixes
- Fixed a bug where an Effectivity Satellite with multiple DFKs or SDKs would incorrectly handle changes in the corresponding link records, meaning
one to many relationships were not getting handled as intended.

Improvements
- Added support for multiple `order_by` or `partition_by` columns when creating ranked columns in the `stage` or `ranked_columns` macros.
  • Loading branch information
Alex Higgs committed Jun 10, 2021
2 parents bfaa3d1 + 991ee0c commit 6ac0c90
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 115 deletions.
4 changes: 3 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ workflows:
only:
- develop
- /^int.*/
- /^fix.*/
test-integration:
jobs:
- integration:
filters:
branches:
only:
- develop
- /^int.*/
- /^int.*/
- /^fix.*/
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'dbtvault'
version: '0.7.4'
version: '0.7.5'
require-dbt-version: [">=0.18.0", "<0.20.0"]
config-version: 2

Expand Down
4 changes: 3 additions & 1 deletion macros/internal/multikey.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

{%- if condition in ['<>', '!=', '='] -%}
{%- for col in columns -%}
{{ (prefix[0] ~ '.') if prefix }}{{ col }} {{ condition }} {{ (prefix[1] ~ '.') if prefix }}{{ col }}
{%- if prefix -%}
{{- dbtvault.prefix([col], prefix[0], alias_target='target') }} {{ condition }} {{ dbtvault.prefix([col], prefix[1]) -}}
{%- endif %}
{%- if not loop.last %} {{ operator }} {% endif %}
{% endfor -%}
{%- else -%}
Expand Down
15 changes: 12 additions & 3 deletions macros/materialisations/shared_helpers.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
{% macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') %}
{%- macro check_placeholder(model_sql, placeholder='__PERIOD_FILTER__') -%}

{%- if model_sql.find(placeholder) == -1 -%}
{%- set error_message -%}
Model '{{ model.unique_id }}' does not include the required string '{{ placeholder }}' in its sql
{%- endset -%}
{{ exceptions.raise_compiler_error(error_message) }}
{{- exceptions.raise_compiler_error(error_message) -}}
{%- endif -%}

{% endmacro %}
{%- endmacro -%}


{%- macro is_any_incremental() -%}
{%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() -%}
{%- do return(true) -%}
{%- else -%}
{%- do return(false) -%}
{%- endif -%}
{%- endmacro -%}
14 changes: 13 additions & 1 deletion macros/staging/rank_columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@

{%- if columns[col] is mapping and columns[col].partition_by and columns[col].order_by -%}

{{- "RANK() OVER (PARTITION BY {} ORDER BY {}) AS {}".format(columns[col].partition_by, columns[col].order_by, col) | indent(4) -}}
{%- if dbtvault.is_list(columns[col].order_by) -%}
{%- set order_by_str = columns[col].order_by | join(", ") -%}
{%- else -%}
{%- set order_by_str = columns[col].order_by -%}
{%- endif -%}

{%- if dbtvault.is_list(columns[col].partition_by) -%}
{%- set partition_by_str = columns[col].partition_by | join(", ") -%}
{%- else -%}
{%- set partition_by_str = columns[col].partition_by -%}
{%- endif -%}

{{- "RANK() OVER (PARTITION BY {} ORDER BY {}) AS {}".format(partition_by_str, order_by_str, col) | indent(4) -}}

{%- endif -%}

Expand Down
143 changes: 74 additions & 69 deletions macros/tables/eff_sat.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,104 +21,109 @@
{{- dbtvault.prepend_generated_by() }}

WITH source_data AS (
SELECT *
FROM {{ ref(source_model) }}
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }}
FROM {{ ref(source_model) }} AS a
WHERE {{ dbtvault.multikey(src_dfk, prefix='a', condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(src_sfk, prefix='a', condition='IS NOT NULL') }}
{%- if model.config.materialized == 'vault_insert_by_period' %}
WHERE __PERIOD_FILTER__
{% endif %}
{%- set source_cte = "source_data" %}
),

{%- if model.config.materialized == 'vault_insert_by_rank' %}
rank_col AS (
SELECT * FROM source_data
WHERE __RANK_FILTER__
{%- set source_cte = "rank_col" %}
AND __PERIOD_FILTER__
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
AND __RANK_FILTER__
{%- endif %}
),
{% endif -%}

{%- if load_relation(this) is none %}
{%- if dbtvault.is_any_incremental() %}

records_to_insert AS (
SELECT {{ dbtvault.alias_all(source_cols, 'e') }}
FROM {{ source_cte }} AS e
)
{%- else %}

latest_open_eff AS
(
{# Selecting the most recent records for each link hashkey -#}
latest_records AS (
SELECT {{ dbtvault.alias_all(source_cols, 'b') }},
ROW_NUMBER() OVER (
PARTITION BY
{%- for driving_key in dfk_cols %}
{{ driving_key }}{{ ", " if not loop.last }}
{%- endfor %}
PARTITION BY b.{{ src_pk }}
ORDER BY b.{{ src_ldts }} DESC
) AS row_number
) AS row_num
FROM {{ this }} AS b
WHERE TO_DATE(b.{{ src_end_date }}) = TO_DATE('9999-12-31')
QUALIFY row_number = 1
QUALIFY row_num = 1
),

stage_slice AS
(
SELECT {{ dbtvault.alias_all(source_cols, 'stage') }}
FROM {{ "rank_col" if model.config.materialized == 'vault_insert_by_rank' else "source_data" }} AS stage
{# Selecting the open records of the most recent records for each link hashkey -#}
latest_open AS (
SELECT {{ dbtvault.alias_all(source_cols, 'c') }}
FROM latest_records AS c
WHERE TO_DATE(c.{{ src_end_date }}) = TO_DATE('9999-12-31')
),

new_open_records AS (
SELECT DISTINCT
{{ dbtvault.alias_all(source_cols, 'stage') }}
FROM stage_slice AS stage
LEFT JOIN latest_open_eff AS e
ON stage.{{ src_pk }} = e.{{ src_pk }}
WHERE e.{{ src_pk }} IS NULL
AND {{ dbtvault.multikey(src_dfk, prefix='stage', condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(src_sfk, prefix='stage', condition='IS NOT NULL') }}
{# Selecting the closed records of the most recent records for each link hashkey -#}
latest_closed AS (
SELECT {{ dbtvault.alias_all(source_cols, 'd') }}
FROM latest_records AS d
WHERE TO_DATE(d.{{ src_end_date }}) != TO_DATE('9999-12-31')
),
{%- if is_auto_end_dating %}

links_to_end_date AS (
SELECT a.*
FROM latest_open_eff AS a
LEFT JOIN stage_slice AS b
ON {{ dbtvault.multikey(src_dfk, prefix=['a', 'b'], condition='=') }}
WHERE {{ dbtvault.multikey(src_sfk, prefix='b', condition='IS NULL', operator='OR') }}
OR {{ dbtvault.multikey(src_sfk, prefix=['a', 'b'], condition='<>', operator='OR') }}
{# Identifying the completely new link relationships to be opened in eff sat -#}
new_open_records AS (
SELECT DISTINCT
{{ dbtvault.alias_all(source_cols, 'f') }}
FROM source_data AS f
LEFT JOIN latest_records AS lr
ON f.{{ src_pk }} = lr.{{ src_pk }}
WHERE lr.{{ src_pk }} IS NULL
),

new_end_dated_records AS (
{# Identifying the currently closed link relationships to be reopened in eff sat -#}
new_reopened_records AS (
SELECT DISTINCT
h.{{ src_pk }},
{{ dbtvault.alias_all(fk_cols, 'g') }},
h.EFFECTIVE_FROM AS {{ src_start_date }}, h.{{ src_source }}
FROM latest_open_eff AS h
INNER JOIN links_to_end_date AS g
ON g.{{ src_pk }} = h.{{ src_pk }}
lc.{{ src_pk }},
{{ dbtvault.alias_all(fk_cols, 'lc') }},
lc.{{ src_start_date }} AS {{ src_start_date }},
g.{{ src_end_date }} AS {{ src_end_date }},
g.{{ src_eff }} AS {{ src_eff }},
g.{{ src_ldts }},
g.{{ src_source }}
FROM source_data AS g
INNER JOIN latest_closed lc
ON g.{{ src_pk }} = lc.{{ src_pk }}
),

amended_end_dated_records AS (
{%- if is_auto_end_dating %}

{# Creating the closing records -#}
{# Identifying the currently open relationships that need to be closed due to change in SFK(s) -#}
new_closed_records AS (
SELECT DISTINCT
a.{{ src_pk }},
{{ dbtvault.alias_all(fk_cols, 'a') }},
a.{{ src_start_date }},
stage.{{ src_eff }} AS END_DATE, stage.{{ src_eff }}, stage.{{ src_ldts }},
a.{{ src_source }}
FROM new_end_dated_records AS a
INNER JOIN stage_slice AS stage
ON {{ dbtvault.multikey(src_dfk, prefix=['stage', 'a'], condition='=') }}
WHERE {{ dbtvault.multikey(src_sfk, prefix='stage', condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(src_dfk, prefix='stage', condition='IS NOT NULL') }}
lo.{{ src_pk }},
{{ dbtvault.alias_all(fk_cols, 'lo') }},
lo.{{ src_start_date }} AS {{ src_start_date }},
h.{{ src_eff }} AS {{ src_end_date }},
h.{{ src_eff }} AS {{ src_eff }},
h.{{ src_ldts }},
lo.{{ src_source }}
FROM source_data AS h
INNER JOIN latest_open AS lo
ON {{ dbtvault.multikey(src_dfk, prefix=['lo', 'h'], condition='=') }}
WHERE ({{ dbtvault.multikey(src_sfk, prefix=['lo', 'h'], condition='<>', operator='OR') }})
),

{#- if is_auto_end_dating -#}
{%- endif %}

records_to_insert AS (
SELECT * FROM new_open_records
UNION
SELECT * FROM new_reopened_records
{%- if is_auto_end_dating %}
UNION
SELECT * FROM amended_end_dated_records
SELECT * FROM new_closed_records
{%- endif %}
)

{%- else %}

records_to_insert AS (
SELECT {{ dbtvault.alias_all(source_cols, 'i') }}
FROM source_data AS i
)

{#- if not dbtvault.is_any_incremental() -#}
{%- endif %}

SELECT * FROM records_to_insert
Expand Down
2 changes: 1 addition & 1 deletion macros/tables/hub.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ row_rank_union AS (
records_to_insert AS (
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if dbtvault.is_vault_insert_by_period() or is_incremental() %}
{%- if dbtvault.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON a.{{ src_pk }} = d.{{ src_pk }}
WHERE {{ dbtvault.prefix([src_pk], 'd') }} IS NULL
Expand Down
4 changes: 2 additions & 2 deletions macros/tables/link.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ row_rank_{{ source_number }} AS (
{%- endif %}
ROW_NUMBER() OVER(
PARTITION BY {{ src_pk }}
ORDER BY {{ src_ldts }} ASC
ORDER BY {{ src_ldts }}
) AS row_number
FROM {{ ref(src) }}
{%- if source_model | length == 1 %}
Expand Down Expand Up @@ -96,7 +96,7 @@ row_rank_union AS (
records_to_insert AS (
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if dbtvault.is_vault_insert_by_period() or is_incremental() %}
{%- if dbtvault.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON a.{{ src_pk }} = d.{{ src_pk }}
WHERE {{ dbtvault.prefix([src_pk], 'd') }} IS NULL
Expand Down
Loading

0 comments on commit 6ac0c90

Please sign in to comment.