Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

"""

from __future__ import annotations

import asyncio
import collections
import datetime
Expand Down Expand Up @@ -127,6 +129,7 @@
pod_template_name (Optional[str]): The name of an existing PodTemplate resource in the cluster which will be used for this task.
generates_deck (bool): Indicates whether the task will generate a Deck URI.
is_eager (bool): Indicates whether the task should be treated as eager.
execution_mode(Optional[Any]): Indicates whether the task should be treated as dynamic workflow.
"""

cache: bool = False
Expand All @@ -140,6 +143,9 @@
pod_template_name: Optional[str] = None
generates_deck: bool = False
is_eager: bool = False
execution_mode: Optional[Any] = (
None # set type as Any due to circular import, the actual type will be PythonFunctionTask.ExecutionBehavior once class init
)

def __post_init__(self):
if self.timeout:
Expand All @@ -155,6 +161,10 @@
raise ValueError(
f"Cache ignore input vars are specified ``cache_ignore_input_vars={self.cache_ignore_input_vars}`` but ``cache`` is not enabled."
)
if self.execution_mode is None:
from flytekit.core.python_function_task import PythonFunctionTask

Check warning on line 165 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L165

Added line #L165 was not covered by tests

self.execution_mode = PythonFunctionTask.ExecutionBehavior.DEFAULT

Check warning on line 167 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L167

Added line #L167 was not covered by tests

@property
def retry_strategy(self) -> _literal_models.RetryStrategy:
Expand All @@ -179,6 +189,7 @@
cache_serializable=self.cache_serialize,
generates_deck=self.generates_deck,
pod_template_name=self.pod_template_name,
execution_mode=self.execution_mode,
cache_ignore_input_vars=self.cache_ignore_input_vars,
is_eager=self.is_eager,
)
Expand Down
17 changes: 17 additions & 0 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from enum import Enum
from typing import Any, Callable, Iterable, List, Optional, Tuple, TypeVar, Union, cast

from flyteidl.core import tasks_pb2

from flytekit.configuration import ImageConfig, ImageSpec, SerializationSettings
from flytekit.core import launch_plan as _annotated_launch_plan
from flytekit.core.base_task import Task, TaskMetadata, TaskResolverMixin
Expand Down Expand Up @@ -127,6 +129,21 @@
DYNAMIC = 2
EAGER = 3

def to_flyte_idl(self) -> tasks_pb2.TaskMetadata.ExecutionMode:
"""convert ExecutionBehavior into flyteidl type"""
return {

Check warning on line 134 in flytekit/core/python_function_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/python_function_task.py#L134

Added line #L134 was not covered by tests
self.__class__.DEFAULT.value: tasks_pb2.TaskMetadata.ExecutionMode.DEFAULT,
self.__class__.DYNAMIC.value: tasks_pb2.TaskMetadata.ExecutionMode.DYNAMIC,
}[self.value]

@classmethod
def from_flyte_idl(cls, execution_mode: tasks_pb2.TaskMetadata.ExecutionMode):
"""convert flyteidl type into ExecutionBehavior"""
return {

Check warning on line 142 in flytekit/core/python_function_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/python_function_task.py#L142

Added line #L142 was not covered by tests
tasks_pb2.TaskMetadata.ExecutionMode.DEFAULT: cls.DEFAULT,
tasks_pb2.TaskMetadata.ExecutionMode.DYNAMIC: cls.DYNAMIC,
}[execution_mode]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing fallback for unknown enum values

The from_flyte_idl method doesn't handle potential new execution modes that might be added to the protobuf definition in the future. If a new mode is added to tasks_pb2.TaskMetadata.ExecutionMode but not handled here, it would result in a KeyError.

Code suggestion
Check the AI-generated fix before applying
Suggested change
}[execution_mode]
}.get(execution_mode, cls.DEFAULT) # Default to DEFAULT mode for unknown enum values

Code Review Run #0d112d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them


