Skip to content

Commit 8449378

Browse files
kaxildauinh
authored andcommitted
Minor improvements to TaskSDK's WatchedSubprocess (apache#45578)
Follow-up of apache#45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`. Added/Updated docstrings since `WatchedSubprocess` is now a Base class. and removed redundant exception handling for `init_log_file`
1 parent f954f0a commit 8449378

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

airflow/dag_processing/processor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ def start( # type: ignore[override]
203203
target: Callable[[], None] = _parse_file_entrypoint,
204204
**kwargs,
205205
) -> Self:
206-
proc = super().start(target=target, **kwargs)
206+
proc: Self = super().start(target=target, **kwargs)
207207
proc._on_child_started(callbacks, path)
208208
return proc
209209

task_sdk/src/airflow/sdk/execution_time/supervisor.py

+17-6
Original file line numberDiff line numberDiff line change
@@ -286,16 +286,28 @@ def exit(n: int) -> NoReturn:
286286

287287
@attrs.define(kw_only=True)
288288
class WatchedSubprocess:
289+
"""
290+
Base class for managing subprocesses in Airflow's TaskSDK.
291+
292+
This class handles common functionalities required for subprocess management, such as
293+
socket handling, process monitoring, and request handling.
294+
"""
295+
289296
id: UUID
290297

291298
pid: int
299+
"""The process ID of the child process"""
300+
292301
stdin: BinaryIO
293302
"""The handle connected to stdin of the child process"""
294303

295304
decoder: TypeAdapter
305+
"""The decoder to use for incoming messages from the child process."""
296306

297307
_process: psutil.Process
298308
_requests_fd: int
309+
"""File descriptor for request handling."""
310+
299311
_num_open_sockets: int = 4
300312
_exit_code: int | None = attrs.field(default=None, init=False)
301313

@@ -308,7 +320,7 @@ def start(
308320
logger: FilteringBoundLogger | None = None,
309321
**constructor_kwargs,
310322
) -> Self:
311-
"""Fork and start a new subprocess to execute the given task."""
323+
"""Fork and start a new subprocess with the specified target function."""
312324
# Create socketpairs/"pipes" to connect to the stdin and out from the subprocess
313325
child_stdin, feed_stdin = mkpipe(remote_read=True)
314326
child_stdout, read_stdout = mkpipe()
@@ -538,6 +550,7 @@ def _check_subprocess_exit(self, raise_on_timeout: bool = False) -> int | None:
538550
@attrs.define(kw_only=True)
539551
class ActivitySubprocess(WatchedSubprocess):
540552
client: Client
553+
"""The HTTP client to use for communication with the API server."""
541554

542555
_terminal_state: str | None = attrs.field(default=None, init=False)
543556
_final_state: str | None = attrs.field(default=None, init=False)
@@ -568,7 +581,8 @@ def start( # type: ignore[override]
568581
logger: FilteringBoundLogger | None = None,
569582
**kwargs,
570583
) -> Self:
571-
proc = super().start(id=what.id, client=client, target=target, logger=logger, **kwargs)
584+
"""Fork and start a new subprocess to execute the given task."""
585+
proc: Self = super().start(id=what.id, client=client, target=target, logger=logger, **kwargs)
572586
# Tell the task process what it needs to do!
573587
proc._on_child_started(what, path)
574588
return proc
@@ -908,10 +922,7 @@ def supervise(
908922
# If we are told to write logs to a file, redirect the task logger to it.
909923
from airflow.sdk.log import init_log_file, logging_processors
910924

911-
try:
912-
log_file = init_log_file(log_path)
913-
except OSError as e:
914-
log.warning("OSError while changing ownership of the log file. ", e)
925+
log_file = init_log_file(log_path)
915926

916927
pretty_logs = False
917928
if pretty_logs:

0 commit comments

Comments
 (0)