-
Notifications
You must be signed in to change notification settings - Fork 309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flytekit: Rename map_task to map, replace min_successes and min_success_ratio with tolerance, rename max_parallelism to concurrency #3107
base: master
Are you sure you want to change the base?
Conversation
- Rename map_task to map for simpler API - Replace min_successes/min_success_ratio with tolerance parameter - Rename max_parallelism to concurrency for consistency
Code Review Agent Run #d47fe6Actionable Suggestions - 13
Additional Suggestions - 10
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
@@ -1,7 +1,7 @@ | |||
import tempfile | |||
from pathlib import Path | |||
|
|||
from flytekit import FlyteDirectory, FlyteFile, map_task, task, workflow | |||
from flytekit import FlyteDirectory, FlyteFile, map, task, workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider if replacing map_task
with map
is intentional as they might have different functionality in the Flyte framework. map_task
is typically used for task parallelization while map
might have different semantics.
Code suggestion
Check the AI-generated fix before applying
from flytekit import FlyteDirectory, FlyteFile, map, task, workflow | |
from flytekit import FlyteDirectory, FlyteFile, map_task, task, workflow |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -26,6 +26,6 @@ def list_dir(dir: FlyteDirectory) -> list[FlyteFile]: | |||
def wf() -> list[str]: | |||
tmpdir = setup() | |||
files = list_dir(dir=tmpdir) | |||
return map_task(read_file)(file=files) | |||
return map(read_file)(file=files) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using map_task
instead of map
for task mapping operations in Flytekit workflows. The map
function may not provide the same task-level parallelization and execution guarantees as map_task
.
Code suggestion
Check the AI-generated fix before applying
return map(read_file)(file=files) | |
return map_task(read_file)(file=files) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -414,7 +414,7 @@ def create_sd() -> StructuredDataset: | |||
def test_map_over_notebook_task(): | |||
@workflow | |||
def wf(a: float) -> typing.List[float]: | |||
return map_task(nb_sub_task)(a=[a, a]) | |||
return map(nb_sub_task)(a=[a, a]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using map_task
instead of map
for mapping over notebook tasks. The map
function may not handle notebook task specific requirements correctly.
Code suggestion
Check the AI-generated fix before applying
return map(nb_sub_task)(a=[a, a]) | |
return map_task(nb_sub_task)(a=[a, a]) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
from flytekit._version import __version__ | ||
from flytekit.configuration import Config | ||
from flytekit.core.array_node_map_task import map_task | ||
from flytekit.core.array_node_map_task import map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider keeping both map_task
and map
imports to maintain backward compatibility. The alias is defined later but importing directly as map
may break existing code that uses map_task
.
Code suggestion
Check the AI-generated fix before applying
from flytekit.core.array_node_map_task import map | |
from flytekit.core.array_node_map_task import map_task |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -369,11 +370,12 @@ def _raw_execute(self, **kwargs) -> Any: | |||
return outputs | |||
|
|||
|
|||
def map_task( | |||
def map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider keeping the original function name map_task
instead of renaming to map
as it could conflict with Python's built-in map
function and cause confusion. The original name was more descriptive of the function's purpose.
Code suggestion
Check the AI-generated fix before applying
def map( | |
def map_task( |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -63,7 +63,7 @@ def say_hello(name: str) -> str: | |||
|
|||
@workflow | |||
def wf() -> List[str]: | |||
return map_task(say_hello)(name=["abc", "def"]) | |||
return map(say_hello)(name=["abc", "def"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider if using map()
instead of map_task()
is intentional as it changes the behavior from using Flyte's map task functionality to Python's built-in map()
.
Code suggestion
Check the AI-generated fix before applying
return map(say_hello)(name=["abc", "def"]) | |
return map_task(say_hello)(name=["abc", "def"]) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -575,7 +575,7 @@ def say_hello(name: str) -> str: | |||
for index, map_input_str in enumerate(list_strs): | |||
monkeypatch.setenv("BATCH_JOB_ARRAY_INDEX_VAR_NAME", "name") | |||
monkeypatch.setenv("name", str(index)) | |||
t = map_task(say_hello) | |||
t = map(say_hello) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider if using map()
instead of map_task()
is intentional as this could change the behavior of task mapping functionality.
Code suggestion
Check the AI-generated fix before applying
t = map(say_hello) | |
t = map_task(say_hello) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -410,7 +410,7 @@ def test_serialization_metadata(serialization_settings): | |||
def t1(a: int) -> int: | |||
return a + 1 | |||
|
|||
arraynode_maptask = map_task(t1, metadata=TaskMetadata(retries=2)) | |||
arraynode_maptask = map(t1, metadata=TaskMetadata(retries=2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider if changing from map_task
to map
could impact backward compatibility. The function name change from map_task
to map
may affect existing code that imports and uses the original function name.
Code suggestion
Check the AI-generated fix before applying
arraynode_maptask = map(t1, metadata=TaskMetadata(retries=2)) | |
# Maintain both for backward compatibility | |
arraynode_maptask = map_task(t1, metadata=TaskMetadata(retries=2)) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
t1 = map(say_hello, **kwargs1) | ||
t2 = map(say_hello, **kwargs2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider if replacing map_task
with map
is intentional as this changes the function being called which could affect functionality. The map_task
decorator appears to be imported but not used after this change.
Code suggestion
Check the AI-generated fix before applying
t1 = map(say_hello, **kwargs1) | |
t2 = map(say_hello, **kwargs2) | |
t1 = map_task(say_hello, **kwargs1) | |
t2 = map_task(say_hello, **kwargs2) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -316,7 +316,7 @@ def test_bounded_inputs_vars_order(serialization_settings): | |||
def task1(a: int, b: float, c: str) -> str: | |||
return f"{a} - {b} - {c}" | |||
|
|||
mt = map_task(functools.partial(task1, c=1.0, b="hello", a=1)) | |||
mt = map(functools.partial(task1, c=1.0, b="hello", a=1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using map_task()
instead of map()
as it appears to be the intended function based on the test context and imports. Using map()
could lead to unexpected behavior since it's a built-in Python function.
Code suggestion
Check the AI-generated fix before applying
mt = map(functools.partial(task1, c=1.0, b="hello", a=1)) | |
mt = map_task(functools.partial(task1, c=1.0, b="hello", a=1)) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -492,7 +492,7 @@ def test_supported_node_type(): | |||
def test_task(): | |||
... | |||
|
|||
map_task(test_task) | |||
map(test_task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function call has been changed from map_task(test_task)
to map(test_task)
. This could potentially cause confusion with Python's built-in map()
function. Consider using the imported map_task
decorator/function to maintain clarity and avoid potential naming conflicts.
Code suggestion
Check the AI-generated fix before applying
map(test_task) | |
map_task(test_task) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -533,7 +533,7 @@ def consume_directories(dirs: List[FlyteDirectory]): | |||
for path_info, other_info in d.crawl(): | |||
print(path_info) | |||
|
|||
mt = map_task(generate_directory, min_success_ratio=0.1) | |||
mt = map(generate_directory, min_success_ratio=0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider if using map()
instead of map_task()
is intentional as it may change the expected behavior. The map_task()
function is typically used for array node map tasks in Flytekit.
Code suggestion
Check the AI-generated fix before applying
mt = map(generate_directory, min_success_ratio=0.1) | |
mt = map_task(generate_directory, min_success_ratio=0.1) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -575,7 +575,7 @@ def say_hello(name: str) -> str: | |||
for index, map_input_str in enumerate(list_strs): | |||
monkeypatch.setenv("BATCH_JOB_ARRAY_INDEX_VAR_NAME", "name") | |||
monkeypatch.setenv("name", str(index)) | |||
t = map_task(say_hello) | |||
t = map(say_hello) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using map_task
instead of map
as it appears to be the intended decorator based on the imports and test context. The map
function could be confused with Python's built-in map
function.
Code suggestion
Check the AI-generated fix before applying
t = map(say_hello) | |
t = map_task(say_hello) |
Code Review Run #d47fe6
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: Chih Tsung Lu <[email protected]>
Signed-off-by: lu00122 <[email protected]> Signed-off-by: Chih Tsung Lu <[email protected]>
Code Review Agent Run #99b31dActionable Suggestions - 8
Additional Suggestions - 10
Review Details
|
@@ -315,7 +316,7 @@ def test_bounded_inputs_vars_order(serialization_settings): | |||
def task1(a: int, b: float, c: str) -> str: | |||
return f"{a} - {b} - {c}" | |||
|
|||
mt = map_task(functools.partial(task1, c=1.0, b="hello", a=1)) | |||
mt = map(functools.partial(task1, c=1.0, b="hello", a=1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function call parameters c=1.0, b="hello", a=1
appear to have mismatched types with the task definition. The task expects a: int, b: float, c: str
but receives c
as float, b
as string, and a
as int. Consider adjusting the parameter types to match the task signature.
Code suggestion
Check the AI-generated fix before applying
mt = map(functools.partial(task1, c=1.0, b="hello", a=1)) | |
mt = map(functools.partial(task1, c="1.0", b=1.0, a=1)) |
Code Review Run #99b31d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -1551,7 +1551,7 @@ def _execute( | |||
annotations=options.annotations, | |||
raw_output_data_config=options.raw_output_data_config, | |||
auth_role=None, | |||
max_parallelism=options.max_parallelism, | |||
concurrency=options.concurrency, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider verifying if renaming max_parallelism
to concurrency
maintains backward compatibility. This change could potentially break existing code that relies on the max_parallelism
parameter.
Code suggestion
Check the AI-generated fix before applying
concurrency=options.concurrency, | |
concurrency=options.max_parallelism if hasattr(options, 'max_parallelism') | |
else options.concurrency, | |
# TODO: Remove max_parallelism support in next major version | |
# Deprecated in favor of concurrency parameter |
Code Review Run #99b31d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -273,7 +273,7 @@ def t1(a: str) -> str: | |||
|
|||
@workflow | |||
def my_wf(a: typing.List[str]) -> typing.List[str]: | |||
mappy = map_task(t1) | |||
mappy = map(t1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using map_task
instead of map
as it appears to be the intended function based on the test context. The map
function may not provide the same task mapping functionality needed for workflow testing.
Code suggestion
Check the AI-generated fix before applying
mappy = map(t1) | |
mappy = map_task(t1) |
Code Review Run #99b31d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -726,7 +726,7 @@ def t1(x: int, y: int) -> int: | |||
|
|||
@workflow | |||
def w() -> int: | |||
return map_task(partial(t1, y=2))(x=[1, 2, 3]) | |||
return map(partial(t1, y=2))(x=[1, 2, 3]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using map_task
instead of map
as it appears to be testing map task functionality based on the test name and context.
Code suggestion
Check the AI-generated fix before applying
return map(partial(t1, y=2))(x=[1, 2, 3]) | |
return map_task(partial(t1, y=2))(x=[1, 2, 3]) |
Code Review Run #99b31d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
m1 = map(functools.partial(task1, c=param_c))(a=param_a, b=param_b) | ||
m2 = map(functools.partial(task2, c=param_c))(a=param_a, b=param_b) | ||
m3 = map(functools.partial(task3, c=param_c))(a=param_a, b=param_b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using map_task
instead of map
for consistency with the test name and module being tested (test_array_node_map_task.py
). The test appears to be validating array node map task functionality.
Code suggestion
Check the AI-generated fix before applying
m1 = map(functools.partial(task1, c=param_c))(a=param_a, b=param_b) | |
m2 = map(functools.partial(task2, c=param_c))(a=param_a, b=param_b) | |
m3 = map(functools.partial(task3, c=param_c))(a=param_a, b=param_b) | |
m1 = map_task(functools.partial(task1, c=param_c))(a=param_a, b=param_b) | |
m2 = map_task(functools.partial(task2, c=param_c))(a=param_a, b=param_b) | |
m3 = map_task(functools.partial(task3, c=param_c))(a=param_a, b=param_b) |
Code Review Run #99b31d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
map(test_dynamic) | ||
|
||
@eager | ||
def test_eager(): | ||
... | ||
|
||
with pytest.raises(ValueError): | ||
map_task(test_eager) | ||
map(test_eager) | ||
|
||
@workflow | ||
def test_wf(): | ||
... | ||
|
||
with pytest.raises(ValueError): | ||
map_task(test_wf) | ||
map(test_wf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using map_task()
instead of map()
to avoid confusion with Python's built-in map()
function. The change from map_task()
to map()
could lead to confusion.
Code suggestion
Check the AI-generated fix before applying
- map(test_dynamic)
+ map_task(test_dynamic)
@@ -468,1 +468,1 @@
- map(test_eager)
+ map_task(test_eager)
@@ -475,1 +475,1 @@
- map(test_wf)
+ map_task(test_wf)
Code Review Run #99b31d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -326,7 +336,8 @@ def get_or_create( | |||
labels, | |||
annotations, | |||
raw_output_data_config, | |||
max_parallelism, | |||
concurrency=concurrency, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 'init' method has too many parameters (14 > 5) and is missing docstring and return type annotation.
Code suggestion
Check the AI-generated fix before applying
- def __init__(
- self,
- name: str,
- workflow: _annotated_workflow.WorkflowBase,
- parameters: _interface_models.ParameterMap,
- fixed_inputs: _literal_models.LiteralMap,
- schedule: Optional[_schedule_model.Schedule] = None,
- notifications: Optional[List[_common_models.Notification]] = None,
- labels: Optional[_common_models.Labels] = None,
- annotations: Optional[_common_models.Annotations] = None,
- raw_output_data_config: Optional[_common_models.RawOutputDataConfig] = None,
- max_parallelism: Optional[int] = None,
- security_context: Optional[security.SecurityContext] = None,
- trigger: Optional[LaunchPlanTriggerBase] = None,
- overwrite_cache: Optional[bool] = None,
- auto_activate: bool = False,
- ):
+ @dataclass
+ class Config:
+ """Configuration for LaunchPlan initialization."""
+ name: str
+ workflow: _annotated_workflow.WorkflowBase
+ parameters: _interface_models.ParameterMap
+ fixed_inputs: _literal_models.LiteralMap
+ schedule: _schedule_model.Schedule | None = None
+ notifications: list[_common_models.Notification] | None = None
+ labels: _common_models.Labels | None = None
+ annotations: _common_models.Annotations | None = None
+ raw_output_data_config: _common_models.RawOutputDataConfig | None = None
+ max_parallelism: int | None = None
+ security_context: security.SecurityContext | None = None
+ trigger: LaunchPlanTriggerBase | None = None
+ overwrite_cache: bool | None = None
+ auto_activate: bool = False
+
+ def __init__(self, config: Config) -> None:
Code Review Run #99b31d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Tracking issue
Related to flyteorg/flyte#6139
Why are the changes needed?
The current Flytekit has several areas that could be improved for a better developer experience:
map_task
name is unnecessarily verbose when imported via the recommendedimport flytekit as fl
min_successes
andmin_success_ratio
) are powerful but overly verbosemap_task
'sconcurrency
parameterWhat changes were proposed in this pull request?
Rename
map_task
tomap
map
, it's acceptable since we recommend usingimport flytekit as fl
Simplify failure tolerance parameters
min_successes
andmin_success_ratio
tolerance
parameter that accepts bothfloat
andint
typesStandardize parallelism parameter
max_parallelism
argument in workflow and LaunchPlanconcurrency
parameter to matchmap_task
's parameterKnown issue
The changes introduce the concurrency field in Flytekit, which is not currently defined in flyteidl's LaunchPlanSpec
<img width="1561" alt="valueError" src="https://github.com/user-attachments/assets/e794e7d0-6393-4009-a320-988fdd1769cb" />
Code to Address the Issue:
The following code handles the transition between the concurrency and max_parallelism fields:
How was this patch tested?
Ran tests with the command: make test
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
This PR implements API improvements in Flytekit by renaming map_task to map, introducing a tolerance parameter to replace min_successes/min_success_ratio, and standardizing parallelism control by replacing max_parallelism with concurrency. The changes include comprehensive deprecation warnings and backward compatibility handling.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5