Skip to content

Commit 4e1bcb4

Browse files
committed
Fix 3.0 compact
1 parent 17704ca commit 4e1bcb4

File tree

7 files changed

+31
-14
lines changed

7 files changed

+31
-14
lines changed

devel-common/src/tests_common/test_utils/version_compat.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
3434

3535
AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
3636
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
37-
[].sort()
37+
AIRFLOW_V_3_0 = get_base_airflow_version_tuple() == (3, 0, 0)

providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
from tests_common.test_utils.file_task_handler import (
4141
convert_list_to_stream,
4242
)
43-
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
43+
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0, AIRFLOW_V_3_0_PLUS
4444

4545
pytestmark = pytest.mark.db_test
4646

@@ -95,10 +95,13 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc
9595
fth._read_from_logs_server.assert_called_once()
9696

9797
if AIRFLOW_V_3_0_PLUS:
98-
assert metadata == {"end_of_log": False, "first_time_read": False}
9998
logs = list(logs)
10099
assert logs[0].sources == ["this message"]
101100
assert [x.event for x in logs[-3:]] == ["this", "log", "content"]
101+
if AIRFLOW_V_3_0:
102+
assert metadata == {"end_of_log": False, "first_time_read": False}
103+
else:
104+
assert metadata == {"end_of_log": False, "log_pos": 3}
102105
else:
103106
assert "*** this message\n" in logs
104107
assert logs.endswith("this\nlog\ncontent")

providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@
4444
ElasticsearchJSONFormatter,
4545
)
4646
from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit
47-
from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS
47+
from airflow.providers.elasticsearch.version_compat import (
48+
AIRFLOW_V_3_0,
49+
AIRFLOW_V_3_0_PLUS,
50+
)
4851
from airflow.utils import timezone
4952
from airflow.utils.log.file_task_handler import FileTaskHandler
5053
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
@@ -358,7 +361,7 @@ def _read(
358361
"If your task started recently, please wait a moment and reload this page. "
359362
"Otherwise, the logs for this task instance may have been removed."
360363
)
361-
if AIRFLOW_V_3_0_PLUS:
364+
if AIRFLOW_V_3_0_PLUS and not AIRFLOW_V_3_0:
362365
from airflow.utils.log.file_task_handler import (
363366
StructuredLogMessage,
364367
get_compatible_output_log_stream,
@@ -390,7 +393,7 @@ def concat_logs(hits: list[Hit]) -> str:
390393
return "\n".join(self._format_msg(hits[i]) for i in range(log_range))
391394

392395
if logs_by_host:
393-
if AIRFLOW_V_3_0_PLUS:
396+
if AIRFLOW_V_3_0_PLUS and not AIRFLOW_V_3_0:
394397
from airflow.utils.log.file_task_handler import (
395398
StructuredLogMessage,
396399
get_compatible_output_log_stream,

providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
3434

3535
AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
3636
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
37+
AIRFLOW_V_3_0 = get_base_airflow_version_tuple() == (3, 0, 0)

providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
from tests_common.test_utils.config import conf_vars
3434
from tests_common.test_utils.db import clear_db_dags, clear_db_runs
35-
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
35+
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0, AIRFLOW_V_3_0_PLUS
3636

3737
pytestmark = pytest.mark.db_test
3838

@@ -122,10 +122,16 @@ def test_wasb_read(self, mock_hook_cls, ti):
122122
assert logs[0].sources == ["https://wasb-container.blob.core.windows.net/abc/hello.log"]
123123
assert logs[1].event == "::endgroup::"
124124
assert logs[2].event == "Log line"
125-
assert metadata == {
126-
"end_of_log": True,
127-
"first_time_read": False,
128-
}
125+
if AIRFLOW_V_3_0:
126+
assert metadata == {
127+
"end_of_log": True,
128+
"log_pos": 1,
129+
}
130+
else:
131+
assert metadata == {
132+
"end_of_log": True,
133+
"first_time_read": False,
134+
}
129135
else:
130136
assert logs[0][0][0] == "localhost"
131137
assert (

providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
from airflow.models import DagRun
3737
from airflow.providers.opensearch.log.os_json_formatter import OpensearchJSONFormatter
3838
from airflow.providers.opensearch.log.os_response import Hit, OpensearchResponse
39-
from airflow.providers.opensearch.version_compat import AIRFLOW_V_3_0_PLUS
39+
from airflow.providers.opensearch.version_compat import (
40+
AIRFLOW_V_3_0,
41+
AIRFLOW_V_3_0_PLUS,
42+
)
4043
from airflow.utils import timezone
4144
from airflow.utils.log.file_task_handler import FileTaskHandler
4245
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
@@ -391,7 +394,7 @@ def _read(
391394
"If your task started recently, please wait a moment and reload this page. "
392395
"Otherwise, the logs for this task instance may have been removed."
393396
)
394-
if AIRFLOW_V_3_0_PLUS:
397+
if AIRFLOW_V_3_0_PLUS and not AIRFLOW_V_3_0: # over 3.0.0
395398
from airflow.utils.log.file_task_handler import (
396399
StructuredLogMessage,
397400
get_compatible_output_log_stream,
@@ -423,7 +426,7 @@ def concat_logs(hits: list[Hit]):
423426
return "\n".join(self._format_msg(hits[i]) for i in range(log_range))
424427

425428
if logs_by_host:
426-
if AIRFLOW_V_3_0_PLUS:
429+
if AIRFLOW_V_3_0_PLUS and not AIRFLOW_V_3_0: # over 3.0.0
427430
from airflow.utils.log.file_task_handler import (
428431
StructuredLogMessage,
429432
get_compatible_output_log_stream,

providers/opensearch/src/airflow/providers/opensearch/version_compat.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
3434

3535
AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
3636
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
37+
AIRFLOW_V_3_0 = get_base_airflow_version_tuple() == (3, 0, 0)

0 commit comments

Comments
 (0)