Skip to content

Add support for pyarc2stac based ingest#361

Merged
kyle-lesinger merged 25 commits intodevfrom
disasterDag
Aug 22, 2025
Merged

Add support for pyarc2stac based ingest#361
kyle-lesinger merged 25 commits intodevfrom
disasterDag

Conversation

@kyle-lesinger
Copy link
Member

@kyle-lesinger kyle-lesinger commented Apr 25, 2025

Summary:

1.) Create a DAG that create a collection using pyarc2stac
2.) Make this DAG schedulable
2.) Install pyarc2stac within airflow_worker and airflow_services within virtual environment (because of conflict with some library on noaa-hrr stac tools dag (we should do this for all the dags - libraries specifically only needed by the dag needs to go within the dag specific virtual environment)

PR Checklist

  • Ad-hoc testing - Deploy changes and test manually
  • Integration tests

Copy link
Member

@slesaad slesaad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kyle-lesinger thanks for starting the work on this.

Ideally, we want to have a generic DAG (that has nothing to do with disasters) that's not always running on schedule

Scenario: I want to ingest an ArcGIS dataset for the air quality (or some other) portal and run it only once. The ideal solution should also cover this scenario. It's kinda getting there with you reading the json from an s3 bucket, but not completely.

If you don't know this already, our airflow instance comes with an s3 bucket that saves the dags in the /dags folder. The bucket is available through the following env var: MWAA_STAC_CONF["EVENT_BUCKET"]. The same s3 bucket has been used to hold scheduled datasets in the /collections folder. Airflow knows to look into that folder and read the json from there, and based on the schedule provided, run the discover dag for the dataset in schedule. This feature was added in #155, you can check it out there.

Here's what I'd do:

  1. remove all reference of "disasters" - we want a generic solution that works for everything
  2. update the dag so that any config can be run manually
  3. handled scheduled workflow for pyarc2stac by updating the generate_dags function

i'd accept the input config that looks something like this:

{
  "url": "https://arcgis.url/no2/imageserver",
  "id": "no2",
  "description": "description of no2 data",
  ....(any other stac collection compatible properties that you'd like to overwrite in the collection created by pyarc2stac)
}

let me know if you wanna meet in person to go over the comments.

)

# veda_pyarc2stac_ingest
scheduled_pyarcstac_configs = filter_configs_by_dag(collection_configs, "veda_pyarc2stac_ingest")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this name veda_pyarc2stac_ingest defined? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's located within the veda-tf-state-shared/collections/veda-pyarc2stac.json as a key/value pair

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought the mapping to a dag was automatic based on the dag ID rather than manually. Maybe we can update this, though. Instead of having three different for loops, have one and just map the configs to the respective dag based on the dag ID.

I was wondering if you're missing something like this for the pyarc2stac dag

get_discover_dag(id="veda_discover", event={})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slesaad I've now creating a mapping function for each pipeline type. Additionally, I've added get_ingest_pyarc2stac_dag(id="veda_pyarc2stac_ingest", event={}) per recommendation to account for manual runs.

Comment on lines +105 to +116
# Default from the AWS .json file. If the value is an empty string, it will default to the pyarc2stac generated value.
keys_to_overwrite = ["id", "title", "description", "license",]
for key in keys_to_overwrite:
event_value = event.get(key)
if isinstance(event_value, str) and event_value.strip():
collection[key] = event_value


# Overwrite collection values with template configuration values. This will only overwrite if the value is a non-empty string within the template_conf file.
for key, value in template_conf.items():
if isinstance(value, str) and value.strip():
collection[key] = value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm unsure why two of these are needed

Copy link
Member Author

@kyle-lesinger kyle-lesinger May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To increase the flexibility of the code, these cover the options to a.) use only information from the AWS .json file, b.) use information from the manual triggering. Pyarc2stac generates its own information but I foresee that having only the .json file content overwriting the pyarc2stac outputs cannot account for manual triggering and changes in the configuration file.

