Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def my_workflow(x: typing.List[int]) -> int:
#
# ## Using `with_overrides`
#
# ### override Resources
# You can use the `with_overrides` method to override the resources allocated to the tasks dynamically.
# Let's understand how the resources can be initialized with an example.

Expand Down Expand Up @@ -131,6 +132,10 @@ def my_pipeline(x: typing.List[int]) -> int:
print(count_unique_numbers_1(x=[1, 1, 2]))
print(my_pipeline(x=[1, 1, 2]))

from typing import NamedTuple

import tensorflow as tf

# %% [markdown]
# You can see the memory allocation below. The memory limit is `500Mi` rather than `350Mi`, and the
# CPU limit is 4, whereas it should have been 6 as specified using `with_overrides`.
Expand All @@ -142,3 +147,78 @@ def my_pipeline(x: typing.List[int]) -> int:
# Resource allocated using "with_overrides" method
# :::
#
# ### override task_config
# Another example for using `with_overrides` method to override the `task_config`.
# In the following we take TF Trainning for example.
# Let’s understand how the TfJob can be initialized and override with an example.
#
# For task_config, refer to the {py:func}`flytekit:flytekit.task` documentation.
#
# To create a TensorFlow task, add {py:class}`flytekitplugins:flytekitplugins.kftensorflow.TfJob` config to the Flyte task, that is a plugin can run distributed TensorFlow training on Kubernetes.
# %%
from flytekit import Resources, dynamic, task, workflow
from flytekitplugins.kftensorflow import PS, Chief, TfJob, Worker

TrainingOutputs = NamedTuple(
"TrainingOutputs",
[
("model", tf.keras.Model),
("accuracy", float),
("loss", float),
],
)


@task(
task_config=TfJob(worker=Worker(replicas=1), ps=PS(replicas=1), chief=Chief(replicas=1)),
cache_version="1.0",
cache=True,
requests=Resources(cpu="1", mem="2048Mi"),
limits=Resources(cpu="1", mem="2048Mi"),
)
def train_model() -> TrainingOutputs:
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
X_train, X_test = X_train / 255.0, X_test / 255.0
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
model = tf.keras.models.Sequential(
[
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation="relu"),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10),
]
)
model.compile(
optimizer="adam", loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=["accuracy"]
)
BATCH_SIZE = 64
NUM_EPOCHS = 5
model.fit(X_train, y_train, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE)
test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=2)

return TrainingOutputs(model=model, accuracy=test_accuracy, loss=test_loss)


# %% [markdown]
# You can use `@dynamic` to generate tasks at runtime with any custom configurations you want, and `with_overrides` method overrides the old configuration allocations.
# For here we override the worker replica count.
# %%
@workflow
def my_tensorflow_workflow() -> TrainingOutputs:
return train_model()


@dynamic
def dynamic_run(new_worker: int) -> TrainingOutputs:
return train_model().with_overrides(
task_config=TfJob(worker=Worker(replicas=new_worker), ps=PS(replicas=1), chief=Chief(replicas=1))
)


# %% [markdown]
# You can execute the workflow locally.
# %%
if __name__ == "__main__":
print(my_tensorflow_workflow())
print(dynamic_run(new_worker=4))