Skip to content

Commit c9ca6d9

Browse files
Future-Outlierpingsutw
authored andcommitted
[flyte deck] Streaming Decks (flyteorg#2779)
* [Flyte Decl] Streaming Decks Signed-off-by: Future-Outlier <[email protected]> * print Signed-off-by: Future-Outlier <[email protected]> * sleep more Signed-off-by: Future-Outlier <[email protected]> * add dummy deck Signed-off-by: Future-Outlier <[email protected]> * nit Signed-off-by: Future-Outlier <[email protected]> * dummy deck Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * nit Signed-off-by: Future-Outlier <[email protected]> * test Signed-off-by: Future-Outlier <[email protected]> * return html Signed-off-by: Future-Outlier <[email protected]> * Change Deck Signed-off-by: Future-Outlier <[email protected]> * fix Signed-off-by: Future-Outlier <[email protected]> * fix recursion error Signed-off-by: Future-Outlier <[email protected]> * remove redundant code Signed-off-by: Future-Outlier <[email protected]> * add dummy deck to deck init Signed-off-by: Future-Outlier <[email protected]> * Better Dummy Deck Logic Signed-off-by: Future-Outlier <[email protected]> * Deck Publish Signed-off-by: Future-Outlier <[email protected]> * litn Signed-off-by: Future-Outlier <[email protected]> * remove dummy deck Signed-off-by: Future-Outlier <[email protected]> * nit Signed-off-by: Future-Outlier <[email protected]> * use auto refresh tab, 5 seconds as interval Signed-off-by: Future-Outlier <[email protected]> * revert Signed-off-by: Future-Outlier <[email protected]> * test setDynamicTabs Signed-off-by: Future-Outlier <[email protected]> * change interval time Signed-off-by: Future-Outlier <[email protected]> * test Signed-off-by: Future-Outlier <[email protected]> * revert Signed-off-by: Future-Outlier <[email protected]> * test Signed-off-by: Future-Outlier <[email protected]> * nit Signed-off-by: Future-Outlier <[email protected]> * try dynamic containers Signed-off-by: Future-Outlier <[email protected]> * try dynamic containers v2 Signed-off-by: Future-Outlier <[email protected]> * try dynamic containers v3 Signed-off-by: Future-Outlier <[email protected]> * debug Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * nit Signed-off-by: Future-Outlier <[email protected]> * Refresh Botton Signed-off-by: Future-Outlier <[email protected]> * fix Signed-off-by: Future-Outlier <[email protected]> * lint Signed-off-by: Future-Outlier <[email protected]> * test new refresh Signed-off-by: Future-Outlier <[email protected]> * lint Signed-off-by: Future-Outlier <[email protected]> * Revert back html code, collaborating with Lyon Signed-off-by: Future-Outlier <[email protected]> * lint Signed-off-by: Future-Outlier <[email protected]> * nit Signed-off-by: Future-Outlier <[email protected]> * nit Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * better code Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * some notes for giving user params builder deck enabled Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * raise error when disabled deck and called Deck.publish() Signed-off-by: Future-Outlier <[email protected]> * lint Signed-off-by: Future-Outlier <[email protected]> * lint Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * static method by YEE Signed-off-by: Future-Outlier <[email protected]> * make Deck.publish more like a wrapper by moving enable deck checking to _output_deck Signed-off-by: Future-Outlier <[email protected]> * lint Signed-off-by: Future-Outlier <[email protected]> * print monodocs err Signed-off-by: Future-Outlier <[email protected]> * Fix monodocs Signed-off-by: Future-Outlier <[email protected]> * use builder Signed-off-by: Future-Outlier <[email protected]> * add translator test for deck serialization settings Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * fix Signed-off-by: Future-Outlier <[email protected]> * test Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * remove blank Signed-off-by: Future-Outlier <[email protected]> * update kevin's advice Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: pingsutw <[email protected]> * master-branch-idl Signed-off-by: Future-Outlier <[email protected]> --------- Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: pingsutw <[email protected]> Signed-off-by: Umer Ahmad <[email protected]>
1 parent 67281e1 commit c9ca6d9

File tree

9 files changed

+145
-30
lines changed

9 files changed

+145
-30
lines changed

flytekit/bin/entrypoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ def _dispatch_execute(
324324
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")
325325

326326
if task_def is not None and not getattr(task_def, "disable_deck", True):
327-
_output_deck(task_def.name.split(".")[-1], ctx.user_space_params)
327+
_output_deck(task_name=task_def.name.split(".")[-1], new_user_params=ctx.user_space_params)
328328

329329
logger.debug("Finished _dispatch_execute")
330330

flytekit/core/base_task.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,21 +115,18 @@ class TaskMetadata(object):
115115
116116
See the :std:ref:`IDL <idl:protos/docs/core/core:taskmetadata>` for the protobuf definition.
117117
118-
Args:
119-
cache (bool): Indicates if caching should be enabled. See :std:ref:`Caching <cookbook:caching>`
120-
cache_serialize (bool): Indicates if identical (ie. same inputs) instances of this task should be executed in serial when caching is enabled. See :std:ref:`Caching <cookbook:caching>`
121-
cache_version (str): Version to be used for the cached value
122-
cache_ignore_input_vars (Tuple[str, ...]): Input variables that should not be included when calculating hash for cache
123-
interruptible (Optional[bool]): Indicates that this task can be interrupted and/or scheduled on nodes with
124-
lower QoS guarantees that can include pre-emption. This can reduce the monetary cost executions incur at the
125-
cost of performance penalties due to potential interruptions
126-
deprecated (str): Can be used to provide a warning message for deprecated task. Absence or empty str indicates
127-
that the task is active and not deprecated
118+
Attributes:
119+
cache (bool): Indicates if caching should be enabled. See :std:ref:`Caching <cookbook:caching>`.
120+
cache_serialize (bool): Indicates if identical (i.e. same inputs) instances of this task should be executed in serial when caching is enabled. See :std:ref:`Caching <cookbook:caching>`.
121+
cache_version (str): Version to be used for the cached value.
122+
cache_ignore_input_vars (Tuple[str, ...]): Input variables that should not be included when calculating hash for cache.
123+
interruptible (Optional[bool]): Indicates that this task can be interrupted and/or scheduled on nodes with lower QoS guarantees that can include pre-emption.
124+
deprecated (str): Can be used to provide a warning message for a deprecated task. An absence or empty string indicates that the task is active and not deprecated.
128125
retries (int): for retries=n; n > 0, on failures of this task, the task will be retried at-least n number of times.
129-
timeout (Optional[Union[datetime.timedelta, int]]): the max amount of time for which one execution of this task
130-
should be executed for. The execution will be terminated if the runtime exceeds the given timeout
131-
(approximately)
132-
pod_template_name (Optional[str]): the name of existing PodTemplate resource in the cluster which will be used in this task.
126+
timeout (Optional[Union[datetime.timedelta, int]]): The maximum duration for which one execution of this task should run. The execution will be terminated if the runtime exceeds this timeout.
127+
pod_template_name (Optional[str]): The name of an existing PodTemplate resource in the cluster which will be used for this task.
128+
generates_deck (bool): Indicates whether the task will generate a Deck URI.
129+
is_eager (bool): Indicates whether the task should be treated as eager.
133130
"""
134131

135132
cache: bool = False
@@ -141,6 +138,7 @@ class TaskMetadata(object):
141138
retries: int = 0
142139
timeout: Optional[Union[datetime.timedelta, int]] = None
143140
pod_template_name: Optional[str] = None
141+
generates_deck: bool = False
144142
is_eager: bool = False
145143

146144
def __post_init__(self):
@@ -179,6 +177,7 @@ def to_taskmetadata_model(self) -> _task_model.TaskMetadata:
179177
discovery_version=self.cache_version,
180178
deprecated_error_message=self.deprecated,
181179
cache_serializable=self.cache_serialize,
180+
generates_deck=self.generates_deck,
182181
pod_template_name=self.pod_template_name,
183182
cache_ignore_input_vars=self.cache_ignore_input_vars,
184183
is_eager=self.is_eager,
@@ -720,11 +719,15 @@ def dispatch_execute(
720719
may be none
721720
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed
722721
"""
723-
if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None:
724-
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)
722+
725723
# Invoked before the task is executed
726724
new_user_params = self.pre_execute(ctx.user_space_params)
727725

726+
if self.enable_deck and ctx.user_space_params is not None:
727+
if DeckField.TIMELINE.value in self.deck_fields:
728+
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)
729+
new_user_params = ctx.user_space_params.with_enable_deck(enable_deck=True).build()
730+
728731
# Create another execution context with the new user params, but let's keep the same working dir
729732
with FlyteContextManager.with_context(
730733
ctx.with_execution_state(
@@ -827,8 +830,19 @@ def disable_deck(self) -> bool:
827830
"""
828831
If true, this task will not output deck html file
829832
"""
833+
warnings.warn(
834+
"`disable_deck` is deprecated and will be removed in the future.\n" "Please use `enable_deck` instead.",
835+
DeprecationWarning,
836+
)
830837
return self._disable_deck
831838

839+
@property
840+
def enable_deck(self) -> bool:
841+
"""
842+
If true, this task will output deck html file
843+
"""
844+
return not self._disable_deck
845+
832846
@property
833847
def deck_fields(self) -> List[DeckField]:
834848
"""

flytekit/core/context_manager.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class Builder(object):
9494
logging: Optional[_logging.Logger] = None
9595
task_id: typing.Optional[_identifier.Identifier] = None
9696
output_metadata_prefix: Optional[str] = None
97+
enable_deck: bool = False
9798

9899
def __init__(self, current: typing.Optional[ExecutionParameters] = None):
99100
self.stats = current.stats if current else None
@@ -107,6 +108,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
107108
self.raw_output_prefix = current.raw_output_prefix if current else None
108109
self.task_id = current.task_id if current else None
109110
self.output_metadata_prefix = current.output_metadata_prefix if current else None
111+
self.enable_deck = current.enable_deck if current else False
110112

111113
def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
112114
self.attrs[key] = v
@@ -126,6 +128,7 @@ def build(self) -> ExecutionParameters:
126128
raw_output_prefix=self.raw_output_prefix,
127129
task_id=self.task_id,
128130
output_metadata_prefix=self.output_metadata_prefix,
131+
enable_deck=self.enable_deck,
129132
**self.attrs,
130133
)
131134

@@ -147,6 +150,11 @@ def with_task_sandbox(self) -> Builder:
147150
b.working_dir = task_sandbox_dir
148151
return b
149152

153+
def with_enable_deck(self, enable_deck: bool) -> Builder:
154+
b = self.new_builder(self)
155+
b.enable_deck = enable_deck
156+
return b
157+
150158
def builder(self) -> Builder:
151159
return ExecutionParameters.Builder(current=self)
152160

@@ -162,6 +170,7 @@ def __init__(
162170
checkpoint=None,
163171
decks=None,
164172
task_id: typing.Optional[_identifier.Identifier] = None,
173+
enable_deck: bool = False,
165174
**kwargs,
166175
):
167176
"""
@@ -190,6 +199,7 @@ def __init__(
190199
self._decks = decks
191200
self._task_id = task_id
192201
self._timeline_deck = None
202+
self._enable_deck = enable_deck
193203

194204
@property
195205
def stats(self) -> taggable.TaggableStats:
@@ -298,6 +308,13 @@ def timeline_deck(self) -> "TimeLineDeck": # type: ignore
298308
self._timeline_deck = time_line_deck
299309
return time_line_deck
300310

311+
@property
312+
def enable_deck(self) -> bool:
313+
"""
314+
Returns whether deck is enabled or not
315+
"""
316+
return self._enable_deck
317+
301318
def __getattr__(self, attr_name: str) -> typing.Any:
302319
"""
303320
This houses certain task specific context. For example in Spark, it houses the SparkSession, etc

flytekit/deck/deck.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ class Deck:
4141
scatter plots or Markdown text. In addition, users can create new decks to render
4242
their data with custom renderers.
4343
44-
.. warning::
45-
46-
This feature is in beta.
47-
4844
.. code-block:: python
4945
5046
iris_df = px.data.iris()
@@ -86,6 +82,19 @@ def name(self) -> str:
8682
def html(self) -> str:
8783
return self._html
8884

85+
@staticmethod
86+
def publish():
87+
params = FlyteContextManager.current_context().user_space_params
88+
task_name = params.task_id.name
89+
90+
if not params.enable_deck:
91+
logger.warning(
92+
f"Attempted to call publish() in task '{task_name}', but Flyte decks will not be generated because enable_deck is currently set to False."
93+
)
94+
return
95+
96+
_output_deck(task_name=task_name, new_user_params=params)
97+
8998

9099
class TimeLineDeck(Deck):
91100
"""
@@ -148,7 +157,8 @@ def generate_time_table(data: dict) -> str:
148157

149158

150159
def _get_deck(
151-
new_user_params: ExecutionParameters, ignore_jupyter: bool = False
160+
new_user_params: ExecutionParameters,
161+
ignore_jupyter: bool = False,
152162
) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore
153163
"""
154164
Get flyte deck html string
@@ -176,11 +186,12 @@ def _get_deck(
176186

177187
def _output_deck(task_name: str, new_user_params: ExecutionParameters):
178188
ctx = FlyteContext.current_context()
189+
179190
local_dir = ctx.file_access.get_random_local_directory()
180191
local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}"
181192
try:
182193
with open(local_path, "w", encoding="utf-8") as f:
183-
f.write(_get_deck(new_user_params, ignore_jupyter=True))
194+
f.write(_get_deck(new_user_params=new_user_params, ignore_jupyter=True))
184195
logger.info(f"{task_name} task creates flyte deck html to file://{local_path}")
185196
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
186197
fs = ctx.file_access.get_filesystem_for_path(new_user_params.output_metadata_prefix)
@@ -197,6 +208,7 @@ def _output_deck(task_name: str, new_user_params: ExecutionParameters):
197208
def get_deck_template() -> Template:
198209
root = os.path.dirname(os.path.abspath(__file__))
199210
templates_dir = os.path.join(root, "html", "template.html")
211+
200212
with open(templates_dir, "r") as f:
201213
template_content = f.read()
202214
return Template(template_content)

flytekit/models/task.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from flyteidl.core import tasks_pb2 as _core_task
99
from google.protobuf import json_format as _json_format
1010
from google.protobuf import struct_pb2 as _struct
11+
from google.protobuf.wrappers_pb2 import BoolValue
1112
from kubernetes.client import ApiClient
1213

1314
from flytekit.models import common as _common
@@ -184,6 +185,7 @@ def __init__(
184185
pod_template_name,
185186
cache_ignore_input_vars,
186187
is_eager: bool = False,
188+
generates_deck: bool = False,
187189
):
188190
"""
189191
Information needed at runtime to determine behavior such as whether or not outputs are discoverable, timeouts,
@@ -203,6 +205,7 @@ def __init__(
203205
receive deprecation warnings.
204206
:param bool cache_serializable: Whether or not caching operations are executed in serial. This means only a
205207
single instance over identical inputs is executed, other concurrent executions wait for the cached results.
208+
:param bool generates_deck: Whether the task will generate a Deck URI.
206209
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task.
207210
:param cache_ignore_input_vars: Input variables that should not be included when calculating hash for cache.
208211
:param is_eager:
@@ -218,6 +221,7 @@ def __init__(
218221
self._pod_template_name = pod_template_name
219222
self._cache_ignore_input_vars = cache_ignore_input_vars
220223
self._is_eager = is_eager
224+
self._generates_deck = generates_deck
221225

222226
@property
223227
def is_eager(self):
@@ -299,6 +303,14 @@ def pod_template_name(self):
299303
"""
300304
return self._pod_template_name
301305

306+
@property
307+
def generates_deck(self) -> bool:
308+
"""
309+
Whether the task will generate a Deck.
310+
:rtype: bool
311+
"""
312+
return self._generates_deck
313+
302314
@property
303315
def cache_ignore_input_vars(self):
304316
"""
@@ -322,6 +334,7 @@ def to_flyte_idl(self):
322334
pod_template_name=self.pod_template_name,
323335
cache_ignore_input_vars=self.cache_ignore_input_vars,
324336
is_eager=self.is_eager,
337+
generates_deck=BoolValue(value=self.generates_deck),
325338
)
326339
if self.timeout:
327340
tm.timeout.FromTimedelta(self.timeout)
@@ -345,6 +358,7 @@ def from_flyte_idl(cls, pb2_object: _core_task.TaskMetadata):
345358
pod_template_name=pb2_object.pod_template_name,
346359
cache_ignore_input_vars=pb2_object.cache_ignore_input_vars,
347360
is_eager=pb2_object.is_eager,
361+
generates_deck=pb2_object.generates_deck.value if pb2_object.HasField("generates_deck") else False,
348362
)
349363

350364

flytekit/tools/translator.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,13 @@ def get_serializable_task(
185185
entity.reset_command_fn()
186186

187187
entity_config = entity.get_config(settings) or {}
188-
189188
extra_config = {}
190189

191-
if hasattr(entity, "task_function") and isinstance(entity.task_function, ClassDecorator):
192-
extra_config = entity.task_function.get_extra_config()
190+
if hasattr(entity, "task_function"):
191+
if isinstance(entity.task_function, ClassDecorator):
192+
extra_config = entity.task_function.get_extra_config()
193+
if entity.enable_deck:
194+
entity.metadata.generates_deck = True
193195

194196
merged_config = {**entity_config, **extra_config}
195197

pydoclint-errors-baseline.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ flytekit/core/base_sql_task.py
3737
DOC301: Class `SQLTask`: __init__() should not have a docstring; please combine it with the docstring of the class
3838
--------------------
3939
flytekit/core/base_task.py
40-
DOC601: Class `TaskMetadata`: Class docstring contains fewer class attributes than actual class attributes. (Please read https://jsh9.github.io/pydoclint/checking_class_attributes.html on how to correctly document class attributes.)
41-
DOC603: Class `TaskMetadata`: Class docstring attributes are different from actual class attributes. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Attributes in the class definition but not in the docstring: [cache: bool, cache_ignore_input_vars: Tuple[str, ...], cache_serialize: bool, cache_version: str, deprecated: str, interruptible: Optional[bool], is_eager: bool, pod_template_name: Optional[str], retries: int, timeout: Optional[Union[datetime.timedelta, int]]]. (Please read https://jsh9.github.io/pydoclint/checking_class_attributes.html on how to correctly document class attributes.)
4240
DOC301: Class `PythonTask`: __init__() should not have a docstring; please combine it with the docstring of the class
4341
DOC001: Function/method `post_execute`: Potential formatting errors in docstring. Error message: Expected a colon in 'rval is returned value from call to execute'. (Note: DOC001 could trigger other unrelated violations under this function/method too. Please fix the docstring formatting first.)
4442
DOC101: Method `PythonTask.post_execute`: Docstring contains fewer arguments than in function signature.

tests/flytekit/unit/deck/test_deck.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import flytekit
99
from flytekit import Deck, FlyteContextManager, task
10+
1011
from flytekit.deck import DeckField, MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer
1112
from flytekit.deck.deck import _output_deck
1213
from flytekit.deck.renderer import PythonDependencyRenderer
@@ -258,3 +259,43 @@ def test_python_dependency_renderer():
258259

259260
# Assert that the button of copy
260261
assert 'button onclick="copyTable()"' in result
262+
263+
def test_enable_deck_in_task():
264+
@task(enable_deck=True)
265+
def t1():
266+
ctx = FlyteContextManager.current_context()
267+
assert ctx.user_space_params.enable_deck == True
268+
return
269+
270+
ctx = FlyteContextManager.current_context()
271+
assert ctx.user_space_params.enable_deck == False
272+
273+
t1()
274+
275+
ctx = FlyteContextManager.current_context()
276+
assert ctx.user_space_params.enable_deck == False
277+
278+
def test_disable_deck_in_task():
279+
@task(disable_deck=True)
280+
def t1():
281+
ctx = FlyteContextManager.current_context()
282+
assert ctx.user_space_params.enable_deck == False
283+
return
284+
285+
ctx = FlyteContextManager.current_context()
286+
assert ctx.user_space_params.enable_deck == False
287+
t1()
288+
ctx = FlyteContextManager.current_context()
289+
assert ctx.user_space_params.enable_deck == False
290+
291+
@task
292+
def t2():
293+
ctx = FlyteContextManager.current_context()
294+
assert ctx.user_space_params.enable_deck == False
295+
return
296+
297+
ctx = FlyteContextManager.current_context()
298+
assert ctx.user_space_params.enable_deck == False
299+
t2()
300+
ctx = FlyteContextManager.current_context()
301+
assert ctx.user_space_params.enable_deck == False

0 commit comments

Comments
 (0)