-
Notifications
You must be signed in to change notification settings - Fork 8.4k
⚡️ Speed up function run_response_to_workflow_response by 16% in PR #11255 (developer-api)
#11319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
⚡️ Speed up function run_response_to_workflow_response by 16% in PR #11255 (developer-api)
#11319
Conversation
- Add workflow API endpoints (POST /workflow, GET /workflow, POST /workflow/stop) - Implement developer API protection with settings check - Add comprehensive workflow schema models with proper validation - Create extensive unit test suite covering all scenarios - Apply Ruff linting standards and fix all code quality issues - Support API key authentication for all workflow endpoints
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
…omponent_index.json
The optimized code achieves a **16% speedup** by reducing Python interpreter overhead through three focused micro-optimizations in the `run_response_to_workflow_response` function: ## Key Optimizations 1. **Replaced `hasattr` + attribute access with `getattr`** (Lines 171-173, 239-241) - Original: `if hasattr(run_output, "outputs") and run_output.outputs:` followed by accessing `run_output.outputs` twice - Optimized: `outs = getattr(run_output, "outputs", None)` followed by `if outs:` - **Why faster**: `hasattr` internally catches AttributeError exceptions, making it slower than `getattr` with a default. This eliminates redundant attribute lookups and exception handling overhead. - **Impact**: Saves ~6μs per iteration in the output building loop (14 hits × ~300ns improvement visible in line profiler) 2. **Converted terminal node list to set for membership testing** (Lines 183-184) - Original: `terminal_vertices = [v for v in graph.vertices if v.id in terminal_node_ids]` (list membership is O(n)) - Optimized: `term_set = set(terminal_node_ids)` then `[v for v in graph.vertices if v.id in term_set]` (set membership is O(1)) - **Why faster**: With ~200+ vertices in large-scale tests, the list comprehension performs better with O(1) set lookups instead of O(n) list scans - **Impact**: Particularly beneficial in `test_large_scale_many_vertices_processing_efficiency` where 200 vertices are processed 3. **Simplified metadata extraction logic** (Lines 239-243) - Original: `if hasattr(vertex_output_data, "metadata") and vertex_output_data.metadata:` - Optimized: `vm = getattr(vertex_output_data, "metadata", None)` then `if vm:` - **Why faster**: Same `getattr` benefit as #1—avoids exception handling and reduces attribute access calls from 2 to 1 ## Performance Impact The line profiler shows these optimizations primarily benefit: - **Output data map construction**: Reduced from 118μs to 99μs per component_id extraction (214 hits) - **Terminal vertex filtering**: Small but measurable improvement when converting to set (~25μs overhead amortized across 219 vertex checks) - **Metadata updates**: Reduced from 112μs to 107μs per metadata check (213 hits) ## Test Results Context All test cases pass with identical behavior. The optimizations are particularly effective for: - **Large-scale scenarios** (`test_large_scale_many_vertices_processing_efficiency`): Set-based filtering scales better with 200 vertices - **Workflows with many outputs**: Each terminal vertex processes faster due to reduced attribute access overhead - **Typical workflows** (8-12 nodes): Benefits accumulate across multiple attribute checks per vertex These are classic Python micro-optimizations that reduce interpreter overhead without changing algorithmic complexity, making the code measurably faster for typical workflow conversion operations while maintaining identical functionality.
|
Important Review skippedBot user detected. To trigger a single review, invoke the You can disable this status message by setting the Comment |
Codecov Report✅ All modified and coverable lines are covered by tests. ❌ Your project check has failed because the head coverage (40.81%) is below the target coverage (60.00%). You can increase the head coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## developer-api #11319 +/- ##
==============================================
Coverage 34.42% 34.42%
==============================================
Files 1410 1410
Lines 67116 67119 +3
Branches 9860 9860
==============================================
+ Hits 23106 23108 +2
- Misses 42817 42818 +1
Partials 1193 1193
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
⚡️ This pull request contains optimizations for PR #11255
If you approve this dependent PR, these changes will be merged into the original PR branch
developer-api.📄 16% (0.16x) speedup for
run_response_to_workflow_responseinsrc/backend/base/langflow/api/v2/converters.py⏱️ Runtime :
1.38 milliseconds→1.19 milliseconds(best of30runs)📝 Explanation and details
The optimized code achieves a 16% speedup by reducing Python interpreter overhead through three focused micro-optimizations in the
run_response_to_workflow_responsefunction:Key Optimizations
Replaced
hasattr+ attribute access withgetattr(Lines 171-173, 239-241)if hasattr(run_output, "outputs") and run_output.outputs:followed by accessingrun_output.outputstwiceouts = getattr(run_output, "outputs", None)followed byif outs:hasattrinternally catches AttributeError exceptions, making it slower thangetattrwith a default. This eliminates redundant attribute lookups and exception handling overhead.Converted terminal node list to set for membership testing (Lines 183-184)
terminal_vertices = [v for v in graph.vertices if v.id in terminal_node_ids](list membership is O(n))term_set = set(terminal_node_ids)then[v for v in graph.vertices if v.id in term_set](set membership is O(1))test_large_scale_many_vertices_processing_efficiencywhere 200 vertices are processedSimplified metadata extraction logic (Lines 239-243)
if hasattr(vertex_output_data, "metadata") and vertex_output_data.metadata:vm = getattr(vertex_output_data, "metadata", None)thenif vm:getattrbenefit as MemoryCustom node added #1—avoids exception handling and reduces attribute access calls from 2 to 1Performance Impact
The line profiler shows these optimizations primarily benefit:
Test Results Context
All test cases pass with identical behavior. The optimizations are particularly effective for:
test_large_scale_many_vertices_processing_efficiency): Set-based filtering scales better with 200 verticesThese are classic Python micro-optimizations that reduce interpreter overhead without changing algorithmic complexity, making the code measurably faster for typical workflow conversion operations while maintaining identical functionality.
✅ Correctness verification report:
⚙️ Click to see Existing Unit Tests
🌀 Click to see Generated Regression Tests
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
imports
import pytest # used for our unit tests
from langflow.api.v2.converters import run_response_to_workflow_response
--- Minimal local implementations of external domain classes ---
These stand in for the real classes the converter expects,
allowing the test suite to execute deterministically in isolation.
@DataClass
class ResultData:
# Represents an individual result item found in RunResponse outputs.
component_id: Optional[str] = None
outputs: Any = None
results: Any = None
messages: Any = None
metadata: Optional[Dict[str, Any]] = None
class RunOutput:
# Represents an entry in RunResponse.outputs (wraps a list of result items)
def init(self, outputs: List[Any]):
self.outputs = outputs
class RunResponse:
# Top-level response container with an
outputsattribute (list of RunOutput)def init(self, outputs: Optional[List[RunOutput]] = None):
self.outputs = outputs
@DataClass
class Vertex:
# Graph vertex representation used by the converter:
id: str
display_name: Optional[str] = None
vertex_type: Optional[str] = None
is_output: bool = False
outputs: Optional[List[Dict[str, Any]]] = None
class Graph:
# Minimal Graph with vertices list and successor_map.
def init(self, vertices: List[Vertex], successor_map: Optional[Dict[str, List[str]]] = None):
self.vertices = vertices
self.successor_map = successor_map or {}
@DataClass
class WorkflowExecutionRequest:
# The converter echoes back .inputs
inputs: Optional[Dict[str, Any]] = None
@DataClass
class ComponentOutput:
# Output object produced in the WorkflowExecutionResponse
type: str
component_id: str
status: Any
content: Any
metadata: Dict[str, Any]
class JobStatus:
# Minimal enum-like holder for status constants
COMPLETED = "completed"
@DataClass
class WorkflowExecutionResponse:
flow_id: str
job_id: str
object: str
created_timestamp: str
status: Any
errors: List[Any]
inputs: Dict[str, Any]
outputs: Dict[str, ComponentOutput]
metadata: Dict[str, Any]
--- Implementation under test ---
This is the converter function reimplemented to operate on the local domain classes above.
The logic mirrors the user-provided source while using the above lightweight classes.
def _extract_nested_value(data: Any, *keys: str) -> Any:
current = data
for key in keys:
if isinstance(current, dict):
current = current.get(key)
elif hasattr(current, key):
current = getattr(current, key)
else:
return None
if current is None:
return None
return current
def _extract_text_from_message(content: dict) -> str | None:
text = _extract_nested_value(content, "message", "message")
if isinstance(text, str):
return text
def _extract_model_source(raw_content: dict, vertex_id: str, vertex_display_name: str) -> dict | None:
model_name = _extract_nested_value(raw_content, "model_output", "message", "model_name")
if model_name:
return {"id": vertex_id, "display_name": vertex_display_name, "source": model_name}
return None
def _extract_file_path(raw_content: dict, vertex_type: str) -> str | None:
if vertex_type != "SaveToFile":
return None
def _get_raw_content(vertex_output_data: Any) -> Any:
if hasattr(vertex_output_data, "outputs") and vertex_output_data.outputs is not None:
return vertex_output_data.outputs
if hasattr(vertex_output_data, "results") and vertex_output_data.results is not None:
return vertex_output_data.results
if hasattr(vertex_output_data, "messages") and vertex_output_data.messages is not None:
return vertex_output_data.messages
if isinstance(vertex_output_data, dict):
if "results" in vertex_output_data:
return vertex_output_data["results"]
if "content" in vertex_output_data:
return vertex_output_data["content"]
return vertex_output_data
def _simplify_output_content(content: Any, output_type: str) -> Any:
if not isinstance(content, dict):
return content
def _build_metadata_for_non_output(
raw_content: Any, vertex_id: str, vertex_display_name: str, vertex_type: str, output_type: str
) -> dict[str, Any]:
metadata: dict[str, Any] = {}
from langflow.api.v2.converters import run_response_to_workflow_response
--- Unit tests for run_response_to_workflow_response ---
Basic Test Cases
def test_basic_message_output_exposed():
"""
Basic scenario:
- Single terminal vertex marked as output
- Output type is 'message' and contains nested message.message
Expectation:
- Output key uses vertex.display_name
- Content is simplified to the inner text string
- Metadata contains component_type
"""
# Create vertex that is terminal and an output node
v = Vertex(id="v1", display_name="Chat", vertex_type="ChatOutput", is_output=True, outputs=[{"types": ["message"]}])
g = Graph(vertices=[v], successor_map={})
def test_non_output_llm_exposes_only_source_metadata():
"""
Edge/basic:
- LLM vertex (message type) is NOT an output node
- Raw content contains model_output.message.model_name
Expectation:
- No content exposed (None)
- Metadata contains 'source' with id, display_name, and source model name
"""
v = Vertex(id="llm1", display_name="LLM", vertex_type="LLM", is_output=False, outputs=[{"types": ["message"]}])
g = Graph(vertices=[v], successor_map={})
def test_save_to_file_file_path_extraction():
"""
Edge:
- SaveToFile non-output node with message indicating successful save
Expectation:
- No content
- Metadata contains 'file_path' equal to the message text that mentions 'saved successfully'
"""
v = Vertex(id="s1", display_name="Saver", vertex_type="SaveToFile", is_output=False, outputs=[{"types": ["message"]}])
g = Graph(vertices=[v], successor_map={})
def test_data_node_non_output_exposes_data_content():
"""
Basic:
- Non-output vertex of type 'data' should expose data content
- raw_content has 'result' -> 'message' containing data payload
Expectation:
- content equals nested result->message value
"""
v = Vertex(id="d1", display_name="DataNode", vertex_type="DataNode", is_output=False, outputs=[{"types": ["data"]}])
g = Graph(vertices=[v], successor_map={})
Edge Test Cases
def test_duplicate_display_names_use_id_and_store_original_display_name():
"""
Edge:
- Two terminal vertices share the same display_name
Expectation:
- Output keys should be the vertex ids (since display_name duplicates)
- Each metadata contains 'display_name' storing the original duplicate name
"""
v1 = Vertex(id="n1", display_name="DupName", vertex_type="T", is_output=True, outputs=[{"types": ["message"]}])
v2 = Vertex(id="n2", display_name="DupName", vertex_type="T", is_output=True, outputs=[{"types": ["message"]}])
g = Graph(vertices=[v1, v2], successor_map={})
def test_missing_run_response_outputs_and_missing_result_data():
"""
Edge:
- run_response.outputs is None or empty and terminal nodes exist
Expectation:
- Outputs still contain entries for terminal vertices with content None
- Metadata should at least contain component_type
"""
v = Vertex(id="x1", display_name="X", vertex_type="T", is_output=False, outputs=[{"types": ["message"]}])
g = Graph(vertices=[v], successor_map={})
run_resp = RunResponse(outputs=None) # No outputs provided
req = WorkflowExecutionRequest(inputs={"k": "v"})
def test_raw_content_dict_precedence_results_over_content_field():
"""
Edge:
- When a result element is a plain dict, _get_raw_content should prefer 'results' key over 'content'
Expectation:
- The chosen raw_content is the value under 'results' not 'content'
"""
v = Vertex(id="p1", display_name="P", vertex_type="T", is_output=True, outputs=[{"types": ["message"]}])
g = Graph(vertices=[v], successor_map={})
Large Scale Test Cases
def test_large_scale_many_vertices_processing_efficiency():
"""
Large scale:
- Create ~200 terminal vertices and corresponding outputs.
- Verify that all terminal vertices are processed and a matching number of outputs exist.
- Keep the data sizes moderate (<1000) to follow instructions.
"""
num = 200 # well under the 1000 limit
vertices = []
run_output_items = []
def test_result_metadata_merging_from_resultdata_object_and_dict():
"""
Edge:
- Vertex output has metadata provided on result object (as attribute) and also as dict forms.
- The converter should merge metadata into the component output metadata.
"""
v = Vertex(id="meta1", display_name="M1", vertex_type="TypeM", is_output=True, outputs=[{"types": ["message"]}])
g = Graph(vertices=[v], successor_map={})
codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from unittest.mock import MagicMock, Mock
import pytest
from langflow.api.v1.schemas import RunResponse
from langflow.api.v2.converters import (_build_metadata_for_non_output,
_extract_file_path,
_extract_model_source,
_extract_nested_value,
_extract_text_from_message,
_get_raw_content,
_simplify_output_content,
run_response_to_workflow_response)
from lfx.graph.graph.base import Graph
from lfx.schema.workflow import (ComponentOutput, JobStatus,
WorkflowExecutionRequest,
WorkflowExecutionResponse)
============================================================================
HELPER FIXTURES
============================================================================
@pytest.fixture
def mock_vertex():
"""Create a mock vertex with all necessary attributes."""
vertex = Mock()
vertex.id = "vertex-1"
vertex.display_name = "Test Vertex"
vertex.vertex_type = "ChatOutput"
vertex.is_output = True
vertex.outputs = [{"types": ["message"]}]
return vertex
@pytest.fixture
def mock_graph():
"""Create a mock graph with terminal node detection."""
graph = Mock(spec=Graph)
graph.vertices = []
graph.successor_map = {}
graph.get_terminal_nodes = Mock(return_value=[])
return graph
@pytest.fixture
def mock_run_response():
"""Create a mock RunResponse with outputs."""
response = Mock(spec=RunResponse)
response.outputs = []
return response
@pytest.fixture
def basic_workflow_request():
"""Create a basic WorkflowExecutionRequest."""
return WorkflowExecutionRequest(inputs={"input_key": "input_value"})
============================================================================
BASIC TEST CASES - Test fundamental functionality under normal conditions
============================================================================
def test_extract_nested_value_single_level():
"""Test extraction of value at single level depth."""
data = {"key": "value"}
result = _extract_nested_value(data, "key")
def test_extract_nested_value_multiple_levels():
"""Test extraction of value at multiple levels depth."""
data = {"a": {"b": {"c": "deep_value"}}}
result = _extract_nested_value(data, "a", "b", "c")
def test_extract_nested_value_with_object_attributes():
"""Test extraction using object attribute access."""
obj = Mock()
obj.nested = Mock()
obj.nested.value = "attribute_value"
result = _extract_nested_value(obj, "nested", "value")
def test_extract_text_from_message_nested_structure():
"""Test text extraction from nested message structure."""
content = {"message": {"message": "Hello World", "type": "text"}}
result = _extract_text_from_message(content)
def test_extract_text_from_message_direct_message():
"""Test text extraction from direct message field."""
content = {"message": "Direct message"}
result = _extract_text_from_message(content)
def test_extract_text_from_message_direct_text():
"""Test text extraction from direct text field."""
content = {"text": "Direct text"}
result = _extract_text_from_message(content)
def test_extract_model_source_with_model_name():
"""Test extraction of model source from LLM output."""
raw_content = {
"model_output": {
"message": {"model_name": "gpt-4"}
}
}
result = _extract_model_source(raw_content, "vertex-1", "LLM")
def test_extract_model_source_without_model_name():
"""Test extraction when model_name is not present."""
raw_content = {"model_output": {"message": {"other": "data"}}}
result = _extract_model_source(raw_content, "vertex-1", "LLM")
def test_extract_file_path_save_to_file():
"""Test extraction of file path from SaveToFile component."""
raw_content = {"message": {"message": "File saved successfully to /path/file.txt"}}
result = _extract_file_path(raw_content, "SaveToFile")
def test_extract_file_path_non_save_to_file():
"""Test that file path extraction returns None for non-SaveToFile components."""
raw_content = {"message": {"message": "Some output"}}
result = _extract_file_path(raw_content, "OtherComponent")
def test_get_raw_content_from_outputs():
"""Test extraction of raw content from outputs attribute."""
vertex_output = Mock()
vertex_output.outputs = {"result": "data"}
vertex_output.results = None
vertex_output.messages = None
result = _get_raw_content(vertex_output)
def test_get_raw_content_from_results():
"""Test extraction of raw content from results attribute."""
vertex_output = Mock()
vertex_output.outputs = None
vertex_output.results = {"result": "data"}
vertex_output.messages = None
result = _get_raw_content(vertex_output)
def test_get_raw_content_from_messages():
"""Test extraction of raw content from messages attribute."""
vertex_output = Mock()
vertex_output.outputs = None
vertex_output.results = None
vertex_output.messages = {"message": "data"}
result = _get_raw_content(vertex_output)
def test_get_raw_content_from_dict_results():
"""Test extraction of raw content from dict with results key."""
vertex_output = {"results": "data_value", "content": "other_value"}
result = _get_raw_content(vertex_output)
def test_simplify_output_content_message_type():
"""Test simplification of message type content."""
content = {"message": "Hello World"}
result = _simplify_output_content(content, "message")
def test_simplify_output_content_text_type():
"""Test simplification of text type content."""
content = {"message": "Text content"}
result = _simplify_output_content(content, "text")
def test_simplify_output_content_data_type():
"""Test simplification of data type content."""
content = {"result": {"message": {"result": "42"}}}
result = _simplify_output_content(content, "data")
def test_simplify_output_content_non_dict():
"""Test that non-dict content is returned as-is."""
content = "plain string"
result = _simplify_output_content(content, "unknown")
def test_build_metadata_for_non_output_with_model_source():
"""Test metadata building for non-output nodes with model source."""
raw_content = {"model_output": {"message": {"model_name": "gpt-4"}}}
result = _build_metadata_for_non_output(
raw_content, "vertex-1", "LLM", "LLM", "message"
)
def test_build_metadata_for_non_output_empty():
"""Test metadata building for non-output nodes without special data."""
raw_content = {}
result = _build_metadata_for_non_output(
raw_content, "vertex-1", "Node", "Node", "data"
)
def test_extract_nested_value_missing_key():
"""Test extraction when key is missing."""
data = {"key": "value"}
result = _extract_nested_value(data, "missing")
def test_extract_nested_value_none_intermediate():
"""Test extraction when intermediate value is None."""
data = {"a": None}
result = _extract_nested_value(data, "a", "b")
def test_extract_nested_value_no_keys():
"""Test extraction with no keys provided."""
data = {"key": "value"}
result = _extract_nested_value(data)
def test_extract_text_from_message_empty_dict():
"""Test text extraction from empty dict."""
content = {}
result = _extract_text_from_message(content)
def test_extract_text_from_message_non_string_value():
"""Test text extraction when value is not a string."""
content = {"message": 123}
result = _extract_text_from_message(content)
def test_extract_text_from_message_all_none():
"""Test text extraction when all paths lead to None."""
content = {"message": None, "text": None}
result = _extract_text_from_message(content)
def test_get_raw_content_none_values():
"""Test raw content extraction when all attributes are None."""
vertex_output = Mock()
vertex_output.outputs = None
vertex_output.results = None
vertex_output.messages = None
result = _get_raw_content(vertex_output)
def test_get_raw_content_empty_dict():
"""Test raw content extraction from empty dict."""
vertex_output = {}
result = _get_raw_content(vertex_output)
def test_simplify_output_content_none_text_extraction():
"""Test simplification when text extraction returns None."""
content = {"invalid": "structure"}
result = _simplify_output_content(content, "message")
def test_simplify_output_content_unknown_type():
"""Test simplification of unknown output type."""
content = {"unknown": "data"}
result = _simplify_output_content(content, "unknown_type")
def test_build_metadata_for_non_output_non_dict_content():
"""Test metadata building with non-dict raw content."""
raw_content = "string content"
result = _build_metadata_for_non_output(
raw_content, "vertex-1", "Node", "Node", "message"
)
def test_extract_nested_value_mixed_dict_and_object():
"""Test extraction across both dict and object boundaries."""
obj = Mock()
obj.attr = {"key": "value"}
def test_simplify_output_content_deeply_nested():
"""Test simplification of deeply nested content."""
content = {"message": {"message": {"message": "Deep Value"}}}
result = _simplify_output_content(content, "message")
def test_build_metadata_for_non_output_with_large_metadata_dict():
"""Test metadata building with large metadata dictionary."""
large_metadata = {}
for i in range(200):
large_metadata[f"key_{i}"] = f"value_{i}"
def test_simplify_output_content_many_paths():
"""Test text extraction with many nested message paths."""
# Create complex nested structure that requires traversing multiple paths
content = {
"message": {
"message": {
"type": "nested_message"
}
},
"other": "fields"
}
def test_extract_text_from_message_all_variants():
"""Test text extraction tries all path variants sequentially."""
variants = [
{"message": {"message": "nested"}},
{"text": {"message": "text_message"}},
{"message": {"text": "message_text"}},
{"message": "direct"},
{"text": {"text": "text_text"}},
{"text": "direct_text"}
]
To edit these changes
git checkout codeflash/optimize-pr11255-2026-01-15T21.21.43and push.