-
Notifications
You must be signed in to change notification settings - Fork 15.9k
Description
Description
Related context: #44753 (comment)
TL;DR
After conducting some research and implementing a POC, I would like to propose a potential solution. However, this solution requires changes to the airflow.utils.log.file_task_handler.FileTaskHandler. If the solution is accepted, it will necessitate modifications to 10 providers that extend the FileTaskHandler class.
Main Concept for Refactoring
The proposed solution focuses on:
- Returning a generator instead of loading the entire file content at once.
- Leveraging a heap to merge logs incrementally, rather than sorting entire chunks.
The POC for this refactoring shows a 90% reduction in memory usage with similar processing times!
Experiment Details
- 830 MB
- Approximately 8,670,000 lines
Main Root Causes of OOM
_interleave_logsFunction inairflow.utils.log.file_task_handler
- Extends all log strings into the
recordslist. - Sorts the entire
recordslist. - Yields lines with deduplication.
_readMethod inairflow.utils.log.file_task_handler.FileTaskHandler
- Joins all aggregated logs into a single string using:
"\n".join(_interleave_logs(all_log_sources))
- Methods That Use
_read:
These methods read the entire log content and return it as a string instead of a generator:_read_from_local_read_from_logs_server_read_remote_logs(Implemented by providers)
Proposed Refactoring Solution
The main concept includes:
- Return a generator for reading log sources (local or external) instead of whole file content as string.
- Merge logs using K-Way Merge instead of Sorting
- Since each source of logs is already sorted, merge them incrementally using
heapqwith streams of logs. - Return a stream of the merged result.
- Since each source of logs is already sorted, merge them incrementally using
Breaking Changes in This Solution
-
Interface of the
readMethod inFileTaskHandler:- Will now return a generator instead of a string.
-
Interfaces of
read_log_chunksandread_log_streaminTaskLogReader:- Adjustments to support the generator-based approach.
-
Methods That Use
_read_read_from_local_read_from_logs_server_read_remote_logs( there are 10 providers implement this method )
Experimental Environment:
- Setup: Docker Compose without memory limits.
- Memory Profiling: memray
- Log Size:
830 MB, about8670000lines
Benchmark Metrics
-
Original Implementation:
- Memory Usage: Average 3GB, peaks at 4GB when returning the final stream.
- Processing Time: ~60 seconds.
- Memory Flame Graph
-
POC (Refactored Implementation):
- Memory Usage: Average 300MB.
- Processing Time: ~60 seconds.
- Memory Flame Graph
Summary
Feel free to share any feedback! I believe we should have more discussions before adopting this solution, as it involves breaking changes to the FileTaskHandler interface and requires refactoring in 10 providers as well.
Related issues
TODO Tasks
- Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records #45129
- Refactor providers that implemented
FileTaskHandler_read_remote_logsmethod- Amazon AWS S3
- Google Cloud GCS
- Microsoft Azure WASB
- Apache HDFS
- Amazon AWS Cloud Watch ( replace
_readwith_read_remote_logsin Rework remote task log handling for the structlog era. #48491 )
_readmethod- Alibaba Cloud
-
Amazon AWS Cloud Watch - Elasticsearch
- OpenSearch
- Redis
- Refactor executors that implemented
get_task_log-
CeleryKubernetesExecutor -
KubernetesExecutor -
LocalKubernetesExecutor
-
- Remove compatible utility
- Add pagination interface for APIs
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct

