-
Notifications
You must be signed in to change notification settings - Fork 124
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Future-Outlier <[email protected]>
- Loading branch information
1 parent
bfe15cf
commit 7a5196a
Showing
2 changed files
with
141 additions
and
485 deletions.
There are no files selected for viewing
314 changes: 57 additions & 257 deletions
314
examples/data_types_and_io/data_types_and_io/attribute_access.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,286 +1,86 @@ | ||
from dataclasses import dataclass, field | ||
from typing import Dict, List | ||
from flytekit.types.file import FlyteFile | ||
from flytekit import task, workflow, ImageSpec | ||
from enum import Enum | ||
from dataclasses import dataclass | ||
|
||
image_spec = ImageSpec( | ||
registry="ghcr.io/flyteorg", | ||
) | ||
from flytekit import task, workflow | ||
|
||
class Status(Enum): | ||
PENDING = "pending" | ||
APPROVED = "approved" | ||
REJECTED = "rejected" | ||
|
||
|
||
@dataclass | ||
class InnerDC: | ||
a: int = -1 | ||
b: float = 2.1 | ||
c: str = "Hello, Flyte" | ||
d: bool = False | ||
e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2]) | ||
f: List[FlyteFile] = field( | ||
default_factory=lambda: [ | ||
FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
) | ||
] | ||
) | ||
g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]]) | ||
h: List[Dict[int, bool]] = field( | ||
default_factory=lambda: [{0: False}, {1: True}, {-1: True}] | ||
) | ||
i: Dict[int, bool] = field( | ||
default_factory=lambda: {0: False, 1: True, -1: False} | ||
) | ||
j: Dict[int, FlyteFile] = field( | ||
default_factory=lambda: { | ||
0: FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
), | ||
1: FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
), | ||
-1: FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
), | ||
} | ||
) | ||
k: Dict[int, List[int]] = field( | ||
default_factory=lambda: {0: [0, 1, -1]} | ||
) | ||
l: Dict[int, Dict[int, int]] = field( | ||
default_factory=lambda: {1: {-1: 0}} | ||
) | ||
m: dict = field(default_factory=lambda: {"key": "value"}) | ||
n: FlyteFile = field( | ||
default_factory=lambda: FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
) | ||
) | ||
enum_status: Status = field(default=Status.PENDING) | ||
|
||
|
||
@dataclass | ||
class DC: | ||
a: int = -1 | ||
b: float = 2.1 | ||
c: str = "Hello, Flyte" | ||
d: bool = False | ||
e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2]) | ||
f: List[FlyteFile] = field( | ||
default_factory=lambda: [ | ||
FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
) | ||
] | ||
) | ||
g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]]) | ||
h: List[Dict[int, bool]] = field( | ||
default_factory=lambda: [{0: False}, {1: True}, {-1: True}] | ||
) | ||
i: Dict[int, bool] = field( | ||
default_factory=lambda: {0: False, 1: True, -1: False} | ||
) | ||
j: Dict[int, FlyteFile] = field( | ||
default_factory=lambda: { | ||
0: FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
), | ||
1: FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
), | ||
-1: FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
), | ||
} | ||
) | ||
k: Dict[int, List[int]] = field( | ||
default_factory=lambda: {0: [0, 1, -1]} | ||
) | ||
l: Dict[int, Dict[int, int]] = field( | ||
default_factory=lambda: {1: {-1: 0}} | ||
) | ||
m: dict = field(default_factory=lambda: {"key": "value"}) | ||
n: FlyteFile = field( | ||
default_factory=lambda: FlyteFile( | ||
"https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" | ||
"examples/data_types_and_io/data_types_and_io/attribute_access.py" | ||
) | ||
) | ||
inner_dc: InnerDC = field(default_factory=lambda: InnerDC()) | ||
enum_status: Status = field(default=Status.PENDING) | ||
|
||
|
||
@task(container_image=image_spec) | ||
def t_dc(dc: DC) -> DC: | ||
return dc | ||
@task | ||
def print_message(message: str): | ||
print(message) | ||
return | ||
|
||
|
||
@task(container_image=image_spec) | ||
def t_inner(inner_dc: InnerDC) -> InnerDC: | ||
assert isinstance(inner_dc, InnerDC), "inner_dc is not of type InnerDC" | ||
# Access an output list using index notation | ||
@task | ||
def list_task() -> list[str]: | ||
return ["apple", "banana"] | ||
|
||
# f: List[FlyteFile] | ||
for ff in inner_dc.f: | ||
assert isinstance(ff, FlyteFile), "Expected FlyteFile" | ||
with open(ff, "r") as f: | ||
print(f.read()) | ||
|
||
# j: Dict[int, FlyteFile] | ||
for _, ff in inner_dc.j.items(): | ||
assert isinstance(ff, FlyteFile), "Expected FlyteFile in j" | ||
with open(ff, "r") as f: | ||
print(f.read()) | ||
@workflow | ||
def list_wf(): | ||
items = list_task() | ||
first_item = items[0] | ||
print_message(message=first_item) | ||
|
||
# n: FlyteFile | ||
assert isinstance(inner_dc.n, FlyteFile), "n is not FlyteFile" | ||
with open(inner_dc.n, "r") as f: | ||
print(f.read()) | ||
|
||
assert inner_dc.enum_status == Status.PENDING, "enum_status does not match" | ||
# Access the output dictionary by specifying the key | ||
@task | ||
def dict_task() -> dict[str, str]: | ||
return {"fruit": "banana"} | ||
|
||
return inner_dc | ||
|
||
@workflow | ||
def dict_wf(): | ||
fruit_dict = dict_task() | ||
print_message(message=fruit_dict["fruit"]) | ||
|
||
@task(container_image=image_spec) | ||
def t_test_all_attributes( | ||
a: int, | ||
b: float, | ||
c: str, | ||
d: bool, | ||
e: List[int], | ||
f: List[FlyteFile], | ||
g: List[List[int]], | ||
h: List[Dict[int, bool]], | ||
i: Dict[int, bool], | ||
j: Dict[int, FlyteFile], | ||
k: Dict[int, List[int]], | ||
l: Dict[int, Dict[int, int]], | ||
m: dict, | ||
n: FlyteFile, | ||
enum_status: Status, | ||
): | ||
|
||
# Strict type checks for simple types | ||
assert isinstance(a, int), f"a is not int, it's {type(a)}" | ||
assert a == -1 | ||
assert isinstance(b, float), f"b is not float, it's {type(b)}" | ||
assert isinstance(c, str), f"c is not str, it's {type(c)}" | ||
assert isinstance(d, bool), f"d is not bool, it's {type(d)}" | ||
# Directly access an attribute of a dataclass | ||
@dataclass | ||
class Fruit: | ||
name: str | ||
|
||
# Strict type checks for List[int] | ||
assert isinstance(e, list) and all( | ||
isinstance(i, int) for i in e | ||
), "e is not List[int]" | ||
|
||
# Strict type checks for List[FlyteFile] | ||
assert isinstance(f, list) and all( | ||
isinstance(i, FlyteFile) for i in f | ||
), "f is not List[FlyteFile]" | ||
@task | ||
def dataclass_task() -> Fruit: | ||
return Fruit(name="banana") | ||
|
||
# Strict type checks for List[List[int]] | ||
assert isinstance(g, list) and all( | ||
isinstance(i, list) and all(isinstance(j, int) for j in i) for i in g | ||
), "g is not List[List[int]]" | ||
|
||
# Strict type checks for List[Dict[int, bool]] | ||
assert isinstance(h, list) and all( | ||
isinstance(i, dict) | ||
and all(isinstance(k, int) and isinstance(v, bool) for k, v in i.items()) | ||
for i in h | ||
), "h is not List[Dict[int, bool]]" | ||
@workflow | ||
def dataclass_wf(): | ||
fruit_instance = dataclass_task() | ||
print_message(message=fruit_instance.name) | ||
|
||
# Strict type checks for Dict[int, bool] | ||
assert isinstance(i, dict) and all( | ||
isinstance(k, int) and isinstance(v, bool) for k, v in i.items() | ||
), "i is not Dict[int, bool]" | ||
|
||
# Strict type checks for Dict[int, FlyteFile] | ||
assert isinstance(j, dict) and all( | ||
isinstance(k, int) and isinstance(v, FlyteFile) for k, v in j.items() | ||
), "j is not Dict[int, FlyteFile]" | ||
# Combinations of list, dict and dataclass also work effectively | ||
@task | ||
def advance_task() -> (dict[str, list[str]], list[dict[str, str]], dict[str, Fruit]): | ||
return {"fruits": ["banana"]}, [{"fruit": "banana"}], {"fruit": Fruit(name="banana")} | ||
|
||
# Strict type checks for Dict[int, List[int]] | ||
assert isinstance(k, dict) and all( | ||
isinstance(k, int) | ||
and isinstance(v, list) | ||
and all(isinstance(i, int) for i in v) | ||
for k, v in k.items() | ||
), "k is not Dict[int, List[int]]" | ||
|
||
# Strict type checks for Dict[int, Dict[int, int]] | ||
assert isinstance(l, dict) and all( | ||
isinstance(k, int) | ||
and isinstance(v, dict) | ||
and all( | ||
isinstance(sub_k, int) and isinstance(sub_v, int) | ||
for sub_k, sub_v in v.items() | ||
) | ||
for k, v in l.items() | ||
), "l is not Dict[int, Dict[int, int]]" | ||
@task | ||
def print_list(fruits: list[str]): | ||
print(fruits) | ||
|
||
# Strict type check for a generic dict | ||
assert isinstance(m, dict), "m is not dict" | ||
|
||
# Strict type check for FlyteFile | ||
assert isinstance(n, FlyteFile), "n is not FlyteFile" | ||
@task | ||
def print_dict(fruit_dict: dict[str, str]): | ||
print(fruit_dict) | ||
|
||
# Strict type check for Enum | ||
assert isinstance(enum_status, Status), "enum_status is not Status" | ||
|
||
print("All attributes passed strict type checks.") | ||
@workflow | ||
def advanced_workflow(): | ||
dictionary_list, list_dict, dict_dataclass = advance_task() | ||
print_message(message=dictionary_list["fruits"][0]) | ||
print_message(message=list_dict[0]["fruit"]) | ||
print_message(message=dict_dataclass["fruit"].name) | ||
|
||
print_list(fruits=dictionary_list["fruits"]) | ||
print_dict(fruit_dict=list_dict[0]) | ||
|
||
@workflow | ||
def wf(dc: DC): | ||
new_dc = t_dc(dc=dc) | ||
t_inner(new_dc.inner_dc) | ||
t_test_all_attributes( | ||
a=new_dc.a, | ||
b=new_dc.b, | ||
c=new_dc.c, | ||
d=new_dc.d, | ||
e=new_dc.e, | ||
f=new_dc.f, | ||
g=new_dc.g, | ||
h=new_dc.h, | ||
i=new_dc.i, | ||
j=new_dc.j, | ||
k=new_dc.k, | ||
l=new_dc.l, | ||
m=new_dc.m, | ||
n=new_dc.n, | ||
enum_status=new_dc.enum_status, | ||
) | ||
t_test_all_attributes( | ||
a=new_dc.inner_dc.a, | ||
b=new_dc.inner_dc.b, | ||
c=new_dc.inner_dc.c, | ||
d=new_dc.inner_dc.d, | ||
e=new_dc.inner_dc.e, | ||
f=new_dc.inner_dc.f, | ||
g=new_dc.inner_dc.g, | ||
h=new_dc.inner_dc.h, | ||
i=new_dc.inner_dc.i, | ||
j=new_dc.inner_dc.j, | ||
k=new_dc.inner_dc.k, | ||
l=new_dc.inner_dc.l, | ||
m=new_dc.inner_dc.m, | ||
n=new_dc.inner_dc.n, | ||
enum_status=new_dc.inner_dc.enum_status, | ||
) | ||
|
||
# Run the workflows locally | ||
if __name__ == "__main__": | ||
wf(dc=DC()) | ||
list_wf() | ||
dict_wf() | ||
dataclass_wf() | ||
advanced_workflow() |
Oops, something went wrong.