Skip to content

Commit e8f002a

Browse files
committed
introduce fail_policy
1 parent 1613e9e commit e8f002a

File tree

21 files changed

+215
-168
lines changed

21 files changed

+215
-168
lines changed

airflow/decorators/__init__.pyi

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ from airflow.decorators.short_circuit import short_circuit_task
4040
from airflow.decorators.task_group import task_group
4141
from airflow.models.dag import dag
4242
from airflow.providers.cncf.kubernetes.secret import Secret
43+
from airflow.sensors.base import FailPolicy
4344
from airflow.typing_compat import Literal
4445

4546
# Please keep this in sync with __init__.py's __all__.
@@ -708,7 +709,7 @@ class TaskDecoratorCollection:
708709
*,
709710
poke_interval: float = ...,
710711
timeout: float = ...,
711-
soft_fail: bool = False,
712+
fail_policy: FailPolicy = ...,
712713
mode: str = ...,
713714
exponential_backoff: bool = False,
714715
max_wait: timedelta | float | None = None,
@@ -720,7 +721,7 @@ class TaskDecoratorCollection:
720721
:param poke_interval: Time in seconds that the job should wait in
721722
between each try
722723
:param timeout: Time, in seconds before the task times out and fails.
723-
:param soft_fail: Set to true to mark the task as SKIPPED on failure
724+
:param fail_policy: TODO.
724725
:param mode: How the sensor operates.
725726
Options are: ``{ poke | reschedule }``, default is ``poke``.
726727
When set to ``poke`` the sensor is taking up a worker slot for its

airflow/example_dags/example_sensors.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
from airflow.models.dag import DAG
2525
from airflow.operators.bash import BashOperator
26+
from airflow.sensors.base import FailPolicy
2627
from airflow.sensors.bash import BashSensor
2728
from airflow.sensors.filesystem import FileSensor
2829
from airflow.sensors.python import PythonSensor
@@ -68,7 +69,7 @@ def failure_callable():
6869
t2 = TimeSensor(
6970
task_id="timeout_after_second_date_in_the_future",
7071
timeout=1,
71-
soft_fail=True,
72+
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
7273
target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(),
7374
)
7475
# [END example_time_sensors]
@@ -81,15 +82,20 @@ def failure_callable():
8182
t2a = TimeSensorAsync(
8283
task_id="timeout_after_second_date_in_the_future_async",
8384
timeout=1,
84-
soft_fail=True,
85+
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
8586
target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(),
8687
)
8788
# [END example_time_sensors_async]
8889

8990
# [START example_bash_sensors]
9091
t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0")
9192

92-
t4 = BashSensor(task_id="Sensor_fails_after_3_seconds", timeout=3, soft_fail=True, bash_command="exit 1")
93+
t4 = BashSensor(
94+
task_id="Sensor_fails_after_3_seconds",
95+
timeout=3,
96+
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
97+
bash_command="exit 1",
98+
)
9399
# [END example_bash_sensors]
94100

95101
t5 = BashOperator(task_id="remove_file", bash_command="rm -rf /tmp/temporary_file_for_testing")
@@ -112,13 +118,19 @@ def failure_callable():
112118
t9 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable)
113119

114120
t10 = PythonSensor(
115-
task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable
121+
task_id="failure_timeout_sensor_python",
122+
timeout=3,
123+
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
124+
python_callable=failure_callable,
116125
)
117126
# [END example_python_sensors]
118127

119128
# [START example_day_of_week_sensor]
120129
t11 = DayOfWeekSensor(
121-
task_id="week_day_sensor_failing_on_timeout", timeout=3, soft_fail=True, week_day=WeekDay.MONDAY
130+
task_id="week_day_sensor_failing_on_timeout",
131+
timeout=3,
132+
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
133+
week_day=WeekDay.MONDAY,
122134
)
123135
# [END example_day_of_week_sensor]
124136

airflow/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ class AirflowSensorTimeout(AirflowException):
6868
"""Raise when there is a timeout on sensor polling."""
6969

7070

