Skip to content

Commit

Permalink
fix: Enhance vertex runnability logic with loop detection (#6309)
Browse files Browse the repository at this point in the history
* feat: add is_loop property to Vertex class for detecting looping outputs

* feat: improve vertex runnability logic for graph traversal

- Update `is_vertex_runnable` to handle loop vertices more robustly
- Modify `are_all_predecessors_fulfilled` to better manage cycle dependencies
- Change adjacency maps to use sets for more efficient predecessor/successor tracking

* refactor: change graph adjacency maps from lists to sets for improved performance

- Update graph data structures to use sets instead of lists for predecessor, successor, and parent-child maps
- Modify type hints and method signatures to reflect the change from list to set
- Improve graph traversal and vertex tracking efficiency by using set operations
  • Loading branch information
ogabrielluiz authored Feb 13, 2025
1 parent f3ddbcf commit f5a2c1c
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 41 deletions.
32 changes: 17 additions & 15 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ def __init__(

self.top_level_vertices: list[str] = []
self.vertex_map: dict[str, Vertex] = {}
self.predecessor_map: dict[str, list[str]] = defaultdict(list)
self.successor_map: dict[str, list[str]] = defaultdict(list)
self.predecessor_map: dict[str, set[str]] = defaultdict(set)
self.successor_map: dict[str, set[str]] = defaultdict(set)
self.in_degree_map: dict[str, int] = defaultdict(int)
self.parent_child_map: dict[str, list[str]] = defaultdict(list)
self.parent_child_map: dict[str, set[str]] = defaultdict(set)
self._run_queue: deque[str] = deque()
self._first_layer: list[str] = []
self._lock = asyncio.Lock()
Expand Down Expand Up @@ -469,10 +469,10 @@ def _add_edge(self, edge: EdgeData) -> None:
self.add_edge(edge)
source_id = edge["data"]["sourceHandle"]["id"]
target_id = edge["data"]["targetHandle"]["id"]
self.predecessor_map[target_id].append(source_id)
self.successor_map[source_id].append(target_id)
self.predecessor_map[target_id].add(source_id)
self.successor_map[source_id].add(target_id)
self.in_degree_map[target_id] += 1
self.parent_child_map[source_id].append(target_id)
self.parent_child_map[source_id].add(target_id)

def add_node(self, node: NodeData) -> None:
self._vertices.append(node)
Expand Down Expand Up @@ -1554,7 +1554,7 @@ async def process(
logger.debug("Graph processing complete")
return self

def find_next_runnable_vertices(self, vertex_successors_ids: list[str]) -> list[str]:
def find_next_runnable_vertices(self, vertex_successors_ids: set[str]) -> list[str]:
next_runnable_vertices = set()
for v_id in sorted(vertex_successors_ids):
if not self.is_vertex_runnable(v_id):
Expand Down Expand Up @@ -1692,7 +1692,7 @@ def get_all_successors(self, vertex: Vertex, *, recursive=True, flat=True, visit

def get_successors(self, vertex: Vertex) -> list[Vertex]:
"""Returns the successors of a vertex."""
return [self.get_vertex(target_id) for target_id in self.successor_map.get(vertex.id, [])]
return [self.get_vertex(target_id) for target_id in self.successor_map.get(vertex.id, set())]

def get_vertex_neighbors(self, vertex: Vertex) -> dict[Vertex, int]:
"""Returns the neighbors of a vertex."""
Expand Down Expand Up @@ -1952,7 +1952,8 @@ def sort_layer_by_avg_build_time(vertices_ids: list[str]) -> list[str]:
def is_vertex_runnable(self, vertex_id: str) -> bool:
"""Returns whether a vertex is runnable."""
is_active = self.get_vertex(vertex_id).is_active()
return self.run_manager.is_vertex_runnable(vertex_id, is_active=is_active)
is_loop = self.get_vertex(vertex_id).is_loop
return self.run_manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)

def build_run_map(self) -> None:
"""Builds the run map for the graph.
Expand Down Expand Up @@ -1984,7 +1985,8 @@ def find_runnable_predecessors(predecessor: Vertex) -> None:
return
visited.add(predecessor_id)
is_active = self.get_vertex(predecessor_id).is_active()
if self.run_manager.is_vertex_runnable(predecessor_id, is_active=is_active):
is_loop = self.get_vertex(predecessor_id).is_loop
if self.run_manager.is_vertex_runnable(predecessor_id, is_active=is_active, is_loop=is_loop):
runnable_vertices.append(predecessor_id)
else:
for pred_pred_id in self.run_manager.run_predecessors.get(predecessor_id, []):
Expand Down Expand Up @@ -2029,13 +2031,13 @@ def build_in_degree(self, edges: list[CycleEdge]) -> dict[str, int]:
return in_degree

@staticmethod
def build_adjacency_maps(edges: list[CycleEdge]) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
def build_adjacency_maps(edges: list[CycleEdge]) -> tuple[dict[str, set[str]], dict[str, set[str]]]:
"""Returns the adjacency maps for the graph."""
predecessor_map: dict[str, list[str]] = defaultdict(list)
successor_map: dict[str, list[str]] = defaultdict(list)
predecessor_map: dict[str, set[str]] = defaultdict(set)
successor_map: dict[str, set[str]] = defaultdict(set)
for edge in edges:
predecessor_map[edge.target_id].append(edge.source_id)
successor_map[edge.source_id].append(edge.target_id)
predecessor_map[edge.target_id].add(edge.source_id)
successor_map[edge.source_id].add(edge.target_id)
return predecessor_map, successor_map

def __to_dict(self) -> dict[str, dict[str, list[str]]]:
Expand Down
14 changes: 11 additions & 3 deletions src/backend/base/langflow/graph/graph/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from langflow.graph.schema import CHAT_COMPONENTS
from langflow.utils.lazy_load import LazyLoadDictBase

if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
from langflow.graph.vertex.vertex_types import CustomComponentVertex


class Finish:
def __bool__(self) -> bool:
Expand All @@ -22,7 +30,7 @@ def __init__(self) -> None:
self._types = _import_vertex_types

@property
def vertex_type_map(self):
def vertex_type_map(self) -> dict[str, type[Vertex]]:
return self.all_types_dict

def _build_dict(self):
Expand All @@ -32,15 +40,15 @@ def _build_dict(self):
"Custom": ["Custom Tool", "Python Function"],
}

def get_type_dict(self):
def get_type_dict(self) -> dict[str, type[Vertex]]:
types = self._types()
return {
"CustomComponent": types.CustomComponentVertex,
"Component": types.ComponentVertex,
**dict.fromkeys(CHAT_COMPONENTS, types.InterfaceVertex),
}

def get_custom_component_vertex_type(self):
def get_custom_component_vertex_type(self) -> type[CustomComponentVertex]:
return self._types().CustomComponentVertex


Expand Down
33 changes: 28 additions & 5 deletions src/backend/base/langflow/graph/graph/runnable_vertices_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,41 @@ def update_run_state(self, run_predecessors: dict, vertices_to_run: set) -> None
self.vertices_to_run.update(vertices_to_run)
self.build_run_map(self.run_predecessors, self.vertices_to_run)

def is_vertex_runnable(self, vertex_id: str, *, is_active: bool) -> bool:
"""Determines if a vertex is runnable."""
def is_vertex_runnable(self, vertex_id: str, *, is_active: bool, is_loop: bool = False) -> bool:
"""Determines if a vertex is runnable based on its active state and predecessor fulfillment."""
if not is_active:
return False
if vertex_id in self.vertices_being_run:
return False
if vertex_id not in self.vertices_to_run:
return False
return self.are_all_predecessors_fulfilled(vertex_id) or vertex_id in self.cycle_vertices

def are_all_predecessors_fulfilled(self, vertex_id: str) -> bool:
return not any(self.run_predecessors.get(vertex_id, []))
return self.are_all_predecessors_fulfilled(vertex_id, is_loop=is_loop)

def are_all_predecessors_fulfilled(self, vertex_id: str, *, is_loop: bool) -> bool:
"""Determines if all predecessors for a vertex have been fulfilled.
This method checks if a vertex is ready to run by verifying that either:
1. It has no pending predecessors that need to complete first
2. For vertices in cycles, none of its pending predecessors are also cycle vertices
(which would create a circular dependency)
Args:
vertex_id (str): The ID of the vertex to check
is_loop (bool): Whether the vertex is a loop
Returns:
bool: True if all predecessor conditions are met, False otherwise
"""
# Get pending predecessors, return True if none exist
if not (pending := self.run_predecessors.get(vertex_id, set())):
return True

# For cycle vertices, check if any pending predecessors are also in cycle
# Using set intersection is faster than iteration
if vertex_id in self.cycle_vertices:
return is_loop or not bool(pending.intersection(self.cycle_vertices))

return False

def remove_from_predecessors(self, vertex_id: str) -> None:
"""Removes a vertex from the predecessor list of its successors."""
Expand Down
16 changes: 8 additions & 8 deletions src/backend/base/langflow/graph/graph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,8 @@ def find_cycle_vertices(edges):
def layered_topological_sort(
vertices_ids: set[str],
in_degree_map: dict[str, int],
successor_map: dict[str, list[str]],
predecessor_map: dict[str, list[str]],
successor_map: dict[str, set[str]],
predecessor_map: dict[str, set[str]],
start_id: str | None = None,
cycle_vertices: set[str] | None = None,
is_input_vertex: Callable[[str], bool] | None = None, # noqa: ARG001
Expand Down Expand Up @@ -780,8 +780,8 @@ def get_sorted_vertices(
start_component_id: str | None = None,
graph_dict: dict[str, Any] | None = None,
in_degree_map: dict[str, int] | None = None,
successor_map: dict[str, list[str]] | None = None,
predecessor_map: dict[str, list[str]] | None = None,
successor_map: dict[str, set[str]] | None = None,
predecessor_map: dict[str, set[str]] | None = None,
is_input_vertex: Callable[[str], bool] | None = None,
get_vertex_predecessors: Callable[[str], list[str]] | None = None,
get_vertex_successors: Callable[[str], list[str]] | None = None,
Expand Down Expand Up @@ -826,18 +826,18 @@ def get_sorted_vertices(
successor_map = {}
for vertex_id in vertices_ids:
if get_vertex_successors is not None:
successor_map[vertex_id] = get_vertex_successors(vertex_id)
successor_map[vertex_id] = set(get_vertex_successors(vertex_id))
else:
successor_map[vertex_id] = []
successor_map[vertex_id] = set()

# Build predecessor_map if not provided
if predecessor_map is None:
predecessor_map = {}
for vertex_id in vertices_ids:
if get_vertex_predecessors is not None:
predecessor_map[vertex_id] = get_vertex_predecessors(vertex_id)
predecessor_map[vertex_id] = set(get_vertex_predecessors(vertex_id))
else:
predecessor_map[vertex_id] = []
predecessor_map[vertex_id] = set()

# If we have a stop component, we need to filter out all vertices
# that are not predecessors of the stop component
Expand Down
14 changes: 11 additions & 3 deletions src/backend/base/langflow/graph/vertex/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(
self.is_state = False
self.is_input = any(input_component_name in self.id for input_component_name in INPUT_COMPONENTS)
self.is_output = any(output_component_name in self.id for output_component_name in OUTPUT_COMPONENTS)
self._is_loop = None
self.has_session_id = None
self.custom_component = None
self.has_external_input = False
Expand All @@ -78,7 +79,7 @@ def __init__(
self.built_object: Any = UnbuiltObject()
self.built_result: Any = None
self.built = False
self._successors_ids: list[str] | None = None
self._successors_ids: set[str] | None = None
self.artifacts: dict[str, Any] = {}
self.artifacts_raw: dict[str, Any] = {}
self.artifacts_type: dict[str, str] = {}
Expand Down Expand Up @@ -109,6 +110,13 @@ def __init__(
output["name"] for output in self.outputs if isinstance(output, dict) and "name" in output
]

@property
def is_loop(self) -> bool:
"""Check if any output allows looping."""
if self._is_loop is None:
self._is_loop = any(output.get("allows_loop", False) for output in self.outputs)
return self._is_loop

def set_input_value(self, name: str, value: Any) -> None:
if self.custom_component is None:
msg = f"Vertex {self.id} does not have a component instance."
Expand Down Expand Up @@ -199,8 +207,8 @@ def successors(self) -> list[Vertex]:
return self.graph.get_successors(self)

@property
def successors_ids(self) -> list[str]:
return self.graph.successor_map.get(self.id, [])
def successors_ids(self) -> set[str]:
return self.graph.successor_map.get(self.id, set())

def __getstate__(self):
state = self.__dict__.copy()
Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/graph/vertex/vertex_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ def __init__(self, data: NodeData, graph):
self.is_state = False

@property
def successors_ids(self) -> list[str]:
def successors_ids(self) -> set[str]:
if self._successors_ids is None:
self.is_state = False
return super().successors_ids
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ def test_is_vertex_runnable(data):
manager = RunnableVerticesManager.from_dict(data)
vertex_id = "A"
is_active = True
is_loop = False

result = manager.is_vertex_runnable(vertex_id, is_active=is_active)
result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)

assert result is False

Expand All @@ -100,8 +101,9 @@ def test_is_vertex_runnable__wrong_is_active(data):
manager = RunnableVerticesManager.from_dict(data)
vertex_id = "A"
is_active = False
is_loop = False

result = manager.is_vertex_runnable(vertex_id, is_active=is_active)
result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)

assert result is False

Expand All @@ -110,8 +112,9 @@ def test_is_vertex_runnable__wrong_vertices_to_run(data):
manager = RunnableVerticesManager.from_dict(data)
vertex_id = "D"
is_active = True
is_loop = False

result = manager.is_vertex_runnable(vertex_id, is_active=is_active)
result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)

assert result is False

Expand All @@ -120,26 +123,29 @@ def test_is_vertex_runnable__wrong_run_predecessors(data):
manager = RunnableVerticesManager.from_dict(data)
vertex_id = "C"
is_active = True
is_loop = False

result = manager.is_vertex_runnable(vertex_id, is_active=is_active)
result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)

assert result is False


def test_are_all_predecessors_fulfilled(data):
manager = RunnableVerticesManager.from_dict(data)
vertex_id = "A"
is_loop = False

result = manager.are_all_predecessors_fulfilled(vertex_id)
result = manager.are_all_predecessors_fulfilled(vertex_id, is_loop=is_loop)

assert result is True


def test_are_all_predecessors_fulfilled__wrong(data):
manager = RunnableVerticesManager.from_dict(data)
vertex_id = "D"
is_loop = False

result = manager.are_all_predecessors_fulfilled(vertex_id)
result = manager.are_all_predecessors_fulfilled(vertex_id, is_loop=is_loop)

assert result is False

Expand Down

0 comments on commit f5a2c1c

Please sign in to comment.