Skip to content

Conversation

machichima
Copy link
Member

@machichima machichima commented May 17, 2025

Tracking issue

flyteorg/flyte#6277

Why are the changes needed?

Currently, we cannot use ContainerTask in map task. This PR is trying to make this integration works.

What changes were proposed in this pull request?

  • Make ContainerTask work with map task in local and remote
  • Add unit test for running ContainerTask with map task locally

How was this patch tested?

unit test & example code below:

Execute following code locally and remotly should pass:

  • local: pyflyte run containertask_maptask.py wf
  • remote: pyflyte run --remote containertask_maptask.py wf
# containertask_maptask.py
import functools

from flytekit import ContainerTask, kwtypes, map_task, workflow

calculate_ellipse_area_python_template_style = ContainerTask(
    name="calculate_ellipse_area_python_template_style",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float),
    image="ghcr.io/flyteorg/rawcontainers-python:v2",
    command=[
        "python",
        "calculate-ellipse-area.py",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)


@workflow
def nomap_wf(a: float = 0.5, b: float = 0.4) -> float:
    res = calculate_ellipse_area_python_template_style(a, b)
    return res


@workflow
def wf(a: list[float] = [3.0, 4.0, 5.0], b: float = 4.0) -> list[float]:
    partial_task = functools.partial(calculate_ellipse_area_python_template_style, b=b)
    res = map_task(partial_task)(a=a)
    return res

Setup process

Screenshots

  • Result in remote

image

  • Result in local

image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This pull request enhances map task functionality by integrating ContainerTask support for both local and remote execution. It includes significant refactoring of the execute method to return native Python types, improving usability and maintainability. Comprehensive unit tests validate this integration and confirm expected behavior, ensuring expected functionality in both environments.

@machichima machichima changed the title 6277 map container [Core feature] map_task to support ContainerTask May 17, 2025
Copy link

codecov bot commented May 17, 2025

Codecov Report

Attention: Patch coverage is 52.94118% with 8 lines in your changes missing coverage. Please review.

Project coverage is 76.36%. Comparing base (3c6b61d) to head (2b9387d).

Files with missing lines Patch % Lines
flytekit/core/array_node_map_task.py 44.44% 3 Missing and 2 partials ⚠️
flytekit/core/container_task.py 57.14% 1 Missing and 2 partials ⚠️

❗ There is a different number of reports uploaded between BASE (3c6b61d) and HEAD (2b9387d). Click for more details.

HEAD has 40 uploads less than BASE
Flag BASE (3c6b61d) HEAD (2b9387d)
41 1
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3249      +/-   ##
==========================================
- Coverage   83.35%   76.36%   -7.00%     
==========================================
  Files         347      215     -132     
  Lines       28791    22528    -6263     
  Branches     2960     2966       +6     
==========================================
- Hits        23999    17203    -6796     
- Misses       3956     4507     +551     
+ Partials      836      818      -18     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

monkeypatch.undo()


@pytest.mark.skipif(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modification in this file before this line is simply caused by formatter

-e
Signed-off-by: machichima <[email protected]>
# tasks that rely on user code defined in the container. This should be encapsulated by the auto container
# parent class
container._args = prefix_with_fast_execute(settings, container.args)
container._args = prefix_with_fast_execute(settings, container.args or [])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

args for ContainerTask is None, adding this to prevent error

try:
o = self._run_task.execute(**single_instance_inputs)
# For Container task, it will return the LiteralMap. We need to convert it to native
# type here.
Copy link
Contributor

@wild-endeavor wild-endeavor May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machichima can you add comment please that this is only for local execution for container tasks? this code doesn't run for backend cluster runs of container task.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I make ContainerTask return Python type instead of LiteralMap. So this part is not needed anymore.
Thanks!

-e
Signed-off-by: machichima <[email protected]>
@machichima machichima force-pushed the 6277-map-container branch from 2c2bd66 to 0f6dd70 Compare May 24, 2025 13:17
@machichima machichima requested a review from wild-endeavor May 25, 2025 09:35
@machichima
Copy link
Member Author

machichima commented May 25, 2025

I think let ConatinerTask return python type break tests for plugins. Will have a look

Update: I think it's not my problem. Merging the master branch and fixed

@machichima machichima force-pushed the 6277-map-container branch from a63386e to f15415b Compare June 3, 2025 21:49
@flyte-bot
Copy link
Contributor

Bito Review Skipped - No Changes Detected

Bito didn't review this pull request because we did not detect any changes in the pull request to review.

1 similar comment
@flyte-bot
Copy link
Contributor

Bito Review Skipped - No Changes Detected

Bito didn't review this pull request because we did not detect any changes in the pull request to review.

Comment on lines 212 to 213
if isinstance(self._run_task, ContainerTask):
return self.python_function_task.get_container(settings)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
We can modify self.prepare_target, do nothing for container task

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! Thanks

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an intergration test to make sure it works well in remote execution?

Comment on lines +199 to +202
if isinstance(self._run_task, ContainerTask):
yield
return

Copy link
Member

@Future-Outlier Future-Outlier Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just return directly?
update: we need to return a generator, please ignore my comment.

@machichima
Copy link
Member Author

Can we add an intergration test to make sure it works well in remote execution?

Added! Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants