-
Notifications
You must be signed in to change notification settings - Fork 15.3k
Added Fastapi endpoint for reserialize_dags #47943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Fastapi endpoint for reserialize_dags #47943
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes! Added suggestions
def validate_bundle_names(cls, value): | ||
"""Validate bundle names format and check for duplicates.""" | ||
manager = DagBundlesManager() | ||
all_bundles = list(manager.get_all_dag_bundles()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we need to init and load all the DAG bundles for each request. Maybe we can have a method in it to get_all_dag_bundles_by_name()
|
||
|
||
@dags_router.post( | ||
"/manage/reserialize", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"/manage/reserialize", | |
"/reserialize", |
I think we don't need additional manage
. It would be simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case you won't be able to have a DAG with ID: reserialize
because we have an API router with /public/dags/{dag_id}
) | ||
|
||
except Exception as e: | ||
session.rollback() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could maybe go to finally?
except Exception as e: | ||
session.rollback() | ||
raise HTTPException( | ||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this broad status code. It should also be documented in openapi spec.
Co-authored-by: Bugra Ozturk <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First off, thanks for opening a comprehensive first PR! It looks great.
Unfortunately, we can't add an endpoint that will reserialize all of the DAGs in the api-server process. If you look at our security model, we state that DAG files are not necessary for the scheduler and api-server (it says webserver, but that just hasn't been updated yet 😄), and most importantly, that user provided code should not be executed in those components.
If you look at how the reparse a file feature works, you'll see it needs to communicate with the dag processor via the db in order to carry out reparsing of a specific file. We will need to follow a similar path to do a "reparse everything" feature.
I have a few ideas on how we could do this, but with feature freeze on Airflow 3 being tomorrow (really closer to the end of today 😄), unfortunately, this will have to wait for 3.1 so we have a bit of time to iterate on it.
@jedcunningham, thank you for your clarifications. I will review the security feature and its functionality. It makes sense that user-defined code is not present with the webserver and scheduler, as they can be deployed on separate machines. I will go through the code to understand how the file is re-parsed, and we can discuss your ideas on improving its implementation. On a side note, do you have any updates on the 3.1 release timeline? |
Unfortunately, not at this time. We can merge new features once 3.0 is out though, so the period of time where we cannot get new things like this merged will be approximately 1 month long. |
@jedcunningham thanks for the clarification. Is it okay if I keep working on this feature and it just gets merged next month? |
@jedcunningham i looked into the code for reparse a file feature For this feature if we fetch all the file locations of bundles and create a batch insert DagPriorityParsingRequest. Will it work? Since this would create a parsing results for all of our bundles. file_locations = session.scalars(select(DagModel.fileloc).where(DagModel.name.in_(list(bundles_to_process))))
# Process each bundle
parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for fileloc in file_locations]
session.add_all(parsing_requests) lemme know your thoughts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we need to use DagPriorityParsingRequest
to trigger the reparsing of appropriate files. There is no DagModel.name
I guess you mean DagModel.bundle_name
?
Also people can use no bundle I think. (bundle_name is None). Therefore if this endpoints is only accessible providing the bundles_to_process
such setup will not be able to use this feature. Maybe using the dagfolder. (there can be multiple dag folders), no args at all and we reparse 'everything'.
This leaves much room for optimization. We really should use a cache across argument resolution, outlet_events, and inlet_events. Refactoring can be done later though; this works for now.
closes apache#47499 `TriggerDagRunOperator` requires direct DB access to trigger DAG runs, which is not feasible under AIP-72 / Task SDK. Several approaches were considered: 1. **New Airflow API provider** - Move `TriggerDagRunOperator` to a new provider built on top of the HTTP provider and Airflow API. - This would allow DAG authors to configure a dedicated Airflow connection. - However, the Public API currently requires API tokens and does not support username/password authentication, making setup cumbersome. 2. **Pass Task Token to API** - Use Task Token from the controller DAG to authenticate API calls. - This would allow triggering DAGs in the same deployment using the execution API. - However, Task-identity tokens have not been implemented yet. 3. **Handle via Task Execution API (Chosen Approach)** - Raise a `TriggerDagRunOperator` exception. - The Task Runner catches this and invokes a new endpoint in the Task Execution API to trigger the target DAG. Since (2) is not yet available, this PR/commit implements (3) as a temporary solution.
) * Add ability to pull XCom when passing multiple map_indexes * Add comment about LazyXComSequence and orignal description * Apply review comments * Update task_sdk/src/airflow/sdk/execution_time/task_runner.py Co-authored-by: Ash Berlin-Taylor <[email protected]> * Fix review comments --------- Co-authored-by: Ash Berlin-Taylor <[email protected]> Co-authored-by: Amogh Desai <[email protected]>
…47938) Fix AttributeError
Using scalars() silently drops the second object.
* refactor: remove type casting in AppflowHook update_flow method * chore: update mypy-boto3-appflow dependency to version 1.37.0 * chore: update mypy-boto3-appflow dependency to version 1.37.0 * chore: update mypy-boto3-appflow dependency to version 1.37.0 --------- Co-authored-by: Simon Prydden <[email protected]> Co-authored-by: Kalyan R <[email protected]>
* Add next run assets data to Dag Schedule component * Update popover bg colors
If we update next run after a backfill, for a dag with catchup enabled, the scheduler may create old runs that are before start date (in the period between the last backfill run and the start date of the dag).
The `src` folder is a folder, not Python package and the file __init__.py there is just wrong :)
) Signed-off-by: Maciej Obuchowski <[email protected]>
We are now including node_modules into our wheel/sdist files, which makes them rather large. This brings them back down. This was a regression with the core move.
Because the rebase messed up codefile I have created a new pull request with latest changes and per suggestions. I am closing this pull request |
Why
#47844
What
Added REST ENDPOINT to replicate CLI functionality of
airflow dags reserialize
Updated endpoint
/public/dags/manage/reserialize