-
Notifications
You must be signed in to change notification settings - Fork 15.9k
Fix CloudwatchTaskHandler display error #54054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix CloudwatchTaskHandler display error #54054
Conversation
|
Just tested fixed CloudwatchTaskHandler with different airflow-core version setups and all of them display correctly. I setup the following airflow-core version matrix against fixed CloudwatchTaskHandler.
After fix, the log should be format correctly on frontend, screenshot as below |
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still goes and loads everything into memory for Cloudwatch doesn't it? As we call io.read() which does [x for x in self.stream()] -- I wonder if the read code in file_task_handler should check for a io.stream function and use that in preference over io.read?
That might be a bit complex, but either way I think we need a bit more thought here to not undo all the changes you had for paging/oom of logs
Yes, I'm still WIP on local for setup stream-based read with backward compatibility.
I had considered compact mechanism in #49470 by peeking the type of the So we just need to make sure the provider with airflow-core after 3.0.3 version should return stream-based. # in `version_compat` module
# or maybe just use `get_base_airflow_version_tuple() >= (3, 0, 3)` directly in provider FileTaskHandler
STREAM_BASED_READ = get_base_airflow_version_tuple() >= (3, 0, 3)
# in provider FileTaskHandler
if STREAM_BASED_READ:
return messages, log_streams # generator of StructuredLogMessage or str
else:
return messages, log_list # list of str ( not memory efficient, but only after 3.0.3 airflow-core can support stream-based read ) |
7e5785d to
de079f2
Compare
| # These types are similar, but have distinct names to make processing them less error prone | ||
| LogMessages: TypeAlias = list[str] | ||
| """The legacy format of log messages before 3.0.2""" | ||
| """The legacy format of log messages before 3.0.3""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it was 3.0.4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the OOM log PR only got released in 3.0.4
| class DateTimeEncoder(json.JSONEncoder): | ||
| """Custom JSON encoder to handle datetime serialization.""" | ||
|
|
||
| def default(self, obj): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def default(self, obj): | |
| def default(self, obj: object) -> str: |
| def read(self, relative_path, ti: RuntimeTI) -> LegacyLogResponse: | ||
| messages, logs = self.stream(relative_path, ti) | ||
|
|
||
| return messages, [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth making it a function. This list comprehension is not that easy to comprehend
| logs = [(self._event_to_str(event) for event in events)] | ||
| else: | ||
| logs = ["\n".join(self._event_to_str(event) for event in events)] | ||
| except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whehter it's possible to list all possible exception instead of using Exception
| logs: list[str] | list[Generator[str, None, None]] | ||
| try: | ||
| events = self.io.get_cloudwatch_logs(stream_name, task_instance) | ||
| if SUPPORT_STREAM_BASED_READ: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read on CloudwatchTaskHandler should only ever be called for old Airflow Core versions (Newer versions use CloudwatchRemoteIO directly instead -- do we need this here?
de079f2 to
4832d37
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the changes to this file backward compatible with Airflow 2.10? PRs that change both core and providers may hide compatibility issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and I agree.
I will test the CloudWatchHandler with Airflow 2.10 as well later on.
ffa3797 to
5a775df
Compare
- .stream method should return gen[str] but it return in gen[dict] in previous fix - Make _parse_cloudwatch_log_event return as str intead of dict can fix the problem
30a2528 to
847a238
Compare








related: #49470
Why
As noted in #49470 (comment), PR #49470 unintentionally broke the display format of
CloudWatchHandler.What
This PR fixes the display issue for
CloudWatchHandlerand ensure it will display properly across differentairflow-coreversions.