Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class TaskMetadata(object):
pod_template_name: Optional[str] = None
generates_deck: bool = False
is_eager: bool = False
execution_mode: "PythonFunctionTask.ExecutionBehavior" = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined name in type annotation

There's an undefined name PythonFunctionTask in the TaskMetadata class. This class is likely imported from another module but the import is missing. Consider adding the appropriate import or using a string literal for the type annotation.

Code suggestion
Check the AI-generated fix before applying
Suggested change
execution_mode: "PythonFunctionTask.ExecutionBehavior" = None
execution_mode: "PythonFunctionTask.ExecutionBehavior" = None

Code Review Run #0d112d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them


def __post_init__(self):
if self.timeout:
Expand All @@ -155,6 +156,10 @@ def __post_init__(self):
raise ValueError(
f"Cache ignore input vars are specified ``cache_ignore_input_vars={self.cache_ignore_input_vars}`` but ``cache`` is not enabled."
)
if self.execution_mode is None:
from flytekit.core.python_function_task import PythonFunctionTask

self.execution_mode = PythonFunctionTask.ExecutionBehavior.DEFAULT

@property
def retry_strategy(self) -> _literal_models.RetryStrategy:
Expand All @@ -179,6 +184,7 @@ def to_taskmetadata_model(self) -> _task_model.TaskMetadata:
cache_serializable=self.cache_serialize,
generates_deck=self.generates_deck,
pod_template_name=self.pod_template_name,
execution_mode=self.execution_mode,
cache_ignore_input_vars=self.cache_ignore_input_vars,
is_eager=self.is_eager,
)
Expand Down
16 changes: 16 additions & 0 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from enum import Enum
from typing import Any, Callable, Iterable, List, Optional, Tuple, TypeVar, Union, cast

from flyteidl.core import tasks_pb2
from flytekit.configuration import ImageConfig, ImageSpec, SerializationSettings
from flytekit.core import launch_plan as _annotated_launch_plan
from flytekit.core.base_task import Task, TaskMetadata, TaskResolverMixin
Expand Down Expand Up @@ -127,6 +128,21 @@ class ExecutionBehavior(Enum):
DYNAMIC = 2
EAGER = 3

def to_flyte_idl(self) -> tasks_pb2.TaskMetadata.ExecutionMode:
"""convert ExecutionBehavior into flyteidl type"""
return {
self.DEFAULT: tasks_pb2.TaskMetadata.ExecutionMode.DEFAULT,
self.DYNAMIC: tasks_pb2.TaskMetadata.ExecutionMode.DYNAMIC,
}[self]

@classmethod
def from_flyte_idl(cls, execution_mode: tasks_pb2.TaskMetadata.ExecutionMode):
"""convert flyteidl type into ExecutionBehavior"""
return {
tasks_pb2.TaskMetadata.ExecutionMode.DEFAULT: cls.DEFAULT,
tasks_pb2.TaskMetadata.ExecutionMode.DYNAMIC: cls.DYNAMIC,
}[execution_mode]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing fallback for unknown enum values

The from_flyte_idl method doesn't handle potential new execution modes that might be added to the protobuf definition in the future. If a new mode is added to tasks_pb2.TaskMetadata.ExecutionMode but not handled here, it would result in a KeyError.

Code suggestion
Check the AI-generated fix before applying
Suggested change
}[execution_mode]
}.get(execution_mode, cls.DEFAULT) # Default to DEFAULT mode for unknown enum values

Code Review Run #0d112d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them


def __init__(
self,
task_config: T,
Expand Down
1 change: 1 addition & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ def wrapper(fn: Callable[P, FuncOut]) -> PythonFunctionTask[T]:
interruptible=interruptible,
deprecated=deprecated,
timeout=timeout,
execution_mode=execution_mode,
)

