Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 9 additions & 91 deletions content/user-guide/flyte-2/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,56 +33,10 @@ domain-specific language (DSL).

{{< tabs "flyte-2-python" >}}
{{< tab "Sync Python" >}}
{{< markdown >}}

```python
import flyte

env = flyte.TaskEnvironment("hello_world")

@env.task
def hello_world(name: str) -> str:
return f"Hello, {name}!"

@env.task
def main(name: str) -> str:
for i in range(10):
hello_world(name)
return "Done"

if __name__ == "__main__":
flyte.init()
flyte.run(main, name="World")
```

{{< /markdown >}}
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/sync.py" lang="python" >}}
{{< /tab >}}
{{< tab "Async Python" >}}
{{< markdown >}}

```python
import flyte

env = flyte.TaskEnvironment("hello_world")

@env.task
async def hello_world(name: str) -> str:
return f"Hello, {name}!"

@env.task
async def main(name: str) -> str:
results = []
for i in range(10):
results.append(hello_world(name))
await asyncio.gather(*results)
return "Done"

if __name__ == "__main__":
flyte.init()
flyte.run(main, name="World")
```

{{< /markdown >}}
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async.py" lang="python" >}}
{{< /tab >}}
{{< /tabs >}}

Expand Down Expand Up @@ -115,21 +69,7 @@ Tasks are defined within environments, which encapsulate the context and resourc
Flyte tasks support caching via `@env.task(cache=...)`, but tracing with `@flyte.trace` augments task level-caching
even further enabling reproducibility and recovery at the sub-task function level.

```python
@flyte.trace
async def call_llm(prompt: str) -> str:
return ...

@env.task
def finalize_output(output: str) -> str:
return ...

@env.task(cache=flyte.Cache(behavior="auto"))
async def main(prompt: str) -> str:
output = await call_llm(prompt)
output = await finalize_output(output)
return output
```
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/trace.py" lang="python" >}}

Here the `call_llm` function is called in the same container as `main` that serves as an automated checkpoint with full
observability in the UI. If the task run fails, the workflow is able to recover and replay from where it left off.
Expand All @@ -145,21 +85,7 @@ Flyte 2 provides full management of the workflow lifecycle through a standardize

You can also fetch and run remote (previously deployed) tasks within the course of a running workflow.

```python
import flyte.remote

env = flyte.TaskEnvironment(name="root")

# get remote tasks that were previously deployed
torch_task = flyte.remote.Task.get("torch_env.torch_task", auto_version="latest")
spark_task = flyte.remote.Task.get("spark_env.spark_task", auto_version="latest")

@env.task
def main() -> flyte.File:
dataset = await spark_task(value)
model = await torch_task(dataset)
return model
```
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/remote.py" lang="python" >}}

## Native Notebook support

Expand All @@ -174,20 +100,13 @@ Author and run workflows and fetch workflow metadata (I/O and logs) directly fro

Schedule tasks in milliseconds with reusable containers, which massively increases the throughput of containerized tasks.

```python
env = flyte.TaskEnvironment(
name="reusable",
resources=flyte.Resources(memory="500Mi", cpu=1),
reusable=flyte.ReusePolicy(
replicas=4, # Min of 2 replacas are needed to ensure no-starvation of tasks.
idle_ttl=300,
),
image=flyte.Image.from_debian_base().with_pip_packages("unionai-reuse==0.1.3"),
)
```
{{< /markdown >}}
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/reusable.py" lang="python" >}}
{{< markdown >}}

Coupled with multi-cluster, multi-cloud, and multi-region support, Flyte 2 can scale to handle even the most demanding
workflows.

{{< /markdown >}}
{{< /variant >}}

Expand All @@ -197,5 +116,4 @@ New UI with a streamlined and user-friendly experience for authoring and managin

