Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from airflow.exceptions import AirflowException

if TYPE_CHECKING:
from airflow.logging_config import RemoteLogIO
from airflow.logging_config import RemoteLogIO, RemoteLogStreamIO

LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()

Expand Down Expand Up @@ -119,7 +119,7 @@
##################

REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
REMOTE_TASK_LOG: RemoteLogIO | None = None
REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None
DEFAULT_REMOTE_CONN_ID: str | None = None


Expand Down
15 changes: 12 additions & 3 deletions airflow-core/src/airflow/logging/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING, Protocol
from typing import TYPE_CHECKING, Protocol, runtime_checkable

if TYPE_CHECKING:
import structlog.typing

from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo
from airflow.utils.log.file_task_handler import LogResponse, StreamingLogResponse


class RemoteLogIO(Protocol):
Expand All @@ -44,6 +44,15 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None:
"""Upload the given log path to the remote storage."""
...

def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]:
def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse:
"""Read logs from the given remote log path."""
...


@runtime_checkable
class RemoteLogStreamIO(RemoteLogIO, Protocol):
"""Interface for remote task loggers with stream-based read support."""

def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse:
"""Stream-based read interface for reading logs from the given remote log path."""
...
20 changes: 12 additions & 8 deletions airflow-core/src/airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,15 @@

# These types are similar, but have distinct names to make processing them less error prone
LogMessages: TypeAlias = list[str]
"""The legacy format of log messages before 3.0.2"""
"""The legacy format of log messages before 3.0.4"""
LogSourceInfo: TypeAlias = list[str]
"""Information _about_ the log fetching process for display to a user"""
RawLogStream: TypeAlias = Generator[str, None, None]
"""Raw log stream, containing unparsed log lines."""
LegacyLogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages]
LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None]
"""Legacy log response, containing source information and log messages."""
LogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
LogResponseWithSize: TypeAlias = tuple[LogSourceInfo, list[RawLogStream], int]
"""Log response, containing source information, stream of log lines, and total log size."""
StreamingLogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
"""Streaming log response, containing source information, stream of log lines."""
StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None]
"""Structured log stream, containing structured log messages."""
LogHandlerOutputStream: TypeAlias = (
Expand Down Expand Up @@ -856,7 +855,7 @@ def _init_file(self, ti, *, identifier: str | None = None):
@staticmethod
def _read_from_local(
worker_log_path: Path,
) -> LogResponse:
) -> StreamingLogResponse:
sources: LogSourceInfo = []
log_streams: list[RawLogStream] = []
paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*"))
Expand All @@ -873,7 +872,7 @@ def _read_from_logs_server(
self,
ti: TaskInstance | TaskInstanceHistory,
worker_log_rel_path: str,
) -> LogResponse:
) -> StreamingLogResponse:
sources: LogSourceInfo = []
log_streams: list[RawLogStream] = []
try:
Expand Down Expand Up @@ -911,7 +910,7 @@ def _read_from_logs_server(
logger.exception("Could not read served logs")
return sources, log_streams

def _read_remote_logs(self, ti, try_number, metadata=None) -> LegacyLogResponse | LogResponse:
def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | StreamingLogResponse:
"""
Implement in subclasses to read from the remote service.

Expand All @@ -936,5 +935,10 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> LegacyLogResponse
# This living here is not really a good plan, but it just about works for now.
# Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler.
path = self._render_filename(ti, try_number)
if stream_method := getattr(remote_io, "stream", None):
# Use .stream interface if provider's RemoteIO supports it
sources, logs = stream_method(path, ti)
return sources, logs or []
# Fallback to .read interface
sources, logs = remote_io.read(path, ti)
return sources, logs or []
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import pytest

from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSRemoteLogIO, OSSTaskHandler # noqa: F401
from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSTaskHandler
from airflow.utils.state import TaskInstanceState
from airflow.utils.timezone import datetime

Expand Down
Loading