-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-66: Make DAG callbacks bundle aware #45860
base: main
Are you sure you want to change the base?
Conversation
airflow/dag_processing/manager.py
Outdated
# Since we are already going to use that filepath to run callback, | ||
# there is no need to have same file path again in the queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a reason now - we are running the callback on an old bundle version now, so we should leave the existing entry in the queue so the latest file is still parsed when it is its turn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a reason now - we are running the callback on an old bundle version now, so we should leave the existing entry in the queue so the latest file is still parsed when it is its turn.
I have added bundle_version to DagFileInfo, which means versions will be differentiated in the queue. We can keep it this way. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding bundle_version to DagFileInfo sounds good. Good way to differentiate them. But, still don't think we should remove the entries. Left another comment below about this too.
airflow/dag_processing/manager.py
Outdated
# callback_paths_to_del = [x for x in self._callback_to_execute if x not in new_file_paths] | ||
# for path_to_del in callback_paths_to_del: | ||
# del self._callback_to_execute[path_to_del] | ||
callback_paths_to_del = [x for x in self._callback_to_execute if x not in new_file_paths] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to be a little smarter about this - the file could still exist in the old bundle version, but not the latest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this code was added cause of SLA callbacks: #30076. I also removed filtering of _file_path_queue in the method since there's a possibility we are just working in a different version of the bundle
@@ -2212,15 +2213,14 @@ def safe_dag_id(self): | |||
return self.dag_id.replace(".", "__dot__") | |||
|
|||
@property | |||
def relative_fileloc(self) -> pathlib.Path | None: | |||
def relative_fileloc(self) -> pathlib.Path: | |||
"""File location of the importable dag 'file' relative to the configured DAGs folder.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docstring needs updating. Longer term fileloc
should just be relative from the start: #45623
3ab1310
to
1bd1551
Compare
airflow/dag_processing/manager.py
Outdated
from airflow.dag_processing.bundles.manager import DagBundlesManager | ||
|
||
DagBundlesManager().sync_bundles_to_db() | ||
self._bundles_manager = DagBundlesManager().sync_bundles_to_db() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._bundles_manager = DagBundlesManager().sync_bundles_to_db() | |
self._bundles_manager.sync_bundles_to_db() |
Doesn't the factory create it for us?
airflow/dag_processing/manager.py
Outdated
@@ -263,7 +269,9 @@ def deactivate_stale_dags( | |||
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is | |||
# no longer present in the file. We have a stale_dag_threshold configured to prevent a | |||
# significant delay in deactivation of stale dags when a large timeout is configured | |||
dag_file_path = DagFileInfo(path=dag.fileloc, bundle_name=dag.bundle_name) | |||
dag_file_path = DagFileInfo( | |||
path=dag.fileloc, bundle_name=dag.bundle_name, bundle_version=dag.bundle_version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to have the version here - we want to see if the dag is still present in the "latest" (a.k.a none) version.
airflow/dag_processing/manager.py
Outdated
if file_info in self._file_path_queue: | ||
# Remove DagFileInfo matching DagFileInfo from self._file_path_queue | ||
# Since we are already going to use that DagFileInfo to run callback, | ||
# there is no need to have same DagFileInfo again in the queue | ||
self._file_path_queue = deque( | ||
file_path for file_path in self._file_path_queue if file_path != request.full_filepath | ||
file_path for file_path in self._file_path_queue if file_path != file_info | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to do this anymore. Leave the existing entry, if it's in there, in there. Just add the versioned one to run the callback.
airflow/dag_processing/manager.py
Outdated
@@ -477,7 +491,8 @@ def _refresh_dag_bundles(self): | |||
|
|||
new_file_paths = [f for f in self._file_paths if f.bundle_name != bundle.name] | |||
new_file_paths.extend( | |||
DagFileInfo(path=path, bundle_name=bundle.name) for path in bundle_file_paths | |||
DagFileInfo(path=path, bundle_name=bundle.name, bundle_version=bundle_model.version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. We want _file_stat
to be "versionless", which is easier if we just leave these all as None.
1bd1551
to
0520626
Compare
This involves using relative paths in the callbacks, resolving the full path and using the it to queue the callback in the file processor process.
0520626
to
e6a6b78
Compare
This involves using relative paths in the callbacks, resolving the full path and using it to queue the callback in the file processor process.
Closes: #45496