diff --git a/airflow/example_dags/example_sensors.py b/airflow/example_dags/example_sensors.py index 9dbe83d6e4c40..1476daf9b74ff 100644 --- a/airflow/example_dags/example_sensors.py +++ b/airflow/example_dags/example_sensors.py @@ -23,6 +23,7 @@ from airflow.models.dag import DAG from airflow.operators.bash import BashOperator +from airflow.sensors.base import FailPolicy from airflow.sensors.bash import BashSensor from airflow.sensors.filesystem import FileSensor from airflow.sensors.python import PythonSensor @@ -68,7 +69,7 @@ def failure_callable(): t2 = TimeSensor( task_id="timeout_after_second_date_in_the_future", timeout=1, - soft_fail=True, + fail_policy=FailPolicy.SKIP_ON_TIMEOUT, target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), ) # [END example_time_sensors] @@ -81,7 +82,7 @@ def failure_callable(): t2a = TimeSensorAsync( task_id="timeout_after_second_date_in_the_future_async", timeout=1, - soft_fail=True, + fail_policy=FailPolicy.SKIP_ON_TIMEOUT, target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), ) # [END example_time_sensors_async] @@ -89,7 +90,12 @@ def failure_callable(): # [START example_bash_sensors] t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0") - t4 = BashSensor(task_id="Sensor_fails_after_3_seconds", timeout=3, soft_fail=True, bash_command="exit 1") + t4 = BashSensor( + task_id="Sensor_fails_after_3_seconds", + timeout=3, + fail_policy=FailPolicy.SKIP_ON_TIMEOUT, + bash_command="exit 1", + ) # [END example_bash_sensors] t5 = BashOperator(task_id="remove_file", bash_command="rm -rf /tmp/temporary_file_for_testing") @@ -112,13 +118,19 @@ def failure_callable(): t9 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable) t10 = PythonSensor( - task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable + task_id="failure_timeout_sensor_python", + timeout=3, + fail_policy=FailPolicy.SKIP_ON_TIMEOUT, + python_callable=failure_callable, ) # [END example_python_sensors] # [START example_day_of_week_sensor] t11 = DayOfWeekSensor( - task_id="week_day_sensor_failing_on_timeout", timeout=3, soft_fail=True, week_day=WeekDay.MONDAY + task_id="week_day_sensor_failing_on_timeout", + timeout=3, + fail_policy=FailPolicy.SKIP_ON_TIMEOUT, + week_day=WeekDay.MONDAY, ) # [END example_day_of_week_sensor] diff --git a/airflow/exceptions.py b/airflow/exceptions.py index dc59f91841133..9145626302480 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -64,6 +64,10 @@ class AirflowSensorTimeout(AirflowException): """Raise when there is a timeout on sensor polling.""" +class AirflowPokeFailException(AirflowException): + """Raise when a sensor must not try to poke again.""" + + class AirflowRescheduleException(AirflowException): """ Raise when the task should be re-scheduled at a later time. @@ -439,6 +443,17 @@ class PodReconciliationError(AirflowException): # type: ignore[no-redef] """Raised when an error is encountered while trying to merge pod configs.""" +class RemovedInAirflow3SoftWarning(DeprecationWarning): + """ + Issued for usage of deprecated features that will be removed in Airflow3. + + But that do not fail in the tests. + """ + + deprecated_since: str | None = None + "Indicates the airflow version that started raising this deprecation warning" + + class RemovedInAirflow3Warning(DeprecationWarning): """Issued for usage of deprecated features that will be removed in Airflow3.""" diff --git a/airflow/providers/amazon/__init__.py b/airflow/providers/amazon/__init__.py index 0ebc51a667c15..cbe4e32343b5d 100644 --- a/airflow/providers/amazon/__init__.py +++ b/airflow/providers/amazon/__init__.py @@ -32,8 +32,8 @@ __version__ = "8.27.0" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( - "2.7.0" + "2.7.1" ): raise RuntimeError( - f"The package `apache-airflow-providers-amazon:{__version__}` needs Apache Airflow 2.7.0+" + f"The package `apache-airflow-providers-amazon:{__version__}` needs Apache Airflow 2.7.1+" ) diff --git a/airflow/providers/amazon/aws/sensors/s3.py b/airflow/providers/amazon/aws/sensors/s3.py index 9c524494cdeb5..cc5aff014588d 100644 --- a/airflow/providers/amazon/aws/sensors/s3.py +++ b/airflow/providers/amazon/aws/sensors/s3.py @@ -33,7 +33,7 @@ if TYPE_CHECKING: from airflow.utils.context import Context -from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.triggers.s3 import S3KeysUnchangedTrigger, S3KeyTrigger from airflow.sensors.base import BaseSensorOperator, poke_mode_only @@ -219,9 +219,6 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None: if not found_keys: self._defer() elif event["status"] == "error": - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 - if self.soft_fail: - raise AirflowSkipException(event["message"]) raise AirflowException(event["message"]) @deprecated(reason="use `hook` property instead.", category=AirflowProviderDeprecationWarning) @@ -342,13 +339,10 @@ def is_keys_unchanged(self, current_objects: set[str]) -> bool: ) return False - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 message = ( f"Illegal behavior: objects were deleted in" f" {os.path.join(self.bucket_name, self.prefix)} between pokes." ) - if self.soft_fail: - raise AirflowSkipException(message) raise AirflowException(message) if self.last_activity_time: @@ -411,8 +405,5 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None event = validate_execute_complete_event(event) if event and event["status"] == "error": - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 - if self.soft_fail: - raise AirflowSkipException(event["message"]) raise AirflowException(event["message"]) return None diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 0411ea51789ca..ddcf3297b3e89 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -89,11 +89,10 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.7.0 + - apache-airflow>=2.7.1 - apache-airflow-providers-common-compat>=1.1.0 - apache-airflow-providers-common-sql>=1.3.1 - apache-airflow-providers-http - - apache-airflow-providers-common-compat>=1.1.0 # We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number # of candidates to consider. Make sure to configure boto3 version here as well as in all the tools below # in the `devel-dependencies` section to be the same minimum version. diff --git a/airflow/providers/common/compat/__init__.py b/airflow/providers/common/compat/__init__.py index 449005683d754..e8b25bc688e22 100644 --- a/airflow/providers/common/compat/__init__.py +++ b/airflow/providers/common/compat/__init__.py @@ -37,3 +37,9 @@ raise RuntimeError( f"The package `apache-airflow-providers-common-compat:{__version__}` needs Apache Airflow 2.7.0+" ) + + +def is_at_least_2_10_0() -> bool: + return packaging.version.parse( + packaging.version.parse(airflow_version).base_version + ) >= packaging.version.parse("2.10.0") diff --git a/airflow/providers/ftp/__init__.py b/airflow/providers/ftp/__init__.py index 2ded47467190e..60cfce6714ca6 100644 --- a/airflow/providers/ftp/__init__.py +++ b/airflow/providers/ftp/__init__.py @@ -32,8 +32,8 @@ __version__ = "3.10.0" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( - "2.7.0" + "2.7.1" ): raise RuntimeError( - f"The package `apache-airflow-providers-ftp:{__version__}` needs Apache Airflow 2.7.0+" + f"The package `apache-airflow-providers-ftp:{__version__}` needs Apache Airflow 2.7.1+" ) diff --git a/airflow/providers/ftp/provider.yaml b/airflow/providers/ftp/provider.yaml index c766fcf237b02..89702c1c73617 100644 --- a/airflow/providers/ftp/provider.yaml +++ b/airflow/providers/ftp/provider.yaml @@ -52,7 +52,7 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.7.0 + - apache-airflow>=2.7.1 integrations: - integration-name: File Transfer Protocol (FTP) diff --git a/airflow/providers/ftp/sensors/ftp.py b/airflow/providers/ftp/sensors/ftp.py index 847cf763537dd..d957b06a8391e 100644 --- a/airflow/providers/ftp/sensors/ftp.py +++ b/airflow/providers/ftp/sensors/ftp.py @@ -21,7 +21,7 @@ import re from typing import TYPE_CHECKING, Sequence -from airflow.exceptions import AirflowSkipException +from airflow.exceptions import AirflowSensorTimeout from airflow.providers.ftp.hooks.ftp import FTPHook, FTPSHook from airflow.sensors.base import BaseSensorOperator @@ -83,9 +83,8 @@ def poke(self, context: Context) -> bool: if (error_code != 550) and ( self.fail_on_transient_errors or (error_code not in self.transient_errors) ): - if self.soft_fail: - raise AirflowSkipException from e - raise e + # TODO: replace by AirflowPokeFailException when min_airflow_version is set to at least 2.10.0 + raise AirflowSensorTimeout from e return False diff --git a/airflow/providers/http/__init__.py b/airflow/providers/http/__init__.py index 26166e3218be0..60dcff6984c21 100644 --- a/airflow/providers/http/__init__.py +++ b/airflow/providers/http/__init__.py @@ -32,8 +32,8 @@ __version__ = "4.12.0" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( - "2.7.0" + "2.7.1" ): raise RuntimeError( - f"The package `apache-airflow-providers-http:{__version__}` needs Apache Airflow 2.7.0+" + f"The package `apache-airflow-providers-http:{__version__}` needs Apache Airflow 2.7.1+" ) diff --git a/airflow/providers/http/provider.yaml b/airflow/providers/http/provider.yaml index 647002c62a78b..7c85c69249be8 100644 --- a/airflow/providers/http/provider.yaml +++ b/airflow/providers/http/provider.yaml @@ -59,7 +59,7 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.7.0 + - apache-airflow>=2.7.1 # The 2.26.0 release of requests got rid of the chardet LGPL mandatory dependency, allowing us to # release it as a requirement for airflow - requests>=2.27.0,<3 diff --git a/airflow/providers/http/sensors/http.py b/airflow/providers/http/sensors/http.py index 3691764333b64..33b5e1d4defb4 100644 --- a/airflow/providers/http/sensors/http.py +++ b/airflow/providers/http/sensors/http.py @@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException from airflow.providers.http.hooks.http import HttpHook from airflow.providers.http.triggers.http import HttpSensorTrigger from airflow.sensors.base import BaseSensorOperator @@ -151,10 +151,6 @@ def poke(self, context: Context) -> bool: except AirflowException as exc: if str(exc).startswith(self.response_error_codes_allowlist): return False - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 - if self.soft_fail: - raise AirflowSkipException from exc - raise exc return True diff --git a/airflow/providers/sftp/__init__.py b/airflow/providers/sftp/__init__.py index 71e76062b15d0..3929232573388 100644 --- a/airflow/providers/sftp/__init__.py +++ b/airflow/providers/sftp/__init__.py @@ -32,8 +32,8 @@ __version__ = "4.10.2" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( - "2.7.0" + "2.7.1" ): raise RuntimeError( - f"The package `apache-airflow-providers-sftp:{__version__}` needs Apache Airflow 2.7.0+" + f"The package `apache-airflow-providers-sftp:{__version__}` needs Apache Airflow 2.7.1+" ) diff --git a/airflow/providers/sftp/provider.yaml b/airflow/providers/sftp/provider.yaml index b2e8f64992df4..fedb97b92a223 100644 --- a/airflow/providers/sftp/provider.yaml +++ b/airflow/providers/sftp/provider.yaml @@ -64,7 +64,7 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.7.0 + - apache-airflow>=2.7.1 - apache-airflow-providers-ssh>=2.1.0 - paramiko>=2.9.0 - asyncssh>=2.12.0 diff --git a/airflow/providers/sftp/sensors/sftp.py b/airflow/providers/sftp/sensors/sftp.py index f56ad9341001d..58f3269945661 100644 --- a/airflow/providers/sftp/sensors/sftp.py +++ b/airflow/providers/sftp/sensors/sftp.py @@ -26,7 +26,7 @@ from paramiko.sftp import SFTP_NO_SUCH_FILE from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException, AirflowSensorTimeout from airflow.providers.sftp.hooks.sftp import SFTPHook from airflow.providers.sftp.triggers.sftp import SFTPTrigger from airflow.sensors.base import BaseSensorOperator, PokeReturnValue @@ -98,10 +98,8 @@ def poke(self, context: Context) -> PokeReturnValue | bool: self.log.info("Found File %s last modified: %s", actual_file_to_check, mod_time) except OSError as e: if e.errno != SFTP_NO_SUCH_FILE: - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 - if self.soft_fail: - raise AirflowSkipException from e - raise e + # TODO: replace by AirflowPokeFailException when min_airflow_version is set to at least 2.10.0 + raise AirflowSensorTimeout from e continue if self.newer_than: diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index 7df76fae52883..fc6dfe45a232a 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -18,10 +18,12 @@ from __future__ import annotations import datetime +import enum import functools import hashlib import time import traceback +import warnings from datetime import timedelta from typing import TYPE_CHECKING, Any, Callable, Iterable @@ -32,11 +34,12 @@ from airflow.configuration import conf from airflow.exceptions import ( AirflowException, - AirflowFailException, + AirflowPokeFailException, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, AirflowTaskTimeout, + RemovedInAirflow3SoftWarning, TaskDeferralError, ) from airflow.executors.executor_loader import ExecutorLoader @@ -51,6 +54,7 @@ # See https://github.com/apache/airflow/issues/16035 from airflow.utils.decorators import apply_defaults # noqa: F401 from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.types import NOTSET, ArgNotSet if TYPE_CHECKING: from sqlalchemy.orm.session import Session @@ -114,6 +118,24 @@ def _orig_start_date( ) +class FailPolicy(str, enum.Enum): + """Class with sensor's fail policies.""" + + # if poke method raise an exception, sensor will not be skipped on. + NONE = "none" + + # If poke method raises an exception, sensor will be skipped on. + SKIP_ON_ANY_ERROR = "skip_on_any_error" + + # If poke method raises AirflowSensorTimeout, AirflowTaskTimeout,AirflowPokeFailException or AirflowSkipException + # sensor will be skipped on. + SKIP_ON_TIMEOUT = "skip_on_timeout" + + # If poke method raises an exception different from AirflowSensorTimeout, AirflowTaskTimeout, + # AirflowSkipException or AirflowFailException sensor will ignore exception and re-poke until timeout. + IGNORE_ERROR = "ignore_error" + + class BaseSensorOperator(BaseOperator, SkipMixin): """ Sensor operators are derived from this class and inherit these attributes. @@ -121,8 +143,9 @@ class BaseSensorOperator(BaseOperator, SkipMixin): Sensor operators keep executing at a time interval and succeed when a criteria is met and fail if and when they time out. - :param soft_fail: Set to true to mark the task as SKIPPED on failure. - Mutually exclusive with never_fail. + :param soft_fail: deprecated parameter replaced by FailPolicy.SKIP_ON_TIMEOUT + but that do not skip on AirflowFailException + Mutually exclusive with fail_policy and silent_fail. :param poke_interval: Time that the job should wait in between each try. Can be ``timedelta`` or ``float`` seconds. :param timeout: Time elapsed before the task times out and fails. @@ -150,13 +173,13 @@ class BaseSensorOperator(BaseOperator, SkipMixin): :param exponential_backoff: allow progressive longer waits between pokes by using exponential backoff algorithm :param max_wait: maximum wait interval between pokes, can be ``timedelta`` or ``float`` seconds - :param silent_fail: If true, and poke method raises an exception different from - AirflowSensorTimeout, AirflowTaskTimeout, AirflowSkipException - and AirflowFailException, the sensor will log the error and continue - its execution. Otherwise, the sensor task fails, and it can be retried - based on the provided `retries` parameter. - :param never_fail: If true, and poke method raises an exception, sensor will be skipped. - Mutually exclusive with soft_fail. + :param silent_fail: deprecated parameter same effect than FailPolicy.IGNORE_ERROR + Mutually exclusive with fail_policy and soft_fail. + :param fail_policy: defines the rule by which sensor skip itself. Options are: + ``{ none | skip_on_any_error | skip_on_timeout | ignore_error }`` + default is ``none``. Options can be set as string or + using the constants defined in the static class ``airflow.sensors.base.FailPolicy`` + Mutually exclusive with soft_fail and silent_fail. """ ui_color: str = "#e6f1f2" @@ -176,21 +199,55 @@ def __init__( exponential_backoff: bool = False, max_wait: timedelta | float | None = None, silent_fail: bool = False, - never_fail: bool = False, + fail_policy: str | ArgNotSet = NOTSET, # FailPolicy.NONE, **kwargs, ) -> None: super().__init__(**kwargs) self.poke_interval = self._coerce_poke_interval(poke_interval).total_seconds() - self.soft_fail = soft_fail self.timeout = self._coerce_timeout(timeout).total_seconds() self.mode = mode self.exponential_backoff = exponential_backoff self.max_wait = self._coerce_max_wait(max_wait) - if soft_fail is True and never_fail is True: - raise ValueError("soft_fail and never_fail are mutually exclusive, you can not provide both.") + if soft_fail: + warnings.warn( + "`soft_fail` is deprecated and will be removed in a future version. " + "Please provide fail_policy=FailPolicy.skip_on_timeout instead.", + RemovedInAirflow3SoftWarning, + stacklevel=3, + ) + elif silent_fail: + warnings.warn( + "`silent_fail` is deprecated and will be removed in a future version. " + "Please provide fail_policy=FailPolicy.IGNORE_ERRORS instead.", + RemovedInAirflow3SoftWarning, + stacklevel=3, + ) + if fail_policy != NOTSET: + if sum([soft_fail, silent_fail]) > 0: + raise ValueError( + "fail_policy and deprecated soft_fail and silent_fail parameters are mutually exclusive." + ) + + if fail_policy == FailPolicy.SKIP_ON_TIMEOUT: + soft_fail = True + elif fail_policy == FailPolicy.IGNORE_ERROR: + silent_fail = True + else: + if sum([soft_fail, silent_fail]) > 1: + raise ValueError( + "soft_fail and silent_fail are mutually exclusive, you can not provide more than one." + ) + if soft_fail: + fail_policy = FailPolicy.SKIP_ON_TIMEOUT + elif silent_fail: + fail_policy = FailPolicy.IGNORE_ERROR + else: + fail_policy = FailPolicy.NONE + + self.soft_fail = soft_fail self.silent_fail = silent_fail - self.never_fail = never_fail + self.fail_policy = fail_policy self._validate_input_values() @staticmethod @@ -287,21 +344,20 @@ def run_duration() -> float: except ( AirflowSensorTimeout, AirflowTaskTimeout, - AirflowFailException, + AirflowPokeFailException, + AirflowSkipException, ) as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - elif self.never_fail: - raise AirflowSkipException("Skipping due to never_fail is set to True.") from e - raise e - except AirflowSkipException as e: + if self.fail_policy == FailPolicy.SKIP_ON_TIMEOUT: + raise AirflowSkipException("Skipping due fail_policy set to SKIP_ON_TIMEOUT.") from e + elif self.fail_policy == FailPolicy.SKIP_ON_ANY_ERROR: + raise AirflowSkipException("Skipping due to SKIP_ON_ANY_ERROR is set to True.") from e raise e except Exception as e: - if self.silent_fail: + if self.fail_policy == FailPolicy.IGNORE_ERROR: self.log.error("Sensor poke failed: \n %s", traceback.format_exc()) poke_return = False - elif self.never_fail: - raise AirflowSkipException("Skipping due to never_fail is set to True.") from e + elif self.fail_policy == FailPolicy.SKIP_ON_ANY_ERROR: + raise AirflowSkipException("Skipping due to SKIP_ON_ANY_ERROR is set to True.") from e else: raise e @@ -311,13 +367,13 @@ def run_duration() -> float: break if run_duration() > self.timeout: - # If sensor is in soft fail mode but times out raise AirflowSkipException. + # If sensor is in SKIP_ON_TIMEOUT mode but times out it raise AirflowSkipException. message = ( f"Sensor has timed out; run duration of {run_duration()} seconds exceeds " f"the specified timeout of {self.timeout}." ) - if self.soft_fail: + if self.fail_policy == FailPolicy.SKIP_ON_TIMEOUT: raise AirflowSkipException(message) else: raise AirflowSensorTimeout(message) @@ -340,7 +396,7 @@ def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | None, try: return super().resume_execution(next_method, next_kwargs, context) except (AirflowException, TaskDeferralError) as e: - if self.soft_fail: + if self.fail_policy == FailPolicy.SKIP_ON_ANY_ERROR: raise AirflowSkipException(str(e)) from e raise diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index 339ad790564e0..cc84c84ef5661 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -25,7 +25,7 @@ import attr from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowSkipException, RemovedInAirflow3Warning +from airflow.exceptions import AirflowPokeFailException, AirflowSkipException, RemovedInAirflow3Warning from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag @@ -177,7 +177,7 @@ def __init__( total_states = set(self.allowed_states + self.skipped_states + self.failed_states) if len(total_states) != len(self.allowed_states) + len(self.skipped_states) + len(self.failed_states): - raise AirflowException( + raise ValueError( "Duplicate values provided across allowed_states, skipped_states and failed_states." ) @@ -288,32 +288,18 @@ def poke(self, context: Context, session: Session = NEW_SESSION) -> bool: # Fail if anything in the list has failed. if count_failed > 0: if self.external_task_ids: - if self.soft_fail: - raise AirflowSkipException( - f"Some of the external tasks {self.external_task_ids} " - f"in DAG {self.external_dag_id} failed. Skipping due to soft_fail." - ) - raise AirflowException( + raise AirflowPokeFailException( f"Some of the external tasks {self.external_task_ids} " f"in DAG {self.external_dag_id} failed." ) elif self.external_task_group_id: - if self.soft_fail: - raise AirflowSkipException( - f"The external task_group '{self.external_task_group_id}' " - f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail." - ) - raise AirflowException( + raise AirflowPokeFailException( f"The external task_group '{self.external_task_group_id}' " f"in DAG '{self.external_dag_id}' failed." ) else: - if self.soft_fail: - raise AirflowSkipException( - f"The external DAG {self.external_dag_id} failed. Skipping due to soft_fail." - ) - raise AirflowException(f"The external DAG {self.external_dag_id} failed.") + raise AirflowPokeFailException(f"The external DAG {self.external_dag_id} failed.") count_skipped = -1 if self.skipped_states: @@ -366,30 +352,27 @@ def execute_complete(self, context, event=None): if event["status"] == "success": self.log.info("External tasks %s has executed successfully.", self.external_task_ids) elif event["status"] == "skipped": - raise AirflowSkipException("External job has skipped skipping.") + raise AirflowPokeFailException("External job has skipped skipping.") else: - if self.soft_fail: - raise AirflowSkipException("External job has failed skipping.") - else: - raise AirflowException( - "Error occurred while trying to retrieve task status. Please, check the " - "name of executed task and Dag." - ) + raise AirflowPokeFailException( + "Error occurred while trying to retrieve task status. Please, check the " + "name of executed task and Dag." + ) def _check_for_existence(self, session) -> None: dag_to_wait = DagModel.get_current(self.external_dag_id, session) if not dag_to_wait: - raise AirflowException(f"The external DAG {self.external_dag_id} does not exist.") + raise AirflowPokeFailException(f"The external DAG {self.external_dag_id} does not exist.") if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)): - raise AirflowException(f"The external DAG {self.external_dag_id} was deleted.") + raise AirflowPokeFailException(f"The external DAG {self.external_dag_id} was deleted.") if self.external_task_ids: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) for external_task_id in self.external_task_ids: if not refreshed_dag_info.has_task(external_task_id): - raise AirflowException( + raise AirflowPokeFailException( f"The external task {external_task_id} in " f"DAG {self.external_dag_id} does not exist." ) @@ -397,7 +380,7 @@ def _check_for_existence(self, session) -> None: if self.external_task_group_id: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) if not refreshed_dag_info.has_task_group(self.external_task_group_id): - raise AirflowException( + raise AirflowPokeFailException( f"The external task group '{self.external_task_group_id}' in " f"DAG '{self.external_dag_id}' does not exist." ) diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py index 5d32ab07ad4e7..10c8adf5a8668 100644 --- a/airflow/sensors/filesystem.py +++ b/airflow/sensors/filesystem.py @@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any, Sequence from airflow.configuration import conf -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowPokeFailException from airflow.hooks.filesystem import FSHook from airflow.sensors.base import BaseSensorOperator from airflow.triggers.base import StartTriggerArgs @@ -134,5 +134,5 @@ def execute(self, context: Context) -> None: def execute_complete(self, context: Context, event: bool | None = None) -> None: if not event: - raise AirflowException("%s task failed as %s not found.", self.task_id, self.filepath) + raise AirflowPokeFailException("%s task failed as %s not found.", self.task_id, self.filepath) self.log.info("%s completed successfully as %s found.", self.task_id, self.filepath) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index d068fad9bf5a5..5d8ba9329c25f 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -19,7 +19,6 @@ from typing import TYPE_CHECKING, Any, NoReturn -from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -77,12 +76,8 @@ def execute(self, context: Context) -> bool | NoReturn: if timezone.utcnow() > target_dttm: # If the target datetime is in the past, return immediately return True - try: - trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger) - except (TypeError, ValueError) as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - raise + + trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger) self.defer(trigger=trigger, method_name="execute_complete") diff --git a/dev/breeze/tests/test_packages.py b/dev/breeze/tests/test_packages.py index 228a1ca0dc5ed..af2b6e0b5bbb3 100644 --- a/dev/breeze/tests/test_packages.py +++ b/dev/breeze/tests/test_packages.py @@ -431,7 +431,7 @@ def test_validate_provider_info_with_schema(): @pytest.mark.parametrize( "provider_id, min_version", [ - ("amazon", "2.7.0"), + ("amazon", "2.7.1"), ("common.io", "2.8.0"), ], ) @@ -496,7 +496,7 @@ def test_provider_jinja_context(): "CHANGELOG_RELATIVE_PATH": "../../airflow/providers/amazon", "SUPPORTED_PYTHON_VERSIONS": ["3.8", "3.9", "3.10", "3.11", "3.12"], "PLUGINS": [], - "MIN_AIRFLOW_VERSION": "2.7.0", + "MIN_AIRFLOW_VERSION": "2.7.1", "PROVIDER_REMOVED": False, "PROVIDER_INFO": provider_info, } diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 85dbd405e8f87..bbaa3008023d5 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -29,10 +29,9 @@ "deps": [ "PyAthena>=3.0.10", "apache-airflow-providers-common-compat>=1.1.0", - "apache-airflow-providers-common-compat>=1.1.0", "apache-airflow-providers-common-sql>=1.3.1", "apache-airflow-providers-http", - "apache-airflow>=2.7.0", + "apache-airflow>=2.7.1", "asgiref>=2.3.0", "boto3>=1.34.90", "botocore>=1.34.90", @@ -575,7 +574,7 @@ }, "ftp": { "deps": [ - "apache-airflow>=2.7.0" + "apache-airflow>=2.7.1" ], "devel-deps": [], "plugins": [], @@ -719,7 +718,7 @@ "http": { "deps": [ "aiohttp>=3.9.2", - "apache-airflow>=2.7.0", + "apache-airflow>=2.7.1", "asgiref", "requests>=2.27.0,<3", "requests_toolbelt" @@ -1149,7 +1148,7 @@ "sftp": { "deps": [ "apache-airflow-providers-ssh>=2.1.0", - "apache-airflow>=2.7.0", + "apache-airflow>=2.7.1", "asyncssh>=2.12.0", "paramiko>=2.9.0" ], diff --git a/pyproject.toml b/pyproject.toml index 9e8970bc408b7..e9765f12f4ed5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -545,6 +545,7 @@ forbidden_warnings = [ "airflow.exceptions.RemovedInAirflow3Warning", "airflow.utils.context.AirflowContextDeprecationWarning", "airflow.exceptions.AirflowProviderDeprecationWarning", + # TODO uncomment before airflow3 "airflow.exceptions.RemovedInAirflow3SoftWarning" ] python_files = [ "test_*.py", diff --git a/tests/decorators/test_sensor.py b/tests/decorators/test_sensor.py index 77852f34f7262..b776e25c09eaa 100644 --- a/tests/decorators/test_sensor.py +++ b/tests/decorators/test_sensor.py @@ -24,7 +24,12 @@ from airflow.exceptions import AirflowSensorTimeout from airflow.models import XCom from airflow.sensors.base import PokeReturnValue +from tests.test_utils.compat import ignore_provider_compatibility_error + +with ignore_provider_compatibility_error("2.10.0", __file__): + from airflow.sensors.base import FailPolicy from airflow.utils.state import State +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS pytestmark = pytest.mark.db_test @@ -141,8 +146,9 @@ def dummy_f(): if ti.task_id == "dummy_f": assert ti.state == State.NONE - def test_basic_sensor_soft_fail(self, dag_maker): - @task.sensor(timeout=0, soft_fail=True) + @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="FailPolicy present from Airflow 2.10.0") + def test_basic_sensor_skip_on_timeout(self, dag_maker): + @task.sensor(timeout=0, fail_policy=FailPolicy.SKIP_ON_TIMEOUT) def sensor_f(): return PokeReturnValue(is_done=False, xcom_value="xcom_value") @@ -165,8 +171,9 @@ def dummy_f(): if ti.task_id == "dummy_f": assert ti.state == State.NONE - def test_basic_sensor_soft_fail_returns_bool(self, dag_maker): - @task.sensor(timeout=0, soft_fail=True) + @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="FailPolicy present from Airflow 2.10.0") + def test_basic_sensor_skip_on_timeout_returns_bool(self, dag_maker): + @task.sensor(timeout=0, fail_policy=FailPolicy.SKIP_ON_TIMEOUT) def sensor_f(): return False diff --git a/tests/providers/amazon/aws/sensors/test_batch.py b/tests/providers/amazon/aws/sensors/test_batch.py index 267aeb998f871..11fe3bfd3cd45 100644 --- a/tests/providers/amazon/aws/sensors/test_batch.py +++ b/tests/providers/amazon/aws/sensors/test_batch.py @@ -28,6 +28,7 @@ BatchSensor, ) from airflow.providers.amazon.aws.triggers.batch import BatchJobTrigger +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS TASK_ID = "batch_job_sensor" JOB_ID = "8222a1c2-b246-4e19-b1b8-0039bb4407c0" @@ -100,14 +101,25 @@ def test_execute_failure_in_deferrable_mode(self, deferrable_batch_sensor: Batch with pytest.raises(AirflowException): deferrable_batch_sensor.execute_complete(context={}, event={"status": "failure"}) - def test_execute_failure_in_deferrable_mode_with_soft_fail(self, deferrable_batch_sensor: BatchSensor): - """Tests that an AirflowSkipException is raised in case of error event and soft_fail is set to True""" - deferrable_batch_sensor.soft_fail = True + def test_execute_failure_in_deferrable_mode_with_fail_policy(self): + """Tests that an AirflowSkipException is raised in case of error event and fail_policy is set to True""" + + args = {} + if AIRFLOW_V_2_10_PLUS: + from airflow.sensors.base import FailPolicy + + args["fail_policy"] = FailPolicy.SKIP_ON_TIMEOUT + else: + args["soft_fail"] = True + deferrable_batch_sensor = BatchSensor( + task_id="task", job_id=JOB_ID, region_name=AWS_REGION, deferrable=True, **args + ) + with pytest.raises(AirflowSkipException): deferrable_batch_sensor.execute_complete(context={}, event={"status": "failure"}) @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + "catch_mode, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) ) @pytest.mark.parametrize( "state, error_message", @@ -123,14 +135,24 @@ def test_execute_failure_in_deferrable_mode_with_soft_fail(self, deferrable_batc def test_fail_poke( self, mock_get_job_description, - batch_sensor: BatchSensor, state, error_message, - soft_fail, + catch_mode, expected_exception, ): + args = {} + if AIRFLOW_V_2_10_PLUS: + from airflow.sensors.base import FailPolicy + + if catch_mode: + args["fail_policy"] = FailPolicy.SKIP_ON_TIMEOUT + else: + args["fail_policy"] = FailPolicy.NONE + else: + args["soft_fail"] = catch_mode + mock_get_job_description.return_value = {"status": state} - batch_sensor.soft_fail = soft_fail + batch_sensor = BatchSensor(task_id="batch_job_sensor", job_id=JOB_ID, **args) with pytest.raises(expected_exception, match=error_message): batch_sensor.poke({}) @@ -203,7 +225,7 @@ def test_poke_invalid( assert "AWS Batch compute environment failed" in str(ctx.value) @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + "catch_mode, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) ) @pytest.mark.parametrize( "compute_env, error_message", @@ -219,14 +241,29 @@ def test_poke_invalid( def test_fail_poke( self, mock_batch_client, - batch_compute_environment_sensor: BatchComputeEnvironmentSensor, compute_env, error_message, - soft_fail, + catch_mode, expected_exception, ): + args = {} + if AIRFLOW_V_2_10_PLUS: + from airflow.sensors.base import FailPolicy + + if catch_mode: + args["fail_policy"] = FailPolicy.SKIP_ON_TIMEOUT + else: + args["fail_policy"] = FailPolicy.NONE + else: + args["soft_fail"] = catch_mode + mock_batch_client.describe_compute_environments.return_value = {"computeEnvironments": compute_env} - batch_compute_environment_sensor.soft_fail = soft_fail + batch_compute_environment_sensor = BatchComputeEnvironmentSensor( + task_id="test_batch_compute_environment_sensor", + compute_environment=ENVIRONMENT_NAME, + **args, + ) + with pytest.raises(expected_exception, match=error_message): batch_compute_environment_sensor.poke({}) @@ -300,21 +337,34 @@ def test_poke_invalid(self, mock_batch_client, batch_job_queue_sensor: BatchJobQ assert "AWS Batch job queue failed" in str(ctx.value) @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + "catch_mode, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) ) @pytest.mark.parametrize("job_queue", ([], [{"status": "UNKNOWN_STATUS"}])) @mock.patch.object(BatchClientHook, "client") def test_fail_poke( self, mock_batch_client, - batch_job_queue_sensor: BatchJobQueueSensor, job_queue, - soft_fail, + catch_mode, expected_exception, ): + args = {} + if AIRFLOW_V_2_10_PLUS: + from airflow.sensors.base import FailPolicy + + if catch_mode: + args["fail_policy"] = FailPolicy.SKIP_ON_TIMEOUT + else: + args["fail_policy"] = FailPolicy.NONE + else: + args["soft_fail"] = catch_mode + mock_batch_client.describe_job_queues.return_value = {"jobQueues": job_queue} + batch_job_queue_sensor = BatchJobQueueSensor( + task_id="test_batch_job_queue_sensor", job_queue=JOB_QUEUE, **args + ) batch_job_queue_sensor.treat_non_existing_as_deleted = False - batch_job_queue_sensor.soft_fail = soft_fail + message = "AWS Batch job queue" with pytest.raises(expected_exception, match=message): batch_job_queue_sensor.poke({}) diff --git a/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py b/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py index c35d84e7fa5af..f265eabf782e1 100644 --- a/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py +++ b/tests/providers/amazon/aws/sensors/test_emr_serverless_application.py @@ -23,17 +23,26 @@ from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS class TestEmrServerlessApplicationSensor: - def setup_method(self): + def setup_method(self, args, optional_arg=None): self.app_id = "vzwemreks" self.job_run_id = "job1234" - self.sensor = EmrServerlessApplicationSensor( - task_id="test_emrcontainer_sensor", - application_id=self.app_id, - aws_conn_id="aws_default", - ) + if optional_arg: + self.sensor = EmrServerlessApplicationSensor( + task_id="test_emrcontainer_sensor", + application_id=self.app_id, + aws_conn_id="aws_default", + **optional_arg, + ) + else: + self.sensor = EmrServerlessApplicationSensor( + task_id="test_emrcontainer_sensor", + application_id=self.app_id, + aws_conn_id="aws_default", + ) def set_get_application_return_value(self, return_value: dict[str, str]): self.mock_hook = MagicMock() @@ -78,8 +87,18 @@ def test_poke_raises_airflow_exception_with_failure_states(self, state): class TestPokeRaisesAirflowSkipException(TestEmrServerlessApplicationSensor): + def setup_method(self, args, optional_arg=None): + optional_arg = {} + if AIRFLOW_V_2_10_PLUS: + from airflow.sensors.base import FailPolicy + + optional_arg["fail_policy"] = FailPolicy.SKIP_ON_TIMEOUT + else: + optional_arg["soft_fail"] = True + + super().setup_method(args, optional_arg) + def test_when_state_is_failed_and_soft_fail_is_true_poke_should_raise_skip_exception(self): - self.sensor.soft_fail = True self.set_get_application_return_value( {"application": {"state": "STOPPED", "stateDetails": "mock stopped"}} ) @@ -87,4 +106,3 @@ def test_when_state_is_failed_and_soft_fail_is_true_poke_should_raise_skip_excep self.sensor.poke(None) assert "EMR Serverless application failed: mock stopped" == str(ctx.value) self.assert_get_application_was_called_once_with_app_id() - self.sensor.soft_fail = False diff --git a/tests/providers/amazon/aws/sensors/test_emr_serverless_job.py b/tests/providers/amazon/aws/sensors/test_emr_serverless_job.py index 299efe3fd277e..19dc06141cce1 100644 --- a/tests/providers/amazon/aws/sensors/test_emr_serverless_job.py +++ b/tests/providers/amazon/aws/sensors/test_emr_serverless_job.py @@ -23,18 +23,28 @@ from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.amazon.aws.sensors.emr import EmrServerlessJobSensor +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS class TestEmrServerlessJobSensor: - def setup_method(self): + def setup_method(self, args, optional_arg=None): self.app_id = "vzwemreks" self.job_run_id = "job1234" - self.sensor = EmrServerlessJobSensor( - task_id="test_emrcontainer_sensor", - application_id=self.app_id, - job_run_id=self.job_run_id, - aws_conn_id="aws_default", - ) + if optional_arg: + self.sensor = EmrServerlessJobSensor( + task_id="test_emrcontainer_sensor", + application_id=self.app_id, + job_run_id=self.job_run_id, + aws_conn_id="aws_default", + **optional_arg, + ) + else: + self.sensor = EmrServerlessJobSensor( + task_id="test_emrcontainer_sensor", + application_id=self.app_id, + job_run_id=self.job_run_id, + aws_conn_id="aws_default", + ) def set_get_job_run_return_value(self, return_value: dict[str, str]): self.mock_hook = MagicMock() @@ -81,11 +91,20 @@ def test_poke_raises_airflow_exception_with_specified_states(self, state): class TestPokeRaisesAirflowSkipException(TestEmrServerlessJobSensor): + def setup_method(self, args, optional_arg=None): + optional_arg = {} + if AIRFLOW_V_2_10_PLUS: + from airflow.sensors.base import FailPolicy + + optional_arg["fail_policy"] = FailPolicy.SKIP_ON_TIMEOUT + else: + optional_arg["soft_fail"] = True + + super().setup_method(args, optional_arg) + def test_when_state_is_failed_and_soft_fail_is_true_poke_should_raise_skip_exception(self): - self.sensor.soft_fail = True self.set_get_job_run_return_value({"jobRun": {"state": "FAILED", "stateDetails": "mock failed"}}) with pytest.raises(AirflowSkipException) as ctx: self.sensor.poke(None) assert "EMR Serverless job failed: mock failed" == str(ctx.value) self.assert_get_job_run_was_called_once_with_app_and_run_id() - self.sensor.soft_fail = False diff --git a/tests/providers/amazon/aws/sensors/test_s3.py b/tests/providers/amazon/aws/sensors/test_s3.py index fd70f7134a7e3..170be0d7517f1 100644 --- a/tests/providers/amazon/aws/sensors/test_s3.py +++ b/tests/providers/amazon/aws/sensors/test_s3.py @@ -24,7 +24,7 @@ import time_machine from moto import mock_aws -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException from airflow.models import DAG, DagRun, TaskInstance from airflow.models.variable import Variable from airflow.providers.amazon.aws.hooks.s3 import S3Hook @@ -274,7 +274,7 @@ def check_fn(files: list) -> bool: ) @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowException)) ) def test_fail_execute_complete(self, soft_fail, expected_exception): op = S3KeySensor( @@ -516,7 +516,7 @@ def test_poke_succeeds_on_upload_complete(self, mock_hook, time_machine): assert self.sensor.poke(dict()) @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowException)) ) def test_fail_is_keys_unchanged(self, soft_fail, expected_exception): op = S3KeysUnchangedSensor(task_id="sensor", bucket_name="test-bucket", prefix="test-prefix/path") @@ -529,7 +529,7 @@ def test_fail_is_keys_unchanged(self, soft_fail, expected_exception): op.is_keys_unchanged(current_objects=current_objects) @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowException)) ) def test_fail_execute_complete(self, soft_fail, expected_exception): op = S3KeysUnchangedSensor(task_id="sensor", bucket_name="test-bucket", prefix="test-prefix/path") diff --git a/tests/providers/ftp/sensors/test_ftp.py b/tests/providers/ftp/sensors/test_ftp.py index 107b2323717bc..21364405f2ad0 100644 --- a/tests/providers/ftp/sensors/test_ftp.py +++ b/tests/providers/ftp/sensors/test_ftp.py @@ -22,9 +22,10 @@ import pytest -from airflow.exceptions import AirflowSkipException +from airflow.exceptions import AirflowSensorTimeout, AirflowSkipException from airflow.providers.ftp.hooks.ftp import FTPHook from airflow.providers.ftp.sensors.ftp import FTPSensor +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS class TestFTPSensor: @@ -52,10 +53,10 @@ def test_poke_fails_due_error(self, mock_hook): "530: Login authentication failed" ) - with pytest.raises(error_perm) as ctx: + with pytest.raises(AirflowSensorTimeout) as ctx: op.execute(None) - assert "530" in str(ctx.value) + assert "530" in str(ctx.value.__cause__) @mock.patch("airflow.providers.ftp.sensors.ftp.FTPHook", spec=FTPHook) def test_poke_fail_on_transient_error(self, mock_hook): @@ -65,14 +66,27 @@ def test_poke_fail_on_transient_error(self, mock_hook): "434: Host unavailable" ) - with pytest.raises(error_perm) as ctx: + with pytest.raises(AirflowSensorTimeout) as ctx: op.execute(None) - assert "434" in str(ctx.value) + assert "434" in str(ctx.value.__cause__) @mock.patch("airflow.providers.ftp.sensors.ftp.FTPHook", spec=FTPHook) def test_poke_fail_on_transient_error_and_skip(self, mock_hook): - op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp", task_id="test_task", soft_fail=True) + args = {} + if AIRFLOW_V_2_10_PLUS: + from airflow.sensors.base import FailPolicy + + args["fail_policy"] = FailPolicy.SKIP_ON_TIMEOUT + else: + args["soft_fail"] = True + + op = FTPSensor( + path="foobar.json", + ftp_conn_id="bob_ftp", + task_id="test_task", + **args, + ) mock_hook.return_value.__enter__.return_value.get_mod_time.side_effect = error_perm( "434: Host unavailable" diff --git a/tests/providers/http/sensors/test_http.py b/tests/providers/http/sensors/test_http.py index 4e95c844058fa..71e732a0d3ce3 100644 --- a/tests/providers/http/sensors/test_http.py +++ b/tests/providers/http/sensors/test_http.py @@ -23,12 +23,18 @@ import pytest import requests -from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException, TaskDeferred +from airflow.exceptions import ( + AirflowException, + AirflowSensorTimeout, + AirflowSkipException, + TaskDeferred, +) from airflow.models.dag import DAG from airflow.providers.http.operators.http import HttpOperator from airflow.providers.http.sensors.http import HttpSensor from airflow.providers.http.triggers.http import HttpSensorTrigger from airflow.utils.timezone import datetime +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS pytestmark = pytest.mark.db_test @@ -66,16 +72,24 @@ def resp_check(_): task.execute(context={}) @patch("airflow.providers.http.hooks.http.requests.Session.send") - def test_poke_exception_with_soft_fail(self, mock_session_send, create_task_of_operator): + def test_poke_exception_with_skip_on_timeout(self, mock_session_send, create_task_of_operator): """ - Exception occurs in poke function should be skipped if soft_fail is True. + Exception occurs in poke function should be skipped if skip_on_timeout. """ response = requests.Response() response.status_code = 200 mock_session_send.return_value = response def resp_check(_): - raise AirflowException("AirflowException raised here!") + raise AirflowSensorTimeout("AirflowSensorTimeout raised here!") + + args = {} + if AIRFLOW_V_2_10_PLUS: + from airflow.sensors.base import FailPolicy + + args["fail_policy"] = FailPolicy.SKIP_ON_TIMEOUT + else: + args["soft_fail"] = True task = create_task_of_operator( HttpSensor, @@ -87,7 +101,7 @@ def resp_check(_): response_check=resp_check, timeout=5, poke_interval=1, - soft_fail=True, + **args, ) with pytest.raises(AirflowSkipException): task.execute(context={}) diff --git a/tests/providers/sftp/sensors/test_sftp.py b/tests/providers/sftp/sensors/test_sftp.py index 25add45e153fb..4c0e1d40611cb 100644 --- a/tests/providers/sftp/sensors/test_sftp.py +++ b/tests/providers/sftp/sensors/test_sftp.py @@ -25,9 +25,10 @@ from paramiko.sftp import SFTP_FAILURE, SFTP_NO_SUCH_FILE from pendulum import datetime as pendulum_datetime, timezone -from airflow.exceptions import AirflowSkipException +from airflow.exceptions import AirflowSensorTimeout from airflow.providers.sftp.sensors.sftp import SFTPSensor from airflow.sensors.base import PokeReturnValue +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS # Ignore missing args provided by default_args # mypy: disable-error-code="arg-type" @@ -53,14 +54,22 @@ def test_file_absent(self, sftp_hook_mock): assert not output @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, OSError), (True, AirflowSkipException)) + "catch_mode, expected_exception", ((False, AirflowSensorTimeout), (True, AirflowSensorTimeout)) ) @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") - def test_sftp_failure(self, sftp_hook_mock, soft_fail: bool, expected_exception): + def test_sftp_failure(self, sftp_hook_mock, catch_mode, expected_exception): sftp_hook_mock.return_value.get_mod_time.side_effect = OSError(SFTP_FAILURE, "SFTP failure") - sftp_sensor = SFTPSensor( - task_id="unit_test", path="/path/to/file/1970-01-01.txt", soft_fail=soft_fail - ) + args = {} + if AIRFLOW_V_2_10_PLUS: + from airflow.sensors.base import FailPolicy + + if catch_mode: + args["fail_policy"] = FailPolicy.SKIP_ON_TIMEOUT + else: + args["fail_policy"] = FailPolicy.NONE + else: + args["soft_fail"] = catch_mode + sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/1970-01-01.txt", **args) context = {"ds": "1970-01-01"} with pytest.raises(expected_exception): sftp_sensor.poke(context) diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index 79b88eb40dbe6..7ca2b6cb7c01e 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -27,6 +27,7 @@ from airflow.exceptions import ( AirflowException, AirflowFailException, + AirflowPokeFailException, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, @@ -51,7 +52,7 @@ from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import LocalKubernetesExecutor -from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, poke_mode_only +from airflow.sensors.base import BaseSensorOperator, FailPolicy, PokeReturnValue, poke_mode_only from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep from airflow.utils import timezone from airflow.utils.session import create_session @@ -95,7 +96,7 @@ def __init__(self, return_value=False, **kwargs): self.return_value = return_value def execute_complete(self, context, event=None): - raise AirflowException("Should be skipped") + raise AirflowException() class DummySensorWithXcomValue(BaseSensorOperator): @@ -178,8 +179,8 @@ def test_fail(self, make_sensor): if ti.task_id == DUMMY_OP: assert ti.state == State.NONE - def test_soft_fail(self, make_sensor): - sensor, dr = make_sensor(False, soft_fail=True) + def test_skip_on_timeout(self, make_sensor): + sensor, dr = make_sensor(False, fail_policy=FailPolicy.SKIP_ON_TIMEOUT) self._run(sensor) tis = dr.get_task_instances() @@ -194,8 +195,8 @@ def test_soft_fail(self, make_sensor): "exception_cls", (ValueError,), ) - def test_soft_fail_with_exception(self, make_sensor, exception_cls): - sensor, dr = make_sensor(False, soft_fail=True) + def test_skip_on_timeout_with_exception(self, make_sensor, exception_cls): + sensor, dr = make_sensor(False, fail_policy=FailPolicy.SKIP_ON_TIMEOUT) sensor.poke = Mock(side_effect=[exception_cls(None)]) with pytest.raises(ValueError): self._run(sensor) @@ -213,11 +214,11 @@ def test_soft_fail_with_exception(self, make_sensor, exception_cls): ( AirflowSensorTimeout, AirflowTaskTimeout, - AirflowFailException, + AirflowPokeFailException, ), ) - def test_soft_fail_with_skip_exception(self, make_sensor, exception_cls): - sensor, dr = make_sensor(False, soft_fail=True) + def test_skip_on_timeout_with_skip_exception(self, make_sensor, exception_cls): + sensor, dr = make_sensor(False, fail_policy=FailPolicy.SKIP_ON_TIMEOUT) sensor.poke = Mock(side_effect=[exception_cls(None)]) self._run(sensor) @@ -231,10 +232,10 @@ def test_soft_fail_with_skip_exception(self, make_sensor, exception_cls): @pytest.mark.parametrize( "exception_cls", - (AirflowSensorTimeout, AirflowTaskTimeout, AirflowFailException, Exception), + (AirflowSensorTimeout, AirflowTaskTimeout, AirflowFailException, AirflowPokeFailException, Exception), ) - def test_never_fail_with_skip_exception(self, make_sensor, exception_cls): - sensor, dr = make_sensor(False, never_fail=True) + def test_skip_on_any_error_with_skip_exception(self, make_sensor, exception_cls): + sensor, dr = make_sensor(False, fail_policy=FailPolicy.SKIP_ON_ANY_ERROR) sensor.poke = Mock(side_effect=[exception_cls(None)]) self._run(sensor) @@ -246,9 +247,12 @@ def test_never_fail_with_skip_exception(self, make_sensor, exception_cls): if ti.task_id == DUMMY_OP: assert ti.state == State.NONE - def test_soft_fail_with_retries(self, make_sensor): + def test_skip_on_timeout_with_retries(self, make_sensor): sensor, dr = make_sensor( - return_value=False, soft_fail=True, retries=1, retry_delay=timedelta(milliseconds=1) + return_value=False, + fail_policy=FailPolicy.SKIP_ON_TIMEOUT, + retries=1, + retry_delay=timedelta(milliseconds=1), ) # first run times out and task instance is skipped @@ -356,9 +360,13 @@ def _get_tis(): assert sensor_ti.state == State.FAILED assert dummy_ti.state == State.NONE - def test_soft_fail_with_reschedule(self, make_sensor, time_machine, session): + def test_skip_on_timeout_with_reschedule(self, make_sensor, time_machine, session): sensor, dr = make_sensor( - return_value=False, poke_interval=10, timeout=5, soft_fail=True, mode="reschedule" + return_value=False, + poke_interval=10, + timeout=5, + fail_policy=FailPolicy.SKIP_ON_TIMEOUT, + mode="reschedule", ) def _get_tis(): @@ -910,7 +918,7 @@ def test_reschedule_and_retry_timeout_and_silent_fail(self, make_sensor, time_ma retries=2, retry_delay=timedelta(seconds=3), mode="reschedule", - silent_fail=True, + fail_policy=FailPolicy.IGNORE_ERROR, ) def _get_sensor_ti(): @@ -1110,14 +1118,15 @@ def test_poke_mode_only_bad_poke(self): class TestAsyncSensor: @pytest.mark.parametrize( - "soft_fail, expected_exception", + "fail_policy, expected_exception", [ - (True, AirflowSkipException), - (False, AirflowException), + (FailPolicy.SKIP_ON_TIMEOUT, AirflowException), + (FailPolicy.SKIP_ON_ANY_ERROR, AirflowSkipException), + (FailPolicy.NONE, AirflowException), ], ) - def test_fail_after_resuming_deferred_sensor(self, soft_fail, expected_exception): - async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", soft_fail=soft_fail) + def test_fail_after_resuming_deferred_sensor(self, fail_policy, expected_exception): + async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", fail_policy=fail_policy) ti = TaskInstance(task=async_sensor) ti.next_method = "execute_complete" with pytest.raises(expected_exception): diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 58e25a3de0d04..ba92a3842d742 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -30,7 +30,13 @@ from airflow import exceptions, settings from airflow.decorators import task as task_deco -from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException, TaskDeferred +from airflow.exceptions import ( + AirflowException, + AirflowPokeFailException, + AirflowSensorTimeout, + AirflowSkipException, + TaskDeferred, +) from airflow.models import DagBag, DagRun, TaskInstance from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel @@ -38,6 +44,11 @@ from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator +from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS, ignore_provider_compatibility_error + +with ignore_provider_compatibility_error("2.10.0", __file__): + from airflow.sensors.base import FailPolicy + from airflow.sensors.external_task import ( ExternalTaskMarker, ExternalTaskSensor, @@ -246,7 +257,7 @@ def test_external_task_group_not_exists_without_check_existence(self): dag=self.dag, poke_interval=0.1, ) - with pytest.raises(AirflowException, match="Sensor has timed out"): + with pytest.raises(AirflowSensorTimeout, match="Sensor has timed out"): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_external_task_group_sensor_success(self): @@ -273,13 +284,13 @@ def test_external_task_group_sensor_failed_states(self): dag=self.dag, ) with pytest.raises( - AirflowException, + AirflowPokeFailException, match=f"The external task_group '{TEST_TASK_GROUP_ID}' in DAG '{TEST_DAG_ID}' failed.", ): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_catch_overlap_allowed_failed_state(self): - with pytest.raises(AirflowException): + with pytest.raises(ValueError): ExternalTaskSensor( task_id="test_external_task_sensor_check", external_dag_id=TEST_DAG_ID, @@ -323,13 +334,14 @@ def test_external_task_sensor_failed_states_as_success(self, caplog): error_message = rf"Some of the external tasks \['{TEST_TASK_ID}'\] in DAG {TEST_DAG_ID} failed\." with caplog.at_level(logging.INFO, logger=op.log.name): caplog.clear() - with pytest.raises(AirflowException, match=error_message): + with pytest.raises(AirflowPokeFailException, match=error_message): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) assert ( f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... " ) in caplog.messages - def test_external_task_sensor_soft_fail_failed_states_as_skipped(self): + @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="FailPolicy present from Airflow 2.10.0") + def test_external_task_sensor_skip_on_timeout_failed_states_as_skipped(self): self.add_time_sensor() op = ExternalTaskSensor( task_id="test_external_task_sensor_check", @@ -337,7 +349,7 @@ def test_external_task_sensor_soft_fail_failed_states_as_skipped(self): external_task_id=TEST_TASK_ID, allowed_states=[State.FAILED], failed_states=[State.SUCCESS], - soft_fail=True, + fail_policy=FailPolicy.SKIP_ON_TIMEOUT, dag=self.dag, ) @@ -426,7 +438,7 @@ def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, c ) with caplog.at_level(logging.INFO, logger=op.log.name): caplog.clear() - with pytest.raises(AirflowException, match=error_message): + with pytest.raises(AirflowPokeFailException, match=error_message): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) assert ( f"Poking for tasks ['{TEST_TASK_ID}', '{TEST_TASK_ID_ALTERNATE}'] " @@ -467,7 +479,8 @@ def test_external_dag_sensor_log(self, caplog): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) assert (f"Poking for DAG 'other_dag' on {DEFAULT_DATE.isoformat()} ... ") in caplog.messages - def test_external_dag_sensor_soft_fail_as_skipped(self): + @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="FailPolicy present from Airflow 2.10.0") + def test_external_dag_sensor_skip_on_timeout_as_skipped(self): other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once") other_dag.create_dagrun( run_id="test", @@ -482,7 +495,7 @@ def test_external_dag_sensor_soft_fail_as_skipped(self): external_task_id=None, allowed_states=[State.FAILED], failed_states=[State.SUCCESS], - soft_fail=True, + fail_policy=FailPolicy.SKIP_ON_TIMEOUT, dag=self.dag, ) @@ -589,12 +602,12 @@ def test_external_task_sensor_fn_multiple_execution_dates(self): dag=dag, ) - # We need to test for an AirflowException explicitly since + # We need to test for an AirflowPokeFailException explicitly since # AirflowSensorTimeout is a subclass that will be raised if this does # not execute properly. - with pytest.raises(AirflowException) as ex_ctx: + with pytest.raises(AirflowPokeFailException) as ex_ctx: task_chain_with_failure.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert type(ex_ctx.value) is AirflowException + assert type(ex_ctx.value) is AirflowPokeFailException def test_external_task_sensor_delta(self): self.add_time_sensor() @@ -829,7 +842,7 @@ def test_external_task_group_with_mapped_tasks_failed_states(self): dag=self.dag, ) with pytest.raises( - AirflowException, + AirflowPokeFailException, match=f"The external task_group '{TEST_TASK_GROUP_ID}' in DAG '{TEST_DAG_ID}' failed.", ): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) @@ -854,6 +867,7 @@ def test_external_task_group_when_there_is_no_TIs(self): ignore_ti_state=True, ) + @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="FailPolicy present from Airflow 2.10.0") @pytest.mark.parametrize( "kwargs, expected_message", ( @@ -880,14 +894,14 @@ def test_external_task_group_when_there_is_no_TIs(self): ), ) @pytest.mark.parametrize( - "soft_fail, expected_exception", + "fail_policy, expected_exception", ( ( - False, - AirflowException, + FailPolicy.NONE, + AirflowPokeFailException, ), ( - True, + FailPolicy.SKIP_ON_TIMEOUT, AirflowSkipException, ), ), @@ -895,7 +909,7 @@ def test_external_task_group_when_there_is_no_TIs(self): @mock.patch("airflow.sensors.external_task.ExternalTaskSensor.get_count") @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter") def test_fail_poke( - self, _get_dttm_filter, get_count, soft_fail, expected_exception, kwargs, expected_message + self, _get_dttm_filter, get_count, fail_policy, expected_exception, kwargs, expected_message ): _get_dttm_filter.return_value = [] get_count.return_value = 1 @@ -904,13 +918,16 @@ def test_fail_poke( external_dag_id=TEST_DAG_ID, allowed_states=["success"], dag=self.dag, - soft_fail=soft_fail, + fail_policy=fail_policy, deferrable=False, **kwargs, ) + if fail_policy == FailPolicy.SKIP_ON_TIMEOUT: + expected_message = "Skipping due fail_policy set to SKIP_ON_TIMEOUT." with pytest.raises(expected_exception, match=expected_message): op.execute(context={}) + @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="FailPolicy present from Airflow 2.10.0") @pytest.mark.parametrize( "response_get_current, response_exists, kwargs, expected_message", ( @@ -937,15 +954,15 @@ def test_fail_poke( ), ) @pytest.mark.parametrize( - "soft_fail, expected_exception", + "fail_policy, expected_exception", ( ( - False, - AirflowException, + FailPolicy.NONE, + AirflowPokeFailException, ), ( - True, - AirflowException, + FailPolicy.SKIP_ON_TIMEOUT, + AirflowSkipException, ), ), ) @@ -959,7 +976,7 @@ def test_fail__check_for_existence( exists, get_dag, _get_dttm_filter, - soft_fail, + fail_policy, expected_exception, response_get_current, response_exists, @@ -978,10 +995,12 @@ def test_fail__check_for_existence( external_dag_id=TEST_DAG_ID, allowed_states=["success"], dag=self.dag, - soft_fail=soft_fail, + fail_policy=fail_policy, check_existence=True, **kwargs, ) + if fail_policy == FailPolicy.SKIP_ON_TIMEOUT: + expected_message = "Skipping due fail_policy set to SKIP_ON_TIMEOUT." with pytest.raises(expected_exception, match=expected_message): op.execute(context={}) @@ -1010,7 +1029,7 @@ def test_defer_and_fire_task_state_trigger(self): assert isinstance(exc.value.trigger, WorkflowTrigger), "Trigger is not a WorkflowTrigger" def test_defer_and_fire_failed_state_trigger(self): - """Tests that an AirflowException is raised in case of error event""" + """Tests that an AirflowPokeFailException is raised in case of error event""" sensor = ExternalTaskSensor( task_id=TASK_ID, external_task_id=EXTERNAL_TASK_ID, @@ -1018,13 +1037,13 @@ def test_defer_and_fire_failed_state_trigger(self): deferrable=True, ) - with pytest.raises(AirflowException): + with pytest.raises(AirflowPokeFailException): sensor.execute_complete( context=mock.MagicMock(), event={"status": "error", "message": "test failure message"} ) def test_defer_and_fire_timeout_state_trigger(self): - """Tests that an AirflowException is raised in case of timeout event""" + """Tests that an AirflowPokeFailException is raised in case of timeout event""" sensor = ExternalTaskSensor( task_id=TASK_ID, external_task_id=EXTERNAL_TASK_ID, @@ -1032,7 +1051,7 @@ def test_defer_and_fire_timeout_state_trigger(self): deferrable=True, ) - with pytest.raises(AirflowException): + with pytest.raises(AirflowPokeFailException): sensor.execute_complete( context=mock.MagicMock(), event={"status": "timeout", "message": "Dag was not started within 1 minute, assuming fail."},