Skip to content

Commit 5779858

Browse files
authored
Add DagBundle information to Serialized DagVersion table (#45976)
* Add DagBundle information to Serialized DagVersion table This will help us keep track of the different versions that we see in DagBundle since DagModel would always have the latest bundle version. * fixup! Add DagBundle information to Serialized DagVersion table * Fix test
1 parent ac5d99c commit 5779858

24 files changed

+1871
-1802
lines changed

airflow/dag_processing/collection.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,9 @@ def _update_dag_owner_links(dag_owner_links: dict[str, str], dm: DagModel, *, se
177177
)
178178

179179

180-
def _serialize_dag_capturing_errors(dag: MaybeSerializedDAG, session: Session):
180+
def _serialize_dag_capturing_errors(
181+
dag: MaybeSerializedDAG, bundle_name, session: Session, bundle_version: str | None
182+
):
181183
"""
182184
Try to serialize the dag to the DB, but make a note of any errors.
183185
@@ -192,6 +194,8 @@ def _serialize_dag_capturing_errors(dag: MaybeSerializedDAG, session: Session):
192194
# We can't use bulk_write_to_db as we want to capture each error individually
193195
dag_was_updated = SerializedDagModel.write_dag(
194196
dag,
197+
bundle_name=bundle_name,
198+
bundle_version=bundle_version,
195199
min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
196200
session=session,
197201
)
@@ -333,7 +337,11 @@ def update_dag_parsing_results_in_db(
333337
DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session)
334338
# Write Serialized DAGs to DB, capturing errors
335339
for dag in dags:
336-
serialize_errors.extend(_serialize_dag_capturing_errors(dag, session))
340+
serialize_errors.extend(
341+
_serialize_dag_capturing_errors(
342+
dag=dag, bundle_name=bundle_name, bundle_version=bundle_version, session=session
343+
)
344+
)
337345
except OperationalError:
338346
session.rollback()
339347
raise

airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py

+9
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import sqlalchemy as sa
3030
from alembic import op
3131

32+
from airflow.migrations.db_types import StringID
3233
from airflow.utils.sqlalchemy import UtcDateTime
3334

3435
revision = "e229247a6cb1"
@@ -59,6 +60,10 @@ def upgrade():
5960
with op.batch_alter_table("import_error", schema=None) as batch_op:
6061
batch_op.add_column(sa.Column("bundle_name", sa.String(length=250), nullable=True))
6162

63+
with op.batch_alter_table("dag_version", schema=None) as batch_op:
64+
batch_op.add_column(sa.Column("bundle_name", StringID(), nullable=False))
65+
batch_op.add_column(sa.Column("bundle_version", StringID()))
66+
6267

6368
def downgrade():
6469
with op.batch_alter_table("dag", schema=None) as batch_op:
@@ -71,4 +76,8 @@ def downgrade():
7176
with op.batch_alter_table("import_error", schema=None) as batch_op:
7277
batch_op.drop_column("bundle_name")
7378

79+
with op.batch_alter_table("dag_version", schema=None) as batch_op:
80+
batch_op.drop_column("bundle_version")
81+
batch_op.drop_column("bundle_name")
82+
7483
op.drop_table("dag_bundle")

airflow/models/dag_version.py

+6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ class DagVersion(Base):
4545
version_number = Column(Integer, nullable=False, default=1)
4646
dag_id = Column(StringID(), ForeignKey("dag.dag_id", ondelete="CASCADE"), nullable=False)
4747
dag_model = relationship("DagModel", back_populates="dag_versions")
48+
bundle_name = Column(StringID(), nullable=False)
49+
bundle_version = Column(StringID())
4850
dag_code = relationship(
4951
"DagCode",
5052
back_populates="dag_version",
@@ -77,6 +79,8 @@ def write_dag(
7779
cls,
7880
*,
7981
dag_id: str,
82+
bundle_name: str,
83+
bundle_version: str | None = None,
8084
version_number: int = 1,
8185
session: Session = NEW_SESSION,
8286
) -> DagVersion:
@@ -99,6 +103,8 @@ def write_dag(
99103
dag_version = DagVersion(
100104
dag_id=dag_id,
101105
version_number=version_number,
106+
bundle_name=bundle_name,
107+
bundle_version=bundle_version,
102108
)
103109
log.debug("Writing DagVersion %s to the DB", dag_version)
104110
session.add(dag_version)

airflow/models/serialized_dag.py

+9
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ def _sort_serialized_dag_dict(cls, serialized_dag: Any):
165165
def write_dag(
166166
cls,
167167
dag: DAG | LazyDeserializedDAG,
168+
bundle_name: str,
169+
bundle_version: str | None = None,
168170
min_update_interval: int | None = None,
169171
session: Session = NEW_SESSION,
170172
) -> bool:
@@ -201,8 +203,11 @@ def write_dag(
201203
if serialized_dag_hash is not None and serialized_dag_hash == new_serialized_dag.dag_hash:
202204
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
203205
return False
206+
204207
dagv = DagVersion.write_dag(
205208
dag_id=dag.dag_id,
209+
bundle_name=bundle_name,
210+
bundle_version=bundle_version,
206211
session=session,
207212
)
208213
log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
@@ -340,6 +345,8 @@ def get(cls, dag_id: str, session: Session = NEW_SESSION) -> SerializedDagModel
340345
@provide_session
341346
def bulk_sync_to_db(
342347
dags: list[DAG] | list[LazyDeserializedDAG],
348+
bundle_name: str,
349+
bundle_version: str | None = None,
343350
session: Session = NEW_SESSION,
344351
) -> None:
345352
"""
@@ -354,6 +361,8 @@ def bulk_sync_to_db(
354361
for dag in dags:
355362
SerializedDagModel.write_dag(
356363
dag=dag,
364+
bundle_name=bundle_name,
365+
bundle_version=bundle_version,
357366
min_update_interval=MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
358367
session=session,
359368
)
+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
cb858681fdc7a596db20c1c5dbf93812fd011a6df1e0b5322a49a51c8476bb93
1+
eb25e0718c9382cdbb02368c9c3e29c90da06ddaba8e8e92d9fc53417b714039

0 commit comments

Comments
 (0)