-
Notifications
You must be signed in to change notification settings - Fork 333
Add concurrency policy #3267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add concurrency policy #3267
Conversation
Thank you for opening this pull request! 🙌 These tips will help get your PR across the finish line:
|
@thomasjhuang Can you sign off on these commits? And can someone else let the CI run? |
167ea31
to
57e7827
Compare
Sorry about that, added the signoff |
flytekit/models/concurrency.py
Outdated
|
||
|
||
class ConcurrencyLimitBehavior(object): | ||
SKIP = _launch_plan_idl.ConcurrencyLimitBehavior.SKIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I encountered some errors in my local execution, According to the latest flyteidl, should we use SKIP = _launch_plan_idl.ConcurrencyLimitBehavior.CONCURRENCY_LIMIT_BEHAVIOR_SKIP
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I need to update this according to the merged PR in flyte. Thanks for the callout
168a830
to
6b353ee
Compare
LGTM |
77059fa
to
d9dce8e
Compare
4da0014
to
012276c
Compare
@kumare3 @davidmirror-ops These test failures are a little odd to me, they seem transient? would a rerun suffice here? |
Hmm weird how did the event loop vanish |
I fixed the failed test in https://github.com/flyteorg/flytekit/pull/3265/files#r2354426173 |
012276c
to
55bf831
Compare
Ah got it, thanks I've rebased and pushed again. |
|
||
|
||
def test_concurrency_limit_behavior(): | ||
assert ConcurrencyLimitBehavior.SKIP == _launch_plan_idl.ConcurrencyLimitBehavior.SKIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert ConcurrencyLimitBehavior.SKIP == _launch_plan_idl.ConcurrencyLimitBehavior.SKIP | |
assert ConcurrencyLimitBehavior.SKIP == _launch_plan_idl.CONCURRENCY_LIMIT_BEHAVIOR_SKIP |
@thomasjhuang
All _launch_plan_idl.ConcurrencyLimitBehavior.SKIP
below should be changed to _launch_plan_idl.CONCURRENCY_LIMIT_BEHAVIOR_SKIP
. This can solve the error AttributeError: Enum ConcurrencyLimitBehavior has no value defined for name 'SKIP'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, I noticed the issue yesterday as well. Thanks for the callout!
a029dab
to
a0f6004
Compare
@kumare3 @davidmirror-ops @popojk @machichima Looks like all checks have passed now! Thanks for the help :) |
Thank you @thomasjhuang! |
flytekit/core/launch_plan.py
Outdated
:param auto_activate: If set to True, the launch plan will be activated automatically on registration. | ||
:param concurrency: Defines execution concurrency limits and policy when limit is reached | ||
Default is False. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the docstring here being cut-off?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but I don't see the issue with this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:param auto_activate: If set to True, the launch plan will be activated automatically on registration. | |
:param concurrency: Defines execution concurrency limits and policy when limit is reached | |
Default is False. | |
:param auto_activate: If set to True, the launch plan will be activated automatically on registration. | |
Default is False. | |
:param concurrency: Defines execution concurrency limits and policy when limit is reached. |
Oh I mean "Default is False" is for auto_activate
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see! fixed it, thanks!
Head branch was pushed to by a user without write access
71f92e4
to
68d189a
Compare
Signed-off-by: thomasjhuang <[email protected]>
Signed-off-by: thomasjhuang <[email protected]>
Signed-off-by: thomasjhuang <[email protected]>
Signed-off-by: thomasjhuang <[email protected]>
Signed-off-by: thomasjhuang <[email protected]>
Co-authored-by: Nary Yeh <[email protected]> Signed-off-by: thomasjhuang <[email protected]>
68d189a
to
e1ba971
Compare
Signed-off-by: thomasjhuang <[email protected]>
Signed-off-by: thomasjhuang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!!
Congrats on merging your first pull request! 🎉 |
* Add concurrency policy Signed-off-by: thomasjhuang <[email protected]> * Update to latest idl Signed-off-by: thomasjhuang <[email protected]> * Reformat with ruff Signed-off-by: thomasjhuang <[email protected]> * Change reference Signed-off-by: thomasjhuang <[email protected]> * Update test Signed-off-by: thomasjhuang <[email protected]> * Update tests/flytekit/unit/models/test_concurrency.py Co-authored-by: Nary Yeh <[email protected]> Signed-off-by: thomasjhuang <[email protected]> * Update comment Signed-off-by: thomasjhuang <[email protected]> * Fix lint Signed-off-by: thomasjhuang <[email protected]> --------- Signed-off-by: thomasjhuang <[email protected]> Co-authored-by: Nary Yeh <[email protected]> Signed-off-by: Atharva <[email protected]>
Tracking issue
This is associated to flyte change flyteorg/flyte#6475, and RFC flyteorg/flyte#5659
Why are the changes needed?
These are the necessary changes needed to implement the ConcurrencyPolicy within flytekit. Namely, it will allow users to manage concurrency control via
LaunchPlan.create()
orLaunchPlan.get_or_create()
How was this patch tested?
Unit tests are added, local testing is also done.
Screenshots
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
This pull request introduces a new concurrency policy feature in the Flytekit library, enhancing concurrency control through LaunchPlan methods. It updates existing models and launch plan parameters to incorporate these settings, improving overall functionality. Comprehensive unit tests have been added to ensure reliability and system integrity.