Skip to content

Conversation

kumare3
Copy link
Contributor

@kumare3 kumare3 commented Jul 31, 2025

This PR adds a new module to Flyte that enables compiling Python functions into executable DAGs (Directed Acyclic Graphs) using a simple, intuitive interface. The core of this system is the use of Promises to symbolically represent the flow of data between tasks, allowing for automatic dependency tracking and parallelization.

Key Components

  • Promises:
    A Promise object represents a future value produced by a task. As the user writes their workflow as a Python function, function parameters and intermediate results become Promise objects. These are tracked during compilation to construct the DAG and its data dependencies.

  • DAG Compilation (dsl.compile):
    The compile function takes a standard Python function (written as a workflow) and replaces its parameters with Promise objects. As the function executes (with Promises instead of actual values), every task call is intercepted, nodes and edges are added, and the final DAG structure is constructed using networkx.

  • DAG, Nodes, and Edges:

    • DAGNode: Represents a task or a special node (Start/End).
    • DAGEdge: Represents a dependency (data flow) between nodes.
    • DAG: Wraps the entire computational graph, with methods to add tasks, dependencies, and visualize the graph.
  • Testing:
    Comprehensive tests demonstrate correct compilation, error detection (such as calling non-task functions), and expected node/edge counts in various workflow structures.

How Compilation Works

  1. Mock Promises: The function’s parameters are replaced with Promise objects.
  2. Task Call Interception: Calls to TaskTemplate instances (i.e., tasks) are monkey-patched so that instead of executing, they register themselves in the DAG and return Promises for their outputs.
  3. Dependency Tracking: Whenever a task receives a Promise as input, a dependency edge is created from the producing node to the consuming node.
  4. Error Handling: If a function attempts to operate on a Promise with a non-task function, an error is raised, ensuring that only decorated tasks can be part of the compiled DAG.

Example Usage

Suppose you have two tasks, add and double, and want to define a workflow that chains them:

import flyte
import flyte.dsl as dsl

env = flyte.TaskEnvironment(name="example_dag")

@env.task
def add(a: int, b: int) -> int:
    return a + b

@env.task
def double(x: int) -> int:
    return x * 2

def my_workflow(x: int, y: int) -> int:
    sum_result = add(x, y)
    doubled = double(sum_result)
    return add(sum_result, doubled)

compiled_dag = dsl.compile(my_workflow)
  • What happens here?
    • dsl.compile replaces x and y with Promises.
    • Each call to add or double registers a node in the DAG, with dependencies based on Promises.
    • The resulting compiled_dag contains the entire workflow structure, which can be visualized or executed.

Error Detection

If the workflow calls a function that is not a decorated Flyte task, compilation fails with an informative error—ensuring full compatibility and reusability.

Visualization

The DAG supports conversion to DOT/Graphviz for visualization, so users can inspect their workflow structure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant