Skip to content

Resolve OOM When Reading Large Logs in Webserver #49470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

jason810496
Copy link
Member

related issue: #45079
related PR: #45129
related discussion on slack: https://apache-airflow.slack.com/archives/CCZRF2U5A/p1736767159693839

Why

In short, this PR aims to eliminate OOM issues by:

  • Replacing full log sorting with a K-Way Merge
  • Making the entire log reading path streamable (using yield generators instead of returning a list of strings)

More detailed reasoning is already described in the linked issue.

Due to too many conflicts with the old PR (#45129), this PR reworks the changes on top of the latest FileTaskHandler.

What

This PR ports the original changes from #45129 to the current version of FileTaskHandler with the following updates:

  • Fixed line-splitting error when reading in chunks using buffered line-splitting with a carry-over
  • Adopted the new log metadata structure
  • Introduced buffering for the log reader

Note: Recent Changes in FileTaskHandler

@jason810496 jason810496 self-assigned this Apr 20, 2025
@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:logging area:UI Related to UI/UX. For Frontend Developers. labels Apr 20, 2025
@jason810496 jason810496 removed the area:UI Related to UI/UX. For Frontend Developers. label Apr 20, 2025
@jason810496 jason810496 force-pushed the refactor/rework-webserver-oom-for-large-log-read branch 10 times, most recently from d47a49b to 2f5a202 Compare April 25, 2025 13:00
@jason810496 jason810496 force-pushed the refactor/rework-webserver-oom-for-large-log-read branch 2 times, most recently from 4e1bcb4 to 358f231 Compare April 26, 2025 13:22
@jason810496 jason810496 marked this pull request as ready for review April 27, 2025 05:44
@jason810496 jason810496 force-pushed the refactor/rework-webserver-oom-for-large-log-read branch from 358f231 to 9900b2e Compare April 27, 2025 05:44
@potiuk
Copy link
Member

potiuk commented Jul 8, 2025

I only have one comment - we already check if new providers will work with last released airflow versions - our compatibility checks are testing it. This is very cool.

But since that change also touches core, I am not 100% sure if the old providers will work with the "upcoming" version of airlfow - we do not have such "forward" looking changes, but I can imagine that someone will get Airflow 3.1 and will want to downgrade (say) google provider to the version released now - it should also work. It's diffucult to make all the mental juggling to see if it will work because you need to connect codebase coming from different branches, but it should be relatively easy to test.

  1. rm -rf dist/*
  2. breeze release-management prepare-airflow-dstributions
  3. breeze release-management prepare-task-sdk-distributions
  4. breeze start-airflow --mount-sources tests --use-airflow-version wheel
  5. in breeze pip install apache-airflow-providers-google
  6. Configure logging to use one of the providers handlers
  7. restart everything and see that it works

Is it possible to make such test(s) - just to be sure @jason810496 ?

@jason810496
Copy link
Member Author

Is it possible to make such test(s) - just to be sure @jason810496 ?

Sure, I will test this with real remote logging setup and update here.

@potiuk
Copy link
Member

potiuk commented Jul 8, 2025

Sure, I will test this with real remote logging setup and update here.

If you have any problems- let me know .. It could be that this recipe needs a bit of fixes and adjustments after recent changes ;)

@jason810496
Copy link
Member Author

jason810496 commented Jul 8, 2025

I just test with the following setup with breeze k8s and it works smooth! And I re-build the k8s image before running the manual test for sure.

I will keep on testing setup with remote logging as well tomorrow.

Without remote logging

  1. KubernetesExecutor ( with no log persistent setup ): Works well, can only show logs when TI is in running state ( as expected )
  2. CeleryExecutor: Works well, can show logs no matter TI is in running or success, which means get get serve_logs well after the refactor

With romote logging setup:

I setup Google Cloud Storage for remote logging

  1. KubernetesExecutor: Works well, can show logs now even if the TI is success
  2. CeleryExecutor: Works well, can show logs as expected and do really read from GCS
Screenshot 2025-07-09 at 12 35 25 PM

If you have any problems- let me know .. It could be that this recipe needs a bit of fixes and adjustments after recent changes ;)

All good until the last start-airflow command.

I didn't dig into it to find the root cause so far; I just use breeze k8s to test the behavior as workaround ( IMO, with breeze k8s the behavior will be more real and it's require for testing with KubernetesExecutor setup )

breeze start-airflow --mount-sources tests --use-airflow-version wheel

Here is the full traceback:

PostgreSQL: OK.  


Starting Airflow

[2025-07-08T17:53:15.198+0000] {configuration.py:1249} WARNING - No module named 'airflow.providers'
[2025-07-08T17:53:15.199+0000] {cli_parser.py:81} WARNING - cannot load CLI commands from auth manager: The object could not be loaded. Please check "auth_manager" key in "core" section. Current value: "airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager".
[2025-07-08T17:53:15.200+0000] {cli_parser.py:82} WARNING - Auth manager is not configured and api-server will not be able to start.
DB: postgresql+psycopg2://postgres:***@postgres/airflow
Performing upgrade to the metadata database postgresql+psycopg2://postgres:***@postgres/airflow
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 10, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/site-packages/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/providers_configuration_loader.py", line 56, in wrapped_function
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/db_command.py", line 197, in migratedb
    run_db_migrate_command(args, db.upgradedb, _REVISION_HEADS_MAP)
  File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/db_command.py", line 125, in run_db_migrate_command
    command(
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 101, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/db.py", line 942, in upgradedb
    import_all_models()
  File "/usr/local/lib/python3.10/site-packages/airflow/models/__init__.py", line 59, in import_all_models
    __getattr__(name)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/__init__.py", line 81, in __getattr__
    val = import_string(f"{path}.{name}")
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 61, in import_string
    module = import_module(module_path)
  File "/usr/local/lib/python3.10/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/usr/local/lib/python3.10/site-packages/airflow/models/dag.py", line 65, in <module>
    from airflow.assets.evaluation import AssetEvaluator
  File "/usr/local/lib/python3.10/site-packages/airflow/assets/evaluation.py", line 34, in <module>
    from airflow.sdk.definitions.asset.decorators import MultiAssetDefinition
  File "/usr/local/lib/python3.10/site-packages/airflow/sdk/definitions/asset/decorators.py", line 25, in <module>
    from airflow.providers.standard.operators.python import PythonOperator
ModuleNotFoundError: No module named 'airflow.providers'
Failed to run 'airflow db migrate'.
This could be because you are installing old airflow version
Attempting to run deprecated 'airflow db init' instead.
[2025-07-08T17:53:17.482+0000] {configuration.py:1249} WARNING - No module named 'airflow.providers'
[2025-07-08T17:53:17.483+0000] {cli_parser.py:81} WARNING - cannot load CLI commands from auth manager: The object could not be loaded. Please check "auth_manager" key in "core" section. Current value: "airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager".
[2025-07-08T17:53:17.483+0000] {cli_parser.py:82} WARNING - Auth manager is not configured and api-server will not be able to start.
Usage: airflow db [-h] COMMAND ...

Database operations

Positional Arguments:
  COMMAND
    check           Check if the database can be reached
    check-migrations
                    Check if migration have finished
    clean           Purge old records in metastore tables
    downgrade       Downgrade the schema of the metadata database.
    drop-archived   Drop archived tables created through the db clean command
    export-archived
                    Export archived data from the archive tables
    migrate         Migrates the metadata database to the latest version
    reset           Burn down and rebuild the metadata database
    shell           Runs a shell to access the database

Options:
  -h, --help        show this help message and exit

airflow db command error: argument COMMAND: invalid choice: 'init' (choose from 'check', 'check-migrations', 'clean', 'downgrade', 'drop-archived', 'export-archived', 'migrate', 'reset', 'shell'), see help above.

Error: check_environment returned 2. Exiting.

Error 2 returned

@jason810496
Copy link
Member Author

Hi @potiuk, I test with scenarios including

  • Kuberntes or Celery Executor
  • Without remote logging or with Google Cloud Storage as remote logging

so there will be 4 permutations, and all of them work well !

@jason810496 jason810496 force-pushed the refactor/rework-webserver-oom-for-large-log-read branch from 7f6bc14 to 61db0f6 Compare July 9, 2025 10:14
@jason810496 jason810496 force-pushed the refactor/rework-webserver-oom-for-large-log-read branch from 61db0f6 to 339fe50 Compare July 9, 2025 13:20
@bbovenzi
Copy link
Contributor

bbovenzi commented Jul 9, 2025

#protm for whenever this gets merged

@jason810496
Copy link
Member Author

Just merged to latest main again to resolve conflict with #52581

@jason810496
Copy link
Member Author

Hi @potiuk,
Just wanted to check, do you think this is good to merge?
Or is there any additional testing I should perform?
Thanks!

@potiuk potiuk added the backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch label Jul 10, 2025
@potiuk
Copy link
Member

potiuk commented Jul 10, 2025

Hi @potiuk, I test with scenarios including

Kuberntes or Celery Executor
Without remote logging or with Google Cloud Storage as remote logging
so there will be 4 permutations, and all of them work well !

Fantastic! Thanks! All good!

@potiuk potiuk merged commit ee54fe9 into apache:main Jul 10, 2025
103 checks passed
@potiuk
Copy link
Member

potiuk commented Jul 10, 2025

🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉

Copy link

Backport failed to create: v3-0-test. View the failure log Run details

Status Branch Result
v3-0-test Commit Link

You can attempt to backport this manually by running:

cherry_picker ee54fe9 v3-0-test

This should apply the commit to the v3-0-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

@potiuk
Copy link
Member

potiuk commented Jul 10, 2025

I marked it for backport - but (as expected) it failed automatic cherry-picking - but you might want to attempt to fix the conflict and see if it can be merged to v3-0-tests ^^ instructions above.

@Lee-W
Copy link
Member

Lee-W commented Jul 10, 2025

Congrats 🎉

@n-badtke-cg
Copy link

n-badtke-cg commented Jul 10, 2025

How am I able to get a container image based the new main branch including this fix so that I can test this in a Kubernetes cluster? We already have a Dockerfile which uses a released Airflow container image a base

@jason810496
Copy link
Member Author

Thanks everyone for your help reviewing this PR along the way!
This PR has spanned over half a year, from before the Airflow 3.0 release to the final release, and it’s even been rewritten twice!

I marked it for backport - but (as expected) it failed automatic cherry-picking - but you might want to attempt to fix the conflict and see if it can be merged to v3-0-tests ^^ instructions above.

Will do! I’ll continue working on resolving the conflicts and push the fix to the v3-0-test branch.

How am I able to get a container image based the new main branch including this fix so that I can test this in a Kubernetes cluster? We already have a Dockerfile which uses a released Airflow container image a base

The fastest way in my mind is:

  1. Install Breeze using uv tool install -e ./dev/breeze
  2. Then run breeze k8s build-k8s-image --rebuild-base-image to build a Docker image based on the latest main branch.

@kaxil
Copy link
Member

kaxil commented Jul 10, 2025

#protm

HsiuChuanHsu pushed a commit to HsiuChuanHsu/airflow that referenced this pull request Jul 10, 2025
* Add note for new usage of LogMetadata

* Add _stream_parsed_lines_by_chunk

* Refactor _read_from_local/logs_server as return stream

* Refactor _interleave_logs with K-Way Merge

* Add _get_compatible_log_stream

* Refactor _read method to return stream with compatible interface

- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source

* Refactor log_reader to adapt stream

* Fix _read_from_local open closed file error

* Refactor LogReader by yielding in batch

* Add ndjson header to get_log openapi schema

* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs

* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp

* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
    - test_file_task_handler_when_ti_value_is_invalid
    - test_file_task_handler
    - test_file_task_handler_running
    - test_file_task_handler_rotate_size_limit
    - test__read_when_local
    - test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
    - test_interleave_interleaves
    - test_interleave_logs_correct_ordering
    - test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
    - test__stream_lines_by_chunk
    - test__log_stream_to_parsed_log_stream
    - test__sort_key
    - test__is_sort_key_with_default_timestamp
    - test__is_logs_stream_like
    - test__add_log_from_parsed_log_streams_to_heap

* Move test_log_handlers utils to test_common

* Fix unit/celery/log_handlers test

* Fix mypy-providers static check

* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream

* Fix amazon task_handler test

* Fix wask task handler test

* Fix elasticsearch task handler test

* Fix opensearch task handler test

* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer

* Fix test_log_reader

* Fix CloudWatchRemoteLogIO.read mypy

* Fix test_gcs_task_handler

* Fix core_api test_log

* Fix CloudWatchRemoteLogIO._event_to_str dt format

* Fix TestCloudRemoteLogIO.test_log_message

* Fix es/os task_hander convert_list_to_stream

* Fix compact tests

* Refactor es,os task handler for 3.0 compact

* Fix compat for RedisTaskHandler

* Fix ruff format for test_cloudwatch_task_handler after rebase

* Fix 2.10 compat TestCloudwatchTaskHandler

* Fix 3.0 compat test for celery, wasb

Fix wasb test, spelling

* Fix 3.0 compat test for gcs

* Fix 3.0 compat test for cloudwatch, s3

* Set get_log API default response format to JSON

* Remove "first_time_read" key in log metadata

* Remove "<source>_log_pos" key in log metadata

* Add LogStreamCounter for backward compatibility

* Remove "first_time_read" with backward "log_pos" for tests

- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos

* Fix RedisTaskHandler compatibility

* Fix chores in self review

- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
    - Fix doc string by removing outdate note
    - Only add buffer for full_download
- Add test ndjson format for get_log API

* Fine-tune HEAP_DUMP_SIZE

* Replace get_compatible_output_log_stream with iter

* Remove buffer in log_reader

* Fix log_id not found compact for es_task_handler

* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator

* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk

* Fix type hint, joinedload for ti.dag_run after merge

* Replace _sort_key as _create_sort_key

* Add _flush_logs_out_of_heap common util

* Fix review nits

 - _is_logs_stream_like
    - add type annotation
    - reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
    - refactor flush logic
    - make totoal_lines as property
    - make stream as property

* Fix mypy errors after merge

* Fix redis task handler test

* Refactor _capture logic in LogStreamAccumulator

* Add comments for ingore LogMetadata TypeDict

* Add comment for offset; Fix commet for LogMessages

* Refactor with from_iterable, islice

* Fix nits in test

- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap

* Refactor test_utils

* Add comment for lazy initialization

* Fix error handling for _stream_lines_by_chunk

* Fix mypy error after merge

* Fix final review nits

* Fix mypy error
@jscheffl
Copy link
Contributor

Wowo what a long story to get this merged! Impressive! Looking forward to test this.

As it is a major change I'd recommend NOT backporting to 3.0. We had a couple of rc's and still ongoing voting. I think it is good to have this in 3.1 as a feature increment. It is not really a bug fix helping for stability and I see rather the risk breaking in a minor patch.

jason810496 added a commit to jason810496/airflow that referenced this pull request Jul 11, 2025
* Add note for new usage of LogMetadata

* Add _stream_parsed_lines_by_chunk

* Refactor _read_from_local/logs_server as return stream

* Refactor _interleave_logs with K-Way Merge

* Add _get_compatible_log_stream

* Refactor _read method to return stream with compatible interface

- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source

* Refactor log_reader to adapt stream

* Fix _read_from_local open closed file error

* Refactor LogReader by yielding in batch

* Add ndjson header to get_log openapi schema

* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs

* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp

* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
    - test_file_task_handler_when_ti_value_is_invalid
    - test_file_task_handler
    - test_file_task_handler_running
    - test_file_task_handler_rotate_size_limit
    - test__read_when_local
    - test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
    - test_interleave_interleaves
    - test_interleave_logs_correct_ordering
    - test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
    - test__stream_lines_by_chunk
    - test__log_stream_to_parsed_log_stream
    - test__sort_key
    - test__is_sort_key_with_default_timestamp
    - test__is_logs_stream_like
    - test__add_log_from_parsed_log_streams_to_heap

* Move test_log_handlers utils to test_common

* Fix unit/celery/log_handlers test

* Fix mypy-providers static check

* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream

* Fix amazon task_handler test

* Fix wask task handler test

* Fix elasticsearch task handler test

* Fix opensearch task handler test

* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer

* Fix test_log_reader

* Fix CloudWatchRemoteLogIO.read mypy

* Fix test_gcs_task_handler

* Fix core_api test_log

* Fix CloudWatchRemoteLogIO._event_to_str dt format

* Fix TestCloudRemoteLogIO.test_log_message

* Fix es/os task_hander convert_list_to_stream

* Fix compact tests

* Refactor es,os task handler for 3.0 compact

* Fix compat for RedisTaskHandler

* Fix ruff format for test_cloudwatch_task_handler after rebase

* Fix 2.10 compat TestCloudwatchTaskHandler

* Fix 3.0 compat test for celery, wasb

Fix wasb test, spelling

* Fix 3.0 compat test for gcs

* Fix 3.0 compat test for cloudwatch, s3

* Set get_log API default response format to JSON

* Remove "first_time_read" key in log metadata

* Remove "<source>_log_pos" key in log metadata

* Add LogStreamCounter for backward compatibility

* Remove "first_time_read" with backward "log_pos" for tests

- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos

* Fix RedisTaskHandler compatibility

* Fix chores in self review

- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
    - Fix doc string by removing outdate note
    - Only add buffer for full_download
- Add test ndjson format for get_log API

* Fine-tune HEAP_DUMP_SIZE

* Replace get_compatible_output_log_stream with iter

* Remove buffer in log_reader

* Fix log_id not found compact for es_task_handler

* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator

* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk

* Fix type hint, joinedload for ti.dag_run after merge

* Replace _sort_key as _create_sort_key

* Add _flush_logs_out_of_heap common util

* Fix review nits

 - _is_logs_stream_like
    - add type annotation
    - reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
    - refactor flush logic
    - make totoal_lines as property
    - make stream as property

* Fix mypy errors after merge

* Fix redis task handler test

* Refactor _capture logic in LogStreamAccumulator

* Add comments for ingore LogMetadata TypeDict

* Add comment for offset; Fix commet for LogMessages

* Refactor with from_iterable, islice

* Fix nits in test

- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap

* Refactor test_utils

* Add comment for lazy initialization

* Fix error handling for _stream_lines_by_chunk

* Fix mypy error after merge

* Fix final review nits

* Fix mypy error

(cherry picked from commit ee54fe9)
@potiuk
Copy link
Member

potiuk commented Jul 11, 2025

As it is a major change I'd recommend NOT backporting to 3.0. We had a couple of rc's and still ongoing voting. I think it is good to have this in 3.1 as a feature increment. It is not really a bug fix helping for stability and I see rather the risk breaking in a minor patch.

Actually maybe - since 3.0.3 is about to be released - if we are able to test it in the next few weeks - I would be for trying to backport it for 3.0.4. There are a number of users who experienced this issue and we could even make an interesting approach - we could release 3.0.4 b1 with only that change applied and ask the users who had this problem to run it (on their own risk of course). We have never done that before other than for X.0.0 - but there is nothing stopoing us from doing it - and then we could give it a few weeks to test it. This way we might be able to give it in the hands of users in 3.0.4 - possibly a month before 3.1.0 (and also 3.1.0 will likely have other changes that might cause other issues).

I think this is one of those cases where - while massive - this is essentially a bug-fix, not a new feature and it has quite an impact.

But - it is not a must of course. Just one of possibilities I see.

kaxil pushed a commit to jason810496/airflow that referenced this pull request Jul 11, 2025
* Add note for new usage of LogMetadata

* Add _stream_parsed_lines_by_chunk

* Refactor _read_from_local/logs_server as return stream

* Refactor _interleave_logs with K-Way Merge

* Add _get_compatible_log_stream

* Refactor _read method to return stream with compatible interface

- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source

* Refactor log_reader to adapt stream

* Fix _read_from_local open closed file error

* Refactor LogReader by yielding in batch

* Add ndjson header to get_log openapi schema

* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs

* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp

* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
    - test_file_task_handler_when_ti_value_is_invalid
    - test_file_task_handler
    - test_file_task_handler_running
    - test_file_task_handler_rotate_size_limit
    - test__read_when_local
    - test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
    - test_interleave_interleaves
    - test_interleave_logs_correct_ordering
    - test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
    - test__stream_lines_by_chunk
    - test__log_stream_to_parsed_log_stream
    - test__sort_key
    - test__is_sort_key_with_default_timestamp
    - test__is_logs_stream_like
    - test__add_log_from_parsed_log_streams_to_heap

* Move test_log_handlers utils to test_common

* Fix unit/celery/log_handlers test

* Fix mypy-providers static check

* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream

* Fix amazon task_handler test

* Fix wask task handler test

* Fix elasticsearch task handler test

* Fix opensearch task handler test

* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer

* Fix test_log_reader

* Fix CloudWatchRemoteLogIO.read mypy

* Fix test_gcs_task_handler

* Fix core_api test_log

* Fix CloudWatchRemoteLogIO._event_to_str dt format

* Fix TestCloudRemoteLogIO.test_log_message

* Fix es/os task_hander convert_list_to_stream

* Fix compact tests

* Refactor es,os task handler for 3.0 compact

* Fix compat for RedisTaskHandler

* Fix ruff format for test_cloudwatch_task_handler after rebase

* Fix 2.10 compat TestCloudwatchTaskHandler

* Fix 3.0 compat test for celery, wasb

Fix wasb test, spelling

* Fix 3.0 compat test for gcs

* Fix 3.0 compat test for cloudwatch, s3

* Set get_log API default response format to JSON

* Remove "first_time_read" key in log metadata

* Remove "<source>_log_pos" key in log metadata

* Add LogStreamCounter for backward compatibility

* Remove "first_time_read" with backward "log_pos" for tests

- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos

* Fix RedisTaskHandler compatibility

* Fix chores in self review

- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
    - Fix doc string by removing outdate note
    - Only add buffer for full_download
- Add test ndjson format for get_log API

* Fine-tune HEAP_DUMP_SIZE

* Replace get_compatible_output_log_stream with iter

* Remove buffer in log_reader

* Fix log_id not found compact for es_task_handler

* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator

* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk

* Fix type hint, joinedload for ti.dag_run after merge

* Replace _sort_key as _create_sort_key

* Add _flush_logs_out_of_heap common util

* Fix review nits

 - _is_logs_stream_like
    - add type annotation
    - reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
    - refactor flush logic
    - make totoal_lines as property
    - make stream as property

* Fix mypy errors after merge

* Fix redis task handler test

* Refactor _capture logic in LogStreamAccumulator

* Add comments for ingore LogMetadata TypeDict

* Add comment for offset; Fix commet for LogMessages

* Refactor with from_iterable, islice

* Fix nits in test

- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap

* Refactor test_utils

* Add comment for lazy initialization

* Fix error handling for _stream_lines_by_chunk

* Fix mypy error after merge

* Fix final review nits

* Fix mypy error

(cherry picked from commit ee54fe9)
jason810496 added a commit to jason810496/airflow that referenced this pull request Jul 14, 2025
* Add note for new usage of LogMetadata

* Add _stream_parsed_lines_by_chunk

* Refactor _read_from_local/logs_server as return stream

* Refactor _interleave_logs with K-Way Merge

* Add _get_compatible_log_stream

* Refactor _read method to return stream with compatible interface

- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source

* Refactor log_reader to adapt stream

* Fix _read_from_local open closed file error

* Refactor LogReader by yielding in batch

* Add ndjson header to get_log openapi schema

* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs

* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp

* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
    - test_file_task_handler_when_ti_value_is_invalid
    - test_file_task_handler
    - test_file_task_handler_running
    - test_file_task_handler_rotate_size_limit
    - test__read_when_local
    - test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
    - test_interleave_interleaves
    - test_interleave_logs_correct_ordering
    - test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
    - test__stream_lines_by_chunk
    - test__log_stream_to_parsed_log_stream
    - test__sort_key
    - test__is_sort_key_with_default_timestamp
    - test__is_logs_stream_like
    - test__add_log_from_parsed_log_streams_to_heap

* Move test_log_handlers utils to test_common

* Fix unit/celery/log_handlers test

* Fix mypy-providers static check

* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream

* Fix amazon task_handler test

* Fix wask task handler test

* Fix elasticsearch task handler test

* Fix opensearch task handler test

* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer

* Fix test_log_reader

* Fix CloudWatchRemoteLogIO.read mypy

* Fix test_gcs_task_handler

* Fix core_api test_log

* Fix CloudWatchRemoteLogIO._event_to_str dt format

* Fix TestCloudRemoteLogIO.test_log_message

* Fix es/os task_hander convert_list_to_stream

* Fix compact tests

* Refactor es,os task handler for 3.0 compact

* Fix compat for RedisTaskHandler

* Fix ruff format for test_cloudwatch_task_handler after rebase

* Fix 2.10 compat TestCloudwatchTaskHandler

* Fix 3.0 compat test for celery, wasb

Fix wasb test, spelling

* Fix 3.0 compat test for gcs

* Fix 3.0 compat test for cloudwatch, s3

* Set get_log API default response format to JSON

* Remove "first_time_read" key in log metadata

* Remove "<source>_log_pos" key in log metadata

* Add LogStreamCounter for backward compatibility

* Remove "first_time_read" with backward "log_pos" for tests

- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos

* Fix RedisTaskHandler compatibility

* Fix chores in self review

- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
    - Fix doc string by removing outdate note
    - Only add buffer for full_download
- Add test ndjson format for get_log API

* Fine-tune HEAP_DUMP_SIZE

* Replace get_compatible_output_log_stream with iter

* Remove buffer in log_reader

* Fix log_id not found compact for es_task_handler

* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator

* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk

* Fix type hint, joinedload for ti.dag_run after merge

* Replace _sort_key as _create_sort_key

* Add _flush_logs_out_of_heap common util

* Fix review nits

 - _is_logs_stream_like
    - add type annotation
    - reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
    - refactor flush logic
    - make totoal_lines as property
    - make stream as property

* Fix mypy errors after merge

* Fix redis task handler test

* Refactor _capture logic in LogStreamAccumulator

* Add comments for ingore LogMetadata TypeDict

* Add comment for offset; Fix commet for LogMessages

* Refactor with from_iterable, islice

* Fix nits in test

- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap

* Refactor test_utils

* Add comment for lazy initialization

* Fix error handling for _stream_lines_by_chunk

* Fix mypy error after merge

* Fix final review nits

* Fix mypy error

(cherry picked from commit ee54fe9)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:logging backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.