Just for clarity: The "event" is from the .json file, while the "template_conf" is from SM2A airflow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if that's needed. i think the veda_discover_pipeline does the same thing without needing to account for the two different options, can you confirm if this is absolutely needed?

Copy link
Member Author

@kyle-lesinger kyle-lesinger May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added new functionality to allow for a specific order of collection values.

  • Manual config triggering = precedence 1
  • AWS .json config. = precedence 2
  • pyarc2stac outputs = precedence 3
    for key in collection.keys():
        collection[key] = (template_conf.get(key) or event.get(key) or collection[key])

Comment on lines +41 to +42
if prefix == "pyarc2stac":
id = f"{prefix}-{collection['id']}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really necessary? why not follow the pattern that's already there? (then you wont need the dag_config values to be a tuple) and if you feel strongly, update all of them to use collection id?

Copy link
Member Author

@kyle-lesinger kyle-lesinger May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only necessary if we want the names of the collection being ingested to be understandable within airflow. If we only set it to id = f"{prefix}-{file_name}" then we will have the dags created as pyarc2stac-veda-pyarcstac-1, pyarc2stac-veda-pyarcstac-2, pyarc2stac-veda-pyarcstac-3, etc.

It only assists to reduce the overhead during debug if one of the ingests fails. But it's not needed at all. So I'll let you decide what works best. Just let me know.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense, let's use this pattern for the others too?

'''

dag_configs = {
"veda_discover": (get_discover_dag, "discover"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't really need the values to be a tuple. my_function.__name__ should give you the name of the function as a string and you should be able to do any conditional based on that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that makes sense. I was trying to allow for different naming conventions to be created, but I'll simply draw the name from the string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also use the dictionary key itself (dag_key) - has the same information

Copy link
Member Author

@kyle-lesinger kyle-lesinger May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete. Final function call is:

        "veda_discover":          get_discover_dag,
        "veda_ingest_vector":     get_ingest_vector_dag,
        "veda_pyarc2stac_ingest": get_ingest_pyarc2stac_dag,
    }

And the name comes directly from the function __name__.

@slesaad slesaad changed the title Disaster dag ingest - addition of pyarc2stac Add support for pyarc2stac based ingest May 6, 2025
"stac_version": "1.0.0",
"description": "collection description",
"data_type": "",
"is_periodic": true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: We may need to add a prefix here and in the template. The dashboard prefixed properties dashboard:is_periodic and dashboard:time_density have been handled different ways--in our deprecated workflows API is_periodic is an alias for dashboard:is_periodic but I don't think we support that alias right now

I will raise this in tag up and follow up here--I think the we may need to make adjustments to all of our DAGs that generate collection metadata (not just this new DAG).
cc: @slesaad

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added dashboard to the config file:

template_conf = {
    "url": "",
    "id": "",
    "title": "",
    "stac_version": "",
    "description": "",
    "data_type": "",
    "license": "",
    "dashboard:is_periodic": "",
    "dashboard:time_density": ""
}

@slesaad
Copy link
Member

slesaad commented May 16, 2025

@kyle-lesinger looks pretty good to be, let's resolve the conflicts and then we'll merge

@kyle-lesinger kyle-lesinger requested a review from slesaad July 28, 2025 16:17
@kyle-lesinger
Copy link
Member Author

@slesaad the conflicts have been resolved. Please re-review at your earliest convenience.


# Handle temporal extent separately if it exists in configs.
# This is useful for items with no temporal extent in the initial pyarc2stac item creation
if "temporal" in filtered_event:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this part, we need to replace the temporal extent for those collections which don't have a date. They will produce a dashboard:is_timeless which overrides the dashboard:time_density. Some of the NASA Disasters data has this issue.

@kyle-lesinger kyle-lesinger merged commit 314a0e6 into dev Aug 22, 2025
4 checks passed
@kyle-lesinger kyle-lesinger deleted the disasterDag branch August 22, 2025 14:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants