From 168f76562e617cd8e8a41ec684bfda09db6b5705 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sat, 11 Jan 2025 22:24:00 +0530 Subject: [PATCH] Minor improvements to TaskSDK's WatchedSubprocess (#45578) Follow-up of https://github.com/apache/airflow/pull/45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`. Added/Updated docstrings since `WatchedSubprocess` is now a Base class. and removed redundant exception handling for `init_log_file` --- airflow/dag_processing/processor.py | 2 +- .../airflow/sdk/execution_time/supervisor.py | 23 ++++++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 583b858b585a6..f5623418c917d 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -203,7 +203,7 @@ def start( # type: ignore[override] target: Callable[[], None] = _parse_file_entrypoint, **kwargs, ) -> Self: - proc = super().start(target=target, **kwargs) + proc: Self = super().start(target=target, **kwargs) proc._on_child_started(callbacks, path) return proc diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py b/task_sdk/src/airflow/sdk/execution_time/supervisor.py index b4415e5728f33..002b156cee2db 100644 --- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py @@ -286,16 +286,28 @@ def exit(n: int) -> NoReturn: @attrs.define(kw_only=True) class WatchedSubprocess: + """ + Base class for managing subprocesses in Airflow's TaskSDK. + + This class handles common functionalities required for subprocess management, such as + socket handling, process monitoring, and request handling. + """ + id: UUID pid: int + """The process ID of the child process""" + stdin: BinaryIO """The handle connected to stdin of the child process""" decoder: TypeAdapter + """The decoder to use for incoming messages from the child process.""" _process: psutil.Process _requests_fd: int + """File descriptor for request handling.""" + _num_open_sockets: int = 4 _exit_code: int | None = attrs.field(default=None, init=False) @@ -308,7 +320,7 @@ def start( logger: FilteringBoundLogger | None = None, **constructor_kwargs, ) -> Self: - """Fork and start a new subprocess to execute the given task.""" + """Fork and start a new subprocess with the specified target function.""" # Create socketpairs/"pipes" to connect to the stdin and out from the subprocess child_stdin, feed_stdin = mkpipe(remote_read=True) child_stdout, read_stdout = mkpipe() @@ -538,6 +550,7 @@ def _check_subprocess_exit(self, raise_on_timeout: bool = False) -> int | None: @attrs.define(kw_only=True) class ActivitySubprocess(WatchedSubprocess): client: Client + """The HTTP client to use for communication with the API server.""" _terminal_state: str | None = attrs.field(default=None, init=False) _final_state: str | None = attrs.field(default=None, init=False) @@ -568,7 +581,8 @@ def start( # type: ignore[override] logger: FilteringBoundLogger | None = None, **kwargs, ) -> Self: - proc = super().start(id=what.id, client=client, target=target, logger=logger, **kwargs) + """Fork and start a new subprocess to execute the given task.""" + proc: Self = super().start(id=what.id, client=client, target=target, logger=logger, **kwargs) # Tell the task process what it needs to do! proc._on_child_started(what, path) return proc @@ -908,10 +922,7 @@ def supervise( # If we are told to write logs to a file, redirect the task logger to it. from airflow.sdk.log import init_log_file, logging_processors - try: - log_file = init_log_file(log_path) - except OSError as e: - log.warning("OSError while changing ownership of the log file. ", e) + log_file = init_log_file(log_path) pretty_logs = False if pretty_logs: