Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 69 additions & 31 deletions airflow-core/src/airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import io
import logging
import os
import time
from collections.abc import Callable, Generator, Iterator
from contextlib import suppress
from datetime import datetime
from enum import Enum
from itertools import chain, islice
from itertools import chain
from pathlib import Path
from types import GeneratorType
from typing import IO, TYPE_CHECKING, TypedDict, cast
Expand All @@ -40,7 +41,6 @@
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.helpers import parse_template_string, render_template
from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import NonCachingRotatingFileHandler
from airflow.utils.session import NEW_SESSION, provide_session
Expand All @@ -65,6 +65,9 @@
"""
HEAP_DUMP_SIZE = 5000
HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
# Time-based flushing configuration
FLUSH_INTERVAL_SECONDS = 2.0 # Flush logs every 2 seconds if there are any records
MIN_RECORDS_FOR_TIME_FLUSH = 1 # Minimum records to trigger time-based flush

# These types are similar, but have distinct names to make processing them less error prone
LogMessages: TypeAlias = list[str]
Expand Down Expand Up @@ -103,7 +106,7 @@ class LogMetadata(TypedDict):
"""Metadata about the log fetching process, including `end_of_log` and `log_pos`."""

end_of_log: bool
log_pos: NotRequired[int]
log_pos: NotRequired[int] # Remove a
# the following attributes are used for Elasticsearch and OpenSearch log handlers
offset: NotRequired[str | int]
# Ensure a string here. Large offset numbers will get JSON.parsed incorrectly
Expand Down Expand Up @@ -198,6 +201,41 @@ def _parse_timestamp(line: str):
return pendulum.parse(timestamp_str.strip("[]"))


def stream_file_until_close(
file_path: Path, poll_interval: float = 0.1, idle_timeout: float = 10.0
) -> RawLogStream:
"""
Stream lines from a file until it is closed.

:param file_path: Path to the file to stream.
:param poll_interval: how often to check for new data
:param idle_timeout: how long to wait with no growth before stopping
"""
with open(file_path) as f:
last_size = os.stat(file_path).st_size
idle_start = None

while True:
line = f.readline()
if line:
yield line
idle_start = None
last_size = os.stat(file_path).st_size
else:
time.sleep(poll_interval)
st = os.stat(file_path)

# If file stopped growing, start idle timer
if st.st_size == last_size:
if idle_start is None:
idle_start = time.time()
elif time.time() - idle_start >= idle_timeout:
break # no growth for too long, assume writer closed
else:
idle_start = None
last_size = st.st_size


def _stream_lines_by_chunk(
log_io: IO[str],
) -> RawLogStream:
Expand Down Expand Up @@ -357,6 +395,9 @@ def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we can reduce the chance of messing up the global order.
Since there are multiple log streams, we can't guarantee that the records are in global order.

Additionally, implements time-based flushing to prevent frontend delays when only a few records
are added over long time intervals.

e.g.

log_stream1: ----------
Expand All @@ -374,15 +415,33 @@ def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
parsed_log_streams: dict[int, ParsedLogStream] = {
idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream in enumerate(log_streams)
}
last_flush_time = time.time()

# keep adding records from logs until all logs are empty
last_log_container: list[StructuredLogMessage | None] = [None]
while parsed_log_streams:
_add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)

# yield HALF_HEAP_DUMP_SIZE records when heap size exceeds HEAP_DUMP_SIZE
if len(heap) >= HEAP_DUMP_SIZE:
current_time = time.time()
time_since_last_flush = current_time - last_flush_time

# Check if we should flush based on heap size or time interval
should_flush_by_size = len(heap) >= HEAP_DUMP_SIZE
should_flush_by_time = (
len(heap) >= MIN_RECORDS_FOR_TIME_FLUSH and time_since_last_flush >= FLUSH_INTERVAL_SECONDS
)

if should_flush_by_size:
# Size-based flush: yield HALF_HEAP_DUMP_SIZE records when heap size exceeds HEAP_DUMP_SIZE
yield from _flush_logs_out_of_heap(heap, HALF_HEAP_DUMP_SIZE, last_log_container)
last_flush_time = current_time
elif should_flush_by_time:
# Time-based flush: yield all available records to prevent frontend delays
flush_count = min(
len(heap), HALF_HEAP_DUMP_SIZE
) # Don't flush more than half to maintain ordering
yield from _flush_logs_out_of_heap(heap, flush_count, last_log_container)
last_flush_time = current_time

# yield remaining records
yield from _flush_logs_out_of_heap(heap, len(heap), last_log_container)
Expand Down Expand Up @@ -586,16 +645,11 @@ def _read(
:param metadata: log metadata,
can be used for steaming log reading and auto-tailing.
Following attributes are used:
log_pos: (absolute) Char position to which the log
which was retrieved in previous calls, this
part will be skipped and only following test
returned to be added to tail.
:return: log message as a string and metadata.
Following attributes are used in metadata:
end_of_log: Boolean, True if end of log is reached or False
if further calls might get more log text.
This is determined by the status of the TaskInstance
log_pos: (absolute) Char position to which the log is retrieved
"""
# Task instance here might be different from task instance when
# initializing the handler. Thus explicitly getting log location
Expand All @@ -613,10 +667,7 @@ def _read(
remote_logs = []
elif isinstance(logs, list) and isinstance(logs[0], str):
# If the logs are in legacy format, convert them to a generator of log lines
remote_logs = [
# We don't need to use the log_pos here, as we are using the metadata to track the position
_get_compatible_log_stream(cast("list[str]", logs))
]
remote_logs = [_get_compatible_log_stream(cast("list[str]", logs))]
elif isinstance(logs, list) and _is_logs_stream_like(logs[0]):
# If the logs are already in a stream-like format, we can use them directly
remote_logs = cast("list[RawLogStream]", logs)
Expand Down Expand Up @@ -669,23 +720,10 @@ def _read(
TaskInstanceState.DEFERRED,
)

with LogStreamAccumulator(out_stream, HEAP_DUMP_SIZE) as stream_accumulator:
log_pos = stream_accumulator.total_lines
out_stream = stream_accumulator.stream
return chain(header, out_stream), {
"end_of_log": end_of_log,
}

# skip log stream until the last position
if metadata and "log_pos" in metadata:
islice(out_stream, metadata["log_pos"])
else:
# first time reading log, add messages before interleaved log stream
out_stream = chain(header, out_stream)

return out_stream, {
"end_of_log": end_of_log,
"log_pos": log_pos,
}

@staticmethod
@staticmethod
def _get_pod_namespace(ti: TaskInstance | TaskInstanceHistory):
pod_override = ti.executor_config.get("pod_override")
Expand Down Expand Up @@ -857,7 +895,7 @@ def _read_from_local(
for path in paths:
sources.append(os.fspath(path))
# Read the log file and yield lines
log_streams.append(_stream_lines_by_chunk(open(path, encoding="utf-8")))
log_streams.append(stream_file_until_close(path))
return sources, log_streams

def _read_from_logs_server(
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def read_log_stream(
if try_number is None:
try_number = ti.try_number

for key in ("end_of_log", "max_offset", "offset", "log_pos"):
for key in ("end_of_log", "max_offset", "offset"):
# https://mypy.readthedocs.io/en/stable/typed_dict.html#supported-operations
metadata.pop(key, None) # type: ignore[misc]
empty_iterations = 0
Expand Down
Loading