Skip to content

Commit 75d56f1

Browse files
authored
Refactor Branch Operators to use BaseBranchOperator (#48979)
Instead of using `BranchMixIn`, we should use `BaseBranchOperator` -- otherwise why have that Base class!
1 parent 21b8621 commit 75d56f1

File tree

1 file changed

+11
-15
lines changed
  • providers/standard/src/airflow/providers/standard/operators

1 file changed

+11
-15
lines changed

providers/standard/src/airflow/providers/standard/operators/python.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@
5252
from airflow.utils.process_utils import execute_in_subprocess
5353

5454
if AIRFLOW_V_3_0_PLUS:
55-
from airflow.providers.standard.operators.branch import BranchMixIn
55+
from airflow.providers.standard.operators.branch import BaseBranchOperator
5656
from airflow.providers.standard.utils.skipmixin import SkipMixin
5757
else:
5858
from airflow.models.skipmixin import SkipMixin
59-
from airflow.operators.branch import BranchMixIn # type: ignore[no-redef]
59+
from airflow.operators.branch import BaseBranchOperator # type: ignore[no-redef]
6060

6161

6262
log = logging.getLogger(__name__)
@@ -233,7 +233,7 @@ def execute_callable(self) -> Any:
233233
return runner.run(*self.op_args, **self.op_kwargs)
234234

235235

236-
class BranchPythonOperator(PythonOperator, BranchMixIn):
236+
class BranchPythonOperator(BaseBranchOperator, PythonOperator):
237237
"""
238238
A workflow can "branch" or follow a path after the execution of this task.
239239
@@ -247,10 +247,8 @@ class BranchPythonOperator(PythonOperator, BranchMixIn):
247247
the DAG run's state to be inferred.
248248
"""
249249

250-
inherits_from_skipmixin = True
251-
252-
def execute(self, context: Context) -> Any:
253-
return self.do_branch(context, super().execute(context))
250+
def choose_branch(self, context: Context) -> str | Iterable[str]:
251+
return PythonOperator.execute(self, context)
254252

255253

256254
class ShortCircuitOperator(PythonOperator, SkipMixin):
@@ -855,7 +853,7 @@ def _iter_serializable_context_keys(self):
855853
yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
856854

857855

858-
class BranchPythonVirtualenvOperator(PythonVirtualenvOperator, BranchMixIn):
856+
class BranchPythonVirtualenvOperator(BaseBranchOperator, PythonVirtualenvOperator):
859857
"""
860858
A workflow can "branch" or follow a path after the execution of this task in a virtual environment.
861859
@@ -873,10 +871,8 @@ class BranchPythonVirtualenvOperator(PythonVirtualenvOperator, BranchMixIn):
873871
:ref:`howto/operator:BranchPythonVirtualenvOperator`
874872
"""
875873

876-
inherits_from_skipmixin = True
877-
878-
def execute(self, context: Context) -> Any:
879-
return self.do_branch(context, super().execute(context))
874+
def choose_branch(self, context: Context) -> str | Iterable[str]:
875+
return PythonVirtualenvOperator.execute(self, context)
880876

881877

882878
class ExternalPythonOperator(_BasePythonVirtualenvOperator):
@@ -1072,7 +1068,7 @@ def _get_airflow_version_from_target_env(self) -> str | None:
10721068
return None
10731069

10741070

1075-
class BranchExternalPythonOperator(ExternalPythonOperator, BranchMixIn):
1071+
class BranchExternalPythonOperator(BaseBranchOperator, ExternalPythonOperator):
10761072
"""
10771073
A workflow can "branch" or follow a path after the execution of this task.
10781074
@@ -1085,8 +1081,8 @@ class BranchExternalPythonOperator(ExternalPythonOperator, BranchMixIn):
10851081
:ref:`howto/operator:BranchExternalPythonOperator`
10861082
"""
10871083

1088-
def execute(self, context: Context) -> Any:
1089-
return self.do_branch(context, super().execute(context))
1084+
def choose_branch(self, context: Context) -> str | Iterable[str]:
1085+
return ExternalPythonOperator.execute(self, context)
10901086

10911087

10921088
def get_current_context() -> Mapping[str, Any]:

0 commit comments

Comments
 (0)