Skip to content

Commit 4d80287

Browse files
committed
Add more metadata
1 parent 2fb1235 commit 4d80287

File tree

2 files changed

+19
-4
lines changed

2 files changed

+19
-4
lines changed

dags/veda_data_pipeline/groups/processing_tasks.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from copy import deepcopy
55
import smart_open
66
from airflow.models.variable import Variable
7+
from airflow.models.xcom import LazyXComSelectSequence
78
from airflow.decorators import task
89
from airflow.datasets import Dataset, DatasetAlias
910
from airflow.datasets.metadata import Metadata
@@ -88,7 +89,7 @@ def build_stac_task(payload, ti=None):
8889
DatasetAlias("VEDA-Datasets")
8990
],
9091
)
91-
def post_ingest_report(ti, logical_date): # params are Airflow kwargs - use this task without input
92+
def post_ingest_dataset_event(ti, logical_date, built_items = {}): # params are Airflow kwargs - use this task without input
9293
"""
9394
Logs a Dataset event, saving the config used as a versioned object in s3, and creating a Metadata object visible in Airflow.
9495
@@ -113,8 +114,22 @@ def post_ingest_report(ti, logical_date): # params are Airflow kwargs - use thi
113114
json.dump(payload, f, indent=2)
114115
log_task(f"Payload written to {key}")
115116

117+
# built items can be either a dict or a list of dicts
118+
if isinstance(built_items, LazyXComSelectSequence):
119+
built_items = list(built_items)
120+
elif not isinstance(built_items, list):
121+
built_items = [built_items]
122+
print(f"Built items: {built_items}")
123+
success_count = sum(item.get("payload", {}).get("status", {}).get("successes", 0) for item in built_items)
124+
failure_count = sum(item.get("payload", {}).get("status", {}).get("failures", 0) for item in built_items)
125+
116126
yield Metadata(
117127
Dataset(f"{collection}"),
118-
extra={"ingest_datetime": str(logical_date) }, # extra has to be provided, can be {}
128+
extra={
129+
"ingest_datetime": str(logical_date),
130+
"ingest_configuration": key,
131+
"successful_items": success_count,
132+
"failed_items": failure_count,
133+
}, # extra has to be provided, can be {}
119134
alias="VEDA-Datasets",
120135
)

dags/veda_data_pipeline/veda_dataset_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_task
55
from airflow.operators.empty import EmptyOperator
66
from veda_data_pipeline.groups.collection_group import collection_task_group
7-
from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task, build_stac_task, extract_discovery_items_from_payload, remove_thumbnail_asset, post_ingest_report
7+
from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task, build_stac_task, extract_discovery_items_from_payload, remove_thumbnail_asset, post_ingest_dataset_event
88

99
template_dag_run_conf = {
1010
"collection": "<collection-id>",
@@ -53,4 +53,4 @@
5353
discover = discover_from_s3_task.partial(payload=mutated_payloads).expand(event=discovery_items)
5454
get_files = get_files_task(payload=discover)
5555
build_stac = build_stac_task.expand(payload=get_files)
56-
submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) >> post_ingest_report() >> end
56+
submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) >> post_ingest_dataset_event(built_items=build_stac) >> end

0 commit comments

Comments
 (0)