Skip to content
56 changes: 35 additions & 21 deletions examples/productionizing/productionizing/customizing_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
# For either a request or limit, refer to the {py:class}`flytekit:flytekit.Resources` documentation.
#
#
# The following attributes can be specified for a `Resource`.
#
# 1. `cpu`
Expand Down Expand Up @@ -85,51 +86,64 @@ def my_workflow(x: typing.List[int]) -> int:
#
# ## Using `with_overrides`
#
# You can use the `with_overrides` method to override the resources allocated to the tasks dynamically.
# Let's understand how the resources can be initialized with an example.
# Tasks can also have task_config which provides configuration for a specific task types. For task_config, refer to the {py:func}`flytekit:flytekit.task` documentation.
#
# You can use the `with_overrides` method to override the resources and task_config allocated to the tasks dynamically.
# Let's understand how the resources can be initialized and override with an example.

# %% [markdown]
# Import the dependencies.
# %%
import typing # noqa: E402

from flytekit import Resources, task, workflow # noqa: E402
from flytekit import Resources, task, workflow, dynamic # noqa: E402
from flytekitplugins.kftensorflow import PS, Chief, TfJob, Worker # noqa: E402


# %% [markdown]
# Define a task and configure the resources to be allocated to it.
# You can use tasks decorated with memory and storage hints like regular tasks in a workflow.
# You can use tasks decorated with memory and storage hints like regular tasks in a workflow, or configuration for an {py:class}`flytekitplugins:flytekitplugins.kftensorflow.TfJob` that can run distributed TensorFlow training on Kubernetes.
# %%
@task(requests=Resources(cpu="1", mem="200Mi"), limits=Resources(cpu="2", mem="350Mi"))
def count_unique_numbers_1(x: typing.List[int]) -> int:
s = set()
for i in x:
s.add(i)
return len(s)

@task(
task_config=TfJob(
num_workers=1,
num_ps_replicas=1,
num_chief_replicas=1,
),
requests=Resources(cpu="1", mem="200Mi"),
limits=Resources(cpu="2", mem="350Mi"),
)
def run_tfjob() -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This task shouldn't just return a string — it should actually showcase a TF operation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain it? If I understand correctly, are you suggesting that I should providing a detailed description of the entire TensorFlow training code, as this might overshadow the importance of discussing the with_overrides method? or can I provide a link to this file that explains the TensorFlow processing, allowing us to focus on how to override the task_conf and how to run it? I want to emphasize that our primary focus here is on demonstrating the usage of overrides, or what my thought that is wrong?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Could you then include this as part of a note but not as a code block? Not a code block because we aren't including a full-fledged code snippet.


return "hello world"

# %% [markdown]
# Define a task that computes the square of a number.
# The `with_overrides` method overrides the old resource allocations.
# %%
@task
def square_1(x: int) -> int:
return x * x
@workflow
def my_run() -> str:
return run_tfjob().with_overrides(limits=Resources(cpu="6", mem="500Mi"))


# %% [markdown]
# The `with_overrides` method overrides the old resource allocations.
# Or you can use `@dynamic` to generate tasks at runtime with any custom configurations you want.
# %%
@dynamic
def dynamic_run(num_workers: int) -> str:
return run_tfjob().with_overrides(task_config=TfJob(
num_workers=num_workers,
num_ps_replicas=1,
num_chief_replicas=1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include a new line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please format the code using black and isort.

@workflow
def my_pipeline(x: typing.List[int]) -> int:
return square_1(x=count_unique_numbers_1(x=x)).with_overrides(limits=Resources(cpu="6", mem="500Mi"))

def start_dynamic_run(new_num_workers: int) -> str:
return dynamic_run(num_workers=new_num_workers)

# %% [markdown]
# You can execute the workflow locally.
# %%
if __name__ == "__main__":
print(count_unique_numbers_1(x=[1, 1, 2]))
print(my_pipeline(x=[1, 1, 2]))
print(f"Running my_run(): {my_run()}")
print(f"Running dynamic_run(num_workers=4): {start_dynamic_run(new_num_workers=4)}")

# %% [markdown]
# You can see the memory allocation below. The memory limit is `500Mi` rather than `350Mi`, and the
Expand Down