Skip to content

Commit 73b9a77

Browse files
Added detection of terminated pod during get_or_create_pod (#51158)
Co-authored-by: AutomationDev85 <AutomationDev85>
1 parent 8865dbd commit 73b9a77

File tree

2 files changed

+69
-4
lines changed
  • providers/cncf/kubernetes
    • src/airflow/providers/cncf/kubernetes/operators
    • tests/unit/cncf/kubernetes/operators

2 files changed

+69
-4
lines changed

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,26 @@ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> k8s
576576
if self.reattach_on_restart:
577577
pod = self.find_pod(pod_request_obj.metadata.namespace, context=context)
578578
if pod:
579-
return pod
579+
# If pod is terminated then delete the pod an create a new as not possible to get xcom
580+
pod_phase = (
581+
pod.status.phase if hasattr(pod, "status") and hasattr(pod.status, "phase") else None
582+
)
583+
if pod_phase and pod_phase not in (PodPhase.SUCCEEDED, PodPhase.FAILED):
584+
return pod
585+
586+
self.log.info(
587+
"Found terminated old matching pod %s with labels %s",
588+
pod.metadata.name,
589+
pod.metadata.labels,
590+
)
591+
592+
# if not required to delete the pod then keep old logic and not automatically create new pod
593+
deleted_pod = self.process_pod_deletion(pod)
594+
if not deleted_pod:
595+
return pod
596+
597+
self.log.info("Deleted pod to handle rerun and create new pod!")
598+
580599
self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
581600
self.pod_manager.create_pod(pod=pod_request_obj)
582601
return pod_request_obj
@@ -1067,7 +1086,7 @@ def kill_istio_sidecar(self, pod: V1Pod) -> None:
10671086
if self.KILL_ISTIO_PROXY_SUCCESS_MSG not in output_str:
10681087
raise AirflowException("Error while deleting istio-proxy sidecar: %s", output_str)
10691088

1070-
def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
1089+
def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True) -> bool:
10711090
with _optionally_suppress(reraise=reraise):
10721091
if pod is not None:
10731092
should_delete_pod = (self.on_finish_action == OnFinishAction.DELETE_POD) or (
@@ -1080,8 +1099,10 @@ def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
10801099
if should_delete_pod:
10811100
self.log.info("Deleting pod: %s", pod.metadata.name)
10821101
self.pod_manager.delete_pod(pod)
1083-
else:
1084-
self.log.info("Skipping deleting pod: %s", pod.metadata.name)
1102+
return True
1103+
self.log.info("Skipping deleting pod: %s", pod.metadata.name)
1104+
1105+
return False
10851106

10861107
def _build_find_pod_label_selector(self, context: Context | None = None, *, exclude_checked=True) -> str:
10871108
labels = {

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,50 @@ def test_omitted_namespace_no_conn_not_in_k8s(self, mock_find, mock_path):
648648
)
649649
mock_find.assert_called_once_with("default", context=context)
650650

651+
@pytest.mark.parametrize(
652+
"pod_phase",
653+
[
654+
PodPhase.SUCCEEDED,
655+
PodPhase.FAILED,
656+
PodPhase.RUNNING,
657+
],
658+
)
659+
@patch(f"{KPO_MODULE}.PodManager.create_pod")
660+
@patch(f"{KPO_MODULE}.KubernetesPodOperator.process_pod_deletion")
661+
@patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
662+
def test_get_or_create_pod_reattach_terminated(
663+
self, mock_find, mock_process_pod_deletion, mock_create_pod, pod_phase
664+
):
665+
"""Check that get_or_create_pod reattaches to existing pod."""
666+
k = KubernetesPodOperator(
667+
image="ubuntu:16.04",
668+
cmds=["bash", "-cx"],
669+
arguments=["echo 10"],
670+
task_id="task",
671+
name="hello",
672+
log_pod_spec_on_failure=False,
673+
)
674+
k.reattach_on_restart = True
675+
context = create_context(k)
676+
mock_pod_request_obj = MagicMock()
677+
mock_pod_request_obj.to_dict.return_value = {"metadata": {"name": "test-pod"}}
678+
679+
mock_found_pod = MagicMock()
680+
mock_found_pod.status.phase = pod_phase
681+
mock_find.return_value = mock_found_pod
682+
result = k.get_or_create_pod(
683+
pod_request_obj=mock_pod_request_obj,
684+
context=context,
685+
)
686+
if pod_phase == PodPhase.RUNNING:
687+
mock_create_pod.assert_not_called()
688+
mock_process_pod_deletion.assert_not_called()
689+
assert result == mock_found_pod
690+
else:
691+
mock_process_pod_deletion.assert_called_once_with(mock_found_pod)
692+
mock_create_pod.assert_called_once_with(pod=mock_pod_request_obj)
693+
assert result == mock_pod_request_obj
694+
651695
def test_xcom_sidecar_container_image_custom(self):
652696
image = "private.repo/alpine:3.13"
653697
with temp_override_attr(PodDefaults.SIDECAR_CONTAINER, "image", image):

0 commit comments

Comments
 (0)