-
Notifications
You must be signed in to change notification settings - Fork 333
[flyte deck] Streaming Decks #2779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 70 commits
01182b4
9616fc3
98ae7c7
4e92bb0
4df18b5
ebb4d4e
99522d9
c19d67d
67cd829
06da3df
6d99d69
b805cd7
4c97758
7b3574a
9b60564
18c994f
39f39d1
aabcbbb
9ca43f3
ed56352
b559fc9
fc5578f
7139468
e0aee9e
3727588
ce3ee15
d066231
f9387ce
f14c3fa
c33a909
8666c60
93580d6
bcaaabd
a321700
6464fae
884943c
d4b5b96
406227c
1e77f54
d70a2d5
7fc6393
6980140
c87a342
f609760
473ae11
008fe52
f0b9028
d48efa9
6b55930
b5912fb
a681ccd
048fdff
7bcf15e
b6c41c3
be02f9f
cf83e06
e137328
a59a56e
b8383be
dc6d203
41d8760
b71cc19
b5976fe
0c1a5a3
d082456
4a8c68f
b58527b
2764ed4
90372db
d8c408c
5cacf11
c447793
c5cc967
1d6417a
b0cd1ae
974b882
ea8b6e0
a642c9d
e9aef35
04775d7
34a4146
f30382b
b649f8f
0565282
c0fd23a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,10 +41,6 @@ class Deck: | |
scatter plots or Markdown text. In addition, users can create new decks to render | ||
their data with custom renderers. | ||
|
||
.. warning:: | ||
|
||
This feature is in beta. | ||
|
||
Comment on lines
-44
to
-47
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice. |
||
.. code-block:: python | ||
|
||
iris_df = px.data.iris() | ||
|
@@ -86,6 +82,12 @@ def name(self) -> str: | |
def html(self) -> str: | ||
return self._html | ||
|
||
@staticmethod | ||
def publish(): | ||
params = FlyteContextManager.current_context().user_space_params | ||
task_name = params.task_id.name | ||
_output_deck(task_name=task_name, new_user_params=params) | ||
pingsutw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class TimeLineDeck(Deck): | ||
""" | ||
|
@@ -148,7 +150,8 @@ def generate_time_table(data: dict) -> str: | |
|
||
|
||
def _get_deck( | ||
new_user_params: ExecutionParameters, ignore_jupyter: bool = False | ||
new_user_params: ExecutionParameters, | ||
ignore_jupyter: bool = False, | ||
) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore | ||
""" | ||
Get flyte deck html string | ||
|
@@ -176,11 +179,17 @@ def _get_deck( | |
|
||
def _output_deck(task_name: str, new_user_params: ExecutionParameters): | ||
ctx = FlyteContext.current_context() | ||
params = ctx.user_space_params | ||
|
||
if not params.has_attr("ENABLE_DECK") or not params.enable_deck: | ||
|
||
logger.warning("Deck is disabled for this task, please don't call Deck.publish()") | ||
|
||
return | ||
|
||
local_dir = ctx.file_access.get_random_local_directory() | ||
local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}" | ||
try: | ||
with open(local_path, "w", encoding="utf-8") as f: | ||
f.write(_get_deck(new_user_params, ignore_jupyter=True)) | ||
f.write(_get_deck(new_user_params=new_user_params, ignore_jupyter=True)) | ||
logger.info(f"{task_name} task creates flyte deck html to file://{local_path}") | ||
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION: | ||
fs = ctx.file_access.get_filesystem_for_path(new_user_params.output_metadata_prefix) | ||
|
@@ -197,6 +206,7 @@ def _output_deck(task_name: str, new_user_params: ExecutionParameters): | |
def get_deck_template() -> Template: | ||
root = os.path.dirname(os.path.abspath(__file__)) | ||
templates_dir = os.path.join(root, "html", "template.html") | ||
|
||
with open(templates_dir, "r") as f: | ||
template_content = f.read() | ||
return Template(template_content) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -180,6 +180,7 @@ def __init__( | |||||
pod_template_name, | ||||||
cache_ignore_input_vars, | ||||||
is_eager: bool = False, | ||||||
generates_deck: bool = False, | ||||||
): | ||||||
""" | ||||||
Information needed at runtime to determine behavior such as whether or not outputs are discoverable, timeouts, | ||||||
|
@@ -199,6 +200,7 @@ def __init__( | |||||
receive deprecation warnings. | ||||||
:param bool cache_serializable: Whether or not caching operations are executed in serial. This means only a | ||||||
single instance over identical inputs is executed, other concurrent executions wait for the cached results. | ||||||
:param bool generates_deck: Whether the task will generate a Deck URI. | ||||||
pingsutw marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task. | ||||||
:param cache_ignore_input_vars: Input variables that should not be included when calculating hash for cache. | ||||||
:param is_eager: | ||||||
|
@@ -214,6 +216,7 @@ def __init__( | |||||
self._pod_template_name = pod_template_name | ||||||
self._cache_ignore_input_vars = cache_ignore_input_vars | ||||||
self._is_eager = is_eager | ||||||
self._generates_deck = generates_deck | ||||||
|
||||||
@property | ||||||
def is_eager(self): | ||||||
|
@@ -295,6 +298,14 @@ def pod_template_name(self): | |||||
""" | ||||||
return self._pod_template_name | ||||||
|
||||||
@property | ||||||
def generates_deck(self) -> bool: | ||||||
""" | ||||||
Whether the task will generate a Deck. | ||||||
:rtype: bool | ||||||
""" | ||||||
return self._generates_deck | ||||||
|
||||||
@property | ||||||
def cache_ignore_input_vars(self): | ||||||
""" | ||||||
|
@@ -315,6 +326,7 @@ def to_flyte_idl(self): | |||||
discovery_version=self.discovery_version, | ||||||
deprecated_error_message=self.deprecated_error_message, | ||||||
cache_serializable=self.cache_serializable, | ||||||
generates_deck=self.generates_deck, | ||||||
pod_template_name=self.pod_template_name, | ||||||
cache_ignore_input_vars=self.cache_ignore_input_vars, | ||||||
is_eager=self.is_eager, | ||||||
|
@@ -338,6 +350,7 @@ def from_flyte_idl(cls, pb2_object: _core_task.TaskMetadata): | |||||
discovery_version=pb2_object.discovery_version, | ||||||
deprecated_error_message=pb2_object.deprecated_error_message, | ||||||
cache_serializable=pb2_object.cache_serializable, | ||||||
generates_deck=pb2_object.generates_deck, | ||||||
|
generates_deck=pb2_object.generates_deck, | |
generates_deck=pb2_object.generates_deck if pb2_object.HasField("generates_deck") else None, |
Code Review Run #077cc1
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -12,6 +12,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit.models.core import identifier as identifier_models | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit.models.task import Resources as resource_model | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit.tools.translator import get_serializable, Options | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from google.protobuf.wrappers_pb2 import BoolValue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
default_img = Image(name="default", fqn="test", tag="tag") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
serialization_settings = flytekit.configuration.SerializationSettings( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -93,14 +94,41 @@ def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def t2(a: str, b: str) -> str: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return b + a | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ssettings = ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
settings = ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
serialization_settings.new_builder() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.with_fast_serialization_settings(FastSerializationSettings(enabled=True)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.build() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
task_spec = get_serializable(OrderedDict(), ssettings, t1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
task_spec = get_serializable(OrderedDict(), settings, t1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assert "pyflyte-fast-execute" in task_spec.template.container.args | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def test_deck(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from flytekit.deck import Deck | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@task(enable_deck=False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def t_no_deck(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@task(enable_deck=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def t_deck(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Deck.publish() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
deck_settings = ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
serialization_settings.new_builder() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.with_fast_serialization_settings(FastSerializationSettings(enabled=True)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.build() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
deck_task_spec = get_serializable(OrderedDict(), deck_settings, t_deck) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assert deck_task_spec.template.metadata.generates_deck == BoolValue(value=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
no_deck_settings = ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
serialization_settings.new_builder() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.with_fast_serialization_settings(FastSerializationSettings(enabled=True)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.build() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
no_deck_task_spec = get_serializable(OrderedDict(), no_deck_settings, t_no_deck) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
deck_settings = ( | |
serialization_settings.new_builder() | |
.with_fast_serialization_settings(FastSerializationSettings(enabled=True)) | |
.build() | |
) | |
deck_task_spec = get_serializable(OrderedDict(), deck_settings, t_deck) | |
assert deck_task_spec.template.metadata.generates_deck == BoolValue(value=True) | |
no_deck_settings = ( | |
serialization_settings.new_builder() | |
.with_fast_serialization_settings(FastSerializationSettings(enabled=True)) | |
.build() | |
) | |
no_deck_task_spec = get_serializable(OrderedDict(), no_deck_settings, t_no_deck) | |
settings = serialization_settings.new_builder() | |
.with_fast_serialization_settings(FastSerializationSettings(enabled=True)) | |
.build() | |
deck_task_spec = get_serializable(OrderedDict(), settings, t_deck) | |
assert deck_task_spec.template.metadata.generates_deck == BoolValue(value=True) |
Code Review Run #077cc1
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider consolidating the test cases by using @pytest.mark.parametrize
to test both enable_deck=True
and enable_deck=False
scenarios. This would make the test more maintainable and reduce code duplication.
Code suggestion
Check the AI-generated fix before applying
@task(enable_deck=False) | |
def t_no_deck(): | |
pass | |
@task(enable_deck=True) | |
def t_deck(): | |
Deck.publish() | |
deck_settings = ( | |
serialization_settings.new_builder() | |
.with_fast_serialization_settings(FastSerializationSettings(enabled=True)) | |
.build() | |
) | |
deck_task_spec = get_serializable(OrderedDict(), deck_settings, t_deck) | |
assert deck_task_spec.template.metadata.generates_deck == BoolValue(value=True) | |
no_deck_settings = ( | |
serialization_settings.new_builder() | |
.with_fast_serialization_settings(FastSerializationSettings(enabled=True)) | |
.build() | |
) | |
no_deck_task_spec = get_serializable(OrderedDict(), no_deck_settings, t_no_deck) | |
assert no_deck_task_spec.template.metadata.generates_deck == BoolValue(value=False) | |
@pytest.mark.parametrize('enable_deck,expected', [(True, True), (False, False)]) | |
def test_deck_settings(enable_deck, expected): | |
@task(enable_deck=enable_deck) | |
def t_deck(): | |
if enable_deck: | |
Deck.publish() | |
settings = ( | |
serialization_settings.new_builder() | |
.with_fast_serialization_settings(FastSerializationSettings(enabled=True)) | |
.build() |
Code Review Run #077cc1
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Uh oh!
There was an error while loading. Please reload this page.