Skip to content

Commit

Permalink
Minor improvements to TaskSDK's WatchedSubprocess (#45578)
Browse files Browse the repository at this point in the history
Follow-up of #45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`
  • Loading branch information
kaxil authored Jan 11, 2025
1 parent 6844cce commit 168f765
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
2 changes: 1 addition & 1 deletion airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def start( # type: ignore[override]
target: Callable[[], None] = _parse_file_entrypoint,
**kwargs,
) -> Self:
proc = super().start(target=target, **kwargs)
proc: Self = super().start(target=target, **kwargs)
proc._on_child_started(callbacks, path)
return proc

Expand Down
23 changes: 17 additions & 6 deletions task_sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,28 @@ def exit(n: int) -> NoReturn:

@attrs.define(kw_only=True)
class WatchedSubprocess:
"""
Base class for managing subprocesses in Airflow's TaskSDK.
This class handles common functionalities required for subprocess management, such as
socket handling, process monitoring, and request handling.
"""

id: UUID

pid: int
"""The process ID of the child process"""

stdin: BinaryIO
"""The handle connected to stdin of the child process"""

decoder: TypeAdapter
"""The decoder to use for incoming messages from the child process."""

_process: psutil.Process
_requests_fd: int
"""File descriptor for request handling."""

_num_open_sockets: int = 4
_exit_code: int | None = attrs.field(default=None, init=False)

Expand All @@ -308,7 +320,7 @@ def start(
logger: FilteringBoundLogger | None = None,
**constructor_kwargs,
) -> Self:
"""Fork and start a new subprocess to execute the given task."""
"""Fork and start a new subprocess with the specified target function."""
# Create socketpairs/"pipes" to connect to the stdin and out from the subprocess
child_stdin, feed_stdin = mkpipe(remote_read=True)
child_stdout, read_stdout = mkpipe()
Expand Down Expand Up @@ -538,6 +550,7 @@ def _check_subprocess_exit(self, raise_on_timeout: bool = False) -> int | None:
@attrs.define(kw_only=True)
class ActivitySubprocess(WatchedSubprocess):
client: Client
"""The HTTP client to use for communication with the API server."""

_terminal_state: str | None = attrs.field(default=None, init=False)
_final_state: str | None = attrs.field(default=None, init=False)
Expand Down Expand Up @@ -568,7 +581,8 @@ def start( # type: ignore[override]
logger: FilteringBoundLogger | None = None,
**kwargs,
) -> Self:
proc = super().start(id=what.id, client=client, target=target, logger=logger, **kwargs)
"""Fork and start a new subprocess to execute the given task."""
proc: Self = super().start(id=what.id, client=client, target=target, logger=logger, **kwargs)
# Tell the task process what it needs to do!
proc._on_child_started(what, path)
return proc
Expand Down Expand Up @@ -908,10 +922,7 @@ def supervise(
# If we are told to write logs to a file, redirect the task logger to it.
from airflow.sdk.log import init_log_file, logging_processors

try:
log_file = init_log_file(log_path)
except OSError as e:
log.warning("OSError while changing ownership of the log file. ", e)
log_file = init_log_file(log_path)

pretty_logs = False
if pretty_logs:
Expand Down

0 comments on commit 168f765

Please sign in to comment.