Skip to content

Commit c31987f

Browse files
Add data_freshness_sla and volume_threshold tests
Add two new Elementary tests: - data_freshness_sla: checks if data was updated before a specified SLA deadline - volume_threshold: monitors row count changes with configurable warn/error thresholds, using Elementary's metric caching to avoid redundant computation Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent d33eb04 commit c31987f

File tree

2 files changed

+395
-0
lines changed

2 files changed

+395
-0
lines changed
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
{#
2+
Test: data_freshness_sla
3+
4+
Verifies that data in a model was updated before a specified SLA deadline time.
5+
Checks the max timestamp value of a specified column in the data itself.
6+
7+
Use case: "Is the data fresh?" / "Was the data updated on time?"
8+
9+
Parameters:
10+
timestamp_column (required): Column name containing timestamps to check for freshness
11+
sla_time (required): Deadline time. Supports formats like "07:00", "7am", "2:30pm", "14:30"
12+
timezone (required): IANA timezone name (e.g., "America/Los_Angeles", "Europe/London")
13+
day_of_week (optional): Day(s) to check. String or list: "Monday", ["Monday", "Wednesday"]
14+
day_of_month (optional): Day(s) of month to check. Integer or list: 1, [1, 15]
15+
where_expression (optional): Additional WHERE clause filter for the data query
16+
17+
Schedule behavior:
18+
- If neither day_of_week nor day_of_month is set: check every day (default)
19+
- If day_of_week is set: only check on those days
20+
- If day_of_month is set: only check on those days
21+
- If both are set: check if today matches EITHER filter (OR logic)
22+
23+
Example usage:
24+
models:
25+
- name: my_model
26+
tests:
27+
- elementary.data_freshness_sla:
28+
timestamp_column: updated_at
29+
sla_time: "07:00"
30+
timezone: "America/Los_Angeles"
31+
32+
- name: daily_events
33+
tests:
34+
- elementary.data_freshness_sla:
35+
timestamp_column: event_timestamp
36+
sla_time: "6am"
37+
timezone: "Europe/Amsterdam"
38+
where_expression: "event_type = 'completed'"
39+
40+
- name: weekly_report_data
41+
tests:
42+
- elementary.data_freshness_sla:
43+
timestamp_column: report_date
44+
sla_time: "09:00"
45+
timezone: "Asia/Tokyo"
46+
day_of_week: ["Monday"]
47+
48+
Test passes if:
49+
- Today is not a scheduled check day (based on day_of_week/day_of_month)
50+
- OR the max timestamp in the data is from today (before or after deadline)
51+
- OR the SLA deadline for today hasn't passed yet
52+
53+
Test fails if:
54+
- Today is a scheduled check day AND the deadline has passed AND:
55+
- No data exists in the table
56+
- The max timestamp is from a previous day (data not updated today)
57+
#}
58+
59+
{% test data_freshness_sla(model, timestamp_column, sla_time, timezone, day_of_week=none, day_of_month=none, where_expression=none) %}
60+
{{ config(tags=['elementary-tests']) }}
61+
62+
{%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %}
63+
64+
{# Validate required parameters #}
65+
{% if not timestamp_column %}
66+
{{ exceptions.raise_compiler_error("The 'timestamp_column' parameter is required. Example: timestamp_column: 'updated_at'") }}
67+
{% endif %}
68+
69+
{% if not sla_time %}
70+
{{ exceptions.raise_compiler_error("The 'sla_time' parameter is required. Example: sla_time: '07:00'") }}
71+
{% endif %}
72+
73+
{# Validate timezone #}
74+
{% do elementary.validate_timezone(timezone) %}
75+
76+
{# Normalize and validate day filters #}
77+
{% set day_of_week_filter = elementary.normalize_day_of_week(day_of_week) %}
78+
{% set day_of_month_filter = elementary.normalize_day_of_month(day_of_month) %}
79+
80+
{# Get model relation and validate #}
81+
{% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %}
82+
{% if not model_relation %}
83+
{{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") }}
84+
{% endif %}
85+
86+
{# Validate timestamp column exists and is a timestamp type #}
87+
{% set timestamp_column_data_type = elementary.find_normalized_data_type_for_column(model_relation, timestamp_column) %}
88+
{% if not elementary.is_column_timestamp(model_relation, timestamp_column, timestamp_column_data_type) %}
89+
{{ exceptions.raise_compiler_error("Column '" ~ timestamp_column ~ "' is not a timestamp type. The timestamp_column must be a timestamp or datetime column.") }}
90+
{% endif %}
91+
92+
{# Parse the SLA time #}
93+
{% set parsed_time = elementary.parse_sla_time(sla_time) %}
94+
{% set formatted_sla_time = elementary.format_sla_time(parsed_time) %}
95+
96+
{# Calculate SLA deadline in UTC (also returns current day info) #}
97+
{% set sla_info = elementary.calculate_sla_deadline_utc(parsed_time.hour, parsed_time.minute, timezone) %}
98+
99+
{# Check if today is a scheduled check day #}
100+
{% set should_check = elementary.should_check_sla_today(
101+
sla_info.day_of_week,
102+
sla_info.day_of_month,
103+
day_of_week_filter,
104+
day_of_month_filter
105+
) %}
106+
107+
{# If today is not a scheduled check day, skip (pass) #}
108+
{% if not should_check %}
109+
{{ elementary.edr_log('Skipping data_freshness_sla test for ' ~ model_relation.identifier ~ ' - not a scheduled check day (' ~ sla_info.day_of_week ~ ', day ' ~ sla_info.day_of_month ~ ')') }}
110+
{{ elementary.no_results_query() }}
111+
{% else %}
112+
113+
{{ elementary.edr_log('Running data_freshness_sla test for ' ~ model_relation.identifier ~ ' with SLA ' ~ formatted_sla_time ~ ' ' ~ timezone) }}
114+
115+
{# Build the query #}
116+
{{ elementary.get_data_freshness_sla_query(
117+
model_relation=model_relation,
118+
timestamp_column=timestamp_column,
119+
sla_deadline_utc=sla_info.sla_deadline_utc,
120+
target_date=sla_info.target_date,
121+
target_date_start_utc=sla_info.target_date_start_utc,
122+
target_date_end_utc=sla_info.target_date_end_utc,
123+
deadline_passed=sla_info.deadline_passed,
124+
formatted_sla_time=formatted_sla_time,
125+
timezone=timezone,
126+
where_expression=where_expression
127+
) }}
128+
129+
{% endif %}
130+
131+
{%- else %}
132+
{{ elementary.no_results_query() }}
133+
{%- endif %}
134+
135+
{% endtest %}
136+
137+
138+
{#
139+
Build SQL query to check if data was updated before SLA deadline.
140+
141+
Logic:
142+
- Query the model table to get MAX(timestamp_column)
143+
- Convert max timestamp to UTC for comparison
144+
- If max timestamp is from today (in target timezone): data is fresh, SLA met
145+
- If deadline hasn't passed yet: Don't fail (still time)
146+
- Otherwise: Data is stale, SLA missed
147+
#}
148+
{% macro get_data_freshness_sla_query(model_relation, timestamp_column, sla_deadline_utc, target_date, target_date_start_utc, target_date_end_utc, deadline_passed, formatted_sla_time, timezone, where_expression) %}
149+
150+
with
151+
152+
sla_deadline as (
153+
select
154+
{{ elementary.edr_cast_as_timestamp("'" ~ sla_deadline_utc ~ "'") }} as deadline_utc,
155+
{{ elementary.edr_cast_as_timestamp("'" ~ target_date_start_utc ~ "'") }} as target_date_start_utc,
156+
{{ elementary.edr_cast_as_timestamp("'" ~ target_date_end_utc ~ "'") }} as target_date_end_utc,
157+
'{{ target_date }}' as target_date
158+
),
159+
160+
{# Get the max timestamp from the data #}
161+
max_data_timestamp as (
162+
select
163+
max({{ elementary.edr_cast_as_timestamp(timestamp_column) }}) as max_timestamp_utc
164+
from {{ model_relation }}
165+
{% if where_expression %}
166+
where {{ where_expression }}
167+
{% endif %}
168+
),
169+
170+
{# Determine freshness status #}
171+
freshness_result as (
172+
select
173+
sd.target_date,
174+
sd.deadline_utc as sla_deadline_utc,
175+
mdt.max_timestamp_utc,
176+
case
177+
{# Data was updated today (max timestamp is within today's UTC range) #}
178+
when mdt.max_timestamp_utc >= sd.target_date_start_utc
179+
and mdt.max_timestamp_utc <= sd.target_date_end_utc then 'DATA_FRESH'
180+
{# No data exists #}
181+
when mdt.max_timestamp_utc is null then 'NO_DATA'
182+
{# Data exists but is from a previous day #}
183+
else 'DATA_STALE'
184+
end as freshness_status
185+
from sla_deadline sd
186+
cross join max_data_timestamp mdt
187+
),
188+
189+
final_result as (
190+
select
191+
'{{ model_relation.identifier }}' as model_name,
192+
target_date,
193+
'{{ formatted_sla_time }}' as sla_time,
194+
'{{ timezone }}' as timezone,
195+
cast(sla_deadline_utc as {{ elementary.edr_type_string() }}) as sla_deadline_utc,
196+
freshness_status,
197+
cast(max_timestamp_utc as {{ elementary.edr_type_string() }}) as max_timestamp_utc,
198+
case
199+
when freshness_status = 'DATA_FRESH' then false
200+
{# If deadline hasn't passed, don't fail yet #}
201+
{% if deadline_passed %}
202+
when not TRUE then false
203+
{% else %}
204+
when not FALSE then false
205+
{% endif %}
206+
else true
207+
end as is_failure,
208+
case
209+
when freshness_status = 'NO_DATA' then
210+
'No data found in "{{ model_relation.identifier }}"' ||
211+
{% if where_expression %}
212+
' (with filter: {{ where_expression }})' ||
213+
{% endif %}
214+
'. Expected data to be updated before {{ formatted_sla_time }} {{ timezone }}.'
215+
when freshness_status = 'DATA_STALE' then
216+
'Data in "{{ model_relation.identifier }}" is stale. Last update was at ' ||
217+
cast(max_timestamp_utc as {{ elementary.edr_type_string() }}) ||
218+
' UTC, which is before today. Expected fresh data before {{ formatted_sla_time }} {{ timezone }}.'
219+
else
220+
'Data in "{{ model_relation.identifier }}" is fresh - last update at ' ||
221+
cast(max_timestamp_utc as {{ elementary.edr_type_string() }}) ||
222+
' UTC (before SLA deadline {{ formatted_sla_time }} {{ timezone }}).'
223+
end as result_description
224+
from freshness_result
225+
)
226+
227+
select
228+
model_name,
229+
target_date,
230+
sla_time,
231+
timezone,
232+
sla_deadline_utc,
233+
freshness_status,
234+
max_timestamp_utc,
235+
result_description
236+
from final_result
237+
where is_failure = true
238+
239+
{% endmacro %}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
{#
2+
Test: volume_threshold
3+
4+
Monitors row count changes using percentage thresholds with multiple severity levels.
5+
Uses Elementary's metric caching infrastructure to avoid recalculating row counts.
6+
7+
Parameters:
8+
timestamp_column (required): Column to determine time periods
9+
warn_threshold_percent (optional): % change that triggers warning (default: 5)
10+
error_threshold_percent (optional): % change that triggers error (default: 10)
11+
direction (optional): 'both', 'spike', or 'drop' (default: 'both')
12+
time_bucket (optional): Time bucket config (default: {period: 'day', count: 1})
13+
where_expression (optional): Additional WHERE filter
14+
days_back (optional): Days of history to keep (default: 14)
15+
backfill_days (optional): Days to backfill on each run (default: 2)
16+
min_row_count (optional): Min baseline rows to check (default: 100)
17+
18+
Example:
19+
- elementary.volume_threshold:
20+
timestamp_column: created_at
21+
warn_threshold_percent: 5
22+
error_threshold_percent: 10
23+
direction: both
24+
#}
25+
26+
{% test volume_threshold(model,
27+
timestamp_column,
28+
warn_threshold_percent=5,
29+
error_threshold_percent=10,
30+
direction='both',
31+
time_bucket=none,
32+
where_expression=none,
33+
days_back=14,
34+
backfill_days=2,
35+
min_row_count=100) %}
36+
{{ config(tags=['elementary-tests'], fail_calc='max(severity_level)', warn_if='>=1', error_if='>=2') }}
37+
38+
{%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %}
39+
40+
{% if warn_threshold_percent > error_threshold_percent %}
41+
{{ exceptions.raise_compiler_error("warn_threshold_percent cannot exceed error_threshold_percent") }}
42+
{% endif %}
43+
{% if direction not in ['both', 'spike', 'drop'] %}
44+
{{ exceptions.raise_compiler_error("direction must be 'both', 'spike', or 'drop'") }}
45+
{% endif %}
46+
47+
{% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %}
48+
{% if not model_relation %}
49+
{{ exceptions.raise_compiler_error("Unsupported model: " ~ model) }}
50+
{% endif %}
51+
52+
{# Collect row_count metrics using Elementary's shared infrastructure (handles caching & incremental) #}
53+
{% set table_metrics = [{"type": "row_count", "name": "row_count"}] %}
54+
{% do elementary.collect_table_metrics(
55+
table_metrics=table_metrics,
56+
model_expr=model,
57+
model_relation=model_relation,
58+
timestamp_column=timestamp_column,
59+
time_bucket=time_bucket or {'period': 'day', 'count': 1},
60+
days_back=days_back,
61+
backfill_days=backfill_days,
62+
where_expression=where_expression,
63+
dimensions=[]
64+
) %}
65+
66+
{# Compare current vs previous bucket from cached metrics #}
67+
{{ elementary.get_volume_threshold_comparison_query(
68+
model_relation=model_relation,
69+
warn_threshold_percent=warn_threshold_percent,
70+
error_threshold_percent=error_threshold_percent,
71+
direction=direction,
72+
min_row_count=min_row_count
73+
) }}
74+
75+
{%- else %}
76+
{{ elementary.no_results_query() }}
77+
{%- endif %}
78+
{% endtest %}
79+
80+
81+
{% macro get_volume_threshold_comparison_query(model_relation, warn_threshold_percent,
82+
error_threshold_percent, direction, min_row_count) %}
83+
84+
{% set test_metrics_table = elementary.get_elementary_test_table(elementary.get_elementary_test_table_name(), 'metrics') %}
85+
86+
with metrics as (
87+
select bucket_start, bucket_end, metric_value as row_count
88+
from {{ test_metrics_table }}
89+
),
90+
91+
current_bucket as (
92+
select bucket_start, bucket_end, row_count
93+
from metrics
94+
order by bucket_end desc
95+
limit 1
96+
),
97+
98+
previous_bucket as (
99+
select bucket_start, bucket_end, row_count
100+
from metrics
101+
where bucket_end <= (select bucket_start from current_bucket)
102+
order by bucket_end desc
103+
limit 1
104+
),
105+
106+
comparison as (
107+
select
108+
curr.bucket_end as current_period,
109+
prev.bucket_end as previous_period,
110+
curr.row_count as current_row_count,
111+
prev.row_count as previous_row_count,
112+
case
113+
when prev.row_count = 0 or prev.row_count is null then null
114+
else round((curr.row_count - prev.row_count) * 100.0 / prev.row_count, 2)
115+
end as percent_change
116+
from current_bucket curr
117+
left join previous_bucket prev on 1=1
118+
),
119+
120+
result as (
121+
select *,
122+
case
123+
when previous_row_count is null or previous_row_count < {{ min_row_count }} then 0
124+
when percent_change is null then 0
125+
{% if direction == 'both' %}
126+
when abs(percent_change) >= {{ error_threshold_percent }} then 2
127+
when abs(percent_change) >= {{ warn_threshold_percent }} then 1
128+
{% elif direction == 'spike' %}
129+
when percent_change >= {{ error_threshold_percent }} then 2
130+
when percent_change >= {{ warn_threshold_percent }} then 1
131+
{% else %}
132+
when percent_change <= -{{ error_threshold_percent }} then 2
133+
when percent_change <= -{{ warn_threshold_percent }} then 1
134+
{% endif %}
135+
else 0
136+
end as severity_level
137+
from comparison
138+
)
139+
140+
select
141+
'{{ model_relation.identifier }}' as model_name,
142+
cast(current_period as {{ elementary.edr_type_string() }}) as current_period,
143+
cast(previous_period as {{ elementary.edr_type_string() }}) as previous_period,
144+
{{ elementary.edr_cast_as_int('current_row_count') }} as current_row_count,
145+
{{ elementary.edr_cast_as_int('previous_row_count') }} as previous_row_count,
146+
{{ elementary.edr_cast_as_int('current_row_count - previous_row_count') }} as absolute_change,
147+
percent_change,
148+
severity_level,
149+
case severity_level when 2 then 'error' when 1 then 'warn' else 'pass' end as severity_name,
150+
'Row count changed by ' || cast(percent_change as {{ elementary.edr_type_string() }}) ||
151+
'% (from ' || cast({{ elementary.edr_cast_as_int('previous_row_count') }} as {{ elementary.edr_type_string() }}) ||
152+
' to ' || cast({{ elementary.edr_cast_as_int('current_row_count') }} as {{ elementary.edr_type_string() }}) || ')' as result_description
153+
from result
154+
where severity_level > 0
155+
156+
{% endmacro %}

0 commit comments

Comments
 (0)