2020import contextlib
2121import textwrap
2222
23- from fastapi import Depends , HTTPException , Request , Response , status
23+ from fastapi import Depends , HTTPException , Request , status
24+ from fastapi .responses import StreamingResponse
2425from itsdangerous import BadSignature , URLSafeSerializer
2526from pydantic import NonNegativeInt
2627from sqlalchemy .orm import joinedload
@@ -119,12 +120,17 @@ def get_log(
119120 )
120121 ti = session .scalar (query )
121122 if ti is None :
122- query = select (TaskInstanceHistory ).where (
123- TaskInstanceHistory .task_id == task_id ,
124- TaskInstanceHistory .dag_id == dag_id ,
125- TaskInstanceHistory .run_id == dag_run_id ,
126- TaskInstanceHistory .map_index == map_index ,
127- TaskInstanceHistory .try_number == try_number ,
123+ query = (
124+ select (TaskInstanceHistory )
125+ .where (
126+ TaskInstanceHistory .task_id == task_id ,
127+ TaskInstanceHistory .dag_id == dag_id ,
128+ TaskInstanceHistory .run_id == dag_run_id ,
129+ TaskInstanceHistory .map_index == map_index ,
130+ TaskInstanceHistory .try_number == try_number ,
131+ )
132+ .options (joinedload (TaskInstanceHistory .dag_run ))
133+ # we need to joinedload the dag_run, since FileTaskHandler._render_filename needs ti.dag_run
128134 )
129135 ti = session .scalar (query )
130136
@@ -137,21 +143,24 @@ def get_log(
137143 with contextlib .suppress (TaskNotFound ):
138144 ti .task = dag .get_task (ti .task_id )
139145
140- if accept == Mimetype .JSON or accept == Mimetype .ANY : # default
141- logs , metadata = task_log_reader .read_log_chunks (ti , try_number , metadata )
142- encoded_token = None
146+ if accept == Mimetype .NDJSON : # only specified application/x-ndjson will return streaming response
147+ # LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
148+ log_stream = task_log_reader .read_log_stream (ti , try_number , metadata ) # type: ignore[arg-type]
149+ headers = None
143150 if not metadata .get ("end_of_log" , False ):
144- encoded_token = URLSafeSerializer (request .app .state .secret_key ).dumps (metadata )
145- return TaskInstancesLogResponse .model_construct (continuation_token = encoded_token , content = logs )
146- # text/plain, or something else we don't understand. Return raw log content
147-
148- # We need to exhaust the iterator before we can generate the continuation token.
149- # We could improve this by making it a streaming/async response, and by then setting the header using
150- # HTTP Trailers
151- logs = "" .join (task_log_reader .read_log_stream (ti , try_number , metadata ))
152- headers = None
153- if not metadata .get ("end_of_log" , False ):
154- headers = {
155- "Airflow-Continuation-Token" : URLSafeSerializer (request .app .state .secret_key ).dumps (metadata )
156- }
157- return Response (media_type = "application/x-ndjson" , content = logs , headers = headers )
151+ headers = {
152+ "Airflow-Continuation-Token" : URLSafeSerializer (request .app .state .secret_key ).dumps (metadata )
153+ }
154+ return StreamingResponse (media_type = "application/x-ndjson" , content = log_stream , headers = headers )
155+
156+ # application/json, or something else we don't understand.
157+ # Return JSON format, which will be more easily for users to debug.
158+
159+ # LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
160+ structured_log_stream , out_metadata = task_log_reader .read_log_chunks (ti , try_number , metadata ) # type: ignore[arg-type]
161+ encoded_token = None
162+ if not out_metadata .get ("end_of_log" , False ):
163+ encoded_token = URLSafeSerializer (request .app .state .secret_key ).dumps (out_metadata )
164+ return TaskInstancesLogResponse .model_construct (
165+ continuation_token = encoded_token , content = list (structured_log_stream )
166+ )
0 commit comments