-
Notifications
You must be signed in to change notification settings - Fork 333
[flyte deck] Streaming Decks #2779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 77 commits
01182b4
9616fc3
98ae7c7
4e92bb0
4df18b5
ebb4d4e
99522d9
c19d67d
67cd829
06da3df
6d99d69
b805cd7
4c97758
7b3574a
9b60564
18c994f
39f39d1
aabcbbb
9ca43f3
ed56352
b559fc9
fc5578f
7139468
e0aee9e
3727588
ce3ee15
d066231
f9387ce
f14c3fa
c33a909
8666c60
93580d6
bcaaabd
a321700
6464fae
884943c
d4b5b96
406227c
1e77f54
d70a2d5
7fc6393
6980140
c87a342
f609760
473ae11
008fe52
f0b9028
d48efa9
6b55930
b5912fb
a681ccd
048fdff
7bcf15e
b6c41c3
be02f9f
cf83e06
e137328
a59a56e
b8383be
dc6d203
41d8760
b71cc19
b5976fe
0c1a5a3
d082456
4a8c68f
b58527b
2764ed4
90372db
d8c408c
5cacf11
c447793
c5cc967
1d6417a
b0cd1ae
974b882
ea8b6e0
a642c9d
e9aef35
04775d7
34a4146
f30382b
b649f8f
0565282
c0fd23a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,12 +42,14 @@ | |
|
||
from flyteidl.core import artifact_id_pb2 as art_id | ||
from flyteidl.core import tasks_pb2 | ||
from google.protobuf.wrappers_pb2 import BoolValue | ||
|
||
from flytekit.configuration import LocalConfig, SerializationSettings | ||
from flytekit.core.artifact_utils import ( | ||
idl_partitions_from_dict, | ||
idl_time_partition_from_datetime, | ||
) | ||
from flytekit.core.constants import ENABLE_DECK | ||
from flytekit.core.context_manager import ( | ||
ExecutionParameters, | ||
ExecutionState, | ||
|
@@ -115,21 +117,18 @@ class TaskMetadata(object): | |
|
||
See the :std:ref:`IDL <idl:protos/docs/core/core:taskmetadata>` for the protobuf definition. | ||
|
||
Args: | ||
cache (bool): Indicates if caching should be enabled. See :std:ref:`Caching <cookbook:caching>` | ||
cache_serialize (bool): Indicates if identical (ie. same inputs) instances of this task should be executed in serial when caching is enabled. See :std:ref:`Caching <cookbook:caching>` | ||
cache_version (str): Version to be used for the cached value | ||
cache_ignore_input_vars (Tuple[str, ...]): Input variables that should not be included when calculating hash for cache | ||
interruptible (Optional[bool]): Indicates that this task can be interrupted and/or scheduled on nodes with | ||
lower QoS guarantees that can include pre-emption. This can reduce the monetary cost executions incur at the | ||
cost of performance penalties due to potential interruptions | ||
deprecated (str): Can be used to provide a warning message for deprecated task. Absence or empty str indicates | ||
that the task is active and not deprecated | ||
Attributes: | ||
cache (bool): Indicates if caching should be enabled. See :std:ref:`Caching <cookbook:caching>`. | ||
cache_serialize (bool): Indicates if identical (i.e. same inputs) instances of this task should be executed in serial when caching is enabled. See :std:ref:`Caching <cookbook:caching>`. | ||
cache_version (str): Version to be used for the cached value. | ||
cache_ignore_input_vars (Tuple[str, ...]): Input variables that should not be included when calculating hash for cache. | ||
interruptible (Optional[bool]): Indicates that this task can be interrupted and/or scheduled on nodes with lower QoS guarantees that can include pre-emption. | ||
deprecated (str): Can be used to provide a warning message for a deprecated task. An absence or empty string indicates that the task is active and not deprecated. | ||
retries (int): for retries=n; n > 0, on failures of this task, the task will be retried at-least n number of times. | ||
timeout (Optional[Union[datetime.timedelta, int]]): the max amount of time for which one execution of this task | ||
should be executed for. The execution will be terminated if the runtime exceeds the given timeout | ||
(approximately) | ||
pod_template_name (Optional[str]): the name of existing PodTemplate resource in the cluster which will be used in this task. | ||
timeout (Optional[Union[datetime.timedelta, int]]): The maximum duration for which one execution of this task should run. The execution will be terminated if the runtime exceeds this timeout. | ||
pod_template_name (Optional[str]): The name of an existing PodTemplate resource in the cluster which will be used for this task. | ||
generates_deck (bool): Indicates whether the task will generate a Deck URI. | ||
is_eager (bool): Indicates whether the task should be treated as eager. | ||
""" | ||
|
||
cache: bool = False | ||
|
@@ -141,6 +140,7 @@ class TaskMetadata(object): | |
retries: int = 0 | ||
timeout: Optional[Union[datetime.timedelta, int]] = None | ||
pod_template_name: Optional[str] = None | ||
generates_deck: bool = False | ||
is_eager: bool = False | ||
|
||
def __post_init__(self): | ||
|
@@ -179,6 +179,7 @@ def to_taskmetadata_model(self) -> _task_model.TaskMetadata: | |
discovery_version=self.cache_version, | ||
deprecated_error_message=self.deprecated, | ||
cache_serializable=self.cache_serialize, | ||
generates_deck=BoolValue(value=self.generates_deck), | ||
pod_template_name=self.pod_template_name, | ||
cache_ignore_input_vars=self.cache_ignore_input_vars, | ||
is_eager=self.is_eager, | ||
|
@@ -720,8 +721,11 @@ def dispatch_execute( | |
may be none | ||
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed | ||
""" | ||
if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None: | ||
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck) | ||
if self.enable_deck and ctx.user_space_params is not None: | ||
ctx.user_space_params.builder().add_attr(ENABLE_DECK, True) | ||
|
||
if DeckField.TIMELINE.value in self.deck_fields: | ||
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck) | ||
eapolinario marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
eapolinario marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
# Invoked before the task is executed | ||
new_user_params = self.pre_execute(ctx.user_space_params) | ||
|
||
|
@@ -827,8 +831,19 @@ def disable_deck(self) -> bool: | |
""" | ||
If true, this task will not output deck html file | ||
""" | ||
warnings.warn( | ||
"`disable_deck` is deprecated and will be removed in the future.\n" "Please use `enable_deck` instead.", | ||
DeprecationWarning, | ||
) | ||
return self._disable_deck | ||
|
||
@property | ||
def enable_deck(self) -> bool: | ||
""" | ||
If true, this task will output deck html file | ||
""" | ||
return not self._disable_deck | ||
|
||
@property | ||
def deck_fields(self) -> List[DeckField]: | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
from string import Template | ||
from typing import Optional | ||
|
||
from flytekit.core.constants import ENABLE_DECK | ||
from flytekit.core.context_manager import ExecutionParameters, ExecutionState, FlyteContext, FlyteContextManager | ||
from flytekit.loggers import logger | ||
from flytekit.tools.interactive import ipython_check | ||
|
@@ -41,10 +42,6 @@ class Deck: | |
scatter plots or Markdown text. In addition, users can create new decks to render | ||
their data with custom renderers. | ||
|
||
.. warning:: | ||
|
||
This feature is in beta. | ||
|
||
Comment on lines
-44
to
-47
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice. |
||
.. code-block:: python | ||
|
||
iris_df = px.data.iris() | ||
|
@@ -86,6 +83,17 @@ def name(self) -> str: | |
def html(self) -> str: | ||
return self._html | ||
|
||
@staticmethod | ||
def publish(): | ||
params = FlyteContextManager.current_context().user_space_params | ||
task_name = params.task_id.name | ||
|
||
if not params.has_attr(ENABLE_DECK) or not params.enable_deck: | ||
logger.warning("Deck is disabled for this task, please don't call Deck.publish()") | ||
|
||
return | ||
|
||
_output_deck(task_name=task_name, new_user_params=params) | ||
|
||
|
||
class TimeLineDeck(Deck): | ||
""" | ||
|
@@ -148,7 +156,8 @@ def generate_time_table(data: dict) -> str: | |
|
||
|
||
def _get_deck( | ||
new_user_params: ExecutionParameters, ignore_jupyter: bool = False | ||
new_user_params: ExecutionParameters, | ||
ignore_jupyter: bool = False, | ||
) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore | ||
""" | ||
Get flyte deck html string | ||
|
@@ -176,11 +185,12 @@ def _get_deck( | |
|
||
def _output_deck(task_name: str, new_user_params: ExecutionParameters): | ||
ctx = FlyteContext.current_context() | ||
|
||
local_dir = ctx.file_access.get_random_local_directory() | ||
local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}" | ||
try: | ||
with open(local_path, "w", encoding="utf-8") as f: | ||
f.write(_get_deck(new_user_params, ignore_jupyter=True)) | ||
f.write(_get_deck(new_user_params=new_user_params, ignore_jupyter=True)) | ||
logger.info(f"{task_name} task creates flyte deck html to file://{local_path}") | ||
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION: | ||
fs = ctx.file_access.get_filesystem_for_path(new_user_params.output_metadata_prefix) | ||
|
@@ -197,6 +207,7 @@ def _output_deck(task_name: str, new_user_params: ExecutionParameters): | |
def get_deck_template() -> Template: | ||
root = os.path.dirname(os.path.abspath(__file__)) | ||
templates_dir = os.path.join(root, "html", "template.html") | ||
|
||
with open(templates_dir, "r") as f: | ||
template_content = f.read() | ||
return Template(template_content) |
Uh oh!
There was an error while loading. Please reload this page.