-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is this your first time submitting a feature request?
- I have read the expectations for open source contributors
- I have searched the existing issues, and I could not find an existing issue for this feature
- I am requesting a straightforward extension of existing dbt functionality, rather than a Big Idea better suited to a discussion
Describe the feature
Even with microbatching, e.g. a single day of data can be very large. But there may be other columns (partitions) available that further split the problem into smaller chunks so that the single day's query can be further split into smaller queries that can be run one-by-one.
Something similar has been suggested here (but for incremental models in general):
#11324
You could have something like (AWS Athena adapter):
{{
config(
materialized = "incremental",
incremental_strategy='microbatch',
period = cur_period,
additional_batch_column ="hg",
additional_batch_values=[0,1,2,3,4,5,6,7,8,9],
event_time='onboarding_date',
batch_size = 'day',
lookback = 3,
begin = '2025-02-17',
partitioned_by=['partoperation', 'partprovider', 'partsubprovider', 'onboarding_date', 'hg'],
format='parquet',
full_refresh=false
)
}}
and in the compiled query it would look like:
SELECT * FROM (select * from "awsdatacatalog"."dbt_aip_stg"."stg_onboarding_tbl_us" where
cast(onboarding_date as timestamp) >= timestamp '2025-03-19 00:00:00+00:00' and cast(onboarding_date
as timestamp) < timestamp '2025-03-20 00:00:00+00:00') AND hg = 0
Describe alternatives you've considered
So far we have used a this custom materialization:
https://gist.github.com/jessedobbelaere/6fdb593f9e2cc732e9f142c56c9bac87
and added such a bespoke additional batching ourselves, but it is a bit clunky and not so generalized:
{% macro get_period_sql_with_batch(target_cols_csv, sql, timestamp_field, start_timestamp, stop_timestamp, period, is_timestamp_string, timestamp_format, offset, force_batch_column, force_batch_value) -%}
{#-- compute period intervals --#}
{%- set period_length = period.split(' ')[0] | trim | int -%}
{%- set period_type = period.split(' ')[1] | trim | string -%}
{%- set start_period_interval = period_length * offset -%}
{%- set stop_period_interval = period_length * (offset + 1) -%}
{#-- build the period filter --#}
{%- if is_timestamp_string -%}
{%- set period_filter =
"(date_parse(" ~ timestamp_field ~ ", '" ~ timestamp_format ~ "') >= timestamp '" ~ start_timestamp ~ "' + interval '" ~ start_period_interval ~ "' " ~ period_type ~
" and date_parse(" ~ timestamp_field ~ ", '" ~ timestamp_format ~ "') < timestamp '" ~ start_timestamp ~ "' + interval '" ~ stop_period_interval ~ "' " ~ period_type ~
" and date_parse(" ~ timestamp_field ~ ", '" ~ timestamp_format ~ "') < timestamp '" ~ stop_timestamp ~ "')" -%}
{%- else -%}
{%- set period_filter =
"(" ~ timestamp_field ~ " >= timestamp '" ~ start_timestamp ~ "' + interval '" ~ start_period_interval ~ "' " ~ period_type ~
" and " ~ timestamp_field ~ " < timestamp '" ~ start_timestamp ~ "' + interval '" ~ stop_period_interval ~ "' " ~ period_type ~
" and " ~ timestamp_field ~ " < timestamp '" ~ stop_timestamp ~ "')" -%}
{%- endif -%}
{#-- build the batch filter if enabled --#}
{%- if force_batch_column and force_batch_value is not none -%}
{%- if force_batch_value is string -%}
{%- set batch_filter = "(" ~ force_batch_column ~ " = '" ~ force_batch_value ~ "')" -%}
{%- else -%}
{%- set batch_filter = "(" ~ force_batch_column ~ " = " ~ force_batch_value ~ ")" -%}
{%- endif -%}
{%- set combined_filter = period_filter ~ " AND " ~ batch_filter -%}
{%- else -%}
{%- set combined_filter = period_filter -%}
{%- endif -%}
{#-- inject combined filter into the user's SQL --#}
{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", combined_filter) -%}
{{ return("select " ~ target_cols_csv ~ " from (" ~ filtered_sql ~ ")") }}
{%- endmacro %}
{% materialization sp_insert_by_period, adapter='athena' -%}
{#-- configuration and setup --#}
{%- set timestamp_field = config.require('timestamp_field') -%}
{%- set start_date = var('start_date', get_from_date()) -%}
{%- set stop_date = var('stop_date', config.get('stop_date', default=get_to_date())) -%}
{%- set force_dates = (var('start_date', '') != '' or var('stop_date', '') != '') -%}
{%- if force_dates -%}
{{ dbt_utils.log_info('Forcing dates between ' ~ start_date ~ ' and ' ~ stop_date) }}
{%- endif -%}
{%- set period = config.get('period', default='100 day') -%}
{%- set partitioned_by = config.get('partitioned_by', default=none) -%}
{%- set is_timestamp_string = config.get('is_timestamp_string', default=True) -%}
{%- set unique_tmp_table_suffix = config.get('unique_tmp_table_suffix', False) | as_bool -%}
{%- set timestamp_format = config.get('timestamp_format', default='%Y-%m-%d')-%}
{%- set full_refresh_mode = should_full_refresh() -%}
{#-- validate placeholder --#}
{%- if sql.find('__PERIOD_FILTER__') == -1 -%}
{%- set error_message -%}
Model '{{ model.unique_id }}' requires the string '__PERIOD_FILTER__' in its SQL
{%- endset -%}
{{ exceptions.raise_compiler_error(error_message) }}
{%- endif -%}
{#-- batch configuration --#}
{%- set force_batch_column = config.get('force_batch_column', default='') -%}
{%- set force_batch_values = config.get('force_batch_values', default=[]) -%}
{%- set force_batch_enabled = (force_batch_column != '' and force_batch_values | length > 0) -%}
{%- if force_batch_enabled -%}
{%- set batch_values = force_batch_values -%}
{%- else -%}
{%- set batch_values = [none] -%}
{%- endif -%}
{#-- prepare target relation --#}
{%- set existing_relation = load_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{%- set empty_sql = sql | replace('__PERIOD_FILTER__', 'false') -%}
{% if existing_relation is none or existing_relation.is_view or full_refresh_mode %}
{% if existing_relation is not none %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% call statement('main') %}
{{ create_table_as(False, target_relation, empty_sql) }}
{% endcall %}
{% endif %}
{#-- run pre-hooks --#}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True ) }}
{#-- compute period boundaries --#}
{%- set period_boundaries = adapter.dispatch('get_period_boundaries', 'insert_by_period')(
schema,
target_relation.name,
timestamp_field,
start_date,
stop_date,
force_dates,
period,
is_timestamp_string,
timestamp_format) -%}
{%- set start_timestamp = period_boundaries['start_timestamp'] -%}
{%- set stop_timestamp = period_boundaries['stop_timestamp'] -%}
{%- set num_periods = period_boundaries['num_periods'] -%}
{#-- identify columns to insert --#}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{#-- iterative insert over periods and batches --#}
{%- for i in range(num_periods) -%}
{%- for batch_val in batch_values -%}
{%- set label = 'period ' ~ (i+1) ~ '/' ~ num_periods -%}
{%- if force_batch_enabled and batch_val is not none -%}
{%- set label = label ~ ', batch ' ~ batch_val ~ '/' ~ (batch_values | length) -%}
{%- endif -%}
{% do dbt_utils.log_info('Running ' ~ target_relation ~ ' for ' ~ label) %}
{#-- create a unique temp table for this slice --#}
{% if unique_tmp_table_suffix %}
{%- set tmp_table_suffix = adapter.generate_unique_temporary_table_suffix() -%}
{%- set tmp_identifier = model['name'] ~ '__dbt_incr_period' ~ i ~ (( '__batch_' ~ batch_val|string ) if batch_val is not none else '') ~ '_tmp_' ~ tmp_table_suffix -%}
{% else %}
{%- set tmp_identifier = model['name'] ~ '__dbt_incr_period' ~ i ~ (( '__batch_' ~ batch_val|string ) if batch_val is not none else '') ~ '_tmp' -%}
{% endif %}
{%- set tmp_relation = api.Relation.create(
database=target_relation.database,
schema=target_relation.schema,
identifier=tmp_identifier,
type='table') -%}
{% if tmp_relation is not none %}{% do adapter.drop_relation(tmp_relation) %}{% endif %}
{#-- generate SQL with combined filters --#}
{%- set tmp_sql = get_period_sql_with_batch(
target_cols_csv,
sql,
timestamp_field,
start_timestamp,
stop_timestamp,
period,
is_timestamp_string,
timestamp_format,
i,
force_batch_column,
batch_val) -%}
{#-- build the slice table --#}
{% call statement('main') %}
{{ create_table_as(True, tmp_relation, tmp_sql) }}
{% endcall %}
{#-- remove overlapping partitions then insert --#}
{% if partitioned_by %}
{% do delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
{% endif %}
{%- set on_schema_change = incremental_validate_on_schema_change(
config.get('on_schema_change'), default='ignore') -%}
{% call statement('main') %}
{{ incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) }}
{% endcall %}
{% do adapter.drop_relation(tmp_relation) %}
{%- endfor -%}
{%- endfor -%}
{#-- finalize --#}
{{ run_hooks(post_hooks, inside_transaction=True ) }}
{% do adapter.commit() %}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
in the same vein could be added to this one:
https://github.com/dbt-labs/dbt-labs-experimental-features/tree/main/insert_by_period
Who will this benefit?
Anyone who needs to have a fine control over how to materialize very large models that fail when running in a single query, even when using microbatching and anyone who wants to leverage existing partitions to split materialization into to smaller chunks of work.
Are you interested in contributing this feature?
In principle yes, but not experiences with working on dbt.core
Anything else?
No response