![New UI](https://raw.githubusercontent.com/unionai/unionai-docs-static/main/images/user-guide/v2ui.png)

This UI improves the visualization of workflow execution and monitoring, simplifying access to logs, metadata, and other
important information.
This UI improves the visualization of workflow execution and monitoring, simplifying access to logs, metadata, and other important information.
71 changes: 4 additions & 67 deletions content/user-guide/flyte-2/async.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,7 @@ Python's asynchronous programming capabilities have evolved significantly:

Consider this pattern for parallel data processing:

```python
import asyncio
import flyte

env = flyte.TaskEnvironment("data_pipeline")

@env.task
async def process_chunk(chunk_id: int, data: str) -> str:
# This could be any computational work - CPU or I/O bound
await asyncio.sleep(1) # Simulating work
return f"Processed chunk {chunk_id}: {data}"

@env.task
async def parallel_pipeline(data_chunks: List[str]) -> List[str]:
# Create coroutines for all chunks
tasks = []
for i, chunk in enumerate(data_chunks):
tasks.append(process_chunk(i, chunk))

# Execute all chunks in parallel
results = await asyncio.gather(*tasks)
return results
```
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async/async.py" lang="python" >}}

In standard Python, this would provide concurrency benefits primarily for I/O-bound operations.
In Flyte 2, the orchestrator schedules each `process_chunk` task on separate Kubernetes pods or configured plugins, achieving true parallelism for any type of work.
Expand All @@ -106,22 +84,7 @@ The Flyte platform handles the complex orchestration while you express paralleli

Recognizing that many existing codebases use synchronous functions, Flyte 2 provides seamless backward compatibility:

```python
@env.task
def legacy_computation(x: int) -> int:
# Existing synchronous function works unchanged
return x * x + 2 * x + 1

@env.task
async def modern_workflow(numbers: List[int]) -> List[int]:
# Call sync tasks from async context using .aio()
tasks = []
for num in numbers:
tasks.append(legacy_computation.aio(num))

results = await asyncio.gather(*tasks)
return results
```
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async/calling_sync_from_async.py" lang="python" >}}

Under the hood, Flyte automatically "asyncifies" synchronous functions, wrapping them to participate seamlessly in the async execution model.
You don't need to rewrite existing code—just leverage the `.aio()` method when calling sync tasks from async contexts.
Expand All @@ -133,37 +96,11 @@ The new `flyte.map` can be used either in synchronous or asynchronous contexts,

{{< tabs "whats-new-map-function" >}}
{{< tab "Sync Map" >}}
{{< markdown >}}
```python
@env.task
def sync_map_example(n: int) -> List[str]:
# Synchronous version for easier migration
results = []
for result in flyte.map(process_item, range(n)):
if isinstance(result, Exception):
raise result
results.append(result)
return results
```
{{< /markdown >}}
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async/sync_map.py" lang="python" >}}
{{< /tab >}}

{{< tab "Async Map" >}}
{{< markdown >}}
```python
@env.task
async def async_map_example(n: int) -> List[str]:
# Async version using flyte.map
results = []
async for result in flyte.map.aio(process_item, range(n)):
if isinstance(result, Exception):
raise result
results.append(result)
return results
```
{{< /markdown >}}
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async/async_map.py" lang="python" >}}
{{< /tab >}}

{{< /tabs >}}

The `flyte.map` function provides:
Expand Down
30 changes: 2 additions & 28 deletions content/user-guide/flyte-2/pure-python.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Flyte 2 introduces a new way of writing workflows that is based on pure Python,
{{< tabs "whats-new-dsl-to-python" >}}
{{< tab "Flyte 1" >}}
{{< markdown >}}

```python
import flytekit

Expand Down Expand Up @@ -47,34 +48,7 @@ def main(data: list[float]) -> float:
{{< /markdown >}}
{{< /tab >}}
{{< tab "Flyte 2" >}}
{{< markdown >}}

```python
import flyte

env = flyte.TaskEnvironment(
"hello_world",
image=flyte.Image.from_debian_base().with_pip_packages(...),
)

@env.task
def mean(data: list[float]) -> float:
return sum(list) / len(list)

@env.task
def main(data: list[float]) -> float:
output = mean(data)

# ✅ performing trivial operations in a workflow is allowed
output = output / 100

# ✅ if/else is allowed
if output < 0:
raise ValueError("Output cannot be negative")

return output
```
{{< /markdown >}}
{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/pure-python/flyte_2.py" lang="python" >}}
{{< /tab >}}
{{< /tabs >}}

Expand Down
28 changes: 2 additions & 26 deletions content/user-guide/getting-started/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,7 @@ This deploys your code to the configured Union/Flyte instance and runs it immedi

To run your workflow remotely from Python, use [`flyte.run()`](../../api-reference/flyte-sdk/packages/flyte#run) by itself, like this:

```python
# hello.py

... # Your sub-task definitions here

@env.task
def main(name: str):
... # The main task logic here

if __name__ == "__main__":
flyte.init_from_config()
flyte.run(main, name="Ada")
```
{{< code file="/external/unionai-examples/v2/user-guide/getting-started/running/run_from_python.py" lang="python" >}}

This is the approach we use throughout our examples in this guide.
We execute the script, thus invoking the `flyte.run()` function, with the top-level task as a parameter.
Expand All @@ -58,19 +46,7 @@ flyte run --local hello.py main

To run your workflow locally from Python, you chain [`flyte.with_runcontext()`](../../api-reference/flyte-sdk/packages/flyte#with_runcontext) with [`flyte.run()`](../../api-reference/flyte-sdk/packages/flyte#run) and specify the run `mode="local"`, like this:

```python
# hello.py

... # Your other task definitions here

@env.task
def main(name: str):
... # The main task logic here

if __name__ == "__main__":
flyte.init_from_config()
flyte.with_runcontext(mode="local").run(main)
```
{{< code file="/external/unionai-examples/v2/user-guide/getting-started/running/run_local_from_python.py" lang="python" >}}

Running your workflow locally is useful for testing and debugging, as it allows you to run your code without deploying it to a remote instance.
It also lets you quickly iterate on your code without the overhead of deployment.
Expand Down
Loading