Skip to content

Commit 9ad6d82

Browse files
committed
Fix es/os task_hander convert_list_to_stream
1 parent 44c110c commit 9ad6d82

File tree

2 files changed

+22
-16
lines changed

2 files changed

+22
-16
lines changed

providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,7 @@
4646
from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit
4747
from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS
4848
from airflow.utils import timezone
49-
from airflow.utils.log.file_task_handler import (
50-
FileTaskHandler,
51-
convert_list_to_stream,
52-
)
49+
from airflow.utils.log.file_task_handler import FileTaskHandler
5350
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
5451
from airflow.utils.module_loading import import_string
5552
from airflow.utils.session import create_session
@@ -355,9 +352,12 @@ def _read(
355352
"Otherwise, the logs for this task instance may have been removed."
356353
)
357354
if AIRFLOW_V_3_0_PLUS:
358-
from airflow.utils.log.file_task_handler import StructuredLogMessage
355+
from airflow.utils.log.file_task_handler import (
356+
StructuredLogMessage,
357+
get_compatible_output_log_stream,
358+
)
359359

360-
return convert_list_to_stream(
360+
return get_compatible_output_log_stream(
361361
[
362362
StructuredLogMessage(
363363
event=missing_log_message,
@@ -384,7 +384,10 @@ def concat_logs(hits: list[Hit]) -> str:
384384

385385
if logs_by_host:
386386
if AIRFLOW_V_3_0_PLUS:
387-
from airflow.utils.log.file_task_handler import StructuredLogMessage
387+
from airflow.utils.log.file_task_handler import (
388+
StructuredLogMessage,
389+
get_compatible_output_log_stream,
390+
)
388391

389392
header = [
390393
StructuredLogMessage(
@@ -397,7 +400,7 @@ def concat_logs(hits: list[Hit]) -> str:
397400
message = header + [
398401
StructuredLogMessage(event=concat_logs(hits)) for hits in logs_by_host.values()
399402
] # type: ignore[misc]
400-
message = convert_list_to_stream(message) # type: ignore[assignment]
403+
message = get_compatible_output_log_stream(message) # type: ignore[assignment]
401404
else:
402405
message = [
403406
(host, concat_logs(hits)) # type: ignore[misc]

providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@
3737
from airflow.providers.opensearch.log.os_response import Hit, OpensearchResponse
3838
from airflow.providers.opensearch.version_compat import AIRFLOW_V_3_0_PLUS
3939
from airflow.utils import timezone
40-
from airflow.utils.log.file_task_handler import (
41-
FileTaskHandler,
42-
convert_list_to_stream,
43-
)
40+
from airflow.utils.log.file_task_handler import FileTaskHandler
4441
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
4542
from airflow.utils.module_loading import import_string
4643
from airflow.utils.session import create_session
@@ -387,9 +384,12 @@ def _read(
387384
"Otherwise, the logs for this task instance may have been removed."
388385
)
389386
if AIRFLOW_V_3_0_PLUS:
390-
from airflow.utils.log.file_task_handler import StructuredLogMessage
387+
from airflow.utils.log.file_task_handler import (
388+
StructuredLogMessage,
389+
get_compatible_output_log_stream,
390+
)
391391

392-
return convert_list_to_stream(
392+
return get_compatible_output_log_stream(
393393
[
394394
StructuredLogMessage(
395395
event=missing_log_message,
@@ -416,7 +416,10 @@ def concat_logs(hits: list[Hit]):
416416

417417
if logs_by_host:
418418
if AIRFLOW_V_3_0_PLUS:
419-
from airflow.utils.log.file_task_handler import StructuredLogMessage
419+
from airflow.utils.log.file_task_handler import (
420+
StructuredLogMessage,
421+
get_compatible_output_log_stream,
422+
)
420423

421424
header = [
422425
StructuredLogMessage(
@@ -429,7 +432,7 @@ def concat_logs(hits: list[Hit]):
429432
message = header + [
430433
StructuredLogMessage(event=concat_logs(hits)) for hits in logs_by_host.values()
431434
]
432-
message = convert_list_to_stream(message) # type: ignore[assignment]
435+
message = get_compatible_output_log_stream(message) # type: ignore[assignment]
433436
else:
434437
message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] # type: ignore[misc]
435438
else:

0 commit comments

Comments
 (0)