def __init__(
self,
task_config: T,
Expand Down
1 change: 1 addition & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ def wrapper(fn: Callable[P, FuncOut]) -> PythonFunctionTask[T]:
interruptible=interruptible,
deprecated=deprecated,
timeout=timeout,
execution_mode=execution_mode,
)

if inspect.iscoroutinefunction(fn):
Expand Down
19 changes: 19 additions & 0 deletions flytekit/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
cache_serializable,
pod_template_name,
cache_ignore_input_vars,
execution_mode,
is_eager: bool = False,
generates_deck: bool = False,
):
Expand Down Expand Up @@ -221,6 +222,13 @@
self._cache_ignore_input_vars = cache_ignore_input_vars
self._is_eager = is_eager
self._generates_deck = generates_deck
self._execution_mode = execution_mode

Check warning on line 225 in flytekit/models/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/models/task.py#L225

Added line #L225 was not covered by tests

def __post_init__(self):
if self.execution_mode is None:
from flytekit.core.python_function_task import PythonFunctionTask

Check warning on line 229 in flytekit/models/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/models/task.py#L229

Added line #L229 was not covered by tests

self.execution_mode = PythonFunctionTask.ExecutionBehavior.DEFAULT

Check warning on line 231 in flytekit/models/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/models/task.py#L231

Added line #L231 was not covered by tests
Comment on lines +228 to +231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing property getter for execution_mode attribute

The __post_init__ method is setting a default value for execution_mode if it's None, but there's no corresponding property getter/setter for execution_mode. This means that self.execution_mode in line 228 will raise an AttributeError since it's trying to access a property that doesn't exist (the actual attribute is _execution_mode). Consider adding a property getter/setter for execution_mode or directly accessing self._execution_mode.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if self.execution_mode is None:
from flytekit.core.python_function_task import PythonFunctionTask
self.execution_mode = PythonFunctionTask.ExecutionBehavior.DEFAULT
if self._execution_mode is None:
from flytekit.core.python_function_task import PythonFunctionTask
self._execution_mode = PythonFunctionTask.ExecutionBehavior.DEFAULT

Code Review Run #0d112d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them


@property
def is_eager(self):
Expand Down Expand Up @@ -318,6 +326,13 @@
"""
return self._cache_ignore_input_vars

@property
def execution_mode(self):
"""
The execution mode is either default or dynamic
"""
return self._execution_mode

Check warning on line 334 in flytekit/models/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/models/task.py#L334

Added line #L334 was not covered by tests

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.task_pb2.TaskMetadata
Expand All @@ -334,6 +349,7 @@
cache_ignore_input_vars=self.cache_ignore_input_vars,
is_eager=self.is_eager,
generates_deck=BoolValue(value=self.generates_deck),
mode=self.execution_mode.to_flyte_idl(),
)
if self.timeout:
tm.timeout.FromTimedelta(self.timeout)
Expand All @@ -345,6 +361,8 @@
:param flyteidl.core.task_pb2.TaskMetadata pb2_object:
:rtype: TaskMetadata
"""
from flytekit.core.python_function_task import PythonFunctionTask

Check warning on line 364 in flytekit/models/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/models/task.py#L364

Added line #L364 was not covered by tests

return cls(
discoverable=pb2_object.discoverable,
runtime=RuntimeMetadata.from_flyte_idl(pb2_object.runtime),
Expand All @@ -358,6 +376,7 @@
cache_ignore_input_vars=pb2_object.cache_ignore_input_vars,
is_eager=pb2_object.is_eager,
generates_deck=pb2_object.generates_deck.value if pb2_object.HasField("generates_deck") else False,
execution_mode=PythonFunctionTask.ExecutionBehavior.from_flyte_idl(pb2_object.mode),
)


Expand Down
5 changes: 4 additions & 1 deletion tests/flytekit/common/parameterizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from flytekit.models import interface, literals, security, task, types
from flytekit.models.core import identifier
from flytekit.models.core import types as _core_types
from flytekit.core.python_function_task import PythonFunctionTask

