Skip to content

Commit 2f5a202

Browse files
committed
Fix compact tests
1 parent ed63ca1 commit 2f5a202

File tree

6 files changed

+37
-15
lines changed

6 files changed

+37
-15
lines changed

devel-common/src/tests_common/test_utils/file_task_handler.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424

2525
import pendulum
2626

27-
from airflow.utils.log.file_task_handler import StructuredLogMessage
27+
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
2828

2929
if TYPE_CHECKING:
30-
from airflow.utils.log.file_task_handler import ParsedLog
30+
from airflow.utils.log.file_task_handler import ParsedLog, StructuredLogMessage
3131

3232

3333
def events(logs: Iterable[StructuredLogMessage], skip_source_info=True) -> list[str]:
@@ -60,6 +60,9 @@ def mock_parsed_logs_factory(
6060
Create a list of ParsedLog objects with the specified start datetime and count.
6161
Each ParsedLog object contains a timestamp and a list of StructuredLogMessage objects.
6262
"""
63+
if AIRFLOW_V_3_0_PLUS:
64+
from airflow.utils.log.file_task_handler import StructuredLogMessage
65+
6366
parsed_logs: list[ParsedLog] = []
6467
for i in range(count):
6568
timestamp: datetime = start_datetime + pendulum.duration(seconds=i)

providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,8 @@ def test_read(self, monkeypatch):
260260
],
261261
)
262262
if AIRFLOW_V_2_10_PLUS:
263-
monkeypatch.setattr(
264-
self.cloudwatch_task_handler,
265-
"_read_from_logs_server",
266-
lambda ti, worker_log_rel_path, log_metadata: ([], []),
263+
self.cloudwatch_task_handler._read_from_logs_server = mock.Mock(
264+
return_value=([], []),
267265
)
268266
msg_template = textwrap.dedent("""
269267
INFO - ::group::Log message source details

providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ def test_read(self):
261261
actual = log[0][0][-1]
262262
assert f"*** Found logs in s3:\n*** * {expected_s3_uri}\n" in actual
263263
assert actual.endswith("Line 4")
264-
assert metadata == [{"end_of_log": True, "first_time_read": 33}]
264+
assert metadata == [{"end_of_log": True, "log_pos": 33}]
265265

266266
def test_read_when_s3_log_missing(self):
267267
ti = copy.copy(self.ti)

providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,16 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc
8181
fth = FileTaskHandler("")
8282

8383
fth._read_from_logs_server = mock.Mock()
84-
fth._read_from_logs_server.return_value = (
85-
["this message"],
86-
[convert_list_to_stream(["this", "log", "content"])],
87-
)
84+
85+
# compact with 2.x and 3.x
86+
if AIRFLOW_V_3_0_PLUS:
87+
fth._read_from_logs_server.return_value = (
88+
["this message"],
89+
[convert_list_to_stream(["this", "log", "content"])],
90+
)
91+
else:
92+
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
93+
8894
logs, metadata = fth._read(ti=ti, try_number=1)
8995
fth._read_from_logs_server.assert_called_once()
9096

providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,18 @@
5555
from datetime import datetime
5656

5757
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
58+
from airflow.typing_compat import TypeAlias
5859

5960
if AIRFLOW_V_3_0_PLUS:
60-
from airflow.utils.log.file_task_handler import LogHandlerOutputStream
61+
from collections.abc import Generator
62+
from itertools import chain
63+
from typing import Union
6164

62-
EsLogMsgType = LogHandlerOutputStream
65+
from airflow.utils.log.file_task_handler import StructuredLogMessage
66+
67+
StructuredLogStream: TypeAlias = Generator[StructuredLogMessage, None, None]
68+
69+
EsLogMsgType = Union[StructuredLogStream, chain[StructuredLogMessage]]
6370
else:
6471
EsLogMsgType = list[tuple[str, str]] # type: ignore[misc]
6572

providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import sys
2323
import time
2424
from collections import defaultdict
25+
from collections.abc import Generator
2526
from datetime import datetime
2627
from operator import attrgetter
2728
from typing import TYPE_CHECKING, Any, Callable, Literal
@@ -44,12 +45,19 @@
4445

4546
if TYPE_CHECKING:
4647
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
48+
from airflow.typing_compat import TypeAlias
4749

4850

4951
if AIRFLOW_V_3_0_PLUS:
50-
from airflow.utils.log.file_task_handler import LogHandlerOutputStream
52+
from collections.abc import Generator
53+
from itertools import chain
54+
from typing import Union
5155

52-
OsLogMsgType = LogHandlerOutputStream
56+
from airflow.utils.log.file_task_handler import StructuredLogMessage
57+
58+
StructuredLogStream: TypeAlias = Generator[StructuredLogMessage, None, None]
59+
60+
OsLogMsgType = Union[StructuredLogStream, chain[StructuredLogMessage]]
5361
else:
5462
OsLogMsgType = list[tuple[str, str]] # type: ignore[misc]
5563

0 commit comments

Comments
 (0)