Skip to content
Merged
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
035bcfb
Add note for new usage of LogMetadata
jason810496 Apr 11, 2025
9a63542
Add _stream_parsed_lines_by_chunk
jason810496 Apr 11, 2025
59aa97b
Refactor _read_from_local/logs_server as return stream
jason810496 Apr 11, 2025
a92c654
Refactor _interleave_logs with K-Way Merge
jason810496 Apr 11, 2025
f079c67
Add _get_compatible_log_stream
jason810496 Apr 12, 2025
f942872
Refactor _read method to return stream with compatible interface
jason810496 Apr 12, 2025
0eec562
Refactor log_reader to adapt stream
jason810496 Apr 13, 2025
fbb75d9
Fix _read_from_local open closed file error
jason810496 Apr 13, 2025
c611644
Refactor LogReader by yielding in batch
jason810496 Apr 13, 2025
491bbeb
Add ndjson header to get_log openapi schema
jason810496 Apr 14, 2025
6e2fca9
Fix _add_log_from_parsed_log_streams_to_heap
jason810496 Apr 18, 2025
4957d24
Fix _interleave_logs dedupe logic
jason810496 Apr 18, 2025
efbea8b
Refactor test_log_handlers
jason810496 Apr 20, 2025
fbaf5d7
Move test_log_handlers utils to test_common
jason810496 Apr 21, 2025
952969a
Fix unit/celery/log_handlers test
jason810496 Apr 22, 2025
764e26c
Fix mypy-providers static check
jason810496 Apr 22, 2025
21c83e6
Fix _get_compatible_log_stream
jason810496 Apr 23, 2025
8b4f681
Fix amazon task_handler test
jason810496 Apr 23, 2025
688ef5f
Fix wask task handler test
jason810496 Apr 23, 2025
97f9900
Fix elasticsearch task handler test
jason810496 Apr 23, 2025
3493093
Fix opensearch task handler test
jason810496 Apr 23, 2025
f979dfb
Fix TaskLogReader buffer
jason810496 Apr 23, 2025
951a45f
Fix test_log_reader
jason810496 Apr 23, 2025
ffa2896
Fix CloudWatchRemoteLogIO.read mypy
jason810496 Apr 23, 2025
7df8fd5
Fix test_gcs_task_handler
jason810496 Apr 23, 2025
3483c8d
Fix core_api test_log
jason810496 Apr 23, 2025
0f0e51c
Fix CloudWatchRemoteLogIO._event_to_str dt format
jason810496 Apr 24, 2025
15ccaf0
Fix TestCloudRemoteLogIO.test_log_message
jason810496 Apr 24, 2025
875e3cb
Fix es/os task_hander convert_list_to_stream
jason810496 Apr 24, 2025
8e5032a
Fix compact tests
jason810496 Apr 25, 2025
523ea11
Refactor es,os task handler for 3.0 compact
jason810496 Apr 28, 2025
fe7be22
Fix compat for RedisTaskHandler
jason810496 Apr 29, 2025
bc7248e
Fix ruff format for test_cloudwatch_task_handler after rebase
jason810496 Apr 29, 2025
44c62ca
Fix 2.10 compat TestCloudwatchTaskHandler
jason810496 Apr 29, 2025
5eea026
Fix 3.0 compat test for celery, wasb
jason810496 Apr 29, 2025
284e615
Fix 3.0 compat test for gcs
jason810496 May 1, 2025
55d70bd
Fix 3.0 compat test for cloudwatch, s3
jason810496 May 1, 2025
8b6ff7d
Set get_log API default response format to JSON
jason810496 May 9, 2025
d9403f8
Remove "first_time_read" key in log metadata
jason810496 May 20, 2025
9c860ea
Remove "<source>_log_pos" key in log metadata
jason810496 May 21, 2025
99e8258
Add LogStreamCounter for backward compatibility
jason810496 May 22, 2025
3d7e6cd
Remove "first_time_read" with backward "log_pos" for tests
jason810496 May 22, 2025
9cd5579
Fix RedisTaskHandler compatibility
jason810496 May 23, 2025
f4c3414
Fix chores in self review
jason810496 May 23, 2025
75afa30
Fine-tune HEAP_DUMP_SIZE
jason810496 May 25, 2025
2aefa34
Replace get_compatible_output_log_stream with iter
jason810496 May 28, 2025
f9d3c61
Remove buffer in log_reader
jason810496 May 28, 2025
64001cc
Fix log_id not found compact for es_task_handler
jason810496 May 28, 2025
c97bc9a
Fix review comments
jason810496 Jun 9, 2025
a3dedd7
Refactor LogStreamAccumulator._capture method
jason810496 Jun 9, 2025
a0e08b7
Merge branch 'main' into refactor/rework-webserver-oom-for-large-log-…
jason810496 Jun 17, 2025
01ed704
Fix type hint, joinedload for ti.dag_run after merge
jason810496 Jun 17, 2025
9e968c2
Replace _sort_key as _create_sort_key
jason810496 Jun 18, 2025
ca730ab
Add _flush_logs_out_of_heap common util
jason810496 Jun 18, 2025
2e90c24
Fix review nits
jason810496 Jun 18, 2025
c8515aa
Merge branch 'main' into refactor/rework-webserver-oom-for-large-log-…
jason810496 Jun 18, 2025
46f21c8
Fix mypy errors after merge
jason810496 Jun 19, 2025
97ab0de
Fix redis task handler test
jason810496 Jun 19, 2025
cea98d7
Merge branch 'main' into refactor/rework-webserver-oom-for-large-log-…
jason810496 Jun 20, 2025
1e35b8b
Refactor _capture logic in LogStreamAccumulator
jason810496 Jun 22, 2025
97e2d20
Add comments for ingore LogMetadata TypeDict
jason810496 Jun 22, 2025
b9dfb2f
Add comment for offset; Fix commet for LogMessages
jason810496 Jun 22, 2025
c80dc51
Refactor with from_iterable, islice
jason810496 Jun 22, 2025
363b68f
Fix nits in test
jason810496 Jun 22, 2025
50ad9ba
Refactor test_utils
jason810496 Jun 22, 2025
f09e9c0
Add comment for lazy initialization
jason810496 Jun 23, 2025
3efe90f
Fix error handling for _stream_lines_by_chunk
jason810496 Jun 24, 2025
a22624e
Merge branch 'main' into refactor/rework-webserver-oom-for-large-log-…
jason810496 Jul 4, 2025
3e29986
Fix mypy error after merge
jason810496 Jul 4, 2025
5c33ac3
Fix final review nits
jason810496 Jul 8, 2025
97ecf91
Merge branch 'main' into refactor/rework-webserver-oom-for-large-log-…
jason810496 Jul 9, 2025
339fe50
Fix mypy error
jason810496 Jul 9, 2025
514a546
Merge branch 'main' into refactor/rework-webserver-oom-for-large-log-…
jason810496 Jul 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import contextlib
import textwrap