LIST_OF_SCALAR_LITERAL_TYPES = [
types.LiteralType(simple=types.SimpleType.BINARY),
Expand Down Expand Up @@ -125,8 +126,9 @@
cache_serializable,
pod_template_name,
cache_ignore_input_vars,
execution_mode,
)
for discoverable, runtime_metadata, timeout, retry_strategy, interruptible, discovery_version, deprecated, cache_serializable, pod_template_name, cache_ignore_input_vars in product(
for discoverable, runtime_metadata, timeout, retry_strategy, interruptible, discovery_version, deprecated, cache_serializable, pod_template_name, cache_ignore_input_vars, execution_mode in product(
[True, False],
LIST_OF_RUNTIME_METADATA,
[timedelta(days=i) for i in range(3)],
Expand All @@ -137,6 +139,7 @@
[True, False],
["A", "B"],
[()],
[PythonFunctionTask.ExecutionBehavior.DEFAULT]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing comma in product function parameters

The product function is now including execution_mode as a parameter, but the corresponding list in the product call is missing a comma at the end of line 142. This could lead to syntax errors or unexpected behavior when the code is executed.

Code suggestion
Check the AI-generated fix before applying
Suggested change
[PythonFunctionTask.ExecutionBehavior.DEFAULT]
[PythonFunctionTask.ExecutionBehavior.DEFAULT],

Code Review Run #0d112d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

)
]

Expand Down
16 changes: 16 additions & 0 deletions tests/flytekit/unit/core/test_python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from flytekit.core.tracker import isnested, istestfunction
from flytekit.image_spec.image_spec import ImageBuildEngine, ImageSpec
from flytekit.tools.translator import get_serializable_task
from flyteidl.core import tasks_pb2
from tests.flytekit.unit.core import tasks


Expand Down Expand Up @@ -284,3 +285,18 @@ def foo3(x: int, y: str) -> int:
return x

assert foo3.python_interface.inputs_with_defaults == {"x": (int, None), "y": (str, None)}

def test_execution_behavior_conversion():
"""Test conversion between ExecutionBehavior and flyteidl ExecutionMode"""

# Test conversion for DEFAULT mode
assert (PythonFunctionTask.ExecutionBehavior.DEFAULT.to_flyte_idl() ==
tasks_pb2.TaskMetadata.ExecutionMode.DEFAULT)
assert (PythonFunctionTask.ExecutionBehavior.from_flyte_idl(tasks_pb2.TaskMetadata.ExecutionMode.DEFAULT) ==
PythonFunctionTask.ExecutionBehavior.DEFAULT)

# Test conversion for DYNAMIC mode
assert (PythonFunctionTask.ExecutionBehavior.DYNAMIC.to_flyte_idl() ==
tasks_pb2.TaskMetadata.ExecutionMode.DYNAMIC)
assert (PythonFunctionTask.ExecutionBehavior.from_flyte_idl(tasks_pb2.TaskMetadata.ExecutionMode.DYNAMIC) ==
PythonFunctionTask.ExecutionBehavior.DYNAMIC)
4 changes: 3 additions & 1 deletion tests/flytekit/unit/models/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flytekit.extras.accelerators import T4
from flytekit.models import literals, task, types
from flytekit.models.core import identifier
from flytekit.core.python_function_task import PythonFunctionTask
from tests.flytekit.common import parameterizers


Expand Down Expand Up @@ -74,7 +75,7 @@ def test_task_metadata():
"This is deprecated!",
True,
"A",
(),
()
)

assert obj.discoverable is True
Expand All @@ -88,6 +89,7 @@ def test_task_metadata():
assert obj.discovery_version == "0.1.1b0"
assert obj.pod_template_name == "A"
assert obj == task.TaskMetadata.from_flyte_idl(obj.to_flyte_idl())
assert obj.mode == PythonFunctionTask.ExecutionBehavior.DEFAULT


@pytest.mark.parametrize(
Expand Down
Loading