diff --git a/content/user-guide/flyte-2/_index.md b/content/user-guide/flyte-2/_index.md index bccc1921..fda3fcdb 100644 --- a/content/user-guide/flyte-2/_index.md +++ b/content/user-guide/flyte-2/_index.md @@ -33,56 +33,10 @@ domain-specific language (DSL). {{< tabs "flyte-2-python" >}} {{< tab "Sync Python" >}} -{{< markdown >}} - -```python -import flyte - -env = flyte.TaskEnvironment("hello_world") - -@env.task -def hello_world(name: str) -> str: - return f"Hello, {name}!" - -@env.task -def main(name: str) -> str: - for i in range(10): - hello_world(name) - return "Done" - -if __name__ == "__main__": - flyte.init() - flyte.run(main, name="World") -``` - -{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/sync.py" lang="python" >}} {{< /tab >}} {{< tab "Async Python" >}} -{{< markdown >}} - -```python -import flyte - -env = flyte.TaskEnvironment("hello_world") - -@env.task -async def hello_world(name: str) -> str: - return f"Hello, {name}!" - -@env.task -async def main(name: str) -> str: - results = [] - for i in range(10): - results.append(hello_world(name)) - await asyncio.gather(*results) - return "Done" - -if __name__ == "__main__": - flyte.init() - flyte.run(main, name="World") -``` - -{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async.py" lang="python" >}} {{< /tab >}} {{< /tabs >}} @@ -115,21 +69,7 @@ Tasks are defined within environments, which encapsulate the context and resourc Flyte tasks support caching via `@env.task(cache=...)`, but tracing with `@flyte.trace` augments task level-caching even further enabling reproducibility and recovery at the sub-task function level. -```python -@flyte.trace -async def call_llm(prompt: str) -> str: - return ... - -@env.task -def finalize_output(output: str) -> str: - return ... - -@env.task(cache=flyte.Cache(behavior="auto")) -async def main(prompt: str) -> str: - output = await call_llm(prompt) - output = await finalize_output(output) - return output -``` +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/trace.py" lang="python" >}} Here the `call_llm` function is called in the same container as `main` that serves as an automated checkpoint with full observability in the UI. If the task run fails, the workflow is able to recover and replay from where it left off. @@ -145,21 +85,7 @@ Flyte 2 provides full management of the workflow lifecycle through a standardize You can also fetch and run remote (previously deployed) tasks within the course of a running workflow. -```python -import flyte.remote - -env = flyte.TaskEnvironment(name="root") - -# get remote tasks that were previously deployed -torch_task = flyte.remote.Task.get("torch_env.torch_task", auto_version="latest") -spark_task = flyte.remote.Task.get("spark_env.spark_task", auto_version="latest") - -@env.task -def main() -> flyte.File: - dataset = await spark_task(value) - model = await torch_task(dataset) - return model -``` +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/remote.py" lang="python" >}} ## Native Notebook support @@ -174,20 +100,13 @@ Author and run workflows and fetch workflow metadata (I/O and logs) directly fro Schedule tasks in milliseconds with reusable containers, which massively increases the throughput of containerized tasks. -```python -env = flyte.TaskEnvironment( - name="reusable", - resources=flyte.Resources(memory="500Mi", cpu=1), - reusable=flyte.ReusePolicy( - replicas=4, # Min of 2 replacas are needed to ensure no-starvation of tasks. - idle_ttl=300, - ), - image=flyte.Image.from_debian_base().with_pip_packages("unionai-reuse==0.1.3"), -) -``` +{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/reusable.py" lang="python" >}} +{{< markdown >}} Coupled with multi-cluster, multi-cloud, and multi-region support, Flyte 2 can scale to handle even the most demanding workflows. + {{< /markdown >}} {{< /variant >}} @@ -197,5 +116,4 @@ New UI with a streamlined and user-friendly experience for authoring and managin ![New UI](https://raw.githubusercontent.com/unionai/unionai-docs-static/main/images/user-guide/v2ui.png) -This UI improves the visualization of workflow execution and monitoring, simplifying access to logs, metadata, and other -important information. +This UI improves the visualization of workflow execution and monitoring, simplifying access to logs, metadata, and other important information. diff --git a/content/user-guide/flyte-2/async.md b/content/user-guide/flyte-2/async.md index c89694e1..b68158d2 100644 --- a/content/user-guide/flyte-2/async.md +++ b/content/user-guide/flyte-2/async.md @@ -59,29 +59,7 @@ Python's asynchronous programming capabilities have evolved significantly: Consider this pattern for parallel data processing: -```python -import asyncio -import flyte - -env = flyte.TaskEnvironment("data_pipeline") - -@env.task -async def process_chunk(chunk_id: int, data: str) -> str: - # This could be any computational work - CPU or I/O bound - await asyncio.sleep(1) # Simulating work - return f"Processed chunk {chunk_id}: {data}" - -@env.task -async def parallel_pipeline(data_chunks: List[str]) -> List[str]: - # Create coroutines for all chunks - tasks = [] - for i, chunk in enumerate(data_chunks): - tasks.append(process_chunk(i, chunk)) - - # Execute all chunks in parallel - results = await asyncio.gather(*tasks) - return results -``` +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async/async.py" lang="python" >}} In standard Python, this would provide concurrency benefits primarily for I/O-bound operations. In Flyte 2, the orchestrator schedules each `process_chunk` task on separate Kubernetes pods or configured plugins, achieving true parallelism for any type of work. @@ -106,22 +84,7 @@ The Flyte platform handles the complex orchestration while you express paralleli Recognizing that many existing codebases use synchronous functions, Flyte 2 provides seamless backward compatibility: -```python -@env.task -def legacy_computation(x: int) -> int: - # Existing synchronous function works unchanged - return x * x + 2 * x + 1 - -@env.task -async def modern_workflow(numbers: List[int]) -> List[int]: - # Call sync tasks from async context using .aio() - tasks = [] - for num in numbers: - tasks.append(legacy_computation.aio(num)) - - results = await asyncio.gather(*tasks) - return results -``` +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async/calling_sync_from_async.py" lang="python" >}} Under the hood, Flyte automatically "asyncifies" synchronous functions, wrapping them to participate seamlessly in the async execution model. You don't need to rewrite existing code—just leverage the `.aio()` method when calling sync tasks from async contexts. @@ -133,37 +96,11 @@ The new `flyte.map` can be used either in synchronous or asynchronous contexts, {{< tabs "whats-new-map-function" >}} {{< tab "Sync Map" >}} -{{< markdown >}} -```python -@env.task -def sync_map_example(n: int) -> List[str]: - # Synchronous version for easier migration - results = [] - for result in flyte.map(process_item, range(n)): - if isinstance(result, Exception): - raise result - results.append(result) - return results -``` -{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async/sync_map.py" lang="python" >}} {{< /tab >}} - {{< tab "Async Map" >}} -{{< markdown >}} -```python -@env.task -async def async_map_example(n: int) -> List[str]: - # Async version using flyte.map - results = [] - async for result in flyte.map.aio(process_item, range(n)): - if isinstance(result, Exception): - raise result - results.append(result) - return results -``` -{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/async/async_map.py" lang="python" >}} {{< /tab >}} - {{< /tabs >}} The `flyte.map` function provides: diff --git a/content/user-guide/flyte-2/pure-python.md b/content/user-guide/flyte-2/pure-python.md index 17d18a37..7eb992ff 100644 --- a/content/user-guide/flyte-2/pure-python.md +++ b/content/user-guide/flyte-2/pure-python.md @@ -19,6 +19,7 @@ Flyte 2 introduces a new way of writing workflows that is based on pure Python, {{< tabs "whats-new-dsl-to-python" >}} {{< tab "Flyte 1" >}} {{< markdown >}} + ```python import flytekit @@ -47,34 +48,7 @@ def main(data: list[float]) -> float: {{< /markdown >}} {{< /tab >}} {{< tab "Flyte 2" >}} -{{< markdown >}} - -```python -import flyte - -env = flyte.TaskEnvironment( - "hello_world", - image=flyte.Image.from_debian_base().with_pip_packages(...), -) - -@env.task -def mean(data: list[float]) -> float: - return sum(list) / len(list) - -@env.task -def main(data: list[float]) -> float: - output = mean(data) - - # ✅ performing trivial operations in a workflow is allowed - output = output / 100 - - # ✅ if/else is allowed - if output < 0: - raise ValueError("Output cannot be negative") - - return output -``` -{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/flyte-2/pure-python/flyte_2.py" lang="python" >}} {{< /tab >}} {{< /tabs >}} diff --git a/content/user-guide/getting-started/running.md b/content/user-guide/getting-started/running.md index 95564faa..30870129 100644 --- a/content/user-guide/getting-started/running.md +++ b/content/user-guide/getting-started/running.md @@ -26,19 +26,7 @@ This deploys your code to the configured Union/Flyte instance and runs it immedi To run your workflow remotely from Python, use [`flyte.run()`](../../api-reference/flyte-sdk/packages/flyte#run) by itself, like this: -```python -# hello.py - -... # Your sub-task definitions here - -@env.task -def main(name: str): - ... # The main task logic here - -if __name__ == "__main__": - flyte.init_from_config() - flyte.run(main, name="Ada") -``` +{{< code file="/external/unionai-examples/v2/user-guide/getting-started/running/run_from_python.py" lang="python" >}} This is the approach we use throughout our examples in this guide. We execute the script, thus invoking the `flyte.run()` function, with the top-level task as a parameter. @@ -58,19 +46,7 @@ flyte run --local hello.py main To run your workflow locally from Python, you chain [`flyte.with_runcontext()`](../../api-reference/flyte-sdk/packages/flyte#with_runcontext) with [`flyte.run()`](../../api-reference/flyte-sdk/packages/flyte#run) and specify the run `mode="local"`, like this: -```python -# hello.py - -... # Your other task definitions here - -@env.task -def main(name: str): - ... # The main task logic here - -if __name__ == "__main__": - flyte.init_from_config() - flyte.with_runcontext(mode="local").run(main) -``` +{{< code file="/external/unionai-examples/v2/user-guide/getting-started/running/run_local_from_python.py" lang="python" >}} Running your workflow locally is useful for testing and debugging, as it allows you to run your code without deploying it to a remote instance. It also lets you quickly iterate on your code without the overhead of deployment. diff --git a/content/user-guide/task-configuration/_index.md b/content/user-guide/task-configuration/_index.md index e84162f3..bf996f9d 100644 --- a/content/user-guide/task-configuration/_index.md +++ b/content/user-guide/task-configuration/_index.md @@ -14,13 +14,7 @@ Flyte manages the spinning up of the containers, the execution of the code, and In **Getting started** we demonstrated the simplest possible case, a `TaskEnvironment` with only a `name` parameter, and an `env.task` decorator, with no parameters: -```python -env = flyte.TaskEnvironment(name="hello_world") - -@env.task -async def say_hello(data: str, lt: List[int]) -> str: - ... -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/task_env.py" lang="python" >}} > [!NOTE] > Notice how the `TaskEnvironment` is assigned to the variable `env` and then that variable is @@ -50,52 +44,7 @@ For shared parameters, the more specific level will override the more general on Here is an example of how these levels work together, showing each level with all available parameters: -```python -import flyte - -# Level 1: TaskEnvironment - Base configuration -env = flyte.TaskEnvironment( - name="data_processing_env", - image=flyte.Image.from_debian_base(), - resources=flyte.Resources(cpu=1, memory="512Mi"), - env_vars={"MY_VAR": "value"}, - secrets=flyte.Secret(key="my_api_key", as_env_var="MY_API_KEY"), - cache="disable", - pod_template=my_pod_template_spec, - reusable=flyte.ReusePolicy(replicas=2, idle_ttl=300), - depends_on=[another_env], - description="My task environment", - plugin_config=my_plugin_config -) - -# Level 2: Decorator - Override some environment settings -@env.task( - short_name="process", - secrets=flyte.Secret(key="my_api_key_2", as_env_var="MY_API_KEY"), - cache="auto" - pod_template=my_pod_template_spec_2, - report=True, - max_inline_io_bytes=100 * 1024 - retries=3, - timeout=60 - docs="This task processes data and generates a report." -) -async def process_data(data_path: str) -> str: - return f"Processed {data_path}" - -@env.task -async def main() -> str: - result = await process_data.override( - resources=flyte.Resources(cpu=4, memory="2Gi"), - env_vars={"MY_VAR": "new_value"}, - secrets=flyte.Secret(key="my_api_key_3", as_env_var="MY_API_KEY"), - cache="enable", - max_inline_io_bytes=100 * 1024, - retries=3, - timeout=60 - )("input.csv") - return result -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/config_levels.py" lang="python" >}} ### Parameter interaction @@ -136,13 +85,7 @@ The full set of parameters available for configuring a task environment, task de The fully qualified name is always the `TaskEnvironment` name (the one above) followed by a period and then the task function name (the name of the Python function being decorated). For example: - ```python - env = flyte.TaskEnvironment(name="my_env") - - @env.task - async def my_task(data: str) -> str: - ... - ``` + {{< code file="/external/unionai-examples/v2/user-guide/task-configuration/task_env_name.py" lang="python" >}} Here, the name of the TaskEnvironment is `my_env` and the fully qualified name of the task is `my_env.my_task`. The `TaskEnvironment` name and fully qualified name of a task name are both fixed and cannot be overridden. @@ -248,22 +191,9 @@ When a `TaskEnvironment` has `reusable` set, then `resources`, `env_vars`, and ` explicit `reusable="off"` in the same `task.override()` invocation. For example: -```python -env = flyte.TaskEnvironment( - name="my_env", - resources=Resources(cpu=1), - reusable=flyte.ReusePolicy(replicas=2, idle_ttl=300) -) - -@env.task -async def my_task(data: str) -> str: - ... - -@env.task -async def main_workflow() -> str: - # `my_task.override(resources=Resources(cpu=4))` will fail. Instead use: - result = await my_task.override(reusable="off", resources=Resources(cpu=4)) -``` +{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable.py" lang="python" >}} +{{< markdown >}} Additionally, `secrets` can only be overridden at the `@env.task` decorator level if the `TaskEnvironment` (`env`) does not have `reusable` set. diff --git a/content/user-guide/task-configuration/caching.md b/content/user-guide/task-configuration/caching.md index 97c93c03..09f968f0 100644 --- a/content/user-guide/task-configuration/caching.md +++ b/content/user-guide/task-configuration/caching.md @@ -36,11 +36,8 @@ Flyte 2 supports three main cache behaviors: ### `"auto"` - Automatic versioning -```python -@env.task(cache=Cache(behavior="auto")) -async def auto_versioned_task(data: str) -> str: - return transform_data(data) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="auto" lang="python" >}} + With `behavior="auto"`, the cache version is automatically generated based on the function's source code. If you change the function implementation, the cache is automatically invalidated. @@ -51,22 +48,14 @@ If you change the function implementation, the cache is automatically invalidate You can also use the direct string shorthand: -```python -@env.task(cache="auto") -async def auto_cached_task(x: int) -> int: - return transform_data(data) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="auto-shorthand" lang="python" >}} ### `"override"` With `behavior="override"`, you can specify a custom cache key in the `version_override` parameter. Since the cache key is fixed as part of the code, it can be manually changed when you need to invalidate the cache. -```python -@env.task(cache=Cache(behavior="override", version_override="v1.2")) -async def manually_versioned_task(data: str) -> str: - return transform_data(data) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="override" lang="python" >}} - **When to use**: When you need explicit control over cache invalidation. - **Cache invalidation**: Manual, by changing `version_override`. @@ -77,11 +66,7 @@ async def manually_versioned_task(data: str) -> str: To explicitly disable caching, use the `"disable"` behavior. **This is the default behavior.** -```python -@env.task(cache=Cache(behavior="disable")) -async def always_fresh_task(data: str) -> str: - return get_current_timestamp() + data -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="disable" lang="python" >}} - **When to use**: Non-deterministic functions, side effects, or always-fresh data. - **Cache invalidation**: N/A - never cached. @@ -89,11 +74,7 @@ async def always_fresh_task(data: str) -> str: You can also use the direct string shorthand: -```python -@env.task(cache="disable") -async def auto_cached_task(x: int) -> int: - return transform_data(data) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="disable-shorthand" lang="python" >}} ## Advanced caching configuration @@ -101,17 +82,8 @@ async def auto_cached_task(x: int) -> int: Sometimes you want to cache based on some inputs but not others: -```python -@env.task(cache=Cache( - behavior="override", - version_override="v1", - ignored_inputs=("debug_flag", "logging_level") -)) -async def selective_caching(data: str, debug_flag: bool, logging_level: str) -> str: - if debug_flag: - print(f"Debug: processing {data}") - return process_data(data) -``` + +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="ignored" lang="python" >}} **This is useful for**: - Debug flags that don't affect computation @@ -122,15 +94,7 @@ async def selective_caching(data: str, debug_flag: bool, logging_level: str) -> Cache serialization ensures that only one instance of a task runs at a time for identical inputs: -```python -@env.task(cache=Cache( - behavior="auto", - serialize=True -)) -async def expensive_model_training(dataset: str, params: dict) -> str: - model = train_large_model(dataset, params) - return save_model(model) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="serialize" lang="python" >}} **When to use serialization**: - Very expensive computations (model training, large data processing) @@ -147,15 +111,7 @@ async def expensive_model_training(dataset: str, params: dict) -> str: Use `salt` to vary cache keys without changing function logic: -```python -@env.task(cache=Cache( - behavior="override", - version_override="v1", - salt="experiment_2024_q4" -)) -async def experimental_analysis(data: str) -> dict: - return run_analysis(data) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="salt" lang="python" >}} **`salt` is useful for**: - A/B testing with identical code. @@ -170,39 +126,13 @@ For `behavior="auto"`, Flyte uses cache policies to generate version hashes. The default `FunctionBodyPolicy` generates cache versions from the function's source code: -```python -from flyte import Cache -from flyte._cache import FunctionBodyPolicy - -@env.task(cache=Cache( - behavior="auto", - # policies=[FunctionBodyPolicy()] <-- This is the default. Does not actually need to be specified. -)) -async def code_sensitive_task(data: str) -> str: - return data.upper() -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="policy" lang="python" >}} ### Custom cache policies You can implement custom cache policies by following the `CachePolicy` protocol: -```python -from flyte._cache import CachePolicy, VersionParameters - -class DatasetVersionPolicy(CachePolicy): - def get_version(self, salt: str, params: VersionParameters) -> str: - # Generate version based on custom logic - dataset_version = get_dataset_version() - return f"{salt}_{dataset_version}" - -@env.task(cache=Cache( - behavior="auto", - policies=[DatasetVersionPolicy()] -)) -async def dataset_dependent_task(data: str) -> str: - # Cache invalidated when dataset version changes - return process_with_current_dataset(data) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="custom-policy" lang="python" >}} ## Caching configuration at different levels @@ -214,57 +144,25 @@ You can configure caching at the `TaskEnvironment` level. This will set the default cache behavior for all tasks defined using that environment. For example: -```python -env = flyte.TaskEnvironment( - name="cached_environment", - cache=Cache(behavior="auto") # Default for all tasks -) - -@env.task # Inherits auto caching from environment -async def inherits_caching(data: str) -> str: - return process_data(data) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="env-level" lang="python" >}} ### `@env.task` decorator level By setting the cache parameter in the `@env.task` decorator, you can override the environment's default cache behavior for specific tasks: -```python -@env.task(cache=Cache(behavior="disable")) # Override environment default -async def overrides_caching(data: str) -> str: - return get_timestamp() -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="decorator-level" lang="python" >}} ### `task.override` level By setting the cache parameter in the `task.override` method, you can override the cache behavior for specific task invocations: -```python -@env.task -async def main(data: str) -> str : - return override_caching_on_call(data).override(cache=Cache(behavior="disable")) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/caching.py" fragment="override-level" lang="python" >}} ## Runtime cache control You can also force cache invalidation for a specific run: -```python -# Disable caching for this specific execution -run = flyte.with_runcontext(overwrite_cache=True).run(my_cached_task, data="test") -``` - -## Checking cache status - -Tasks can access cache information through the execution context: - -```python -@env.task(cache=Cache(behavior="auto")) -async def cache_aware_task(data: str) -> str: - ctx = flyte.ctx() - # Access cache-related context if needed - return process_data(data) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/cache_control.py" fragment="cache-control" lang="python" >}} ## Project and domain cache isolation @@ -276,102 +174,10 @@ Caches are automatically isolated by: When running locally, Flyte maintains a local cache: -```python -# Local execution uses ~/.flyte/local-cache/ -flyte.init() # Local mode -result = flyte.run(my_cached_task, data="test") -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/caching/local_cache.py" fragment="local-cache" lang="python" >}} Local cache behavior: - Stored in `~/.flyte/local-cache/` directory - No project/domain isolation (since running locally) - Can be cleared with `flyte local-cache clear` - Disabled by setting `FLYTE_LOCAL_CACHE_ENABLED=false` - -## Best practices - -### When to enable caching - -**Good candidates for caching**: -- Pure functions with deterministic outputs. -- Expensive computations (model training, data processing). -- External API calls with stable responses. -- File processing operations. - -**Avoid caching for**: -- Functions with side effects (logging, file writes) -- Non-deterministic functions (random generation, timestamps). -- Functions that interact with external state. -- Very fast operations where caching overhead exceeds benefits. - -### Cache configuration strategies - -1. **Start with `behavior="auto"`** for most tasks -2. **Use `behavior="override"`** for stable production workflows -3. **Add `serialize=True`** for expensive, parallelizable operations -4. **Configure `ignored_inputs`** for debug/metadata parameters -5. **Use environment-level defaults** to reduce configuration duplication - -### Performance considerations - -- **Cache hit performance**: Retrieving cached results is orders of magnitude faster than re-execution -- **Cache miss overhead**: Minimal overhead for cache key generation and lookup -- **Storage costs**: Cached outputs consume storage space in your Flyte backend -- **Network transfer**: Large cached objects may have network transfer costs - -### Debugging cached executions - -When troubleshooting cache behavior: - -1. **Check cache configuration**: Verify `behavior`, `version_override`, and `ignored_inputs` -2. **Review function changes**: Auto-cached functions invalidate on source code changes -3. **Inspect input variations**: Even small input changes create new cache keys -4. **Use cache override**: Force fresh execution with `overwrite_cache=True` -5. **Monitor cache hits**: Check execution logs for cache hit/miss information - -## Example: Complete caching workflow - -Here's a comprehensive example showing different caching patterns: - -```python -import flyte -from flyte import Cache - -env = flyte.TaskEnvironment(name="ml_pipeline", cache=Cache(behavior="auto")) - -@env.task # Uses environment default (auto caching) -async def load_dataset(dataset_name: str) -> dict: - # Cached - expensive data loading - return load_large_dataset(dataset_name) - -@env.task(cache=Cache(behavior="override", version_override="v2.1")) -async def preprocess_data(raw_data: dict, config: dict) -> dict: - # Manual versioning for stable preprocessing logic - return apply_preprocessing(raw_data, config) - -@env.task(cache=Cache( - behavior="auto", - serialize=True, - ignored_inputs=("experiment_name",) -)) -async def train_model(data: dict, hyperparams: dict, experiment_name: str) -> str: - # Serialized expensive training, ignoring experiment name - model = train_neural_network(data, hyperparams) - return save_model(model, experiment_name) - -@env.task(cache=Cache(behavior="disable")) -async def generate_report(model_path: str, timestamp: str) -> str: - # Never cached - always generates fresh reports - return create_timestamped_report(model_path, timestamp) - -@env.task -async def ml_pipeline(dataset_name: str, config: dict, hyperparams: dict) -> str: - # Orchestrator task - benefits from sub-task caching - raw_data = await load_dataset(dataset_name) - processed_data = await preprocess_data(raw_data, config) - model_path = await train_model(processed_data, hyperparams, "exp_001") - report = await generate_report(model_path, "2024-01-15") - return report -``` - -This pipeline efficiently caches expensive operations while ensuring fresh outputs where needed, demonstrating the flexibility and power of Flyte's caching system. diff --git a/content/user-guide/task-configuration/container-images.md b/content/user-guide/task-configuration/container-images.md index 3fe79b1a..1a2d76a3 100644 --- a/content/user-guide/task-configuration/container-images.md +++ b/content/user-guide/task-configuration/container-images.md @@ -15,12 +15,7 @@ If a `TaskEnvironment` does not specify an `image`, it will use the default Flyt You can directly reference an image by URL in the `image` parameter, like this: -```python -env = flyte.TaskEnvironment( - name="my_task_env", - image="docker.io/myorg/myimage" -) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/container-images/direct_image.py" fragment="direct-image" lang="python" >}} This works well if you have a pre-built image available in a public registry like Docker Hub or in a private registry that your Union/Flyte instance can access. @@ -68,41 +63,7 @@ This image is itself based on the official Python Docker image (specifically `py Starting there, you can layer additional features onto your image. For example: -```python -import flyte - -# Define the task environment -env = flyte.TaskEnvironment( - name="my_env", - image = ( - flyte.Image.from_debian_base( - name="my-image" - python_version=(3, 13), - registry="ghcr.io/my_gh_org" # Only needed for local builds - ) - .with_apt_packages("git", "curl") - .with_pip_packages("numpy", "pandas", "scikit-learn") - .with_env_vars({"MY_CONFIG": "production"}) - ) -) - - -# Supporting task definitions -... - -# Main task definition -@env.task -def main(x_list: list[int] = list(range(10))) -> float: - ... - -# Init and run -if __name__ == "__main__": - flyte.init_from_config("config.yaml") - run = flyte.run(main, x_list=list(range(10))) - print(run.name) - print(run.url) - run.wait(run) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/container-images/from_debian_base.py" fragment="from-debian-base" lang="python" >}} > [!NOTE] > The `registry` parameter is only needed if you are building the image locally. It is not required when using the Union backend `ImageBuilder`. @@ -114,49 +75,7 @@ Another common technique for defining an image is to use [`uv` inline script met The `from_uv_script` method starts with the default Flyte image and adds the dependencies specified in the `uv` metadata. For example: -```python -# /// script -# requires-python = ">=3.13" -# dependencies = [ -# "flyte", -# "numpy", -# "pandas", -# "scikit-learn" -# ] -# /// - -... - -env = flyte.TaskEnvironment( - name="my_env", - image=flyte.Image.from_uv_script( - __file__, - name="my_image", - registry="ghcr.io/my_gh_org" # Only needed for local builds - ) -) - -# Supporting task definitions -... - -# Main task definition -@env.task -def main(x_list: list[int] = list(range(10))) -> float: - ... - -# Init and run -if __name__ == "__main__": - # Init for remote run on backend - flyte.init_from_config("config.yaml") - - # Init for local run - # flyte.init() - - run = flyte.run(main, x_list=list(range(10))) - print(run.name) - print(run.url) - run.wait(run) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/container-images/from_uv_script.py" fragment="from-uv-script" lang="python" >}} The advantage of this approach is that the dependencies used when running a script locally and when running it on the Flyte/Union backend are always the same (as long as you use `uv` to run your scripts locally). This means you can develop and test your scripts in a consistent environment, reducing the chances of encountering issues when deploying to the backend. diff --git a/content/user-guide/task-configuration/pod-templates.md b/content/user-guide/task-configuration/pod-templates.md index da0f6d48..00f36ddd 100644 --- a/content/user-guide/task-configuration/pod-templates.md +++ b/content/user-guide/task-configuration/pod-templates.md @@ -25,74 +25,13 @@ The `pod_template` parameter accepts either a string reference or a `PodTemplate Here's a complete example showing how to use pod templates with a `TaskEnvironment`: -```python -# /// script -# requires-python = "==3.12" -# dependencies = [ -# "kubernetes", -# ] -# /// - -from kubernetes.client import ( - V1Container, - V1EnvVar, - V1LocalObjectReference, - V1PodSpec, -) - -import flyte - -# Create a custom pod template -pod_template = flyte.PodTemplate( - primary_container_name="primary", # Name of the main container - labels={"lKeyA": "lValA"}, # Custom pod labels - annotations={"aKeyA": "aValA"}, # Custom pod annotations - pod_spec=V1PodSpec( # Kubernetes pod specification - containers=[ - V1Container( - name="primary", - env=[V1EnvVar(name="hello", value="world")] # Environment variables - ) - ], - image_pull_secrets=[ # Access to private registries - V1LocalObjectReference(name="regcred-test") - ], - ), -) - -# Use the pod template in a TaskEnvironment -env = flyte.TaskEnvironment( - name="hello_world", - pod_template=pod_template, # Apply the custom pod template - image=flyte.Image.from_uv_script(__file__, name="flyte", pre=True), -) - -@env.task -async def say_hello(data: str) -> str: - return f"Hello {data}" - -@env.task -async def say_hello_nested(data: str = "default string") -> str: - return await say_hello(data=data) - -if __name__ == "__main__": - flyte.init_from_config() - result = flyte.run(say_hello_nested, data="hello world") - print(result.url) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/pod-templates/pod_template.py" fragment="pod-template" lang="python" >}} ## PodTemplate components The `PodTemplate` class provides the following parameters for customizing your pod configuration: -```python -pod_template = flyte.PodTemplate( - primary_container_name: str = "primary", - pod_spec: Optional[V1PodSpec] = None, - labels: Optional[Dict[str, str]] = None, - annotations: Optional[Dict[str, str]] = None -) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/pod-templates/pod_template_params.py" fragment="pod-template-params" lang="python" >}} ### Parameters @@ -103,5 +42,3 @@ pod_template = flyte.PodTemplate( - **`labels`** (`Optional[Dict[str, str]]`): Key-value pairs used for organizing and selecting pods. Labels are used by Kubernetes selectors and can be queried to filter and manage pods. - **`annotations`** (`Optional[Dict[str, str]]`): Additional metadata attached to the pod that doesn't affect pod scheduling or selection. Annotations are typically used for storing non-identifying information like deployment revisions, contact information, or configuration details. - - diff --git a/content/user-guide/task-configuration/resources.md b/content/user-guide/task-configuration/resources.md index 8d9d7a6b..0e03340e 100644 --- a/content/user-guide/task-configuration/resources.md +++ b/content/user-guide/task-configuration/resources.md @@ -20,15 +20,7 @@ If neither `TaskEnvironment` nor the task decorator specifies `resources`, the d The `Resources` dataclass provides the following initialization parameters: -```python -resources = Resources( - cpu: Union[int, float, str, Tuple[Union[int, float, str], Union[int, float, str]], None] = None, - memory: Union[str, Tuple[str, str], None] = None, - gpu: Union[str, int, Device, None] = None, # Accelerators string, count, or Device object - disk: Union[str, None] = None, - shm: Union[str, Literal["auto"], None] = None -) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="params" lang="python" >}} Each parameter is optional and allows you to specify different types of resources: @@ -44,46 +36,11 @@ Each parameter is optional and allows you to specify different types of resource Here's a complete example of defining a TaskEnvironment with resource specifications for a machine learning training workload: -```python -import flyte - -# Define a TaskEnvironment for ML training tasks -ml_training_env = flyte.TaskEnvironment( - name="ml-training", - resources=flyte.Resources( - cpu=("2", "8"), # Request 2 cores, allow up to 8 cores for scaling - memory=("8Gi", "32Gi"), # Request 8 GiB, allow up to 32 GiB for large datasets - gpu="A100:2", # 2 NVIDIA A100 GPUs for training - disk="50Gi", # 50 GiB ephemeral storage for checkpoints - shm="8Gi" # 8 GiB shared memory for efficient data loading - ) -) - -# Use the environment for tasks -@ml_training_env.task -async def train_model(dataset_path: str) -> str: - # This task will run with flexible resource allocation - return "model_trained_successfully" -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="task-env" lang="python" >}} ### Usage in a task-specific override -```python -from flyte import task - -# Override resources for specific tasks -@task( - resources=flyte.Resources( - cpu="16", - memory="64Gi", - gpu="H100:2", - disk="50Gi", - shm="8Gi" - ) -) -async def heavy_training_task() -> str: - return "heavy_model_trained" -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="override" lang="python" >}} ## Resource types @@ -91,36 +48,13 @@ async def heavy_training_task() -> str: CPU can be specified in several formats: -```python -# String formats (Kubernetes-style) -flyte.Resources(cpu="500m") # 500 milliCPU (0.5 cores) -flyte.Resources(cpu="2") # 2 CPU cores -flyte.Resources(cpu="1.5") # 1.5 CPU cores - -# Numeric formats -flyte.Resources(cpu=1) # 1 CPU core -flyte.Resources(cpu=0.5) # 0.5 CPU cores - -# Request and limit ranges -flyte.Resources(cpu=("1", "2")) # Request 1 core, limit to 2 cores -flyte.Resources(cpu=(1, 4)) # Request 1 core, limit to 4 cores -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="cpu" lang="python" >}} ### Memory resources Memory specifications follow Kubernetes conventions: -```python -# Standard memory units -flyte.Resources(memory="512Mi") # 512 MiB -flyte.Resources(memory="1Gi") # 1 GiB -flyte.Resources(memory="2Gi") # 2 GiB -flyte.Resources(memory="500M") # 500 MB (decimal) -flyte.Resources(memory="1G") # 1 GB (decimal) - -# Request and limit ranges -flyte.Resources(memory=("1Gi", "4Gi")) # Request 1 GiB, limit to 4 GiB -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="memory" lang="python" >}} ### GPU resources @@ -128,41 +62,13 @@ Flyte supports various GPU types and configurations: #### Simple GPU allocation -```python -# Basic GPU count -flyte.Resources(gpu=1) # 1 GPU (any available type) -flyte.Resources(gpu=4) # 4 GPUs - -# Specific GPU types with quantity -flyte.Resources(gpu="T4:1") # 1 NVIDIA T4 GPU -flyte.Resources(gpu="A100:2") # 2 NVIDIA A100 GPUs -flyte.Resources(gpu="H100:8") # 8 NVIDIA H100 GPUs -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="gpu" lang="python" >}} #### Advanced GPU configuration You can also use the `GPU` helper class for more detailed configurations: -```python -# Using the GPU helper function -gpu_config = flyte.GPU(device="A100", quantity=2) -flyte.Resources(gpu=gpu_config) - -# GPU with memory partitioning (A100 only) -partitioned_gpu = flyte.GPU( - device="A100", - quantity=1, - partition="1g.5gb" # 1/7th of A100 with 5GB memory -) -flyte.Resources(gpu=partitioned_gpu) - -# A100 80GB with partitioning -large_partition = flyte.GPU( - device="A100 80G", - quantity=1, - partition="7g.80gb" # Full A100 80GB -) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="advanced-gpu" lang="python" >}} #### Supported GPU types - **T4**: Entry-level training and inference @@ -176,30 +82,13 @@ large_partition = flyte.GPU( You can also define custom devices if your infrastructure supports them: -```python -# Custom device configuration -custom_device = flyte.Device( - device="custom_accelerator", - quantity=2, - partition="large" -) - -resources = flyte.Resources(gpu=custom_device) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="custom" lang="python" >}} ### TPU resources For Google Cloud TPU workloads you can specify TPU resources using the `TPU` helper class: -```python -# TPU v5p configuration -tpu_config = flyte.TPU(device="V5P", partition="2x2x1") -flyte.Resources(gpu=tpu_config) # Note: TPUs use the gpu parameter - -# TPU v6e configuration -tpu_v6e = flyte.TPU(device="V6E", partition="4x4") -flyte.Resources(gpu=tpu_v6e) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="tpu" lang="python" >}} ### Storage resources @@ -210,23 +99,10 @@ These resources are essential for tasks that need temporary storage for processi Ephemeral disk storage provides temporary space for your tasks to store intermediate files, downloaded datasets, model checkpoints, and other temporary data. This storage is automatically cleaned up when the task completes. -```python -flyte.Resources(disk="10Gi") # 10 GiB ephemeral storage -flyte.Resources(disk="100Gi") # 100 GiB ephemeral storage -flyte.Resources(disk="1Ti") # 1 TiB for large-scale data processing - -# Common use cases -flyte.Resources(disk="50Gi") # ML model training with checkpoints -flyte.Resources(disk="200Gi") # Large dataset preprocessing -flyte.Resources(disk="500Gi") # Video/image processing workflows -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="disk" lang="python" >}} #### Shared memory Shared memory (`/dev/shm`) is a high-performance, RAM-based storage area that can be shared between processes within the same container. It's particularly useful for machine learning workflows that need fast data loading and inter-process communication. -```python -flyte.Resources(shm="1Gi") # 1 GiB shared memory (/dev/shm) -flyte.Resources(shm="auto") # Auto-sized shared memory -flyte.Resources(shm="16Gi") # Large shared memory for distributed training -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/resources/resources.py" fragment="shm" lang="python" >}} diff --git a/content/user-guide/task-configuration/reusable-containers.md b/content/user-guide/task-configuration/reusable-containers.md index 5c13cbfd..6d3685a4 100644 --- a/content/user-guide/task-configuration/reusable-containers.md +++ b/content/user-guide/task-configuration/reusable-containers.md @@ -47,50 +47,17 @@ When you configure a `TaskEnvironment` with a `ReusePolicy`, the system does the Enable container reuse by adding a `ReusePolicy` to your `TaskEnvironment`: -```python -import flyte - -# Currently required to enable resuable containers -reusable_image = flyte.Image.from_debian_base().with_pip_packages("unionai-reuse>=0.1.3") - -env = flyte.TaskEnvironment( - name="reusable-env", - resources=flyte.Resources(memory="1Gi", cpu="500m"), - reusable=flyte.ReusePolicy( - replicas=2, # Create 2 container instances - concurrency=1, # Process 1 task per container at a time - scaledown_ttl=timedelta(minutes=10), # Individual containers shut down after 5 minutes of inactivity - idle_ttl=timedelta(hours=1) # Entire environment shuts down after 30 minutes of no tasks - ), - image=reusable_image # Use the container image augmented with the unionai-reuse library. -) - -@env.task -async def compute_task(x: int) -> int: - return x * x - -@env.task -async def main() -> list[int]: - # These tasks will reuse containers from the pool - results = [] - for i in range(10): - result = await compute_task(i) - results.append(result) - return results -``` +{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/example.py" fragment="first-example" lang="python" >}} +{{< markdown >}} ## `ReusePolicy` parameters The `ReusePolicy` class controls how containers are managed in a reusable environment: -```python -flyte.ReusePolicy( - replicas: typing.Union[int, typing.Tuple[int, int]], - concurrency: int, - scaledown_ttl: typing.Union[int, datetime.timedelta], - idle_ttl: typing.Union[int, datetime.timedelta] -) -``` +{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/example.py" fragment="policy-params" lang="python" >}} +{{< markdown >}} ### `replicas`: Container pool size @@ -104,23 +71,9 @@ Controls the number of container instances in the reusable pool: - If demand drops again, container 4 will be also shutdown after another period of `scaledown_ttl` expires. - **Resource impact**: Each replica consumes the full resources defined in `TaskEnvironment.resources`. -```python -# Fixed pool size -reuse_policy = flyte.ReusePolicy( - replicas=3, - concurrency=1, - scaledown_ttl=timedelta(minutes=10), - idle_ttl=timedelta(hours=1) -) - -# Auto-scaling pool -reuse_policy = flyte.ReusePolicy( - replicas=(1, 10), - concurrency=1, - scaledown_ttl=timedelta(minutes=10), - idle_ttl=timedelta(hours=1) -) -``` +{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/example.py" fragment="replicas" lang="python" >}} +{{< markdown >}} ### `concurrency`: Tasks per container @@ -130,23 +83,9 @@ Controls how many tasks can execute simultaneously within a single container: - **Higher concurrency**: `concurrency=5` allows 5 tasks to run simultaneously in each container. - **Total capacity**: `replicas × concurrency` = maximum concurrent tasks across the entire pool. -```python -# Sequential processing (default) -sequential_policy = flyte.ReusePolicy( - replicas=2, - concurrency=1, # One task per container - scaledown_ttl=timedelta(minutes=10), - idle_ttl=timedelta(hours=1) -) - -# Concurrent processing -concurrent_policy = flyte.ReusePolicy( - replicas=2, - concurrency=5, # 5 tasks per container = 10 total concurrent tasks - scaledown_ttl=timedelta(minutes=10), - idle_ttl=timedelta(hours=1) -) -``` +{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/example.py" fragment="concurrency" lang="python" >}} +{{< markdown >}} ### `idle_ttl` vs `scaledown_ttl`: Container lifecycle @@ -166,33 +105,17 @@ These parameters work together to manage container lifecycle at different levels - **Purpose**: Prevents resource waste from inactive containers. - **Typical values**: 5-30 minutes for most workloads. - -```python -from datetime import timedelta - -lifecycle_policy = flyte.ReusePolicy( - replicas=3, - concurrency=2, - scaledown_ttl=timedelta(minutes=10), # Individual containers shut down after 10 minutes of inactivity - idle_ttl=timedelta(hours=1) # Entire environment shuts down after 1 hour of no tasks -) -``` +{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/example.py" fragment="ttl" lang="python" >}} +{{< markdown >}} ## Understanding parameter relationships The four `ReusePolicy` parameters work together to control different aspects of container management: -```python -reuse_policy = flyte.ReusePolicy( - replicas=4, # Infrastructure: How many containers? - concurrency=3, # Throughput: How many tasks per container? - scaledown_ttl=timedelta(minutes=10), # Individual: When do idle containers shut down? - idle_ttl=timedelta(hours=1) # Environment: When does the whole pool shut down? -) -# Total capacity: 4 × 3 = 12 concurrent tasks -# Individual containers shut down after 10 minutes of inactivity -# Entire environment shuts down after 1 hour of no tasks -``` +{{< /markdown >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/example.py" fragment="param-relationship" lang="python" >}} +{{< markdown >}} ### Key relationships @@ -201,7 +124,6 @@ reuse_policy = flyte.ReusePolicy( - **Cost efficiency**: Higher `concurrency` reduces container overhead, more `replicas` provides better isolation - **Lifecycle management**: `scaledown_ttl` manages individual containers, `idle_ttl` manages the environment - ## Simple example Here is a simple, but complete, example of reuse with concurrency @@ -209,24 +131,23 @@ Here is a simple, but complete, example of reuse with concurrency First, import the needed modules, set upf logging: {{< /markdown >}} -{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/reuse-concurrency.py" fragment="import" lang="python" >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/reuse_concurrency.py" fragment="import" lang="python" >}} {{< markdown >}} Next, we set up the reusable task environment. Note that, currently, the image used for a reusable environment requires an extra package to be installed: {{< /markdown >}} -{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/reuse-concurrency.py" fragment="env" lang="python" >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/reuse_concurrency.py" fragment="env" lang="python" >}} {{< markdown >}} Now, we define the `reuse_concurrency` task (the main driver task of the workflow) and the `noop` task that will be executed multiple times reusing the same containers: {{< /markdown >}} -{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/reuse-concurrency.py" fragment="tasks" lang="python" >}} +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/reuse_concurrency.py" fragment="tasks" lang="python" >}} {{< markdown >}} Finally, we deploy and run the workflow programmatically, so all you have to do is execute `python reuse_concurrency.py` to see it in action: {{< /markdown >}} -{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/reuse-concurrency.py" fragment="run" lang="python" >}} - +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/reusable-containers/reuse_concurrency.py" fragment="run" lang="python" >}} {{< /variant >}} diff --git a/content/user-guide/task-configuration/secrets.md b/content/user-guide/task-configuration/secrets.md index 86f31cdf..7681a7fe 100644 --- a/content/user-guide/task-configuration/secrets.md +++ b/content/user-guide/task-configuration/secrets.md @@ -83,20 +83,7 @@ To use a literal string secret, specify it in the `TaskEnvironment` along with t You can then access it using `os.getenv()` in your task code. For example: -```python -env = flyte.TaskEnvironment( - name="my_task_env", - secrets=[ - flyte.Secret(key="MY_SECRET_KEY", as_env_var="MY_SECRET_ENV_VAR"), - ] -) - -@env.task -def t1(): - my_secret_value = os.getenv("MY_SECRET_ENV_VAR") - # Do something with the secret - ... -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/secrets/secrets.py" fragment="literal" lang="python" >}} ## Using a file secret @@ -106,22 +93,7 @@ The file will be mounted at `/etc/flyte/secrets/`. For example: - -```python -env = flyte.TaskEnvironment( - name="my_task_env", - secrets=[ - flyte.Secret(key="MY_SECRET_KEY", mount="/etc/flyte/secrets"), - ] -) - -@env.task -def t1(): - with open("/etc/flyte/secrets/MY_SECRET_KEY", "r") as f: - my_secret_file_content = f.read() - # Do something with the secret file content - ... -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-configuration/secrets/secrets.py" fragment="file" lang="python" >}} > [!NOTE] > Currently, to access a file secret you must specify a `mount` parameter value of `"/etc/flyte/secrets"`. diff --git a/content/user-guide/task-programming/dataclasses-and-structures.md b/content/user-guide/task-programming/dataclasses-and-structures.md index 7a31e3f3..385aaf3a 100644 --- a/content/user-guide/task-programming/dataclasses-and-structures.md +++ b/content/user-guide/task-programming/dataclasses-and-structures.md @@ -17,98 +17,13 @@ This makes them ideal for configuration objects, metadata, and smaller structure This example demonstrates how dataclasses and Pydantic models work together as materialized data types, showing nested structures and batch processing patterns: -```python -import asyncio -from dataclasses import dataclass -from typing import List - -from pydantic import BaseModel - -import flyte - -env = flyte.TaskEnvironment(name="ex-mixed-structures") - - -@dataclass -class InferenceRequest: - feature_a: float - feature_b: float - - -@dataclass -class BatchRequest: - requests: List[InferenceRequest] - batch_id: str = "default" - - -class PredictionSummary(BaseModel): - predictions: List[float] - average: float - count: int - batch_id: str - - -@env.task -async def predict_one(request: InferenceRequest) -> float: - """ - A dummy linear model: prediction = 2 * feature_a + 3 * feature_b + bias(=1.0) - """ - return 2.0 * request.feature_a + 3.0 * request.feature_b + 1.0 - - -@env.task -async def process_batch(batch: BatchRequest) -> PredictionSummary: - """ - Processes a batch of inference requests and returns summary statistics. - """ - # Process all requests concurrently - tasks = [predict_one(request=req) for req in batch.requests] - predictions = await asyncio.gather(*tasks) - - # Calculate statistics - average = sum(predictions) / len(predictions) if predictions else 0.0 - - return PredictionSummary( - predictions=predictions, - average=average, - count=len(predictions), - batch_id=batch.batch_id - ) - - -@env.task -async def summarize_results(summary: PredictionSummary) -> str: - """ - Creates a text summary from the prediction results. - """ - return ( - f"Batch {summary.batch_id}: " - f"Processed {summary.count} predictions, " - f"average value: {summary.average:.2f}" - ) - - -if __name__ == "__main__": - flyte.init_from_config() - - batch_req = BatchRequest( - requests=[ - InferenceRequest(feature_a=1.0, feature_b=2.0), - InferenceRequest(feature_a=3.0, feature_b=4.0), - InferenceRequest(feature_a=5.0, feature_b=6.0), - ], - batch_id="demo_batch_001" - ) - - run = flyte.run(summarize_results, flyte.run(process_batch, batch_req)) - print(f"Run URL: {run.url}") -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/dataclasses-and-structures/example.py" lang="python" >}} ## Running the example You can run the example with python like this: -```bash +```shell python dataclasses-and-structures.py ``` @@ -118,7 +33,7 @@ It uses the example `BatchRequest` dataclass defined in the code as input to the Alternatively, you can run the example using the `flyte` CLI: -```bash +```shell flyte run dataclass_example.py process_batch --batch '{"requests": [{"feature_a": 1, "feature_b": 2}, {"feature_a": 3, "feature_b": 4}], "batch_id": "test_batch"}' ``` @@ -126,6 +41,6 @@ Here, the `--batch` argument is used because it matches the name of the paramete You could then take the output of that run (a `PredictionSummary` Pydantic model) and pass it to the `summarize_results` task like this: -```bash +```shell flyte run dataclass_example.py summarize_results --summary '{"predictions": [8.0, 18.0], "average": 13.0, "count": 2, "batch_id": "test_batch"}' ``` diff --git a/content/user-guide/task-programming/dataframes.md b/content/user-guide/task-programming/dataframes.md index 9117ad76..b8ffccfd 100644 --- a/content/user-guide/task-programming/dataframes.md +++ b/content/user-guide/task-programming/dataframes.md @@ -10,170 +10,38 @@ By default, return values in Python are materialized - meaning the actual data i To avoid downloading large datasets into memory, Flyte V2 exposes [`flyte.io.dataframe`](../../api-reference/flyte-sdk/packages/flyte.io#flyteiodataframe): a thin, uniform wrapper type for DataFrame-style objects that allows you to pass a reference to the data, rather than the fully materialized contents. -The `flyte.io.DataFrame` type provides serialization support for common engines like `pandas`, `polars`, `pyarrow`, `dask`, etc.; enabling you to move data between different DataFrame backends. +The `flyte.io.DataFrame` type provides serialization support for common engines like `pandas`, `polars`, `pyarrow`, `dask`, etc.; enabling you to move data between different DataFrame backends. ## Constructing a flyte.io.DataFrame - Use the `from_df` method to create a `flyte.io.DataFrame` from a native object: -```python -@env.task -async def create_flyte_dataframe() -> Annotated[flyte.io.DataFrame, "csv"]: - pd_df = pd.DataFrame(ADDL_EMPLOYEE_DATA) - fdf = flyte.io.DataFrame.from_df(pd_df) - return fdf -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/dataframes/dataframes.py" fragment="from_df" lang="python" >}} ## Declaring DataFrame inputs and outputs To declare a task that returns a native pandas DataFrame, you can use `pd.DataFrame` directly in the signature, the SDK will treat the return as a DataFrame-type output and upload it at task completion. -```python -@env.task -async def create_raw_dataframe() -> pd.DataFrame: -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/dataframes/dataframes.py" fragment="native" lang="python" >}} To use the unified `flyte.io.DataFrame` wrapper (recommended when you want to be explicit about the DataFrame type and storage format), use an `Annotated` type where the second argument encodes format or other lightweight hints. -## Example - -```python -from typing import Annotated -import pandas as pd -import flyte.io - -def my_task() -> Annotated[flyte.io.DataFrame, "parquet"]: - # create a pandas DataFrame and convert it to a flyte DataFrame - df = pd.DataFrame(...) - return flyte.io.DataFrame.from_df(df) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/dataframes/dataframes.py" fragment="annotated" lang="python" >}} ## Reading a DataFrame value inside a task When a task receives a `flyte.io.DataFrame`, you can open it and request a concrete backend representation. For example, to download as a pandas DataFrame: -```python -# Download all data at once -downloaded = await flyte_dataframe.open(pd.DataFrame).all() -# or in synchronous contexts: downloaded = flyte_dataframe.open(pd.DataFrame).all() -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/dataframes/dataframes.py" fragment="download" lang="python" >}} The `open(...)` call delegates to the DataFrame handler for the stored format and converts to the requested in-memory type. You can also leverage Flyte to automatically download and convert the dataframe between types when needed: - -```python -@env.task -async def get_employee_data(raw_dataframe: pd.DataFrame, flyte_dataframe: pd.DataFrame) -> pd.DataFrame: - """ - This task takes two dataframes as input. We'll pass one raw pandas dataframe, and one flyte.io.DataFrame. - Flyte automatically converts the flyte.io.DataFrame to a pandas DataFrame. The actual download and conversion - happens only when we access the data (in this case, when we do the merge).""" - joined_df = raw_dataframe.merge(flyte_dataframe, on="employee_id", how="inner") - - return joined_df - -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/dataframes/dataframes.py" fragment="automatic" lang="python" >}} ## Example — full usage -The following example ([code](https://github.com/flyteorg/flyte-sdk/blob/main/examples/basics/dataframe_usage.py)) demonstrates creating a raw pandas DataFrame, wrapping a pandas DataFrame into `flyte.io.DataFrame`, joining them inside another task, and running locally. - -```python -from typing import Annotated - -import numpy as np -import pandas as pd - -import flyte.io - -# Create task environment with required dependencies -img = flyte.Image.from_debian_base() -img = img.with_pip_packages("pandas", "pyarrow") - -env = flyte.TaskEnvironment( - "dataframe_usage", - image=img, - resources=flyte.Resources(cpu="1", memory="2Gi"), -) - -BASIC_EMPLOYEE_DATA = { - "employee_id": range(1001, 1009), - "name": ["Alice", "Bob", "Charlie", "Diana", "Ethan", "Fiona", "George", "Hannah"], - "department": ["HR", "Engineering", "Engineering", "Marketing", "Finance", "Finance", "HR", "Engineering"], - "hire_date": pd.to_datetime( - ["2018-01-15", "2019-03-22", "2020-07-10", "2017-11-01", "2021-06-05", "2018-09-13", "2022-01-07", "2020-12-30"] - ), -} - -ADDL_EMPLOYEE_DATA = { - "employee_id": range(1001, 1009), - "salary": [55000, 75000, 72000, 50000, 68000, 70000, np.nan, 80000], - "bonus_pct": [0.05, 0.10, 0.07, 0.04, np.nan, 0.08, 0.03, 0.09], - "full_time": [True, True, True, False, True, True, False, True], - "projects": [ - ["Recruiting", "Onboarding"], - ["Platform", "API"], - ["API", "Data Pipeline"], - ["SEO", "Ads"], - ["Budget", "Forecasting"], - ["Auditing"], - [], - ["Platform", "Security", "Data Pipeline"], - ], -} - - -@env.task -async def create_raw_dataframe() -> pd.DataFrame: - """ - This task creates a raw pandas DataFrame with basic employee information. - This is the most basic use-case of how to pass dataframes (of all kinds, not just pandas). Create the dataframe - as normal, and return it. Note that the output signature is of the dataframe library type. - Uploading of the actual bits of the dataframe (which for pandas is serialized to parquet) happens at the - end of the task, the TypeEngine uploads from memory to blob store. - """ - return pd.DataFrame(BASIC_EMPLOYEE_DATA) - - -@env.task -async def create_flyte_dataframe() -> Annotated[flyte.io.DataFrame, "csv"]: - """ - This task creates a Flyte DataFrame with compensation and project data. - Because there's no generic type in Python that means any dataframe type, Flyte ships with its own. The - flyte.io.DataFrame class is a thin wrapper around the various dataframe libraries (pandas, pyarrow, dask, etc). - """ - pd_df = pd.DataFrame(ADDL_EMPLOYEE_DATA) - - fdf = flyte.io.DataFrame.from_df(pd_df) - return fdf - - -@env.task -async def get_employee_data(raw_dataframe: pd.DataFrame, flyte_dataframe: pd.DataFrame) -> pd.DataFrame: - """ - This task takes two dataframes as input. We'll pass one raw pandas dataframe, and one flyte.io.DataFrame. - Flyte automatically converts the flyte.io.DataFrame to a pandas DataFrame. The actual download and conversion - happens only when we access the data (in this case, when we do the merge).""" - joined_df = raw_dataframe.merge(flyte_dataframe, on="employee_id", how="inner") - - return joined_df - - -if __name__ == "__main__": - import flyte.git - flyte.init_from_config(flyte.git.config_from_root()) - # Get the data sources - - raw_df = flyte.with_runcontext(mode="local").run(create_raw_dataframe) - flyte_df = flyte.with_runcontext(mode="local").run(create_flyte_dataframe) - - # Pass both to get_employee_data - Flyte auto-converts flyte.io.DataFrame to pd.DataFrame - run = flyte.with_runcontext(mode="local").run( - get_employee_data, - raw_dataframe=raw_df.outputs(), - flyte_dataframe=flyte_df.outputs(), - ) - print("Results:", run.outputs()) -``` \ No newline at end of file +The following example ([code](https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/full_usage.py)) demonstrates creating a raw pandas DataFrame, wrapping a pandas DataFrame into `flyte.io.DataFrame`, joining them inside another task, and running locally. + +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/dataframes/full_usage.py" lang="python" >}} diff --git a/content/user-guide/task-programming/fanout.md b/content/user-guide/task-programming/fanout.md index 21035f5f..847a6988 100644 --- a/content/user-guide/task-programming/fanout.md +++ b/content/user-guide/task-programming/fanout.md @@ -28,37 +28,7 @@ In Flyte terminology, each individual task execution is called an "action"—thi Here's a basic fanout example: -```python -# fanout.py - -import asyncio - -import flyte - -env = flyte.TaskEnvironment("large_fanout") - - -@env.task -async def my_task(x: int) -> int: - return x - - -@env.task -async def main(r: int): - results = [] - for i in range(r): - results.append(my_task(x=i)) - result = await asyncio.gather(*results) - - return result - - -if __name__ == "__main__": - flyte.init_from_config("config.yaml") - run = flyte.run(main, r=50) - print(run.url) - run.wait(run) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/fanout/fanout.py" fragment="basic" lang="python" >}} ## Fanout execution patterns @@ -66,123 +36,36 @@ if __name__ == "__main__": The most common fanout pattern is to collect task invocations and execute them in parallel using `asyncio.gather()`: -```python -@env.task -async def parallel_fanout_example(n: int) -> List[str]: - results = [] - - # Collect all task invocations first - for i in range(n): - results.append(my_async_task(i)) - - # Execute all tasks in parallel - final_results = await asyncio.gather(*results) - - return final_results -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/fanout/fanout.py" fragment="parallel" lang="python" >}} ### Sequential execution You can also implement fanout with sequential execution when you need to process tasks one at a time in order: -```python -@env.task -async def sequential_fanout_example(n: int) -> List[str]: - results = [] - - # Execute tasks one at a time in sequence - for i in range(n): - result = await my_async_task(i) # Await each task individually - results.append(result) - - return results -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/fanout/fanout.py" fragment="sequential" lang="python" >}} ### Mixed patterns You can combine both parallel and sequential patterns within the same workflow: -```python -@env.task -async def mixed_fanout_example(n: int) -> Tuple[List[str], List[str]]: - # First: parallel execution - parallel_tasks = [] - for i in range(n): - parallel_tasks.append(fast_task(i)) - parallel_results = await asyncio.gather(*parallel_tasks) - - # Second: sequential execution using results from parallel phase - sequential_results = [] - for result in parallel_results: - processed = await slow_processing_task(result) - sequential_results.append(processed) - - return parallel_results, sequential_results -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/fanout/fanout.py" fragment="mixed" lang="python" >}} ## Multi-phase fanout workflows Complex workflows often involve multiple fanout phases: -```python -@env.task -async def multi_phase_workflow(data_size: int) -> List[int]: - # First phase: data preprocessing - preprocessed = [] - for i in range(data_size): - preprocessed.append(preprocess_task(i)) - phase1_results = await asyncio.gather(*preprocessed) - - # Second phase: main processing - processed = [] - for result in phase1_results: - processed.append(process_task(result)) - phase2_results = await asyncio.gather(*processed) - - # Third phase: postprocessing - postprocessed = [] - for result in phase2_results: - postprocessed.append(postprocess_task(result)) - final_results = await asyncio.gather(*postprocessed) - - return final_results -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/fanout/fanout.py" fragment="multi-phase" lang="python" >}} ## Best practices for fanout -1. **Use appropriate parallelism**: Balance between parallelism and resource constraints - ```python - # For very large fanouts, consider batching - batch_size = 100 - for i in range(0, total_items, batch_size): - batch = items[i:i + batch_size] - batch_results = [] - for item in batch: - batch_results.append(process_task(item)) - await asyncio.gather(*batch_results) - ``` +1. **Use appropriate parallelism**: Balance between parallelism and resource constraint + {{< code file="/external/unionai-examples/v2/user-guide/task-programming/fanout/fanout.py" fragment="batching" lang="python" >}} 2. **Handle errors gracefully**: Use error handling strategies for large fanouts - ```python - # Use return_exceptions=True to handle failures gracefully - results = await asyncio.gather(*tasks, return_exceptions=True) - for i, result in enumerate(results): - if isinstance(result, Exception): - logger.error(f"Task {i} failed: {result}") - ``` + {{< code file="/external/unionai-examples/v2/user-guide/task-programming/fanout/fanout.py" fragment="errors" lang="python" >}} 3. **Consider memory usage**: Large fanouts can consume significant memory - ```python - # Process in chunks to manage memory - chunk_size = 1000 - all_results = [] - for chunk_start in range(0, total_size, chunk_size): - chunk_tasks = [] - for i in range(chunk_start, min(chunk_start + chunk_size, total_size)): - chunk_tasks.append(my_task(i)) - chunk_results = await asyncio.gather(*chunk_tasks) - all_results.extend(chunk_results) - ``` + {{< code file="/external/unionai-examples/v2/user-guide/task-programming/fanout/fanout.py" fragment="memory" lang="python" >}} ## When to use fanout diff --git a/content/user-guide/task-programming/files-and-directories.md b/content/user-guide/task-programming/files-and-directories.md index 1ad8e950..74bdbbd7 100644 --- a/content/user-guide/task-programming/files-and-directories.md +++ b/content/user-guide/task-programming/files-and-directories.md @@ -25,29 +25,7 @@ The `File` and `Dir` classes provide both `sync` and `async` methods to interact The examples below show the basic use-cases of uploading files and directories created locally, and using them as inputs to a task. -```python -import asyncio -import tempfile -from pathlib import Path - -import flyte -from flyte.io import Dir, File - -env = flyte.TaskEnvironment(name="files-and-folders") - - -@env.task -async def write_file(name: str) -> File: - - # Create a file and write some content to it - with open("test.txt", "w") as f: - f.write(f"hello world {name}") - - # Upload the file using flyte - uploaded_file_obj = await File.from_local("test.txt") - return uploaded_file_obj - -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/files-and-directories/file_and_dir.py" fragment="write-file" lang="python" >}} The upload happens when the [`from_local`](../../api-reference/flyte-sdk/packages/flyte.io#from_local) command is called. Because the upload would otherwise block execution, `from_local` is implemented as an `async` function. @@ -57,57 +35,10 @@ This is a slightly more complicated task that calls the task above to produce `F These are assembled into a directory and the `Dir` object is returned, also via invoking `from_local`. -```python -@env.task -async def write_and_check_files() -> Dir: - coros = [] - for name in ["Alice", "Bob", "Eve"]: - coros.append(write_file(name=name)) - - vals = await asyncio.gather(*coros) - temp_dir = tempfile.mkdtemp() - for file in vals: - async with file.open() as fh: - contents = fh.read() - print(f"File {file.path} contents: {contents}") - new_file = Path(temp_dir) / file.name - with open(new_file, "wb") as out: # noqa: ASYNC230 - out.write(contents) - print(f"Files written to {temp_dir}") - - # walk the directory and ls - for path in Path(temp_dir).iterdir(): - print(f"File: {path.name}") - - my_dir = await Dir.from_local(temp_dir) - return my_dir -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/files-and-directories/file_and_dir.py" fragment="write-and-check-files" lang="python" >}} Finally, these tasks show how to use an offloaded type as an input. Helper functions like `walk` and `open` have been added to the objects and do what you might expect. -```python -@env.task -async def check_dir(my_dir: Dir): - print(f"Dir {my_dir.path} contents:") - async for file in my_dir.walk(): - print(f"File: {file.name}") - async with file.open() as fh: - contents = fh.read() - print(f"Contents: {contents.decode('utf-8')}") - - -@env.task -async def create_and_check_dir(): - my_dir = await write_and_check_files() - await check_dir(my_dir=my_dir) - - -if __name__ == "__main__": - flyte.init_from_config() - run = flyte.run(create_and_check_dir) - print(run.name) - print(run.url) - run.wait(run) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/files-and-directories/file_and_dir.py" fragment="create-and-check-dir" lang="python" >}} diff --git a/content/user-guide/task-programming/grouping-actions.md b/content/user-guide/task-programming/grouping-actions.md index 84050d81..a7207d6d 100644 --- a/content/user-guide/task-programming/grouping-actions.md +++ b/content/user-guide/task-programming/grouping-actions.md @@ -32,13 +32,7 @@ Groups solve this by: Groups are declared using the [`flyte.group`](../../api-reference/flyte-sdk/packages/flyte#group) context manager. Any task invocations that occur within the `with flyte.group()` block are automatically associated with that group: -```python -with flyte.group("my-group-name"): - # All task invocations here belong to "my-group-name" - result1 = await task_a(data) - result2 = await task_b(data) - result3 = await task_c(data) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="simple" lang="python" >}} The key points about groups: @@ -56,159 +50,49 @@ The key points about groups: Group related sequential operations that logically belong together: -```python -@env.task -async def data_pipeline(raw_data: str) -> str: - with flyte.group("data-validation"): - validated_data = await validate_schema(raw_data) - validated_data = await check_data_quality(validated_data) - validated_data = await remove_duplicates(validated_data) - - with flyte.group("feature-engineering"): - features = await extract_features(validated_data) - features = await normalize_features(features) - features = await select_features(features) - - with flyte.group("model-training"): - model = await train_model(features) - model = await validate_model(model) - final_model = await save_model(model) - - return final_model -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="sequential" lang="python" >}} ### Parallel processing with groups Groups work well with parallel execution patterns: -```python -@env.task -async def parallel_processing_example(n: int) -> List[str]: - results = [] - - with flyte.group("parallel-processing"): - # Collect all task invocations first - for i in range(n): - results.append(my_async_task(i)) - - # Execute all tasks in parallel - final_results = await asyncio.gather(*results) - - return final_results -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="parallel" lang="python" >}} ### Multi-phase workflows Use groups to organize different phases of complex workflows: -```python -@env.task -async def multi_phase_workflow(data_size: int) -> List[int]: - # First phase: data preprocessing - preprocessed = [] - with flyte.group("preprocessing"): - for i in range(data_size): - preprocessed.append(preprocess_task(i)) - phase1_results = await asyncio.gather(*preprocessed) - - # Second phase: main processing - processed = [] - with flyte.group("main-processing"): - for result in phase1_results: - processed.append(process_task(result)) - phase2_results = await asyncio.gather(*processed) - - # Third phase: postprocessing - postprocessed = [] - with flyte.group("postprocessing"): - for result in phase2_results: - postprocessed.append(postprocess_task(result)) - final_results = await asyncio.gather(*postprocessed) - - return final_results -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="multi" lang="python" >}} ### Nested groups Groups can be nested to create hierarchical organization: -```python -@env.task -async def hierarchical_example(): - with flyte.group("machine-learning-pipeline"): - with flyte.group("data-preparation"): - cleaned_data = await clean_data(raw_data) - split_data = await split_dataset(cleaned_data) - - with flyte.group("model-experiments"): - with flyte.group("hyperparameter-tuning"): - best_params = await tune_hyperparameters(split_data) - - with flyte.group("model-training"): - model = await train_final_model(split_data, best_params) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="nested" lang="python" >}} ### Conditional grouping Groups can be used with conditional logic: -```python -@env.task -async def conditional_processing(use_advanced_features: bool): - base_result = await basic_processing() - - if use_advanced_features: - with flyte.group("advanced-features"): - enhanced_result = await advanced_processing(base_result) - optimized_result = await optimize_result(enhanced_result) - return optimized_result - else: - with flyte.group("basic-features"): - simple_result = await simple_processing(base_result) - return simple_result -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="conditional" lang="python" >}} ## Best practices for groups 1. **Meaningful names**: Use descriptive group names that indicate the purpose or phase - ```python - with flyte.group("feature-extraction"): - with flyte.group("model-training"): - with flyte.group("hyperparameter-sweep"): - ``` + + {{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="meaningful" lang="python" >}} 2. **Logical boundaries**: Group related operations together, but don't make groups too large - ```python - # Good: Group by logical phase - with flyte.group("data-validation"): - # All validation tasks - with flyte.group("feature-engineering"): - # All feature engineering tasks - ``` + {{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="logical" lang="python" >}} 3. **Consistent patterns**: Use consistent naming conventions across your workflows - ```python - # Use consistent prefixes or patterns - with flyte.group("phase-1-preprocessing"): - with flyte.group("phase-2-training"): - with flyte.group("phase-3-evaluation"): - ``` + + {{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="consistent" lang="python" >}} 4. **Appropriate granularity**: Balance between too many small groups and too few large groups - ```python - # Too granular - avoid - with flyte.group("step-1"): - task_a() - with flyte.group("step-2"): - task_b() - - # Better - logical grouping - with flyte.group("data-preparation"): - task_a() - task_b() - task_c() - ``` + + {{< code file="/external/unionai-examples/v2/user-guide/task-programming/grouping-actions/grouping.py" fragment="granularity" lang="python" >}} 5. **UI consideration**: Remember that groups are primarily for organization and visualization—they don't affect execution performance diff --git a/content/user-guide/task-programming/reports.md b/content/user-guide/task-programming/reports.md index 79fa9341..6d4f585a 100644 --- a/content/user-guide/task-programming/reports.md +++ b/content/user-guide/task-programming/reports.md @@ -38,27 +38,7 @@ Check (test) if implicit flush is performed at the end of the task execution. ## A simple example -```python -import flyte -import flyte.report - -env = flyte.TaskEnvironment(name="reports_example") - - -@env.task(report=True) -async def task1(): - await flyte.report.replace.aio("