if inspect.iscoroutinefunction(fn):
Expand Down
19 changes: 19 additions & 0 deletions flytekit/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def __init__(
cache_serializable,
pod_template_name,
cache_ignore_input_vars,
execution_mode,
is_eager: bool = False,
generates_deck: bool = False,
):
Expand Down Expand Up @@ -221,6 +222,13 @@ def __init__(
self._cache_ignore_input_vars = cache_ignore_input_vars
self._is_eager = is_eager
self._generates_deck = generates_deck
self._execution_mode = execution_mode

def __post_init__(self):
if self.execution_mode is None:
from flytekit.core.python_function_task import PythonFunctionTask

self.execution_mode = PythonFunctionTask.ExecutionBehavior.DEFAULT
Comment on lines +228 to +231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing property getter for execution_mode attribute

The __post_init__ method is setting a default value for execution_mode if it's None, but there's no corresponding property getter/setter for execution_mode. This means that self.execution_mode in line 228 will raise an AttributeError since it's trying to access a property that doesn't exist (the actual attribute is _execution_mode). Consider adding a property getter/setter for execution_mode or directly accessing self._execution_mode.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if self.execution_mode is None:
from flytekit.core.python_function_task import PythonFunctionTask
self.execution_mode = PythonFunctionTask.ExecutionBehavior.DEFAULT
if self._execution_mode is None:
from flytekit.core.python_function_task import PythonFunctionTask
self._execution_mode = PythonFunctionTask.ExecutionBehavior.DEFAULT

Code Review Run #0d112d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them


@property
def is_eager(self):
Expand Down Expand Up @@ -318,6 +326,13 @@ def cache_ignore_input_vars(self):
"""
return self._cache_ignore_input_vars

@property
def execution_mode(self):
"""
The execution mode is either default or dynamic
"""
return self._execution_mode

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.task_pb2.TaskMetadata
Expand All @@ -334,6 +349,7 @@ def to_flyte_idl(self):
cache_ignore_input_vars=self.cache_ignore_input_vars,
is_eager=self.is_eager,
generates_deck=BoolValue(value=self.generates_deck),
mode=self.execution_mode.to_flyte_idl(),
)
if self.timeout:
tm.timeout.FromTimedelta(self.timeout)
Expand All @@ -345,6 +361,8 @@ def from_flyte_idl(cls, pb2_object: _core_task.TaskMetadata):
:param flyteidl.core.task_pb2.TaskMetadata pb2_object:
:rtype: TaskMetadata
"""
from flytekit.core.python_function_task import PythonFunctionTask

return cls(
discoverable=pb2_object.discoverable,
runtime=RuntimeMetadata.from_flyte_idl(pb2_object.runtime),
Expand All @@ -358,6 +376,7 @@ def from_flyte_idl(cls, pb2_object: _core_task.TaskMetadata):
cache_ignore_input_vars=pb2_object.cache_ignore_input_vars,
is_eager=pb2_object.is_eager,
generates_deck=pb2_object.generates_deck.value if pb2_object.HasField("generates_deck") else False,
execution_mode=PythonFunctionTask.ExecutionBehavior.from_flyte_idl(pb2_object.mode),
)


Expand Down
5 changes: 4 additions & 1 deletion tests/flytekit/common/parameterizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from flytekit.models import interface, literals, security, task, types
from flytekit.models.core import identifier
from flytekit.models.core import types as _core_types
from flytekit.core.python_function_task import PythonFunctionTask

LIST_OF_SCALAR_LITERAL_TYPES = [
types.LiteralType(simple=types.SimpleType.BINARY),
Expand Down Expand Up @@ -125,8 +126,9 @@
cache_serializable,
pod_template_name,
cache_ignore_input_vars,
execution_mode,
)
for discoverable, runtime_metadata, timeout, retry_strategy, interruptible, discovery_version, deprecated, cache_serializable, pod_template_name, cache_ignore_input_vars in product(
for discoverable, runtime_metadata, timeout, retry_strategy, interruptible, discovery_version, deprecated, cache_serializable, pod_template_name, cache_ignore_input_vars, execution_mode in product(
[True, False],
LIST_OF_RUNTIME_METADATA,
[timedelta(days=i) for i in range(3)],
Expand All @@ -137,6 +139,7 @@
[True, False],
["A", "B"],
[()],
[PythonFunctionTask.ExecutionBehavior.DEFAULT]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing comma in product function parameters

The product function is now including execution_mode as a parameter, but the corresponding list in the product call is missing a comma at the end of line 142. This could lead to syntax errors or unexpected behavior when the code is executed.

Code suggestion
Check the AI-generated fix before applying
Suggested change
[PythonFunctionTask.ExecutionBehavior.DEFAULT]
[PythonFunctionTask.ExecutionBehavior.DEFAULT],

Code Review Run #0d112d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

)
]

Expand Down
16 changes: 16 additions & 0 deletions tests/flytekit/unit/core/test_python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from flytekit.core.tracker import isnested, istestfunction
from flytekit.image_spec.image_spec import ImageBuildEngine, ImageSpec
from flytekit.tools.translator import get_serializable_task
from flyteidl.core import tasks_pb2
from tests.flytekit.unit.core import tasks


Expand Down Expand Up @@ -284,3 +285,18 @@ def foo3(x: int, y: str) -> int:
return x

assert foo3.python_interface.inputs_with_defaults == {"x": (int, None), "y": (str, None)}

def test_execution_behavior_conversion():
"""Test conversion between ExecutionBehavior and flyteidl ExecutionMode"""

# Test conversion for DEFAULT mode
assert (PythonFunctionTask.ExecutionBehavior.DEFAULT.to_flyte_idl() ==
tasks_pb2.TaskMetadata.ExecutionMode.DEFAULT)
assert (PythonFunctionTask.ExecutionBehavior.from_flyte_idl(tasks_pb2.TaskMetadata.ExecutionMode.DEFAULT) ==
PythonFunctionTask.ExecutionBehavior.DEFAULT)

# Test conversion for DYNAMIC mode
assert (PythonFunctionTask.ExecutionBehavior.DYNAMIC.to_flyte_idl() ==
tasks_pb2.TaskMetadata.ExecutionMode.DYNAMIC)
assert (PythonFunctionTask.ExecutionBehavior.from_flyte_idl(tasks_pb2.TaskMetadata.ExecutionMode.DYNAMIC) ==
PythonFunctionTask.ExecutionBehavior.DYNAMIC)
4 changes: 3 additions & 1 deletion tests/flytekit/unit/models/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flytekit.extras.accelerators import T4
from flytekit.models import literals, task, types
from flytekit.models.core import identifier
from flytekit.core.python_function_task import PythonFunctionTask
from tests.flytekit.common import parameterizers


Expand Down Expand Up @@ -74,7 +75,7 @@ def test_task_metadata():
"This is deprecated!",
True,
"A",
(),
()
)

assert obj.discoverable is True
Expand All @@ -88,6 +89,7 @@ def test_task_metadata():
assert obj.discovery_version == "0.1.1b0"
assert obj.pod_template_name == "A"
assert obj == task.TaskMetadata.from_flyte_idl(obj.to_flyte_idl())
assert obj.mode == PythonFunctionTask.ExecutionBehavior.DEFAULT


@pytest.mark.parametrize(
Expand Down
Loading