2020import contextlib
2121import textwrap
2222
23- from fastapi import Depends , HTTPException , Request , Response , status
23+ from fastapi import Depends , HTTPException , Request , status
24+ from fastapi .responses import StreamingResponse
2425from itsdangerous import BadSignature , URLSafeSerializer
2526from pydantic import NonNegativeInt , PositiveInt
2627from sqlalchemy .orm import joinedload
@@ -120,12 +121,17 @@ def get_log(
120121 )
121122 ti = session .scalar (query )
122123 if ti is None :
123- query = select (TaskInstanceHistory ).where (
124- TaskInstanceHistory .task_id == task_id ,
125- TaskInstanceHistory .dag_id == dag_id ,
126- TaskInstanceHistory .run_id == dag_run_id ,
127- TaskInstanceHistory .map_index == map_index ,
128- TaskInstanceHistory .try_number == try_number ,
124+ query = (
125+ select (TaskInstanceHistory )
126+ .where (
127+ TaskInstanceHistory .task_id == task_id ,
128+ TaskInstanceHistory .dag_id == dag_id ,
129+ TaskInstanceHistory .run_id == dag_run_id ,
130+ TaskInstanceHistory .map_index == map_index ,
131+ TaskInstanceHistory .try_number == try_number ,
132+ )
133+ .options (joinedload (TaskInstanceHistory .dag_run ))
134+ # we need to joinedload the dag_run, since FileTaskHandler._render_filename needs ti.dag_run
129135 )
130136 ti = session .scalar (query )
131137
@@ -138,24 +144,27 @@ def get_log(
138144 with contextlib .suppress (TaskNotFound ):
139145 ti .task = dag .get_task (ti .task_id )
140146
141- if accept == Mimetype .JSON or accept == Mimetype .ANY : # default
142- logs , metadata = task_log_reader .read_log_chunks (ti , try_number , metadata )
143- encoded_token = None
147+ if accept == Mimetype .NDJSON : # only specified application/x-ndjson will return streaming response
148+ # LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
149+ log_stream = task_log_reader .read_log_stream (ti , try_number , metadata ) # type: ignore[arg-type]
150+ headers = None
144151 if not metadata .get ("end_of_log" , False ):
145- encoded_token = URLSafeSerializer (request .app .state .secret_key ).dumps (metadata )
146- return TaskInstancesLogResponse .model_construct (continuation_token = encoded_token , content = logs )
147- # text/plain, or something else we don't understand. Return raw log content
148-
149- # We need to exhaust the iterator before we can generate the continuation token.
150- # We could improve this by making it a streaming/async response, and by then setting the header using
151- # HTTP Trailers
152- logs = "" .join (task_log_reader .read_log_stream (ti , try_number , metadata ))
153- headers = None
154- if not metadata .get ("end_of_log" , False ):
155- headers = {
156- "Airflow-Continuation-Token" : URLSafeSerializer (request .app .state .secret_key ).dumps (metadata )
157- }
158- return Response (media_type = "application/x-ndjson" , content = logs , headers = headers )
152+ headers = {
153+ "Airflow-Continuation-Token" : URLSafeSerializer (request .app .state .secret_key ).dumps (metadata )
154+ }
155+ return StreamingResponse (media_type = "application/x-ndjson" , content = log_stream , headers = headers )
156+
157+ # application/json, or something else we don't understand.
158+ # Return JSON format, which will be more easily for users to debug.
159+
160+ # LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
161+ structured_log_stream , out_metadata = task_log_reader .read_log_chunks (ti , try_number , metadata ) # type: ignore[arg-type]
162+ encoded_token = None
163+ if not out_metadata .get ("end_of_log" , False ):
164+ encoded_token = URLSafeSerializer (request .app .state .secret_key ).dumps (out_metadata )
165+ return TaskInstancesLogResponse .model_construct (
166+ continuation_token = encoded_token , content = list (structured_log_stream )
167+ )
159168
160169
161170@task_instances_log_router .get (
0 commit comments