Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions jobs/create_stored_procedures.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,47 @@ def unload_as_geojson(
)


def write_iceberg_metadata(session: snowpark.Session, database: str, stage: str):
# Your code goes here, inside the "main" handler.
iceberg_tables = (
session.sql(
f"""
show iceberg tables in database {database}
"""
)
.collect_nowait()
.to_df()
.to_pandas()
)

def _get_meta(record):
identifier = ".".join([database, record["schema_name"], record["name"]])
result = (
session.sql(
f"""
select system$get_iceberg_table_information('{identifier}')
"""
)
.collect_nowait()
.result()
)
return json.loads(result[0][0])["metadataLocation"]

iceberg_tables = iceberg_tables.assign(
schema=iceberg_tables["schema_name"],
metadata=iceberg_tables.apply(_get_meta, axis=1),
path=iceberg_tables["base_location"],
)[["name", "schema", "path", "metadata"]]
fname = "/tmp/current_table_versions.json"
iceberg_tables.to_json(fname, orient="records")
return session.file.put(
fname,
f"{stage}/iceberg",
auto_compress=False,
overwrite=True,
)


if __name__ == "__main__":
from jobs.utils.snowflake import snowflake_connection_from_environment

Expand Down Expand Up @@ -73,3 +114,15 @@ def unload_as_geojson(
is_permanent=True,
stage_location="@PEMS_MARTS_INTERNAL",
)

session.sproc.register(
func=write_iceberg_metadata,
name="write_iceberg_metadata",
return_type=types.VariantType(),
input_types=[types.StringType(), types.StringType()],
packages=["snowflake-snowpark-python"],
replace=True,
source_code_display=True,
is_permanent=True,
stage_location="@PEMS_MARTS_INTERNAL",
)
4,695 changes: 2,545 additions & 2,150 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ package-mode = false
[tool.poetry.dependencies]
python = "~3.10"
mkdocs-material = "~9.1.3"
dbt-core = "~1.8"
dbt-snowflake = "~1.8"
dbt-core = "~1.9"
dbt-snowflake = "~1.9.4"
jupyterlab = "^4.0.9"
ibis-framework = {extras = ["snowflake"], version = "^7.2.0"}
matplotlib = "^3.8.2"
Expand Down
14 changes: 10 additions & 4 deletions transform/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ flags:
skip_nodes_if_on_run_start_fails: true
require_explicit_package_overrides_for_builtin_materializations: true
source_freshness_run_project_hooks: false
enable_iceberg_materializations: true

# This setting configures which "profile" dbt uses for this project.
profile: caltrans_pems
Expand Down Expand Up @@ -96,19 +97,24 @@ models:
+schema: imputation
marts:
+materialized: table
+post-hook: "{{ unload_relation(strip_leading_words=1) }}"
+database: "{{ env_var('DBT_ANALYTICS_DB', 'ANALYTICS_DEV') }}"
performance:
+post-hook: "{{ unload_relation(strip_leading_words=1) }}"
+schema: performance
imputation:
+schema: imputation
+table_format: iceberg
+external_volume: pems_marts_dev
+base_location_root: iceberg
diagnostics:
+post-hook: "{{ unload_relation(strip_leading_words=1) }}"
+schema: diagnostics
geo:
+schema: geo
+post-hook: "{{ unload_relation_as_geojson(strip_leading_words=1) }}"
quality:
+post-hook: "{{ unload_relation(strip_leading_words=1) }}"
+schema: quality
geo:
+post-hook: "{{ unload_relation_as_geojson(strip_leading_words=1) }}"
+schema: geo

on-run-start:
- "{{create_udfs()}}"
17 changes: 17 additions & 0 deletions transform/macros/write_iceberg_metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{% macro write_iceberg_metadata() %}
{% if target.name == 'prd' %}
{% set suffix = 'PRD' %}
{% else %}
{% set suffix = 'DEV' %}
{% endif %}
{% set stage = '@ANALYTICS_' ~ suffix ~ '.PUBLIC.PEMS_MARTS_' ~ suffix %}
{% set database = "ANALYTICS_" ~ suffix %}
{% set query %}
call {{ database }}.public.write_iceberg_metadata(
'{{ database }}',
'{{ stage }}'
);
{% endset %}
{{ log('Writing iceberg metadata to ' ~ stage, info=true) }}
{{ run_query(query) }}
{% endmacro %}
5 changes: 0 additions & 5 deletions transform/models/marts/imputation/_imputation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ models:
This model filters the results of the imputation_detector_imputed_agg_five_minutes model from the transform
step to detectors with station_type in of either 'ML' or 'HV' from the past 4 days and adds the county name
to the results. This model is unique at the level of SAMPLE_TIMESTAMP + DETECTOR_ID.
data_tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- SAMPLE_TIMESTAMP
- DETECTOR_ID
columns:
- name: DETECTOR_ID
data_tests:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
{{ config(
materialized="table",
unload_partitioning="('day=' || to_varchar(date_part(day, sample_date)) || '/district=' || district)",
) }}


with imputation_five_mins as (
select *
from {{ ref('int_imputation__detector_imputed_agg_five_minutes') }}
where
station_type in ('ML', 'HV')
and sample_date >= dateadd(day, -4, current_date)
),

imputation_five_minsc as (
{{ get_county_name('imputation_five_mins') }}
)

select * from imputation_five_minsc
{{ config(
materialized='incremental',
snowflake_warehouse=get_snowflake_refresh_warehouse(big="XL", small="XS")
) }}
with imputation_five_mins as (
select
* exclude (
sample_timestamp,
station_valid_from,
station_valid_to
),
sample_timestamp::timestamp_ntz(6) as sample_timestamp -- iceberg needs ms for timestamps
from {{ ref('int_imputation__detector_imputed_agg_five_minutes') }}
where
station_type in ('ML', 'HV')
and {{ make_model_incremental('sample_date') }}
),

imputation_five_minsc as (
{{ get_county_name('imputation_five_mins') }}
)

select * from imputation_five_minsc