71+
class AirflowPokeFailException(AirflowException):
72+
"""Raise when a sensor must not try to poke again."""
73+
74+
7175
class AirflowRescheduleException(AirflowException):
7276
"""
7377
Raise when the task should be re-scheduled at a later time.

airflow/providers/amazon/provider.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ dependencies:
9494
- apache-airflow-providers-common-compat>=1.1.0
9595
- apache-airflow-providers-common-sql>=1.3.1
9696
- apache-airflow-providers-http
97-
- apache-airflow-providers-common-compat>=1.1.0
9897
# We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number
9998
# of candidates to consider. Make sure to configure boto3 version here as well as in all the tools below
10099
# in the `devel-dependencies` section to be the same minimum version.

airflow/providers/common/compat/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,9 @@
3737
raise RuntimeError(
3838
f"The package `apache-airflow-providers-common-compat:{__version__}` needs Apache Airflow 2.8.0+"
3939
)
40+
41+
42+
def is_at_least_2_10_0() -> bool:
43+
return packaging.version.parse(
44+
packaging.version.parse(airflow_version).base_version
45+
) >= packaging.version.parse("2.10.0")

airflow/providers/ftp/sensors/ftp.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import re
2222
from typing import TYPE_CHECKING, Sequence
2323

24-
from airflow.exceptions import AirflowSkipException
24+
from airflow.exceptions import AirflowPokeFailException
2525
from airflow.providers.ftp.hooks.ftp import FTPHook, FTPSHook
2626
from airflow.sensors.base import BaseSensorOperator
2727

@@ -83,9 +83,7 @@ def poke(self, context: Context) -> bool:
8383
if (error_code != 550) and (
8484
self.fail_on_transient_errors or (error_code not in self.transient_errors)
8585
):
86-
if self.soft_fail:
87-
raise AirflowSkipException from e
88-
raise e
86+
raise AirflowPokeFailException from e
8987

9088
return False
9189

airflow/providers/http/sensors/http.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from typing import TYPE_CHECKING, Any, Callable, Sequence
2222

2323
from airflow.configuration import conf
24-
from airflow.exceptions import AirflowException, AirflowSkipException
24+
from airflow.exceptions import AirflowException
2525
from airflow.providers.http.hooks.http import HttpHook
2626
from airflow.providers.http.triggers.http import HttpSensorTrigger
2727
from airflow.sensors.base import BaseSensorOperator
@@ -151,10 +151,6 @@ def poke(self, context: Context) -> bool:
151151
except AirflowException as exc:
152152
if str(exc).startswith(self.response_error_codes_allowlist):
153153
return False
154-
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
155-
if self.soft_fail:
156-
raise AirflowSkipException from exc
157-
158154
raise exc
159155

160156
return True

airflow/providers/sftp/sensors/sftp.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from paramiko.sftp import SFTP_NO_SUCH_FILE
2727

2828
from airflow.configuration import conf
29-
from airflow.exceptions import AirflowException
29+
from airflow.exceptions import AirflowException, AirflowPokeFailException
3030
from airflow.providers.sftp.hooks.sftp import SFTPHook
3131
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
3232
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue
@@ -98,7 +98,7 @@ def poke(self, context: Context) -> PokeReturnValue | bool:
9898
self.log.info("Found File %s last modified: %s", actual_file_to_check, mod_time)
9999
except OSError as e:
100100
if e.errno != SFTP_NO_SUCH_FILE:
101-
raise AirflowException from e
101+
raise AirflowPokeFailException from e
102102
continue
103103

104104
if self.newer_than:

airflow/sensors/base.py

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from __future__ import annotations
1919

2020
import datetime
21+
import enum
2122
import functools
2223
import hashlib
2324
import time
@@ -32,7 +33,7 @@
3233
from airflow.configuration import conf
3334
from airflow.exceptions import (
3435
AirflowException,
35-
AirflowFailException,
36+
AirflowPokeFailException,
3637
AirflowRescheduleException,
3738
AirflowSensorTimeout,
3839
AirflowSkipException,
@@ -109,15 +110,31 @@ def _orig_start_date(
109110
)
110111

111112

113+
class FailPolicy(str, enum.Enum):
114+
"""Class with sensor's fail policies."""
115+
116+
# if poke method raise an exception, sensor will not be skipped on.
117+
NONE = "none"
118+
119+
# If poke method raises an exception, sensor will be skipped on.
120+
SKIP_ON_ANY_ERROR = "skip_on_any_error"
121+
122+
# If poke method raises AirflowSensorTimeout, AirflowTaskTimeout,AirflowPokeFailException or AirflowSkipException
123+
# sensor will be skipped on.
124+
SKIP_ON_TIMEOUT = "skip_on_timeout"
125+
126+
# If poke method raises an exception different from AirflowSensorTimeout, AirflowTaskTimeout,
127+
# AirflowSkipException or AirflowFailException sensor will ignore exception and re-poke until timeout.
128+
IGNORE_ERROR = "ignore_error"
129+
130+
112131
class BaseSensorOperator(BaseOperator, SkipMixin):
113132
"""
114133
Sensor operators are derived from this class and inherit these attributes.
115134
116135
Sensor operators keep executing at a time interval and succeed when
117136
a criteria is met and fail if and when they time out.
118137
119-
:param soft_fail: Set to true to mark the task as SKIPPED on failure.
120-
Mutually exclusive with never_fail.
121138
:param poke_interval: Time that the job should wait in between each try.
122139
Can be ``timedelta`` or ``float`` seconds.
123140
:param timeout: Time elapsed before the task times out and fails.
@@ -145,13 +162,10 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
145162
:param exponential_backoff: allow progressive longer waits between
146163
pokes by using exponential backoff algorithm
147164
:param max_wait: maximum wait interval between pokes, can be ``timedelta`` or ``float`` seconds
148-
:param silent_fail: If true, and poke method raises an exception different from
149-
AirflowSensorTimeout, AirflowTaskTimeout, AirflowSkipException
150-
and AirflowFailException, the sensor will log the error and continue
151-
its execution. Otherwise, the sensor task fails, and it can be retried
152-
based on the provided `retries` parameter.
153-
:param never_fail: If true, and poke method raises an exception, sensor will be skipped.
154-
Mutually exclusive with soft_fail.
165+
:param fail_policy: defines the rule by which sensor skip itself. Options are:
166+
``{ none | skip_on_any_error | skip_on_timeout | ignore_error }``
167+
default is ``none``. Options can be set as string or
168+
using the constants defined in the static class ``airflow.sensors.base.FailPolicy``
155169
"""
156170