The quick, brown fox jumps over a lazy dog.

") - tab2 = flyte.report.get_tab("Tab 2") - tab2.log.aio("

The quick, brown dog jumps over a lazy fox.

") - await flyte.report.flush.aio() - - -if __name__ == "__main__": - flyte.init_from_config("config.yaml") - r = flyte.run(task1) - print(r.name) - print(r.url) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/reports/simple.py" lang="python" >}} Here we define a task `task1` that logs some HTML content to the default tab and creates a new tab named "Tab 2" where it logs additional HTML content. The `flush` method is called to send the report to the backend. @@ -85,7 +65,7 @@ def get_html_content(): """ ``` -We exclude it here due to length. You can find it in the [source file](https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/globe_visualization.py). +(We exclude it here due to length. You can find it in the [source file](https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/globe_visualization.py)). Finally, we run the workflow: @@ -120,8 +100,8 @@ DATA_PROCESSING_DASHBOARD_HTML = """ """ ``` -We exclude it here due to length. You can find it in the [source file]( -https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/streaming_reports.py). +(We exclude it here due to length. You can find it in the [source file]( +https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/streaming_reports.py)). Finally, we define the task that renders the report (`data_processing_dashboard`), the driver task of the workflow (`main`), and the run logic: diff --git a/content/user-guide/task-programming/traces.md b/content/user-guide/task-programming/traces.md index 65770a2c..bbbeb50d 100644 --- a/content/user-guide/task-programming/traces.md +++ b/content/user-guide/task-programming/traces.md @@ -17,32 +17,7 @@ Flyte differentiates between tasks and traced functions: - **Tasks** (`@env.task`): The orchestration layer that manages workflow execution, caching, and resources. - **Traced functions** (`@flyte.trace`) = Helper functions that perform specific operations and create checkpoints. -```python -import flyte - -env = flyte.TaskEnvironment("my-app") - -# Traced helper functions - the main use case -@flyte.trace -async def call_llm(prompt: str) -> str: - """Helper function for LLM calls - traced for observability.""" - response = await llm_client.chat(prompt) - return response - -@flyte.trace -async def process_data(data: str) -> dict: - """Helper function for data processing - traced for checkpointing.""" - return await expensive_computation(data) - -# Tasks orchestrate traced helper functions -@env.task -async def research_workflow(topic: str) -> dict: - # Task coordinates the workflow - llm_result = await call_llm(f"Generate research plan for: {topic}") - processed_data = await process_data(llm_result) - - return {"topic": topic, "result": processed_data} -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/traces/traces.py" fragment="tasks-vs-traced" lang="python" >}} This division has the following benefits: @@ -56,26 +31,7 @@ This division has the following benefits: Traces only function within task execution contexts. They either fail or do nothing when called outside tasks: -```python -@flyte.trace -def sync_function(x: int) -> int: - return x * 2 - -@flyte.trace -async def async_function(x: int) -> int: - return x * 2 - -# ❌ Outside task context - neither work -sync_function(5) # Fails -await async_function(5) # Fails - -# ✅ Within task context - both work and are traced -@env.task -async def my_task(): - result1 = sync_function(5) # ✅ Traced! (Coming soon) - result2 = await async_function(5) # ✅ Traced! - return result1 + result2 -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/traces/traces.py" fragment="context" lang="python" >}} > [!NOTE] > Tracing of synchronous functions (within a task context) is coming soon. @@ -100,76 +56,13 @@ The trace decorator works with: - **Generator functions**: Functions that `yield` values. - **Async generators**: Functions that `async yield` values. -```python -@flyte.trace -def sync_process(data: str) -> str: - """Synchronous data processing.""" - return data.upper() - -@flyte.trace -async def async_api_call(url: str) -> dict: - """Asynchronous API call.""" - response = await http_client.get(url) - return response.json() - -@flyte.trace -def stream_data(items: list[str]): - """Generator function for streaming.""" - for item in items: - yield f"Processing: {item}" - -@flyte.trace -async def async_stream_llm(prompt: str): - """Async generator for streaming LLM responses.""" - async for chunk in llm_client.stream(prompt): - yield chunk -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/traces/traces.py" fragment="function-types" lang="python" >}} ## Task Orchestration Pattern The typical Flyte workflow follows this pattern: -```python -# Helper functions - traced for observability -@flyte.trace -async def search_web(query: str) -> list[dict]: - """Search the web and return results.""" - results = await search_api.query(query) - return results - -@flyte.trace -async def summarize_content(content: str) -> str: - """Summarize content using LLM.""" - summary = await llm_client.summarize(content) - return summary - -@flyte.trace -async def extract_insights(summaries: list[str]) -> dict: - """Extract insights from summaries.""" - insights = await analysis_service.extract_insights(summaries) - return insights - -# Task - orchestrates the traced helper functions -@env.task -async def research_pipeline(topic: str) -> dict: - """Main research pipeline task.""" - - # Each helper function creates a checkpoint - search_results = await search_web(f"research on {topic}") - - summaries = [] - for result in search_results: - summary = await summarize_content(result["content"]) - summaries.append(summary) - - final_insights = await extract_insights(summaries) - - return { - "topic": topic, - "insights": final_insights, - "sources_count": len(search_results) - } -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/traces/traces.py" fragment="pattern" lang="python" >}} **Benefits of this pattern:** - If `search_web` succeeds but `summarize_content` fails, resumption skips the search step @@ -192,40 +85,7 @@ Understanding how traces work with Flyte's other execution features: Lets use better typing for all of these examples, we have the opportunity to make this right for our users --> -```python -@env.task # Task-level caching enabled by default -async def data_pipeline(dataset_id: str) -> dict: - # 1. If this exact task with these inputs ran before, - # the entire task result is returned from cache - - # 2. If not cached, execution begins and each traced function - # creates checkpoints for resumption - cleaned_data = await traced_data_cleaning(dataset_id) # Checkpoint 1 - features = await traced_feature_extraction(cleaned_data) # Checkpoint 2 - model_results = await traced_model_training(features) # Checkpoint 3 - - # 3. If workflow fails at step 3, resumption will: - # - Skip traced_data_cleaning (checkpointed) - # - Skip traced_feature_extraction (checkpointed) - # - Re-run only traced_model_training - - return {"dataset_id": dataset_id, "accuracy": model_results["accuracy"]} - -@flyte.trace -async def traced_data_cleaning(dataset_id: str) -> list: - """Creates checkpoint after successful execution.""" - return await expensive_cleaning_process(dataset_id) - -@flyte.trace -async def traced_feature_extraction(data: list) -> dict: - """Creates checkpoint after successful execution.""" - return await expensive_feature_process(data) - -@flyte.trace -async def traced_model_training(features: dict) -> dict: - """Creates checkpoint after successful execution.""" - return await expensive_training_process(features) -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/traces/traces.py" fragment="caching-vs-checkpointing" lang="python" >}} ### Execution Flow @@ -246,27 +106,7 @@ Traces capture comprehensive execution information for debugging and monitoring: Add more examples of error handling with traces --> -```python -@flyte.trace -async def risky_api_call(endpoint: str, data: dict) -> dict: - """API call that might fail - traces capture errors.""" - try: - response = await api_client.post(endpoint, json=data) - return response.json() - except Exception as e: - # Error is automatically captured in trace - logger.error(f"API call failed: {e}") - raise - -@env.task -async def error_handling_workflow(): - try: - result = await risky_api_call("/process", {"invalid": "data"}) - return {"status": "success", "result": result} - except Exception as e: - # The error is recorded in the trace for debugging - return {"status": "error", "message": str(e)} -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/traces/traces.py" fragment="error-handling" lang="python" >}} **What traces capture:** - **Execution time**: Duration of each function call @@ -288,34 +128,7 @@ Use `@flyte.trace` for: ### Recommended Architecture -```python -# ✅ Traced helper functions for specific operations -@flyte.trace -async def call_llm(prompt: str, model: str) -> str: - """LLM interaction - traced for observability.""" - return await llm_client.chat(prompt, model=model) - -@flyte.trace -async def search_database(query: str) -> list[dict]: - """Database operation - traced for checkpointing.""" - return await db.search(query) - -@flyte.trace -async def process_results(data: list[dict]) -> dict: - """Data processing - traced for error tracking.""" - return await expensive_analysis(data) - -# ✅ Tasks that orchestrate traced functions -@env.task -async def research_workflow(topic: str) -> dict: - """Main workflow - coordinates traced operations.""" - search_results = await search_database(f"research: {topic}") - analysis_prompt = f"Analyze these results: {search_results}" - llm_analysis = await call_llm(analysis_prompt, "gpt-4") - final_results = await process_results([{"analysis": llm_analysis}]) - - return final_results -``` +{{< code file="/external/unionai-examples/v2/user-guide/task-programming/traces/traces.py" fragment="recommended" lang="python" >}} ### Performance Considerations diff --git a/external/unionai-examples b/external/unionai-examples index 27279c17..1db28dba 160000 --- a/external/unionai-examples +++ b/external/unionai-examples @@ -1 +1 @@ -Subproject commit 27279c17837486893afb672fa7ba2e55c2e2033a +Subproject commit 1db28dbad7ba06f7900505374299311c5c2f4e8d