from fastapi import Depends, HTTPException, Request, Response, status
from fastapi import Depends, HTTPException, Request, status
from fastapi.responses import StreamingResponse
from itsdangerous import BadSignature, URLSafeSerializer
from pydantic import PositiveInt
from sqlalchemy.orm import joinedload
Expand Down Expand Up @@ -120,12 +121,17 @@ def get_log(
)
ti = session.scalar(query)
if ti is None:
query = select(TaskInstanceHistory).where(
TaskInstanceHistory.task_id == task_id,
TaskInstanceHistory.dag_id == dag_id,
TaskInstanceHistory.run_id == dag_run_id,
TaskInstanceHistory.map_index == map_index,
TaskInstanceHistory.try_number == try_number,
query = (
select(TaskInstanceHistory)
.where(
TaskInstanceHistory.task_id == task_id,
TaskInstanceHistory.dag_id == dag_id,
TaskInstanceHistory.run_id == dag_run_id,
TaskInstanceHistory.map_index == map_index,
TaskInstanceHistory.try_number == try_number,
)
.options(joinedload(TaskInstanceHistory.dag_run))
# we need to joinedload the dag_run, since FileTaskHandler._render_filename needs ti.dag_run
)
ti = session.scalar(query)

Expand All @@ -138,24 +144,27 @@ def get_log(
with contextlib.suppress(TaskNotFound):
ti.task = dag.get_task(ti.task_id)

if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
encoded_token = None
if accept == Mimetype.NDJSON: # only specified application/x-ndjson will return streaming response
# LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
log_stream = task_log_reader.read_log_stream(ti, try_number, metadata) # type: ignore[arg-type]
headers = None
if not metadata.get("end_of_log", False):
encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
return TaskInstancesLogResponse.model_construct(continuation_token=encoded_token, content=logs)
# text/plain, or something else we don't understand. Return raw log content

# We need to exhaust the iterator before we can generate the continuation token.
# We could improve this by making it a streaming/async response, and by then setting the header using
# HTTP Trailers
logs = "".join(task_log_reader.read_log_stream(ti, try_number, metadata))
headers = None
if not metadata.get("end_of_log", False):
headers = {
"Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
}
return Response(media_type="application/x-ndjson", content=logs, headers=headers)
headers = {
"Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
}
return StreamingResponse(media_type="application/x-ndjson", content=log_stream, headers=headers)

# application/json, or something else we don't understand.
# Return JSON format, which will be more easily for users to debug.

# LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
structured_log_stream, out_metadata = task_log_reader.read_log_chunks(ti, try_number, metadata) # type: ignore[arg-type]
encoded_token = None
if not out_metadata.get("end_of_log", False):
encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(out_metadata)
return TaskInstancesLogResponse.model_construct(
continuation_token=encoded_token, content=list(structured_log_stream)
)


@task_instances_log_router.get(
Expand Down
Loading