-
Notifications
You must be signed in to change notification settings - Fork 518
fix(worker): add retryIfFailed method [python] [DRAFT] #3521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds retry functionality and pause support to the Worker class, along with cleanup improvements. The main changes include adding a retry_if_failed method for retrying failed operations, implementing a pause method to pause job processing, and updating the close method to emit events and set state flags.
- Added
retry_if_failedmethod for retrying coroutine functions with configurable delays and max retries - Implemented
pausemethod to pause worker and wait for active jobs to complete - Enhanced
closemethod to setclosedflag and emit 'closed' event
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if not self.paused: | ||
| self.paused = True |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The self.paused attribute is not initialized in the __init__ method. This will raise an AttributeError when pause() is called for the first time or when retry_if_failed() checks self.paused at line 238. Add self.paused = False in the __init__ method around line 56 where other state flags are initialized.
| retry += 1 | ||
| if retry >= max_retries: | ||
| raise err | ||
| await asyncio.sleep(delay_in_ms / 1000.0) |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The delay is applied even after the last retry fails before re-raising the exception. The sleep at line 245 should only occur if there will be another retry attempt. Consider moving the sleep to before the retry increment or adding a condition to skip it when retry + 1 >= max_retries.
| await asyncio.sleep(delay_in_ms / 1000.0) | |
| else: | |
| await asyncio.sleep(delay_in_ms / 1000.0) |
| finally: | ||
| self.jobs.remove((job, token)) | ||
|
|
||
| async def retry_if_failed(self, fn, opts=None): |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mixing implicit and explicit returns may indicate an error, as implicit returns always return None.
Why
Enter your explanation here.
How
Enter the implementation details here.
Additional Notes (Optional)
Any extra info here.