Skip to content

Commit 586f1ea

Browse files
authored
do not update DR on TI update after task execution (#45348)
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 2cd40ca commit 586f1ea

File tree

2 files changed

+47
-4
lines changed

2 files changed

+47
-4
lines changed

airflow/models/taskinstance.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ def _run_raw_task(
247247
ti.hostname = get_hostname()
248248
ti.pid = os.getpid()
249249
if not test_mode:
250-
TaskInstance.save_to_db(ti=ti, session=session)
250+
TaskInstance.save_to_db(ti=ti, session=session, refresh_dag=False)
251251
actual_start_date = timezone.utcnow()
252252
Stats.incr(f"ti.start.{ti.task.dag_id}.{ti.task.task_id}", tags=ti.stats_tags)
253253
# Same metric with tagging
@@ -1241,7 +1241,7 @@ def _handle_failure(
12411241
)
12421242

12431243
if not test_mode:
1244-
TaskInstance.save_to_db(failure_context["ti"], session)
1244+
TaskInstance.save_to_db(task_instance, session)
12451245

12461246
with Trace.start_span_from_taskinstance(ti=task_instance) as span:
12471247
# ---- error info ----
@@ -3395,7 +3395,11 @@ def fetch_handle_failure_context(
33953395
@staticmethod
33963396
@internal_api_call
33973397
@provide_session
3398-
def save_to_db(ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION):
3398+
def save_to_db(
3399+
ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION, refresh_dag: bool = True
3400+
):
3401+
if refresh_dag and isinstance(ti, TaskInstance):
3402+
ti.get_dagrun().refresh_from_db()
33993403
ti = _coalesce_to_orm_ti(ti=ti, session=session)
34003404
ti.updated_at = timezone.utcnow()
34013405
session.merge(ti)

tests/models/test_taskinstance.py

+40-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import pickle
2727
import signal
2828
import sys
29+
import time
2930
import urllib
3031
from traceback import format_exception
3132
from typing import cast
@@ -34,6 +35,7 @@
3435
from uuid import uuid4
3536

3637
import pendulum
38+
import psutil
3739
import pytest
3840
import time_machine
3941
from sqlalchemy import select
@@ -83,7 +85,7 @@
8385
from airflow.sensors.base import BaseSensorOperator
8486
from airflow.sensors.python import PythonSensor
8587
from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG
86-
from airflow.settings import TIMEZONE, TracebackSessionForTests
88+
from airflow.settings import TIMEZONE, TracebackSessionForTests, reconfigure_orm
8789
from airflow.stats import Stats
8890
from airflow.ti_deps.dep_context import DepContext
8991
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
@@ -3587,6 +3589,43 @@ def test_handle_failure(self, create_dummy_dag, session=None):
35873589
assert "task_instance" in context_arg_3
35883590
mock_on_retry_3.assert_not_called()
35893591

3592+
@provide_session
3593+
def test_handle_failure_does_not_push_stale_dagrun_model(self, dag_maker, create_dummy_dag, session=None):
3594+
session = settings.Session()
3595+
with dag_maker():
3596+
3597+
def method(): ...
3598+
3599+
task = PythonOperator(task_id="mytask", python_callable=method)
3600+
dr = dag_maker.create_dagrun()
3601+
ti = dr.get_task_instance(task.task_id)
3602+
ti.state = State.RUNNING
3603+
3604+
assert dr.state == DagRunState.RUNNING
3605+
3606+
session.merge(ti)
3607+
session.flush()
3608+
session.commit()
3609+
3610+
pid = os.fork()
3611+
if pid:
3612+
process = psutil.Process(pid)
3613+
time.sleep(1)
3614+
3615+
dr.state = DagRunState.SUCCESS
3616+
session.merge(dr)
3617+
session.flush()
3618+
session.commit()
3619+
process.wait(timeout=7)
3620+
else:
3621+
reconfigure_orm(disable_connection_pool=True)
3622+
time.sleep(2)
3623+
ti.handle_failure("should not update related models")
3624+
os._exit(0)
3625+
3626+
dr.refresh_from_db()
3627+
assert dr.state == DagRunState.SUCCESS
3628+
35903629
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
35913630
def test_handle_failure_updates_queued_task_updates_state(self, dag_maker):
35923631
session = settings.Session()

0 commit comments

Comments
 (0)