From 14d7416d73a8df3f732989d3ecd575aef05e12f3 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 13 Jan 2025 16:30:47 +0100 Subject: [PATCH 1/8] Add bundle_name to ParseImportError This PR adds bundle_name to ParseImportError. Future work would make the filename relative to the bundle path and that means we need to include bundle_name as part of the ParseImportError so that if two DAG files are having the same filename, we could differentiate them by the bundle they belong. --- airflow/api/common/delete_dag.py | 5 +- .../endpoints/import_error_endpoint.py | 27 +- airflow/api_connexion/schemas/error_schema.py | 1 + .../core_api/routes/public/import_error.py | 1 + airflow/dag_processing/collection.py | 34 +- airflow/dag_processing/manager.py | 7 +- ...0_0_add_bundle_name_to_parseimporterror.py | 50 + airflow/models/errors.py | 3 +- airflow/utils/db.py | 2 +- airflow/www/utils.py | 11 +- airflow/www/views.py | 9 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 3526 ++++++++--------- docs/apache-airflow/migrations-ref.rst | 4 +- tests/dag_processing/test_collection.py | 23 +- 15 files changed, 1908 insertions(+), 1797 deletions(-) create mode 100644 airflow/migrations/versions/0056_3_0_0_add_bundle_name_to_parseimporterror.py diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index 1e9d4a80dbd67..fff64a6e53771 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -75,7 +75,10 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = # This handles the case when the dag_id is changed in the file session.execute( delete(ParseImportError) - .where(ParseImportError.filename == dag.fileloc) + .where( + ParseImportError.filename == dag.fileloc, + ParseImportError.bundle_name == dag.get_bundle_name(session), + ) .execution_options(synchronize_session="fetch") ) diff --git a/airflow/api_connexion/endpoints/import_error_endpoint.py b/airflow/api_connexion/endpoints/import_error_endpoint.py index d2780fe941b13..b4e19a1edbcf8 100644 --- a/airflow/api_connexion/endpoints/import_error_endpoint.py +++ b/airflow/api_connexion/endpoints/import_error_endpoint.py @@ -19,7 +19,7 @@ from collections.abc import Sequence from typing import TYPE_CHECKING -from sqlalchemy import func, select +from sqlalchemy import func, select, tuple_ from airflow.api_connexion import security from airflow.api_connexion.exceptions import NotFound, PermissionDenied @@ -61,7 +61,9 @@ def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) -> readable_dag_ids = security.get_readable_dags() file_dag_ids = { dag_id[0] - for dag_id in session.query(DagModel.dag_id).filter(DagModel.fileloc == error.filename).all() + for dag_id in session.query(DagModel.dag_id) + .filter(DagModel.fileloc == error.filename, DagModel.bundle_name == error.bundle_name) + .all() } # Can the user read any DAGs in the file? @@ -98,9 +100,17 @@ def get_import_errors( if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs readable_dag_ids = security.get_readable_dags() - dagfiles_stmt = select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids)) - query = query.where(ParseImportError.filename.in_(dagfiles_stmt)) - count_query = count_query.where(ParseImportError.filename.in_(dagfiles_stmt)) + dagfiles_stmt = ( + select(DagModel.fileloc, DagModel.bundle_name) + .distinct() + .where(DagModel.dag_id.in_(readable_dag_ids)) + ) + query = query.where( + tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt) + ) + count_query = count_query.where( + tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt) + ) total_entries = session.scalars(count_query).one() import_errors = session.scalars(query.offset(offset).limit(limit)).all() @@ -109,7 +119,12 @@ def get_import_errors( for import_error in import_errors: # Check if user has read access to all the DAGs defined in the file file_dag_ids = ( - session.query(DagModel.dag_id).filter(DagModel.fileloc == import_error.filename).all() + session.query(DagModel.dag_id) + .filter( + DagModel.fileloc == import_error.filename, + DagModel.bundle_name == import_error.bundle_name, + ) + .all() ) requests: Sequence[IsAuthorizedDagRequest] = [ { diff --git a/airflow/api_connexion/schemas/error_schema.py b/airflow/api_connexion/schemas/error_schema.py index 8f117fb9666b2..237abcc091d41 100644 --- a/airflow/api_connexion/schemas/error_schema.py +++ b/airflow/api_connexion/schemas/error_schema.py @@ -35,6 +35,7 @@ class Meta: import_error_id = auto_field("id", dump_only=True) timestamp = auto_field(format="iso", dump_only=True) filename = auto_field(dump_only=True) + bundle_name = auto_field(dump_only=True) stack_trace = auto_field("stacktrace", dump_only=True) diff --git a/airflow/api_fastapi/core_api/routes/public/import_error.py b/airflow/api_fastapi/core_api/routes/public/import_error.py index 89ad78403a430..01caf9048e2d6 100644 --- a/airflow/api_fastapi/core_api/routes/public/import_error.py +++ b/airflow/api_fastapi/core_api/routes/public/import_error.py @@ -74,6 +74,7 @@ def get_import_errors( "id", "timestamp", "filename", + "bundle_name", "stacktrace", ], ParseImportError, diff --git a/airflow/dag_processing/collection.py b/airflow/dag_processing/collection.py index 6e0b627198995..5712958b55b95 100644 --- a/airflow/dag_processing/collection.py +++ b/airflow/dag_processing/collection.py @@ -241,21 +241,36 @@ def _update_dag_warnings( session.merge(warning_to_add) -def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str], session: Session): +def _update_import_errors( + files_parsed: set[str], bundle_name: str, import_errors: dict[str, str], session: Session +): from airflow.listeners.listener import get_listener_manager # We can remove anything from files parsed in this batch that doesn't have an error. We need to remove old # errors (i.e. from files that are removed) separately - session.execute(delete(ParseImportError).where(ParseImportError.filename.in_(list(files_parsed)))) + session.execute( + delete(ParseImportError).where( + ParseImportError.filename.in_(list(files_parsed)), ParseImportError.bundle_name == bundle_name + ) + ) - existing_import_error_files = set(session.scalars(select(ParseImportError.filename))) + existing_import_error_files = set( + session.execute(select(ParseImportError.filename, ParseImportError.bundle_name)) + ) # Add the errors of the processed files for filename, stacktrace in import_errors.items(): - if filename in existing_import_error_files: - session.query(ParseImportError).where(ParseImportError.filename == filename).update( - {"filename": filename, "timestamp": utcnow(), "stacktrace": stacktrace}, + if (filename, bundle_name) in existing_import_error_files: + session.query(ParseImportError).where( + ParseImportError.filename == filename, ParseImportError.bundle_name == bundle_name + ).update( + { + "filename": filename, + "bundle_name": bundle_name, + "timestamp": utcnow(), + "stacktrace": stacktrace, + }, ) # sending notification when an existing dag import error occurs get_listener_manager().hook.on_existing_dag_import_error(filename=filename, stacktrace=stacktrace) @@ -263,13 +278,16 @@ def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str], session.add( ParseImportError( filename=filename, + bundle_name=bundle_name, timestamp=utcnow(), stacktrace=stacktrace, ) ) # sending notification when a new dag import error occurs get_listener_manager().hook.on_new_dag_import_error(filename=filename, stacktrace=stacktrace) - session.query(DagModel).filter(DagModel.fileloc == filename).update({"has_import_errors": True}) + session.query(DagModel).filter( + DagModel.fileloc == filename, DagModel.bundle_name == bundle_name + ).update({"has_import_errors": True}) def update_dag_parsing_results_in_db( @@ -314,7 +332,6 @@ def update_dag_parsing_results_in_db( try: DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session) # Write Serialized DAGs to DB, capturing errors - # Write Serialized DAGs to DB, capturing errors for dag in dags: serialize_errors.extend(_serialize_dag_capturing_errors(dag, session)) except OperationalError: @@ -332,6 +349,7 @@ def update_dag_parsing_results_in_db( good_dag_filelocs = {dag.fileloc for dag in dags if dag.fileloc not in import_errors} _update_import_errors( files_parsed=good_dag_filelocs, + bundle_name=bundle_name, import_errors=import_errors, session=session, ) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 0b831d238d1c3..217e3faa5ead2 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -40,7 +40,7 @@ import attrs from setproctitle import setproctitle -from sqlalchemy import delete, select, update +from sqlalchemy import and_, delete, select, update from tabulate import tabulate from uuid6 import uuid7 @@ -756,7 +756,10 @@ def clear_nonexistent_import_errors(self, session=NEW_SESSION): if self._file_paths: query = query.where( - ParseImportError.filename.notin_([f.path for f in self._file_paths]), + and_( + ParseImportError.filename.notin_([f.path for f in self._file_paths]), + ParseImportError.bundle_name.notin_([f.bundle_name for f in self._file_paths]), + ) ) session.execute(query.execution_options(synchronize_session="fetch")) diff --git a/airflow/migrations/versions/0056_3_0_0_add_bundle_name_to_parseimporterror.py b/airflow/migrations/versions/0056_3_0_0_add_bundle_name_to_parseimporterror.py new file mode 100644 index 0000000000000..ea110e50b0e45 --- /dev/null +++ b/airflow/migrations/versions/0056_3_0_0_add_bundle_name_to_parseimporterror.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +add bundle_name to ParseImportError. + +Revision ID: 03de77aaa4ec +Revises: e39a26ac59f6 +Create Date: 2025-01-08 10:38:02.108760 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "03de77aaa4ec" +down_revision = "e39a26ac59f6" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Apply add bundle_name to ParseImportError.""" + with op.batch_alter_table("import_error", schema=None) as batch_op: + batch_op.add_column(sa.Column("bundle_name", sa.String(length=250), nullable=True)) + + +def downgrade(): + """Unapply add bundle_name to ParseImportError.""" + with op.batch_alter_table("import_error", schema=None) as batch_op: + batch_op.drop_column("bundle_name") diff --git a/airflow/models/errors.py b/airflow/models/errors.py index ff05a1385a966..21c2236e2c18b 100644 --- a/airflow/models/errors.py +++ b/airflow/models/errors.py @@ -19,7 +19,7 @@ from sqlalchemy import Column, Integer, String, Text -from airflow.models.base import Base +from airflow.models.base import Base, StringID from airflow.utils.sqlalchemy import UtcDateTime @@ -30,4 +30,5 @@ class ParseImportError(Base): id = Column(Integer, primary_key=True) timestamp = Column(UtcDateTime) filename = Column(String(1024)) + bundle_name = Column(StringID()) stacktrace = Column(Text) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 1a1eb6f4d3500..ed7eb1090416c 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "e39a26ac59f6", + "3.0.0": "03de77aaa4ec", } diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 9c31942457487..7860e3cf7282d 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -198,14 +198,19 @@ def encode_dag_run( return encoded_dag_run, None -def check_import_errors(fileloc, session): +def check_import_errors(fileloc, bundle_name, session): # Check dag import errors import_errors = session.scalars( - select(ParseImportError).where(ParseImportError.filename == fileloc) + select(ParseImportError).where( + ParseImportError.filename == fileloc, ParseImportError.bundle_name == bundle_name + ) ).all() if import_errors: for import_error in import_errors: - flash(f"Broken DAG: [{import_error.filename}] {import_error.stacktrace}", "dag_import_error") + flash( + f"Broken DAG: [{import_error.filename}, Bundle name: {bundle_name}] {import_error.stacktrace}", + "dag_import_error", + ) def check_dag_warnings(dag_id, session): diff --git a/airflow/www/views.py b/airflow/www/views.py index d2751d18046d4..5b550089cf1bd 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1019,7 +1019,10 @@ def index(self): import_errors = import_errors.where( ParseImportError.filename.in_( select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(filter_dag_ids)) - ) + ), + ParseImportError.bundle_name.in_( + select(DagModel.bundle_name).distinct().where(DagModel.dag_id.in_(filter_dag_ids)) + ), ) import_errors = session.scalars(import_errors) @@ -2884,10 +2887,10 @@ def grid(self, dag_id: str, session: Session = NEW_SESSION): dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session) url_serializer = URLSafeSerializer(current_app.config["SECRET_KEY"]) dag_model = DagModel.get_dagmodel(dag_id, session=session) - if not dag: + if not dag or not dag_model: flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error") return redirect(url_for("Airflow.index")) - wwwutils.check_import_errors(dag.fileloc, session) + wwwutils.check_import_errors(dag.fileloc, dag_model.bundle_name, session) wwwutils.check_dag_warnings(dag.dag_id, session) included_events_raw = conf.get("webserver", "audit_view_included_events", fallback="") diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 86afe05d37001..ad7d1b13c19e7 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -00d5d138d0773a6b700ada4650f5c60cc3972afefd3945ea434dea50abfda834 \ No newline at end of file +79449705d667d8fe382b4b53a0c59bf55bf31eeafc34941c59e6dceccc68d7a7 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 9fe57986ed3b3..f647786452c6b 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + dag_priority_parsing_request @@ -273,2245 +273,2245 @@ import_error - -import_error - -id - - [INTEGER] - NOT NULL - -filename - - [VARCHAR(1024)] - -stacktrace - - [TEXT] - -timestamp - - [TIMESTAMP] + +import_error + +id + + [INTEGER] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +filename + + [VARCHAR(1024)] + +stacktrace + + [TEXT] + +timestamp + + [TIMESTAMP] asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 asset_trigger - -asset_trigger - -asset_id - - [INTEGER] - NOT NULL - -trigger_id - - [INTEGER] - NOT NULL + +asset_trigger + +asset_id + + [INTEGER] + NOT NULL + +trigger_id + + [INTEGER] + NOT NULL asset--asset_trigger - -0..N -1 + +0..N +1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--asset_trigger - -0..N -1 + +0..N +1 task_instance - -task_instance - -id - - [UUID] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSON] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSON] + +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance_note - -task_instance_note - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance_history - -task_instance_history - -id - - [INTEGER] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +id + + [INTEGER] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 dag_bundle - -dag_bundle - -name - - [VARCHAR(250)] - NOT NULL - -active - - [BOOLEAN] - -last_refreshed - - [TIMESTAMP] - -latest_version - - [VARCHAR(200)] + +dag_bundle + +name + + [VARCHAR(250)] + NOT NULL + +active + + [BOOLEAN] + +last_refreshed + + [TIMESTAMP] + +latest_version + + [VARCHAR(200)] dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -bundle_name - - [VARCHAR(250)] - -dag_display_name - - [VARCHAR(2000)] - -default_view - - [VARCHAR(25)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_active - - [BOOLEAN] - -is_paused - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -latest_bundle_version - - [VARCHAR(200)] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +bundle_name + + [VARCHAR(250)] + +dag_display_name + + [VARCHAR(2000)] + +default_view + + [VARCHAR(25)] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_active + + [BOOLEAN] + +is_paused + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +latest_bundle_version + + [VARCHAR(200)] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag_bundle--dag - -0..N -{0,1} + +0..N +{0,1} dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 dag_schedule_asset_name_reference - -dag_schedule_asset_name_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_name_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL dag--dag_schedule_asset_name_reference - -0..N -1 + +0..N +1 dag_schedule_asset_uri_reference - -dag_schedule_asset_uri_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_uri_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL dag--dag_schedule_asset_uri_reference - -0..N -1 + +0..N +1 dag_version - -dag_version - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -version_number - - [INTEGER] - NOT NULL + +dag_version + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +version_number + + [INTEGER] + NOT NULL dag--dag_version - -0..N -1 + +0..N +1 dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 deadline - -deadline - -id - - [UUID] - NOT NULL - -callback - - [VARCHAR(500)] - NOT NULL - -callback_kwargs - - [JSON] - -dag_id - - [VARCHAR(250)] - -dagrun_id - - [INTEGER] - -deadline - - [TIMESTAMP] - NOT NULL + +deadline + +id + + [UUID] + NOT NULL + +callback + + [VARCHAR(500)] + NOT NULL + +callback_kwargs + + [JSON] + +dag_id + + [VARCHAR(250)] + +dagrun_id + + [INTEGER] + +deadline + + [TIMESTAMP] + NOT NULL dag--deadline - -0..N -{0,1} + +0..N +{0,1} dag_version--task_instance - -0..N -{0,1} + +0..N +{0,1} dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -bundle_version - - [VARCHAR(250)] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [JSONB] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [JSONB] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] dag_version--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_code - -dag_code - -id - - [UUID] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL + +dag_code + +id + + [UUID] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +source_code + + [TEXT] + NOT NULL + +source_code_hash + + [VARCHAR(32)] + NOT NULL dag_version--dag_code - -0..N -1 + +0..N +1 serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] dag_version--serialized_dag - -0..N -1 + +0..N +1 dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--deadline - -0..N -{0,1} + +0..N +{0,1} backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - NOT NULL - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 62013ff8f799c..5f90476af3570 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``e39a26ac59f6`` (head) | ``38770795785f`` | ``3.0.0`` | remove pickled data from dagrun table. | +| ``03de77aaa4ec`` (head) | ``e39a26ac59f6`` | ``3.0.0`` | add bundle_name to ParseImportError. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``e39a26ac59f6`` | ``38770795785f`` | ``3.0.0`` | remove pickled data from dagrun table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``38770795785f`` | ``5c9c0231baa2`` | ``3.0.0`` | Add asset reference models. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/dag_processing/test_collection.py b/tests/dag_processing/test_collection.py index 8ef07514bbfbe..c1a6d71f43b35 100644 --- a/tests/dag_processing/test_collection.py +++ b/tests/dag_processing/test_collection.py @@ -329,9 +329,11 @@ def test_new_import_error_replaces_old(self, session, dag_import_error_listener, Test that existing import error is updated and new record not created for a dag with the same filename """ + bundle_name = "testing" filename = "abc.py" prev_error = ParseImportError( filename=filename, + bundle_name=bundle_name, timestamp=tz.utcnow(), stacktrace="Some error", ) @@ -340,7 +342,7 @@ def test_new_import_error_replaces_old(self, session, dag_import_error_listener, prev_error_id = prev_error.id update_dag_parsing_results_in_db( - bundle_name="testing", + bundle_name=bundle_name, bundle_version=None, dags=[], import_errors={"abc.py": "New error"}, @@ -348,7 +350,11 @@ def test_new_import_error_replaces_old(self, session, dag_import_error_listener, session=session, ) - import_error = session.query(ParseImportError).filter(ParseImportError.filename == filename).one() + import_error = ( + session.query(ParseImportError) + .filter(ParseImportError.filename == filename, ParseImportError.bundle_name == bundle_name) + .one() + ) # assert that the ID of the import error did not change assert import_error.id == prev_error_id @@ -361,9 +367,11 @@ def test_new_import_error_replaces_old(self, session, dag_import_error_listener, def test_remove_error_clears_import_error(self, testing_dag_bundle, session): # Pre-condition: there is an import error for the dag file + bundle_name = "testing" filename = "abc.py" prev_error = ParseImportError( filename=filename, + bundle_name=bundle_name, timestamp=tz.utcnow(), stacktrace="Some error", ) @@ -373,6 +381,7 @@ def test_remove_error_clears_import_error(self, testing_dag_bundle, session): session.add( ParseImportError( filename="def.py", + bundle_name=bundle_name, timestamp=tz.utcnow(), stacktrace="Some error", ) @@ -380,21 +389,21 @@ def test_remove_error_clears_import_error(self, testing_dag_bundle, session): session.flush() # Sanity check of pre-condition - import_errors = set(session.scalars(select(ParseImportError.filename))) - assert import_errors == {"abc.py", "def.py"} + import_errors = set(session.execute(select(ParseImportError.filename, ParseImportError.bundle_name))) + assert import_errors == {("abc.py", bundle_name), ("def.py", bundle_name)} dag = DAG(dag_id="test") dag.fileloc = filename import_errors = {} - update_dag_parsing_results_in_db("testing", None, [dag], import_errors, set(), session) + update_dag_parsing_results_in_db(bundle_name, None, [dag], import_errors, set(), session) dag_model: DagModel = session.get(DagModel, (dag.dag_id,)) assert dag_model.has_import_errors is False - import_errors = set(session.scalars(select(ParseImportError.filename))) + import_errors = set(session.execute(select(ParseImportError.filename, ParseImportError.bundle_name))) - assert import_errors == {"def.py"} + assert import_errors == {("def.py", bundle_name)} def test_sync_perm_for_dag_with_dict_access_control(self, session, spy_agency: SpyAgency): """ From dc29d25d88cbe3a2a065b117e56e7daed40d6a39 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 13 Jan 2025 21:30:19 +0100 Subject: [PATCH 2/8] fixup! Add bundle_name to ParseImportError --- airflow/api/common/delete_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index fff64a6e53771..2f6076252ba97 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -77,7 +77,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = delete(ParseImportError) .where( ParseImportError.filename == dag.fileloc, - ParseImportError.bundle_name == dag.get_bundle_name(session), + ParseImportError.bundle_name == dag.bundle_name, ) .execution_options(synchronize_session="fetch") ) From a2a307845e8aac3d6437fdf913306634bf76ff2e Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 14 Jan 2025 14:08:12 +0100 Subject: [PATCH 3/8] fixup! fixup! Add bundle_name to ParseImportError --- airflow/dag_processing/manager.py | 11 +++++++---- airflow/www/views.py | 13 +++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 217e3faa5ead2..d0876c6e8098f 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -40,7 +40,7 @@ import attrs from setproctitle import setproctitle -from sqlalchemy import and_, delete, select, update +from sqlalchemy import delete, select, tuple_, update from tabulate import tabulate from uuid6 import uuid7 @@ -756,9 +756,12 @@ def clear_nonexistent_import_errors(self, session=NEW_SESSION): if self._file_paths: query = query.where( - and_( - ParseImportError.filename.notin_([f.path for f in self._file_paths]), - ParseImportError.bundle_name.notin_([f.bundle_name for f in self._file_paths]), + ( + tuple_( + (ParseImportError.filename, ParseImportError.bundle_name).notin_( + [(f.path, f.bundle_name) for f in self._file_paths] + ) + ), ) ) diff --git a/airflow/www/views.py b/airflow/www/views.py index 5b550089cf1bd..e5b8509697859 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -74,7 +74,7 @@ from markupsafe import Markup, escape from pendulum.datetime import DateTime from pendulum.parsing.exceptions import ParserError -from sqlalchemy import and_, case, desc, func, inspect, or_, select, union_all +from sqlalchemy import and_, case, desc, func, inspect, or_, select, tuple_, union_all from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import joinedload from wtforms import BooleanField, validators @@ -1017,11 +1017,12 @@ def index(self): if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs import_errors = import_errors.where( - ParseImportError.filename.in_( - select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(filter_dag_ids)) - ), - ParseImportError.bundle_name.in_( - select(DagModel.bundle_name).distinct().where(DagModel.dag_id.in_(filter_dag_ids)) + tuple_( + (ParseImportError.filename, ParseImportError.bundle_name).in_( + select(DagModel.fileloc, DagModel.bundle_name) + .distinct() + .where(DagModel.dag_id.in_(filter_dag_ids)) + ) ), ) From 47a33df218cc6f3ea0adab4c7b88587eb83fa383 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 14 Jan 2025 22:02:30 +0100 Subject: [PATCH 4/8] fixup! fixup! fixup! Add bundle_name to ParseImportError --- .../endpoints/import_error_endpoint.py | 6 ++--- airflow/api_connexion/openapi/v1.yaml | 4 ++++ .../core_api/datamodels/import_error.py | 1 + .../core_api/openapi/v1-generated.yaml | 4 ++++ airflow/dag_processing/manager.py | 6 ++--- .../ui/openapi-gen/requests/schemas.gen.ts | 6 ++++- airflow/ui/openapi-gen/requests/types.gen.ts | 1 + airflow/www/static/js/types/api-generated.ts | 2 ++ airflow/www/views.py | 10 ++++----- .../test_import_error_endpoint.py | 20 ++++++++++++----- providers/tests/fab/auth_manager/conftest.py | 22 +++++++++++++++++++ .../endpoints/test_import_error_endpoint.py | 5 +++++ .../schemas/test_error_schema.py | 3 +++ 13 files changed, 71 insertions(+), 19 deletions(-) diff --git a/airflow/api_connexion/endpoints/import_error_endpoint.py b/airflow/api_connexion/endpoints/import_error_endpoint.py index b4e19a1edbcf8..95225c8693f88 100644 --- a/airflow/api_connexion/endpoints/import_error_endpoint.py +++ b/airflow/api_connexion/endpoints/import_error_endpoint.py @@ -100,13 +100,13 @@ def get_import_errors( if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs readable_dag_ids = security.get_readable_dags() - dagfiles_stmt = ( + dagfiles_stmt = session.execute( select(DagModel.fileloc, DagModel.bundle_name) .distinct() .where(DagModel.dag_id.in_(readable_dag_ids)) - ) + ).all() query = query.where( - tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt) + tuple_(ParseImportError.filename, ParseImportError.bundle_name or None).in_(dagfiles_stmt) ) count_query = count_query.where( tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 0ead649f4215a..c63fb72f2a9a0 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -3370,6 +3370,10 @@ components: type: string readOnly: true description: The filename + bundle_name: + type: string + readOnly: true + description: The bundle name stack_trace: type: string readOnly: true diff --git a/airflow/api_fastapi/core_api/datamodels/import_error.py b/airflow/api_fastapi/core_api/datamodels/import_error.py index 32c139da1a93a..baf1ffa4fb7f1 100644 --- a/airflow/api_fastapi/core_api/datamodels/import_error.py +++ b/airflow/api_fastapi/core_api/datamodels/import_error.py @@ -31,6 +31,7 @@ class ImportErrorResponse(BaseModel): id: int = Field(alias="import_error_id") timestamp: datetime filename: str + bundle_name: str stacktrace: str = Field(alias="stack_trace") diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 14f329ee94a4b..ed26c689aa8cb 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -8440,6 +8440,9 @@ components: filename: type: string title: Filename + bundle_name: + type: string + title: Bundle Name stack_trace: type: string title: Stack Trace @@ -8448,6 +8451,7 @@ components: - import_error_id - timestamp - filename + - bundle_name - stack_trace title: ImportErrorResponse description: Import Error Response. diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index d0876c6e8098f..96c7fe4f0ed5c 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -757,10 +757,8 @@ def clear_nonexistent_import_errors(self, session=NEW_SESSION): if self._file_paths: query = query.where( ( - tuple_( - (ParseImportError.filename, ParseImportError.bundle_name).notin_( - [(f.path, f.bundle_name) for f in self._file_paths] - ) + tuple_(ParseImportError.filename, ParseImportError.bundle_name).notin_( + [(f.path, f.bundle_name) for f in self._file_paths] ), ) ) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 243643bd62d51..080df98714dea 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3259,13 +3259,17 @@ export const $ImportErrorResponse = { type: "string", title: "Filename", }, + bundle_name: { + type: "string", + title: "Bundle Name", + }, stack_trace: { type: "string", title: "Stack Trace", }, }, type: "object", - required: ["import_error_id", "timestamp", "filename", "stack_trace"], + required: ["import_error_id", "timestamp", "filename", "bundle_name", "stack_trace"], title: "ImportErrorResponse", description: "Import Error Response.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index e5a3d549f91cf..c1c53c8843c88 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -795,6 +795,7 @@ export type ImportErrorResponse = { import_error_id: number; timestamp: string; filename: string; + bundle_name: string; stack_trace: string; }; diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index b3ed0790efd71..9508b8f7bc923 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1284,6 +1284,8 @@ export interface components { timestamp?: string; /** @description The filename */ filename?: string; + /** @description The bundle name */ + bundle_name?: string; /** @description The full stackstrace. */ stack_trace?: string; }; diff --git a/airflow/www/views.py b/airflow/www/views.py index e5b8509697859..3beeb66d612b3 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1017,12 +1017,10 @@ def index(self): if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs import_errors = import_errors.where( - tuple_( - (ParseImportError.filename, ParseImportError.bundle_name).in_( - select(DagModel.fileloc, DagModel.bundle_name) - .distinct() - .where(DagModel.dag_id.in_(filter_dag_ids)) - ) + tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_( + select(DagModel.fileloc, DagModel.bundle_name) + .distinct() + .where(DagModel.dag_id.in_(filter_dag_ids)) ), ) diff --git a/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py b/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py index 19509aa558146..2788e511e3276 100644 --- a/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py +++ b/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py @@ -18,6 +18,7 @@ import pytest +from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.models.dag import DagModel from airflow.providers.fab.www.security import permissions from airflow.utils import timezone @@ -60,7 +61,6 @@ def configured_app(minimal_app_for_auth_api): } ] ) - yield app delete_user(app, username="test_single_dag") @@ -123,6 +123,7 @@ def test_should_return_200_with_single_dag_read(self, session): response_data["import_error_id"] = 1 assert response_data == { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -149,6 +150,7 @@ def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, sessio response_data["import_error_id"] = 1 assert response_data == { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", "timestamp": "2020-06-10T12:00:00+00:00", @@ -156,13 +158,16 @@ def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, sessio class TestGetImportErrorsEndpoint(TestBaseImportError): - def test_get_import_errors_single_dag(self, session): + def test_get_import_errors_single_dag(self, configure_testing_dag_bundle, session): + with configure_testing_dag_bundle("/tmp"): + DagBundlesManager().sync_bundles_to_db() for dag_id in TEST_DAG_IDS: fake_filename = f"/tmp/{dag_id}.py" - dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename, bundle_name="testing") session.add(dag_model) importerror = ParseImportError( filename=fake_filename, + bundle_name="testing", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), ) @@ -180,6 +185,7 @@ def test_get_import_errors_single_dag(self, session): "import_errors": [ { "filename": "/tmp/test_dag.py", + "bundle_name": "testing", "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -188,14 +194,17 @@ def test_get_import_errors_single_dag(self, session): "total_entries": 1, } - def test_get_import_errors_single_dag_in_dagfile(self, session): + def test_get_import_errors_single_dag_in_dagfile(self, configure_testing_dag_bundle, session): + with configure_testing_dag_bundle("/tmp"): + DagBundlesManager().sync_bundles_to_db() for dag_id in TEST_DAG_IDS: fake_filename = "/tmp/all_in_one.py" - dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename, bundle_name="testing") session.add(dag_model) importerror = ParseImportError( filename="/tmp/all_in_one.py", + bundle_name="testing", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), ) @@ -213,6 +222,7 @@ def test_get_import_errors_single_dag_in_dagfile(self, session): "import_errors": [ { "filename": "/tmp/all_in_one.py", + "bundle_name": "testing", "import_error_id": 1, "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", "timestamp": "2020-06-10T12:00:00+00:00", diff --git a/providers/tests/fab/auth_manager/conftest.py b/providers/tests/fab/auth_manager/conftest.py index f26a08d19c3e3..ad23c1e1febb0 100644 --- a/providers/tests/fab/auth_manager/conftest.py +++ b/providers/tests/fab/auth_manager/conftest.py @@ -16,7 +16,10 @@ # under the License. from __future__ import annotations +import json import os +from contextlib import contextmanager +from pathlib import Path import pytest @@ -77,3 +80,22 @@ def dagbag(): parse_and_sync_to_db(os.devnull, include_examples=True) return DagBag(read_dags_from_db=True) + + +@pytest.fixture +def configure_testing_dag_bundle(): + """Configure the testing DAG bundle with the provided path, and disable the DAGs folder bundle.""" + + @contextmanager + def _config_bundle(path_to_parse: Path | str): + bundle_config = [ + { + "name": "testing", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"local_folder": str(path_to_parse), "refresh_interval": 0}, + } + ] + with conf_vars({("dag_bundles", "backends"): json.dumps(bundle_config)}): + yield + + return _config_bundle diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py b/tests/api_connexion/endpoints/test_import_error_endpoint.py index 6697c776d9052..4c6d6ec344174 100644 --- a/tests/api_connexion/endpoints/test_import_error_endpoint.py +++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py @@ -90,6 +90,7 @@ def test_response_200(self, session): response_data["import_error_id"] = 1 assert response_data == { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -147,12 +148,14 @@ def test_get_import_errors(self, session): "import_errors": [ { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", }, { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 2, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -184,12 +187,14 @@ def test_get_import_errors_order_by(self, session): "import_errors": [ { "filename": "Lorem_ipsum1.py", + "bundle_name": None, "import_error_id": 1, # id normalized with self._normalize_import_errors "stack_trace": "Lorem ipsum", "timestamp": "2020-06-09T12:00:00+00:00", }, { "filename": "Lorem_ipsum2.py", + "bundle_name": None, "import_error_id": 2, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-08T12:00:00+00:00", diff --git a/tests/api_connexion/schemas/test_error_schema.py b/tests/api_connexion/schemas/test_error_schema.py index c953925c90d43..29d4e222900ec 100644 --- a/tests/api_connexion/schemas/test_error_schema.py +++ b/tests/api_connexion/schemas/test_error_schema.py @@ -55,6 +55,7 @@ def test_serialize(self, session): serialized_data["import_error_id"] = 1 assert serialized_data == { "filename": "lorem.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem Ipsum", "timestamp": "2020-06-10T12:02:44+00:00", @@ -86,12 +87,14 @@ def test_serialize(self, session): "import_errors": [ { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:02:44+00:00", }, { "filename": "Lorem_ipsum.py", + "bundle_name": None, "import_error_id": 2, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:02:44+00:00", From 7fd69ca5565f409121ca15d506a8cfe29f617320 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 15 Jan 2025 09:55:47 +0100 Subject: [PATCH 5/8] Fix tests --- .../endpoints/test_import_error_endpoint.py | 20 ++++++++++++---- .../routes/public/test_import_error.py | 7 ++++++ tests/www/views/test_views_home.py | 10 ++++++-- tests_common/pytest_plugin.py | 24 ++++++++++++------- 4 files changed, 46 insertions(+), 15 deletions(-) diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py b/tests/api_connexion/endpoints/test_import_error_endpoint.py index 4c6d6ec344174..1950c1cd0b206 100644 --- a/tests/api_connexion/endpoints/test_import_error_endpoint.py +++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py @@ -32,6 +32,7 @@ pytestmark = pytest.mark.db_test TEST_DAG_IDS = ["test_dag", "test_dag2"] +BUNDLE_NAME = "dag_maker" @pytest.fixture(scope="module") @@ -77,6 +78,7 @@ def test_response_200(self, session): filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) session.add(import_error) session.commit() @@ -90,7 +92,7 @@ def test_response_200(self, session): response_data["import_error_id"] = 1 assert response_data == { "filename": "Lorem_ipsum.py", - "bundle_name": None, + "bundle_name": BUNDLE_NAME, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -111,6 +113,7 @@ def test_should_raises_401_unauthenticated(self, session): filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) session.add(import_error) session.commit() @@ -133,6 +136,7 @@ def test_get_import_errors(self, session): filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) for _ in range(2) ] @@ -148,14 +152,14 @@ def test_get_import_errors(self, session): "import_errors": [ { "filename": "Lorem_ipsum.py", - "bundle_name": None, + "bundle_name": BUNDLE_NAME, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", }, { "filename": "Lorem_ipsum.py", - "bundle_name": None, + "bundle_name": BUNDLE_NAME, "import_error_id": 2, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -170,6 +174,7 @@ def test_get_import_errors_order_by(self, session): filename=f"Lorem_ipsum{i}.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC") + timedelta(days=-i), + bundle_name=BUNDLE_NAME, ) for i in range(1, 3) ] @@ -187,14 +192,14 @@ def test_get_import_errors_order_by(self, session): "import_errors": [ { "filename": "Lorem_ipsum1.py", - "bundle_name": None, + "bundle_name": BUNDLE_NAME, "import_error_id": 1, # id normalized with self._normalize_import_errors "stack_trace": "Lorem ipsum", "timestamp": "2020-06-09T12:00:00+00:00", }, { "filename": "Lorem_ipsum2.py", - "bundle_name": None, + "bundle_name": BUNDLE_NAME, "import_error_id": 2, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-08T12:00:00+00:00", @@ -209,6 +214,7 @@ def test_order_by_raises_400_for_invalid_attr(self, session): filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) for _ in range(2) ] @@ -229,6 +235,7 @@ def test_should_raises_401_unauthenticated(self, session): filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) for _ in range(2) ] @@ -261,6 +268,7 @@ def test_limit_and_offset(self, url, expected_import_error_ids, session): filename=f"/tmp/file_{i}.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) for i in range(1, 110) ] @@ -279,6 +287,7 @@ def test_should_respect_page_size_limit_default(self, session): filename=f"/tmp/file_{i}.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) for i in range(1, 110) ] @@ -293,6 +302,7 @@ def test_should_return_conf_max_if_req_max_above_conf(self, session): import_errors = [ ParseImportError( filename=f"/tmp/file_{i}.py", + bundle_name=BUNDLE_NAME, stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), ) diff --git a/tests/api_fastapi/core_api/routes/public/test_import_error.py b/tests/api_fastapi/core_api/routes/public/test_import_error.py index 33a3fbaac745c..38965ab7184c1 100644 --- a/tests/api_fastapi/core_api/routes/public/test_import_error.py +++ b/tests/api_fastapi/core_api/routes/public/test_import_error.py @@ -39,6 +39,7 @@ TIMESTAMP3 = datetime(2024, 6, 15, 3, 0, tzinfo=timezone.utc) IMPORT_ERROR_NON_EXISTED_ID = 9999 IMPORT_ERROR_NON_EXISTED_KEY = "non_existed_key" +BUNDLE_NAME = "dag_maker" class TestImportErrorEndpoint: @@ -56,16 +57,19 @@ def setup(self, session=None) -> dict[str, ParseImportError]: """ self._clear_db() import_error1 = ParseImportError( + bundle_name=BUNDLE_NAME, filename=FILENAME1, stacktrace=STACKTRACE1, timestamp=TIMESTAMP1, ) import_error2 = ParseImportError( + bundle_name=BUNDLE_NAME, filename=FILENAME2, stacktrace=STACKTRACE2, timestamp=TIMESTAMP2, ) import_error3 = ParseImportError( + bundle_name=BUNDLE_NAME, filename=FILENAME3, stacktrace=STACKTRACE3, timestamp=TIMESTAMP3, @@ -90,6 +94,7 @@ class TestGetImportError(TestImportErrorEndpoint): "timestamp": TIMESTAMP1, "filename": FILENAME1, "stack_trace": STACKTRACE1, + "bundle_name": BUNDLE_NAME, }, ), ( @@ -100,6 +105,7 @@ class TestGetImportError(TestImportErrorEndpoint): "timestamp": TIMESTAMP2, "filename": FILENAME2, "stack_trace": STACKTRACE2, + "bundle_name": BUNDLE_NAME, }, ), (IMPORT_ERROR_NON_EXISTED_KEY, 404, {}), @@ -119,6 +125,7 @@ def test_get_import_error( "timestamp": from_datetime_to_zulu_without_ms(expected_body["timestamp"]), "filename": expected_body["filename"], "stack_trace": expected_body["stack_trace"], + "bundle_name": BUNDLE_NAME, } assert response.json() == expected_json diff --git a/tests/www/views/test_views_home.py b/tests/www/views/test_views_home.py index 2411b46467dbe..ae0e2c3e5d719 100644 --- a/tests/www/views/test_views_home.py +++ b/tests/www/views/test_views_home.py @@ -240,7 +240,11 @@ def _broken_dags(session): from airflow.models.errors import ParseImportError for dag_id in TEST_FILTER_DAG_IDS: - session.add(ParseImportError(filename=f"/{dag_id}.py", stacktrace="Some Error\nTraceback:\n")) + session.add( + ParseImportError( + filename=f"/{dag_id}.py", bundle_name="dag_maker", stacktrace="Some Error\nTraceback:\n" + ) + ) session.commit() @@ -253,7 +257,9 @@ def _broken_dags_after_working(dag_maker, session): pass # Then create an import error against that file - session.add(ParseImportError(filename=path, stacktrace="Some Error\nTraceback:\n")) + session.add( + ParseImportError(filename=path, bundle_name="dag_maker", stacktrace="Some Error\nTraceback:\n") + ) session.commit() diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py index 0516f8431fe76..48701bcaf0e41 100644 --- a/tests_common/pytest_plugin.py +++ b/tests_common/pytest_plugin.py @@ -832,7 +832,7 @@ def __exit__(self, type, value, traceback): return dag.clear(session=self.session) - dag.sync_to_db(session=self.session) + dag.bulk_write_to_db(self.bundle_name, None, [dag], session=self.session) if dag.access_control: from airflow.www.security_appless import ApplessAirflowSecurityManager @@ -951,13 +951,10 @@ def sync_dagbag_to_db(self): self.dagbag.sync_to_db() return - from airflow.models.dagbundle import DagBundleModel - - if self.session.query(DagBundleModel).filter(DagBundleModel.name == "dag_maker").count() == 0: - self.session.add(DagBundleModel(name="dag_maker")) - self.session.commit() - - self.dagbag.sync_to_db("dag_maker", None) + self.dagbag.sync_to_db( + self.bundle_name, + None, + ) def __call__( self, @@ -966,6 +963,7 @@ def __call__( serialized=want_serialized, activate_assets=want_activate_assets, fileloc=None, + bundle_name=None, session=None, **kwargs, ): @@ -997,6 +995,16 @@ def __call__( self.dag.fileloc = fileloc or request.module.__file__ self.want_serialized = serialized self.want_activate_assets = activate_assets + self.bundle_name = bundle_name or "dag_maker" + if AIRFLOW_V_3_0_PLUS: + from airflow.models.dagbundle import DagBundleModel + + if ( + self.session.query(DagBundleModel).filter(DagBundleModel.name == self.bundle_name).count() + == 0 + ): + self.session.add(DagBundleModel(name=self.bundle_name)) + self.session.commit() return self From 4f5ad41e5f4f27f42fa7fb600406035bce1e3cf0 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 15 Jan 2025 10:05:43 +0100 Subject: [PATCH 6/8] Move migration --- .../versions/0050_3_0_0_add_dagbundlemodel.py | 6 + ...0_0_add_bundle_name_to_parseimporterror.py | 50 -- airflow/utils/db.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 554 +++++++++--------- docs/apache-airflow/migrations-ref.rst | 4 +- 6 files changed, 288 insertions(+), 330 deletions(-) delete mode 100644 airflow/migrations/versions/0056_3_0_0_add_bundle_name_to_parseimporterror.py diff --git a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py index 5322be0abe763..16e8e91cbe277 100644 --- a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py +++ b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py @@ -56,6 +56,9 @@ def upgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.add_column(sa.Column("bundle_version", sa.String(length=250), nullable=True)) + with op.batch_alter_table("import_error", schema=None) as batch_op: + batch_op.add_column(sa.Column("bundle_name", sa.String(length=250), nullable=True)) + def downgrade(): with op.batch_alter_table("dag", schema=None) as batch_op: @@ -65,4 +68,7 @@ def downgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_column("bundle_version") + with op.batch_alter_table("import_error", schema=None) as batch_op: + batch_op.drop_column("bundle_name") + op.drop_table("dag_bundle") diff --git a/airflow/migrations/versions/0056_3_0_0_add_bundle_name_to_parseimporterror.py b/airflow/migrations/versions/0056_3_0_0_add_bundle_name_to_parseimporterror.py deleted file mode 100644 index ea110e50b0e45..0000000000000 --- a/airflow/migrations/versions/0056_3_0_0_add_bundle_name_to_parseimporterror.py +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -add bundle_name to ParseImportError. - -Revision ID: 03de77aaa4ec -Revises: e39a26ac59f6 -Create Date: 2025-01-08 10:38:02.108760 - -""" - -from __future__ import annotations - -import sqlalchemy as sa -from alembic import op - -# revision identifiers, used by Alembic. -revision = "03de77aaa4ec" -down_revision = "e39a26ac59f6" -branch_labels = None -depends_on = None -airflow_version = "3.0.0" - - -def upgrade(): - """Apply add bundle_name to ParseImportError.""" - with op.batch_alter_table("import_error", schema=None) as batch_op: - batch_op.add_column(sa.Column("bundle_name", sa.String(length=250), nullable=True)) - - -def downgrade(): - """Unapply add bundle_name to ParseImportError.""" - with op.batch_alter_table("import_error", schema=None) as batch_op: - batch_op.drop_column("bundle_name") diff --git a/airflow/utils/db.py b/airflow/utils/db.py index ed7eb1090416c..1a1eb6f4d3500 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "03de77aaa4ec", + "3.0.0": "e39a26ac59f6", } diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index ad7d1b13c19e7..5e4843e2b1882 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -79449705d667d8fe382b4b53a0c59bf55bf31eeafc34941c59e6dceccc68d7a7 \ No newline at end of file +32350c3c7d1dd29eca4205458eab946ece6628f7f53d30c4e0a8f1ee914f1372 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index f647786452c6b..663f30f9c959c 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1047,30 +1047,30 @@ task_instance--task_map - + 0..N -1 +1 task_instance--task_map - + 0..N -1 +1 task_instance--task_map - + 0..N -1 +1 task_instance--task_map - + 0..N -1 +1 @@ -1120,30 +1120,30 @@ task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 @@ -1731,41 +1731,41 @@ deadline - -deadline - -id - - [UUID] - NOT NULL - -callback - - [VARCHAR(500)] - NOT NULL - -callback_kwargs - - [JSON] - -dag_id - - [VARCHAR(250)] - -dagrun_id - - [INTEGER] - -deadline - - [TIMESTAMP] - NOT NULL + +deadline + +id + + [UUID] + NOT NULL + +callback + + [VARCHAR(500)] + NOT NULL + +callback_kwargs + + [JSON] + +dag_id + + [VARCHAR(250)] + +dagrun_id + + [INTEGER] + +deadline + + [TIMESTAMP] + NOT NULL dag--deadline - -0..N + +0..N {0,1} @@ -1778,104 +1778,108 @@ dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [JSONB] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +bundle_version + + [VARCHAR(250)] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [JSONB] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] dag_version--dag_run - -0..N + +0..N {0,1} @@ -1975,121 +1979,121 @@ dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--deadline - -0..N -{0,1} + +0..N +{0,1} backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 @@ -2120,9 +2124,9 @@ log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} @@ -2186,16 +2190,16 @@ backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 @@ -2315,28 +2319,28 @@ ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} @@ -2426,28 +2430,28 @@ ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} @@ -2491,16 +2495,16 @@ ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 5f90476af3570..62013ff8f799c 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,9 +39,7 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``03de77aaa4ec`` (head) | ``e39a26ac59f6`` | ``3.0.0`` | add bundle_name to ParseImportError. | -+-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``e39a26ac59f6`` | ``38770795785f`` | ``3.0.0`` | remove pickled data from dagrun table. | +| ``e39a26ac59f6`` (head) | ``38770795785f`` | ``3.0.0`` | remove pickled data from dagrun table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``38770795785f`` | ``5c9c0231baa2`` | ``3.0.0`` | Add asset reference models. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ From bed2fa1ef334bd2070e39153a183d615a1f63851 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 15 Jan 2025 16:57:18 +0100 Subject: [PATCH 7/8] Fix compact tests --- tests_common/pytest_plugin.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py index 48701bcaf0e41..dab785c5b77e4 100644 --- a/tests_common/pytest_plugin.py +++ b/tests_common/pytest_plugin.py @@ -832,7 +832,10 @@ def __exit__(self, type, value, traceback): return dag.clear(session=self.session) - dag.bulk_write_to_db(self.bundle_name, None, [dag], session=self.session) + if AIRFLOW_V_3_0_PLUS: + dag.bulk_write_to_db(self.bundle_name, None, [dag], session=self.session) + else: + dag.sync_to_db(session=self.session) if dag.access_control: from airflow.www.security_appless import ApplessAirflowSecurityManager From 834f7e1779543afb3b883dd2e865e2bc3ecaeb68 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 15 Jan 2025 17:08:52 +0100 Subject: [PATCH 8/8] Fix fab failures --- .../test_import_error_endpoint.py | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py b/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py index 2788e511e3276..04bf4182ec359 100644 --- a/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py +++ b/providers/tests/fab/auth_manager/api_endpoints/test_import_error_endpoint.py @@ -35,6 +35,7 @@ ] TEST_DAG_IDS = ["test_dag", "test_dag2"] +BUNDLE_NAME = "testing" @pytest.fixture(scope="module") @@ -88,11 +89,14 @@ def _normalize_import_errors(import_errors): class TestGetImportErrorEndpoint(TestBaseImportError): - def test_should_raise_403_forbidden_without_dag_read(self, session): + def test_should_raise_403_forbidden_without_dag_read(self, configure_testing_dag_bundle, session): + with configure_testing_dag_bundle("/tmp"): + DagBundlesManager().sync_bundles_to_db() import_error = ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) session.add(import_error) session.commit() @@ -103,13 +107,16 @@ def test_should_raise_403_forbidden_without_dag_read(self, session): assert response.status_code == 403 - def test_should_return_200_with_single_dag_read(self, session): - dag_model = DagModel(dag_id=TEST_DAG_IDS[0], fileloc="Lorem_ipsum.py") + def test_should_return_200_with_single_dag_read(self, session, configure_testing_dag_bundle): + with configure_testing_dag_bundle("/tmp"): + DagBundlesManager().sync_bundles_to_db() + dag_model = DagModel(dag_id=TEST_DAG_IDS[0], fileloc="Lorem_ipsum.py", bundle_name=BUNDLE_NAME) session.add(dag_model) import_error = ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) session.add(import_error) session.commit() @@ -123,20 +130,25 @@ def test_should_return_200_with_single_dag_read(self, session): response_data["import_error_id"] = 1 assert response_data == { "filename": "Lorem_ipsum.py", - "bundle_name": None, + "bundle_name": BUNDLE_NAME, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", } - def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, session): + def test_should_return_200_redacted_with_single_dag_read_in_dagfile( + self, configure_testing_dag_bundle, session + ): + with configure_testing_dag_bundle("/tmp"): + DagBundlesManager().sync_bundles_to_db() for dag_id in TEST_DAG_IDS: - dag_model = DagModel(dag_id=dag_id, fileloc="Lorem_ipsum.py") + dag_model = DagModel(dag_id=dag_id, fileloc="Lorem_ipsum.py", bundle_name=BUNDLE_NAME) session.add(dag_model) import_error = ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), + bundle_name=BUNDLE_NAME, ) session.add(import_error) session.commit() @@ -150,7 +162,7 @@ def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, sessio response_data["import_error_id"] = 1 assert response_data == { "filename": "Lorem_ipsum.py", - "bundle_name": None, + "bundle_name": BUNDLE_NAME, "import_error_id": 1, "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", "timestamp": "2020-06-10T12:00:00+00:00", @@ -163,11 +175,11 @@ def test_get_import_errors_single_dag(self, configure_testing_dag_bundle, sessio DagBundlesManager().sync_bundles_to_db() for dag_id in TEST_DAG_IDS: fake_filename = f"/tmp/{dag_id}.py" - dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename, bundle_name="testing") + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename, bundle_name=BUNDLE_NAME) session.add(dag_model) importerror = ParseImportError( filename=fake_filename, - bundle_name="testing", + bundle_name=BUNDLE_NAME, stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), ) @@ -185,7 +197,7 @@ def test_get_import_errors_single_dag(self, configure_testing_dag_bundle, sessio "import_errors": [ { "filename": "/tmp/test_dag.py", - "bundle_name": "testing", + "bundle_name": BUNDLE_NAME, "import_error_id": 1, "stack_trace": "Lorem ipsum", "timestamp": "2020-06-10T12:00:00+00:00", @@ -204,7 +216,7 @@ def test_get_import_errors_single_dag_in_dagfile(self, configure_testing_dag_bun importerror = ParseImportError( filename="/tmp/all_in_one.py", - bundle_name="testing", + bundle_name=BUNDLE_NAME, stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), ) @@ -222,7 +234,7 @@ def test_get_import_errors_single_dag_in_dagfile(self, configure_testing_dag_bun "import_errors": [ { "filename": "/tmp/all_in_one.py", - "bundle_name": "testing", + "bundle_name": BUNDLE_NAME, "import_error_id": 1, "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", "timestamp": "2020-06-10T12:00:00+00:00",