157171
ui_color: str = "#e6f1f2"
@@ -166,26 +180,19 @@ def __init__(
166180
*,
167181
poke_interval: timedelta | float = 60,
168182
timeout: timedelta | float = conf.getfloat("sensors", "default_timeout"),
169-
soft_fail: bool = False,
170183
mode: str = "poke",
171184
exponential_backoff: bool = False,
172185
max_wait: timedelta | float | None = None,
173-
silent_fail: bool = False,
174-
never_fail: bool = False,
186+
fail_policy: str = FailPolicy.NONE,
175187
**kwargs,
176188
) -> None:
177189
super().__init__(**kwargs)
178190
self.poke_interval = self._coerce_poke_interval(poke_interval).total_seconds()
179-
self.soft_fail = soft_fail
180191
self.timeout = self._coerce_timeout(timeout).total_seconds()
181192
self.mode = mode
182193
self.exponential_backoff = exponential_backoff
183194
self.max_wait = self._coerce_max_wait(max_wait)
184-
if soft_fail is True and never_fail is True:
185-
raise ValueError("soft_fail and never_fail are mutually exclusive, you can not provide both.")
186-
187-
self.silent_fail = silent_fail
188-
self.never_fail = never_fail
195+
self.fail_policy = fail_policy
189196
self._validate_input_values()
190197

191198
@staticmethod
@@ -282,21 +289,20 @@ def run_duration() -> float:
282289
except (
283290
AirflowSensorTimeout,
284291
AirflowTaskTimeout,
285-
AirflowFailException,
292+
AirflowPokeFailException,
293+
AirflowSkipException,
286294
) as e:
287-
if self.soft_fail:
288-
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
289-
elif self.never_fail:
290-
raise AirflowSkipException("Skipping due to never_fail is set to True.") from e
291-
raise e
292-
except AirflowSkipException as e:
295+
if self.fail_policy == FailPolicy.SKIP_ON_TIMEOUT:
296+
raise AirflowSkipException("Skipping due fail_policy set to SKIP_ON_TIMEOUT.") from e
297+
elif self.fail_policy == FailPolicy.SKIP_ON_ANY_ERROR:
298+
raise AirflowSkipException("Skipping due to SKIP_ON_ANY_ERROR is set to True.") from e
293299
raise e
294300
except Exception as e:
295-
if self.silent_fail:
301+
if self.fail_policy == FailPolicy.IGNORE_ERROR:
296302
self.log.error("Sensor poke failed: \n %s", traceback.format_exc())
297303
poke_return = False
298-
elif self.never_fail:
299-
raise AirflowSkipException("Skipping due to never_fail is set to True.") from e
304+
elif self.fail_policy == FailPolicy.SKIP_ON_ANY_ERROR:
305+
raise AirflowSkipException("Skipping due to SKIP_ON_ANY_ERROR is set to True.") from e
300306
else:
301307
raise e
302308

@@ -306,13 +312,13 @@ def run_duration() -> float:
306312
break
307313

308314
if run_duration() > self.timeout:
309-
# If sensor is in soft fail mode but times out raise AirflowSkipException.
315+
# If sensor is in SKIP_ON_TIMEOUT mode but times out it raise AirflowSkipException.
310316
message = (
311317
f"Sensor has timed out; run duration of {run_duration()} seconds exceeds "
312318
f"the specified timeout of {self.timeout}."
313319
)
314320

315-
if self.soft_fail:
321+
if self.fail_policy == FailPolicy.SKIP_ON_TIMEOUT:
316322
raise AirflowSkipException(message)
317323
else:
318324
raise AirflowSensorTimeout(message)
@@ -335,7 +341,7 @@ def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | None,
335341
try:
336342
return super().resume_execution(next_method, next_kwargs, context)
337343
except (AirflowException, TaskDeferralError) as e:
338-
if self.soft_fail:
344+
if self.fail_policy == FailPolicy.SKIP_ON_ANY_ERROR:
339345
raise AirflowSkipException(str(e)) from e
340346
raise
341347

0 commit comments

Comments
 (0)