-
Notifications
You must be signed in to change notification settings - Fork 8.4k
feat: Implement initial Langflow Assistant API with streaming support #11372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis pull request introduces the Langflow Agentic API feature with router endpoints, request/response schemas, helper utilities for code extraction and validation, and services for flow execution with validation and streaming support. It includes comprehensive unit test coverage and integrates the agentic router into the main API while enriching provider metadata. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Router as /assist<br/>Endpoint
participant FlowService as Flow Execution<br/>Service
participant Validator as Code<br/>Validator
participant RetryLogic as Retry<br/>Handler
Client->>Router: POST /assist with AssistantRequest
Router->>FlowService: execute_flow_with_validation()
loop Attempt 1 to max_retries
FlowService->>FlowService: execute_flow_file()
FlowService->>FlowService: extract_python_code()
FlowService->>Validator: validate_component_code(code)
alt Validation Success
Validator-->>FlowService: ValidationResult(is_valid=true)
FlowService-->>Router: Return result with class_name
Router-->>Client: Success response
else Validation Fails
Validator-->>FlowService: ValidationResult(is_valid=false, error)
FlowService->>RetryLogic: Check retries remaining
alt Retries Available
RetryLogic->>FlowService: Inject error context into next input
FlowService->>FlowService: execute_flow_file() [RETRY]
else No Retries Left
RetryLogic-->>FlowService: Return last result with validation_error
FlowService-->>Router: Return with error details
Router-->>Client: Error response
end
end
end
sequenceDiagram
participant Client
participant Router as /assist/stream<br/>Endpoint
participant EventQueue as Event<br/>Queue
participant FlowRunner as Flow<br/>Runner
participant SSEFormatter as SSE<br/>Formatter
Client->>Router: POST /assist/stream (async)
Router->>EventQueue: Create async queue
Router->>FlowRunner: Launch background flow runner
par FlowRunner Background Task
FlowRunner->>FlowRunner: execute_flow_file(model injected)
FlowRunner->>EventQueue: Queue token events
FlowRunner->>FlowRunner: extract_python_code()
FlowRunner->>EventQueue: Queue progress: extracting_code
FlowRunner->>EventQueue: Queue progress: validating
alt Code Valid
FlowRunner->>EventQueue: Queue progress: validated
FlowRunner->>EventQueue: Queue complete event
else Code Invalid & Retries
FlowRunner->>EventQueue: Queue progress: validation_failed
FlowRunner->>EventQueue: Queue progress: retrying
FlowRunner->>FlowRunner: Retry with error context
end
and EventQueue Consumption
Router->>EventQueue: Consume events (streaming)
EventQueue-->>SSEFormatter: Get next event
SSEFormatter-->>Router: Format as SSE data line
Router-->>Client: Stream SSE chunk
Note over Router,Client: Repeat until completion
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Important Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 error, 2 warnings)
✅ Passed checks (4 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (16.66%) is below the target coverage (40.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #11372 +/- ##
==========================================
- Coverage 34.51% 34.50% -0.01%
==========================================
Files 1414 1414
Lines 67202 67214 +12
Branches 9910 9914 +4
==========================================
+ Hits 23193 23194 +1
- Misses 42793 42804 +11
Partials 1216 1216
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/lfx/src/lfx/base/models/unified_models.py (1)
1086-1155: Avoid defaulting to disabled-provider placeholders.
When Line 1088 finds no user default, the fallback tooptions[0]can select__enable_provider_*__entries (metadatais_disabled_provider), which later fail model instantiation. Skip placeholders when choosing a default and leave the value unset if no selectable models exist.🛠️ Proposed fix
- options = cached.get("options", []) - if options: + options = cached.get("options", []) + if options: + selectable_options = [ + opt + for opt in options + if not opt.get("metadata", {}).get("is_disabled_provider") + ] + if selectable_options: + options = selectable_options + else: + options = []
🤖 Fix all issues with AI agents
In `@src/backend/base/langflow/agentic/api/router.py`:
- Around line 242-244: The current logger.info call exposes raw assistant input
via input_preview (derived from request.input_value) when logging execution of
LANGFLOW_ASSISTANT_FLOW; replace this by not including raw content—either
downgrade to logger.debug and keep a redacted preview, or keep info level but
log only metadata such as len(request.input_value) and provider/model_name;
update the call that uses input_preview and logger.info to instead reference
safe fields (e.g. input_length = len(request.input_value or "") and/or a
constant "<redacted>") and use logger.debug if you must include any snippet.
- Around line 118-130: In get_unified_models_detailed() the display_name is
incorrectly read from the top-level model dict causing fallback to model_name;
instead extract display_name from metadata by replacing display_name =
model.get("display_name", model_name) with display_name =
metadata.get("display_name", model_name) inside the loop that builds model_list
(the block referencing model_name, display_name, metadata, is_deprecated,
is_not_supported, and appending to model_list).
- Around line 40-69: The execute_named_flow handler currently builds
flow_filename from the user-controlled flow_name which allows path traversal;
before constructing flow_filename (or passing flow_name to the executor),
validate and sanitize flow_name in execute_named_flow: reject any path
separators or traversal tokens (e.g., "..", "/" or "\"), or enforce a strict
whitelist pattern (e.g., only alphanumeric, hyphen, underscore) and return a
400/HTTPException for invalid names; alternatively normalize with
pathlib/os.path and ensure the resolved name equals the basename to prevent
escaping the flows directory—apply this check just before the line that sets
flow_filename and session_id.
- Around line 7-35: Ruff I001 is triggered by unsorted imports in this module;
reorder the imports into standard groups (stdlib, third-party, application) and
sort each group alphabetically so tools accept the file. Specifically, ensure
uuid is in the stdlib group; APIRouter, HTTPException, StreamingResponse, and
other fastapi imports are grouped and alphabetized; group lfx.* and langflow.*
imports (get_model_provider_variable_mapping, get_unified_models_detailed,
logger, CurrentActiveUser, DbSession, get_variable_service, CREDENTIAL_TYPE,
DatabaseVariableService, AssistantRequest, LANGFLOW_ASSISTANT_FLOW,
MAX_VALIDATION_RETRIES, execute_flow_with_validation,
execute_flow_with_validation_streaming, execute_flow_file, DEFAULT_MODELS,
PREFERRED_PROVIDERS, check_api_key, get_enabled_providers_for_user) into the
application/local group and sort them alphabetically within that group, then run
the formatter/ruff autofix to confirm the I001 error is resolved.
In `@src/backend/base/langflow/agentic/helpers/error_handling.py`:
- Around line 16-30: The function extract_friendly_error can raise
AttributeError when error_msg is None; add a guard at the start of
extract_friendly_error to handle None (or non-str) inputs by converting None to
an empty string or returning a safe default via _truncate_error_message, then
use that sanitized value for error_lower and subsequent checks; update
references to ERROR_PATTERNS and the final _truncate_error_message call to
operate on the sanitized string so the function never calls .lower() on None.
In `@src/backend/base/langflow/agentic/services/assistant_service.py`:
- Around line 32-103: Clamp negative max_retries to zero before the retry loop:
in execute_flow_with_validation, normalize the incoming max_retries parameter
(e.g., max_retries = max(0, max_retries)) or raise on invalid values so the
while loop always has a defined iteration plan and result is always assigned;
apply the same guard to the other related function that uses max_retries (the
similar retry/streaming function referenced in lines 106-135) to prevent
zero-iteration/undefined-result paths.
- Around line 3-13: Sort and wrap the imports to satisfy Ruff I001 and E501:
reorder imports into stdlib (asyncio, collections.abc.AsyncGenerator),
third-party (fastapi.HTTPException, lfx.log.logger), then local package imports,
and break long multi-member imports onto multiple lines using parentheses so
each line stays under the max line length; specifically split the long import of
format_complete_event, format_error_event, format_progress_event,
format_token_event and similarly split execute_flow_file,
execute_flow_file_streaming, extract_response_text across multiple lines,
keeping names like extract_python_code, extract_friendly_error,
validate_component_code unchanged but grouped correctly. Ensure import groups
are separated by a blank line and run the formatter/ruff afterwards.
In `@src/backend/base/langflow/agentic/services/flow_executor.py`:
- Around line 211-212: Replace the blocking await event_queue.put(None) in the
finally block with a non-blocking event_queue.put_nowait(None) and catch
asyncio.QueueFull (e.g., except asyncio.QueueFull: pass or log) so shutdown
cannot deadlock; ensure asyncio.QueueFull is referenced/imported and keep the
change at the spot where event_queue is used in the finally block of the flow
executor.
In `@src/backend/tests/unit/agentic/helpers/test_error_handling.py`:
- Around line 212-215: The test test_should_handle_none_like_behavior_gracefully
currently only asserts empty-string behavior; either rename it or actually test
None — implement both: update extract_friendly_error to safely handle None by
returning "" when the input is None (avoid AttributeError), and modify or add a
unit test (test_should_handle_none_like_behavior_gracefully) to call
extract_friendly_error(None) and assert it returns "" so the test name matches
behavior; use the existing test function name and the extract_friendly_error
symbol to locate both changes.
In `@src/backend/tests/unit/agentic/services/test_provider_service.py`:
- Around line 192-203: The test currently patches isinstance to return False
causing get_enabled_providers_for_user to return early; change the patch for
langflow.agentic.services.provider_service.isinstance to return True (or remove
that patch) so the isinstance check passes, allowing the mocked
DatabaseVariableService (mock_db) and its AsyncMock get_all returning [] to
exercise the "no credentials" branch inside get_enabled_providers_for_user; keep
the existing patches for get_variable_service and DatabaseVariableService and
ensure mock_service.get_all remains an AsyncMock(return_value=[]).
🧹 Nitpick comments (9)
src/backend/base/langflow/agentic/helpers/sse.py (1)
35-42: Truthiness checks will exclude explicit empty strings.The conditions
if message,if error, etc., will exclude bothNoneand empty strings"". If an explicit empty string is a valid value that should be included in the output, consider usingif message is not Noneinstead. If empty strings should be excluded (current behavior), this is fine as-is.src/backend/base/langflow/agentic/helpers/error_handling.py (1)
38-42: Boundary condition excludes exactly 10-character parts.The condition
MIN_MEANINGFUL_PART_LENGTH < len(stripped)excludes parts with exactly 10 characters. If this is intentional, consider adding a brief comment. Otherwise, use>=for inclusive comparison.src/backend/tests/unit/agentic/helpers/test_code_extraction.py (1)
32-128: Add per-test docstrings for clarity.Class docstrings exist, but individual tests lack docstrings. Consider adding short per-test docstrings for consistency and intent clarity. As per coding guidelines, ...
src/backend/tests/unit/agentic/services/test_flow_executor.py (1)
17-45: Add per-test docstrings for clarity.Individual test functions don’t include docstrings. Consider adding brief ones for intent clarity and consistency. As per coding guidelines, ...
src/backend/tests/unit/agentic/services/test_provider_service.py (2)
18-226: Add per-test docstrings for clarity.Individual tests don’t include docstrings; adding short ones would improve intent and readability. As per coding guidelines, ...
119-169: Drop pytest.mark.asyncio in favor of auto mode.This repo uses pytest-asyncio auto detection, so the explicit decorators are unnecessary and inconsistent with the configured convention. Based on learnings, ...
Also applies to: 174-204
src/backend/tests/unit/agentic/helpers/test_validation.py (1)
56-262: Add per-test docstrings for clarity and consistency.
As per coding guidelines, each test should have a short docstring stating intent; this suite currently relies only on class-level docstrings.src/backend/tests/unit/agentic/services/test_assistant_service.py (1)
99-541: Remove redundant@pytest.mark.asynciodecorators throughout the file.
The project configures pytest-asyncio withasyncio_mode = "auto", which automatically detects and runs async tests without explicit decorators. These decorators are unnecessary and should be removed for consistency across the file. Apply the removal to all async test methods in this file (13+ instances).🔧 Example removal pattern
- `@pytest.mark.asyncio` async def test_should_return_validated_true_for_valid_code(self):src/backend/base/langflow/agentic/api/router.py (1)
171-257: Extract shared provider/model resolution logic for assist endpoints.
assistandassist_streamduplicate the same provider/model/API-key selection and validation. This is drift-prone; consider a small helper that returns(provider, model_name, api_key_name, api_key, global_vars, session_id)used by both handlers.Also applies to: 259-343
| import uuid | ||
|
|
||
| from fastapi import APIRouter, HTTPException | ||
| from fastapi.responses import StreamingResponse | ||
| from lfx.base.models.unified_models import ( | ||
| get_model_provider_variable_mapping, | ||
| get_unified_models_detailed, | ||
| ) | ||
| from lfx.log.logger import logger | ||
|
|
||
| from langflow.api.utils.core import CurrentActiveUser, DbSession | ||
| from langflow.services.deps import get_variable_service | ||
| from langflow.services.variable.constants import CREDENTIAL_TYPE | ||
| from langflow.services.variable.service import DatabaseVariableService | ||
|
|
||
| from langflow.agentic.api.schemas import AssistantRequest | ||
| from langflow.agentic.services.assistant_service import ( | ||
| LANGFLOW_ASSISTANT_FLOW, | ||
| MAX_VALIDATION_RETRIES, | ||
| execute_flow_with_validation, | ||
| execute_flow_with_validation_streaming, | ||
| ) | ||
| from langflow.agentic.services.flow_executor import execute_flow_file | ||
| from langflow.agentic.services.provider_service import ( | ||
| DEFAULT_MODELS, | ||
| PREFERRED_PROVIDERS, | ||
| check_api_key, | ||
| get_enabled_providers_for_user, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix import ordering to satisfy Ruff I001 (CI failure).
Ruff is failing on Line 7 for unsorted imports; please reorder or run the formatter to unblock CI.
🛠️ Suggested import reordering
import uuid
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from lfx.base.models.unified_models import (
get_model_provider_variable_mapping,
get_unified_models_detailed,
)
from lfx.log.logger import logger
+from langflow.agentic.api.schemas import AssistantRequest
+from langflow.agentic.services.assistant_service import (
+ LANGFLOW_ASSISTANT_FLOW,
+ MAX_VALIDATION_RETRIES,
+ execute_flow_with_validation,
+ execute_flow_with_validation_streaming,
+)
+from langflow.agentic.services.flow_executor import execute_flow_file
+from langflow.agentic.services.provider_service import (
+ DEFAULT_MODELS,
+ PREFERRED_PROVIDERS,
+ check_api_key,
+ get_enabled_providers_for_user,
+)
from langflow.api.utils.core import CurrentActiveUser, DbSession
from langflow.services.deps import get_variable_service
from langflow.services.variable.constants import CREDENTIAL_TYPE
from langflow.services.variable.service import DatabaseVariableService
-
-from langflow.agentic.api.schemas import AssistantRequest
-from langflow.agentic.services.assistant_service import (
- LANGFLOW_ASSISTANT_FLOW,
- MAX_VALIDATION_RETRIES,
- execute_flow_with_validation,
- execute_flow_with_validation_streaming,
-)
-from langflow.agentic.services.flow_executor import execute_flow_file
-from langflow.agentic.services.provider_service import (
- DEFAULT_MODELS,
- PREFERRED_PROVIDERS,
- check_api_key,
- get_enabled_providers_for_user,
-)🧰 Tools
🪛 GitHub Actions: Ruff Style Check
[error] 7-7: I001 Import block is un-sorted or un-formatted. Ruff check failed: imports in this file are not properly sorted/formatted.
🪛 GitHub Check: Ruff Style Check (3.13)
[failure] 7-35: Ruff (I001)
src/backend/base/langflow/agentic/api/router.py:7:1: I001 Import block is un-sorted or un-formatted
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/agentic/api/router.py` around lines 7 - 35, Ruff
I001 is triggered by unsorted imports in this module; reorder the imports into
standard groups (stdlib, third-party, application) and sort each group
alphabetically so tools accept the file. Specifically, ensure uuid is in the
stdlib group; APIRouter, HTTPException, StreamingResponse, and other fastapi
imports are grouped and alphabetized; group lfx.* and langflow.* imports
(get_model_provider_variable_mapping, get_unified_models_detailed, logger,
CurrentActiveUser, DbSession, get_variable_service, CREDENTIAL_TYPE,
DatabaseVariableService, AssistantRequest, LANGFLOW_ASSISTANT_FLOW,
MAX_VALIDATION_RETRIES, execute_flow_with_validation,
execute_flow_with_validation_streaming, execute_flow_file, DEFAULT_MODELS,
PREFERRED_PROVIDERS, check_api_key, get_enabled_providers_for_user) into the
application/local group and sort them alphabetically within that group, then run
the formatter/ruff autofix to confirm the I001 error is resolved.
| @router.post("/execute/{flow_name}") | ||
| async def execute_named_flow( | ||
| flow_name: str, | ||
| request: AssistantRequest, | ||
| current_user: CurrentActiveUser, | ||
| session: DbSession, | ||
| ) -> dict: | ||
| """Execute a named flow from the flows directory.""" | ||
| variable_service = get_variable_service() | ||
| user_id = current_user.id | ||
|
|
||
| global_vars = { | ||
| "USER_ID": str(user_id), | ||
| "FLOW_ID": request.flow_id, | ||
| } | ||
|
|
||
| if request.component_id: | ||
| global_vars["COMPONENT_ID"] = request.component_id | ||
| if request.field_name: | ||
| global_vars["FIELD_NAME"] = request.field_name | ||
|
|
||
| try: | ||
| openai_key = await variable_service.get_variable(user_id, "OPENAI_API_KEY", "", session) | ||
| global_vars["OPENAI_API_KEY"] = openai_key | ||
| except (ValueError, HTTPException): | ||
| logger.debug("OPENAI_API_KEY not configured, continuing without it") | ||
|
|
||
| flow_filename = f"{flow_name}.json" | ||
| # Generate unique session_id per request to isolate memory | ||
| session_id = str(uuid.uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate flow_name to prevent path traversal.
Line 67 builds a filename directly from a user-controlled path segment. A value like ../secrets could escape the flows directory once resolved in the executor. Reject path separators and traversal tokens before building the filename.
🔒 Proposed guard
`@router.post`("/execute/{flow_name}")
async def execute_named_flow(
@@
- flow_filename = f"{flow_name}.json"
+ if "/" in flow_name or "\\" in flow_name or ".." in flow_name:
+ raise HTTPException(status_code=400, detail="Invalid flow name")
+ flow_filename = f"{flow_name}.json"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @router.post("/execute/{flow_name}") | |
| async def execute_named_flow( | |
| flow_name: str, | |
| request: AssistantRequest, | |
| current_user: CurrentActiveUser, | |
| session: DbSession, | |
| ) -> dict: | |
| """Execute a named flow from the flows directory.""" | |
| variable_service = get_variable_service() | |
| user_id = current_user.id | |
| global_vars = { | |
| "USER_ID": str(user_id), | |
| "FLOW_ID": request.flow_id, | |
| } | |
| if request.component_id: | |
| global_vars["COMPONENT_ID"] = request.component_id | |
| if request.field_name: | |
| global_vars["FIELD_NAME"] = request.field_name | |
| try: | |
| openai_key = await variable_service.get_variable(user_id, "OPENAI_API_KEY", "", session) | |
| global_vars["OPENAI_API_KEY"] = openai_key | |
| except (ValueError, HTTPException): | |
| logger.debug("OPENAI_API_KEY not configured, continuing without it") | |
| flow_filename = f"{flow_name}.json" | |
| # Generate unique session_id per request to isolate memory | |
| session_id = str(uuid.uuid4()) | |
| `@router.post`("/execute/{flow_name}") | |
| async def execute_named_flow( | |
| flow_name: str, | |
| request: AssistantRequest, | |
| current_user: CurrentActiveUser, | |
| session: DbSession, | |
| ) -> dict: | |
| """Execute a named flow from the flows directory.""" | |
| variable_service = get_variable_service() | |
| user_id = current_user.id | |
| global_vars = { | |
| "USER_ID": str(user_id), | |
| "FLOW_ID": request.flow_id, | |
| } | |
| if request.component_id: | |
| global_vars["COMPONENT_ID"] = request.component_id | |
| if request.field_name: | |
| global_vars["FIELD_NAME"] = request.field_name | |
| try: | |
| openai_key = await variable_service.get_variable(user_id, "OPENAI_API_KEY", "", session) | |
| global_vars["OPENAI_API_KEY"] = openai_key | |
| except (ValueError, HTTPException): | |
| logger.debug("OPENAI_API_KEY not configured, continuing without it") | |
| if "/" in flow_name or "\\" in flow_name or ".." in flow_name: | |
| raise HTTPException(status_code=400, detail="Invalid flow name") | |
| flow_filename = f"{flow_name}.json" | |
| # Generate unique session_id per request to isolate memory | |
| session_id = str(uuid.uuid4()) |
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/agentic/api/router.py` around lines 40 - 69, The
execute_named_flow handler currently builds flow_filename from the
user-controlled flow_name which allows path traversal; before constructing
flow_filename (or passing flow_name to the executor), validate and sanitize
flow_name in execute_named_flow: reject any path separators or traversal tokens
(e.g., "..", "/" or "\"), or enforce a strict whitelist pattern (e.g., only
alphanumeric, hyphen, underscore) and return a 400/HTTPException for invalid
names; alternatively normalize with pathlib/os.path and ensure the resolved name
equals the basename to prevent escaping the flows directory—apply this check
just before the line that sets flow_filename and session_id.
| for model in models: | ||
| model_name = model.get("model_name") | ||
| display_name = model.get("display_name", model_name) | ||
| metadata = model.get("metadata", {}) | ||
|
|
||
| is_deprecated = metadata.get("deprecated", False) | ||
| is_not_supported = metadata.get("not_supported", False) | ||
|
|
||
| if not is_deprecated and not is_not_supported: | ||
| model_list.append({ | ||
| "name": model_name, | ||
| "display_name": display_name, | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Display name lookup misses metadata.
get_unified_models_detailed() nests display_name in the metadata field, so the current lookup always falls back to model_name. Pull display_name from metadata instead.
✅ Fix display name extraction
- for model in models:
- model_name = model.get("model_name")
- display_name = model.get("display_name", model_name)
- metadata = model.get("metadata", {})
+ for model in models:
+ model_name = model.get("model_name")
+ metadata = model.get("metadata", {})
+ display_name = metadata.get("display_name", model_name)🤖 Prompt for AI Agents
In `@src/backend/base/langflow/agentic/api/router.py` around lines 118 - 130, In
get_unified_models_detailed() the display_name is incorrectly read from the
top-level model dict causing fallback to model_name; instead extract
display_name from metadata by replacing display_name = model.get("display_name",
model_name) with display_name = metadata.get("display_name", model_name) inside
the loop that builds model_list (the block referencing model_name, display_name,
metadata, is_deprecated, is_not_supported, and appending to model_list).
| input_preview = request.input_value[:50] if request.input_value else "None" | ||
| logger.info(f"Executing {LANGFLOW_ASSISTANT_FLOW} with {provider}/{model_name}, input: {input_preview}...") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid logging raw assistant input at info level.
Line 242 logs a prompt preview that may contain secrets/PII. Prefer logging only length or redacting content, or downgrade to debug.
🔐 Safer logging
- input_preview = request.input_value[:50] if request.input_value else "None"
- logger.info(f"Executing {LANGFLOW_ASSISTANT_FLOW} with {provider}/{model_name}, input: {input_preview}...")
+ input_len = len(request.input_value) if request.input_value else 0
+ logger.info(
+ f"Executing {LANGFLOW_ASSISTANT_FLOW} with {provider}/{model_name}, input_length={input_len} chars..."
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| input_preview = request.input_value[:50] if request.input_value else "None" | |
| logger.info(f"Executing {LANGFLOW_ASSISTANT_FLOW} with {provider}/{model_name}, input: {input_preview}...") | |
| input_len = len(request.input_value) if request.input_value else 0 | |
| logger.info( | |
| f"Executing {LANGFLOW_ASSISTANT_FLOW} with {provider}/{model_name}, input_length={input_len} chars..." | |
| ) |
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/agentic/api/router.py` around lines 242 - 244, The
current logger.info call exposes raw assistant input via input_preview (derived
from request.input_value) when logging execution of LANGFLOW_ASSISTANT_FLOW;
replace this by not including raw content—either downgrade to logger.debug and
keep a redacted preview, or keep info level but log only metadata such as
len(request.input_value) and provider/model_name; update the call that uses
input_preview and logger.info to instead reference safe fields (e.g.
input_length = len(request.input_value or "") and/or a constant "<redacted>")
and use logger.debug if you must include any snippet.
| def extract_friendly_error(error_msg: str) -> str: | ||
| """Convert technical API errors into user-friendly messages.""" | ||
| error_lower = error_msg.lower() | ||
|
|
||
| for patterns, friendly_message in ERROR_PATTERNS: | ||
| if any(pattern in error_lower or pattern in error_msg for pattern in patterns): | ||
| return friendly_message | ||
|
|
||
| if "model" in error_lower and ("not found" in error_lower or "does not exist" in error_lower): | ||
| return "Model not available. Please select a different model." | ||
|
|
||
| if "content" in error_lower and any(term in error_lower for term in ["filter", "policy", "safety"]): | ||
| return "Request blocked by content policy. Please modify your prompt." | ||
|
|
||
| return _truncate_error_message(error_msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing None guard could cause AttributeError.
If error_msg is None, calling error_msg.lower() will raise AttributeError. Consider adding a guard at the start of the function, or documenting that the function requires a non-None string.
Proposed fix
def extract_friendly_error(error_msg: str) -> str:
"""Convert technical API errors into user-friendly messages."""
+ if not error_msg:
+ return error_msg or ""
error_lower = error_msg.lower()🤖 Prompt for AI Agents
In `@src/backend/base/langflow/agentic/helpers/error_handling.py` around lines 16
- 30, The function extract_friendly_error can raise AttributeError when
error_msg is None; add a guard at the start of extract_friendly_error to handle
None (or non-str) inputs by converting None to an empty string or returning a
safe default via _truncate_error_message, then use that sanitized value for
error_lower and subsequent checks; update references to ERROR_PATTERNS and the
final _truncate_error_message call to operate on the sanitized string so the
function never calls .lower() on None.
| import asyncio | ||
| from collections.abc import AsyncGenerator | ||
|
|
||
| from fastapi import HTTPException | ||
| from lfx.log.logger import logger | ||
|
|
||
| from langflow.agentic.helpers.code_extraction import extract_python_code | ||
| from langflow.agentic.helpers.error_handling import extract_friendly_error | ||
| from langflow.agentic.helpers.sse import format_complete_event, format_error_event, format_progress_event, format_token_event | ||
| from langflow.agentic.helpers.validation import validate_component_code | ||
| from langflow.agentic.services.flow_executor import execute_flow_file, execute_flow_file_streaming, extract_response_text |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Ruff I001/E501 on imports (CI failure).
CI reports unsorted imports and long lines at Lines 11 and 13. Wrap the multi-imports and re-run the formatter.
🛠️ Import formatting
import asyncio
from collections.abc import AsyncGenerator
from fastapi import HTTPException
from lfx.log.logger import logger
from langflow.agentic.helpers.code_extraction import extract_python_code
from langflow.agentic.helpers.error_handling import extract_friendly_error
-from langflow.agentic.helpers.sse import format_complete_event, format_error_event, format_progress_event, format_token_event
+from langflow.agentic.helpers.sse import (
+ format_complete_event,
+ format_error_event,
+ format_progress_event,
+ format_token_event,
+)
from langflow.agentic.helpers.validation import validate_component_code
-from langflow.agentic.services.flow_executor import execute_flow_file, execute_flow_file_streaming, extract_response_text
+from langflow.agentic.services.flow_executor import (
+ execute_flow_file,
+ execute_flow_file_streaming,
+ extract_response_text,
+)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import asyncio | |
| from collections.abc import AsyncGenerator | |
| from fastapi import HTTPException | |
| from lfx.log.logger import logger | |
| from langflow.agentic.helpers.code_extraction import extract_python_code | |
| from langflow.agentic.helpers.error_handling import extract_friendly_error | |
| from langflow.agentic.helpers.sse import format_complete_event, format_error_event, format_progress_event, format_token_event | |
| from langflow.agentic.helpers.validation import validate_component_code | |
| from langflow.agentic.services.flow_executor import execute_flow_file, execute_flow_file_streaming, extract_response_text | |
| import asyncio | |
| from collections.abc import AsyncGenerator | |
| from fastapi import HTTPException | |
| from lfx.log.logger import logger | |
| from langflow.agentic.helpers.code_extraction import extract_python_code | |
| from langflow.agentic.helpers.error_handling import extract_friendly_error | |
| from langflow.agentic.helpers.sse import ( | |
| format_complete_event, | |
| format_error_event, | |
| format_progress_event, | |
| format_token_event, | |
| ) | |
| from langflow.agentic.helpers.validation import validate_component_code | |
| from langflow.agentic.services.flow_executor import ( | |
| execute_flow_file, | |
| execute_flow_file_streaming, | |
| extract_response_text, | |
| ) |
🧰 Tools
🪛 GitHub Check: Ruff Style Check (3.13)
[failure] 13-13: Ruff (E501)
src/backend/base/langflow/agentic/services/assistant_service.py:13:121: E501 Line too long (121 > 120)
[failure] 11-11: Ruff (E501)
src/backend/base/langflow/agentic/services/assistant_service.py:11:121: E501 Line too long (125 > 120)
[failure] 3-13: Ruff (I001)
src/backend/base/langflow/agentic/services/assistant_service.py:3:1: I001 Import block is un-sorted or un-formatted
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/agentic/services/assistant_service.py` around lines
3 - 13, Sort and wrap the imports to satisfy Ruff I001 and E501: reorder imports
into stdlib (asyncio, collections.abc.AsyncGenerator), third-party
(fastapi.HTTPException, lfx.log.logger), then local package imports, and break
long multi-member imports onto multiple lines using parentheses so each line
stays under the max line length; specifically split the long import of
format_complete_event, format_error_event, format_progress_event,
format_token_event and similarly split execute_flow_file,
execute_flow_file_streaming, extract_response_text across multiple lines,
keeping names like extract_python_code, extract_friendly_error,
validate_component_code unchanged but grouped correctly. Ensure import groups
are separated by a blank line and run the formatter/ruff afterwards.
| async def execute_flow_with_validation( | ||
| flow_filename: str, | ||
| input_value: str, | ||
| global_variables: dict[str, str], | ||
| *, | ||
| max_retries: int = MAX_VALIDATION_RETRIES, | ||
| user_id: str | None = None, | ||
| session_id: str | None = None, | ||
| provider: str | None = None, | ||
| model_name: str | None = None, | ||
| api_key_var: str | None = None, | ||
| ) -> dict: | ||
| """Execute flow and validate the generated component code. | ||
|
|
||
| If the response contains Python code, it validates the code. | ||
| If validation fails, re-executes the flow with error context. | ||
| Continues until valid code is generated or max retries reached. | ||
| """ | ||
| current_input = input_value | ||
| attempt = 0 | ||
|
|
||
| while attempt <= max_retries: | ||
| attempt += 1 | ||
| logger.info(f"Component generation attempt {attempt}/{max_retries + 1}") | ||
|
|
||
| result = await execute_flow_file( | ||
| flow_filename=flow_filename, | ||
| input_value=current_input, | ||
| global_variables=global_variables, | ||
| verbose=True, | ||
| user_id=user_id, | ||
| session_id=session_id, | ||
| provider=provider, | ||
| model_name=model_name, | ||
| api_key_var=api_key_var, | ||
| ) | ||
|
|
||
| response_text = extract_response_text(result) | ||
| code = extract_python_code(response_text) | ||
|
|
||
| if not code: | ||
| logger.debug("No Python code found in response, returning as-is") | ||
| return result | ||
|
|
||
| logger.info("Validating generated component code...") | ||
| validation = validate_component_code(code) | ||
|
|
||
| if validation.is_valid: | ||
| logger.info(f"Component '{validation.class_name}' validated successfully!") | ||
| return { | ||
| **result, | ||
| "validated": True, | ||
| "class_name": validation.class_name, | ||
| "component_code": code, | ||
| "validation_attempts": attempt, | ||
| } | ||
|
|
||
| logger.warning(f"Validation failed (attempt {attempt}): {validation.error}") | ||
|
|
||
| if attempt > max_retries: | ||
| logger.error(f"Max retries ({max_retries}) reached. Returning last result with error.") | ||
| return { | ||
| **result, | ||
| "validated": False, | ||
| "validation_error": validation.error, | ||
| "validation_attempts": attempt, | ||
| } | ||
|
|
||
| current_input = VALIDATION_RETRY_TEMPLATE.format(error=validation.error, code=code) | ||
| logger.info("Retrying with error context...") | ||
|
|
||
| return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clamp max_retries to avoid zero-iteration paths.
If a client sends a negative max_retries, Line 53’s loop never runs and result is undefined; the streaming generator yields nothing. Clamp to >= 0 (or validate upstream) in both functions.
✅ Suggested guard
async def execute_flow_with_validation(
@@
- current_input = input_value
+ max_retries = max(0, max_retries)
+ current_input = input_value
@@
async def execute_flow_with_validation_streaming(
@@
- current_input = input_value
- total_attempts = max_retries + 1
+ max_retries = max(0, max_retries)
+ current_input = input_value
+ total_attempts = max_retries + 1Also applies to: 106-135
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/agentic/services/assistant_service.py` around lines
32 - 103, Clamp negative max_retries to zero before the retry loop: in
execute_flow_with_validation, normalize the incoming max_retries parameter
(e.g., max_retries = max(0, max_retries)) or raise on invalid values so the
while loop always has a defined iteration plan and result is always assigned;
apply the same guard to the other related function that uses max_retries (the
similar retry/streaming function referenced in lines 106-135) to prevent
zero-iteration/undefined-result paths.
| finally: | ||
| await event_queue.put(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n src/backend/base/langflow/agentic/services/flow_executor.py | head -220 | tail -50Repository: langflow-ai/langflow
Length of output: 184
🌐 Web query:
asyncio.Queue put_nowait vs await put deadlock QueueFull
💡 Result:
short answer
- await queue.put(item) is a coroutine that blocks the current task until a free slot exists (it will not raise QueueFull). Use it when you want the producer to wait for consumers. [1][2]
- queue.put_nowait(item) is non‑blocking; if the queue is full it raises asyncio.QueueFull immediately. Use it when you must not block and can handle/drop/retry items. [1][2]
Why you see hangs/deadlocks
- If a producer uses await put() but there are no active consumers (or consumers are themselves blocked waiting on the producer), the producer will wait forever and your program appears to hang (classic producer/consumer deadlock). [2][4]
- put_nowait avoids blocking, but if used without handling QueueFull you’ll get exceptions instead of a hang. [1][2]
Practical fixes
- Ensure consumers are running (start consumer tasks before producers). [4]
- Use put_nowait + try/except QueueFull to drop/backpressure or retry later. [1]
- Increase or remove maxsize (maxsize<=0 => unlimited). [3]
- If crossing threads, do queue operations on the event loop thread (e.g., asyncio.run_coroutine_threadsafe or schedule a coroutine that does await put()). [4]
Sources
[1] Python asyncio.Queue docs (put / put_nowait).
[2] Asyncio Queue cheat‑sheet / tutorial (explanation & deadlock examples).
[3] Python asyncio queues docs (maxsize behavior).
[4] StackOverflow: asyncio blocking with queues (practical thread/ordering fixes).
🏁 Script executed:
sed -n '200,225p' src/backend/base/langflow/agentic/services/flow_executor.pyRepository: langflow-ai/langflow
Length of output: 184
Avoid blocking on the completion signal; use non-blocking queue operations in the finally block.
await event_queue.put(None) violates the coding guidelines for src/backend/**/*.py, which require queue.put_nowait() for non-blocking queue operations. In a finally block, blocking operations can deadlock if the queue reaches capacity with no active consumers. Handle asyncio.QueueFull to keep shutdown non-blocking.
Suggested fix
- finally:
- await event_queue.put(None)
+ finally:
+ try:
+ event_queue.put_nowait(None)
+ except asyncio.QueueFull:
+ logger.warning("Event queue full; dropping completion signal")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| finally: | |
| await event_queue.put(None) | |
| finally: | |
| try: | |
| event_queue.put_nowait(None) | |
| except asyncio.QueueFull: | |
| logger.warning("Event queue full; dropping completion signal") |
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/agentic/services/flow_executor.py` around lines 211
- 212, Replace the blocking await event_queue.put(None) in the finally block
with a non-blocking event_queue.put_nowait(None) and catch asyncio.QueueFull
(e.g., except asyncio.QueueFull: pass or log) so shutdown cannot deadlock;
ensure asyncio.QueueFull is referenced/imported and keep the change at the spot
where event_queue is used in the finally block of the flow executor.
| def test_should_handle_none_like_behavior_gracefully(self): | ||
| # Empty string should not raise | ||
| result = extract_friendly_error("") | ||
| assert result == "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test name suggests None handling but only tests empty string.
The test is named test_should_handle_none_like_behavior_gracefully but only tests an empty string. As noted in the implementation, passing None would raise AttributeError. Consider either:
- Renaming to
test_should_handle_empty_string_gracefully, or - Adding a separate test for
None(which would currently fail)
🤖 Prompt for AI Agents
In `@src/backend/tests/unit/agentic/helpers/test_error_handling.py` around lines
212 - 215, The test test_should_handle_none_like_behavior_gracefully currently
only asserts empty-string behavior; either rename it or actually test None —
implement both: update extract_friendly_error to safely handle None by returning
"" when the input is None (avoid AttributeError), and modify or add a unit test
(test_should_handle_none_like_behavior_gracefully) to call
extract_friendly_error(None) and assert it returns "" so the test name matches
behavior; use the existing test function name and the extract_friendly_error
symbol to locate both changes.
| with patch("langflow.agentic.services.provider_service.get_variable_service") as mock_get: | ||
| mock_service = MagicMock() | ||
| mock_service.get_all = AsyncMock(return_value=[]) | ||
|
|
||
| # Make it look like DatabaseVariableService | ||
| with patch("langflow.agentic.services.provider_service.DatabaseVariableService") as mock_db: | ||
| mock_db.return_value = mock_service | ||
| mock_get.return_value = mock_service | ||
|
|
||
| # Need to make isinstance check pass | ||
| with patch("langflow.agentic.services.provider_service.isinstance", return_value=False): | ||
| result, status = await get_enabled_providers_for_user(user_id, mock_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test never reaches the “no credentials” branch.
isinstance is forced to False, so the function returns early and duplicates the previous test. Make the isinstance check pass so get_all() returns [] and exercises the intended path.
✅ Suggested fix
- # Need to make isinstance check pass
- with patch("langflow.agentic.services.provider_service.isinstance", return_value=False):
+ # Make isinstance check pass
+ with patch("langflow.agentic.services.provider_service.isinstance", return_value=True):
result, status = await get_enabled_providers_for_user(user_id, mock_session)🤖 Prompt for AI Agents
In `@src/backend/tests/unit/agentic/services/test_provider_service.py` around
lines 192 - 203, The test currently patches isinstance to return False causing
get_enabled_providers_for_user to return early; change the patch for
langflow.agentic.services.provider_service.isinstance to return True (or remove
that patch) so the isinstance check passes, allowing the mocked
DatabaseVariableService (mock_db) and its AsyncMock get_all returning [] to
exercise the "no credentials" branch inside get_enabled_providers_for_user; keep
the existing patches for get_variable_service and DatabaseVariableService and
ensure mock_service.get_all remains an AsyncMock(return_value=[]).
| if any(pattern in error_lower or pattern in error_msg for pattern in patterns): | ||
| return friendly_message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡️Codeflash found 71% (0.71x) speedup for extract_friendly_error in src/backend/base/langflow/agentic/helpers/error_handling.py
⏱️ Runtime : 630 microseconds → 369 microseconds (best of 89 runs)
📝 Explanation and details
The optimized code achieves a 70% speedup (from 630μs to 369μs) by eliminating redundant string operations in the pattern matching loop of extract_friendly_error.
Key Optimization:
The original code used a combined condition with any():
if any(pattern in error_lower or pattern in error_msg for pattern in patterns):This performs two substring searches per pattern - one against error_lower and one against the original error_msg. The line profiler shows this line consuming 58.8% of total time (3.36ms).
The optimized version restructures this to a nested loop:
for pattern in patterns:
if pattern in error_lower:
return friendly_messageThis performs only one substring search per pattern against the already-lowercased error_lower, eliminating half the string operations. While this increases line hits (from 1046 to 2588 for the inner check), the total time drops significantly because:
- Eliminates redundant
pattern in error_msgchecks - the original checked both lowercase and original strings, but all patterns inERROR_PATTERNSare already lowercase - Better early-exit behavior - returns immediately upon first match instead of completing the
any()generator - Reduces time spent in pattern matching from 58.8% to 53.3% (29.3% + 24%) with faster per-hit execution (420ns vs 3213ns)
Performance characteristics from tests:
- Most effective for errors that don't match early patterns, as it avoids unnecessary dual-string searches through all pattern sets
- The optimization benefits all test cases uniformly since pattern matching is the dominant operation
- Particularly valuable for the "large scale" tests with long messages, where reducing string operations has compounding effects
The _truncate_error_message function remains unchanged and is not a bottleneck (only ~23% of runtime).
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⚙️ Existing Unit Tests | ✅ 32 Passed |
| 🌀 Generated Regression Tests | ✅ 71 Passed |
| ⏪ Replay Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 📊 Tests Coverage | 100.0% |
⚙️ Click to see Existing Unit Tests
🌀 Click to see Generated Regression Tests
import pytest # used for our unit tests
from langflow.agentic.helpers.error_handling import extract_friendly_error
# function to test
MAX_ERROR_MESSAGE_LENGTH = 150
MIN_MEANINGFUL_PART_LENGTH = 10
ERROR_PATTERNS: list[tuple[list[str], str]] = [
(["rate_limit", "429"], "Rate limit exceeded. Please wait a moment and try again."),
(["authentication", "api_key", "unauthorized", "401"], "Authentication failed. Check your API key."),
(["quota", "billing", "insufficient"], "API quota exceeded. Please check your account billing."),
(["timeout", "timed out"], "Request timed out. Please try again."),
(["connection", "network"], "Connection error. Please check your network and try again."),
(["500", "internal server error"], "Server error. Please try again later."),
]
from langflow.agentic.helpers.error_handling import extract_friendly_error
def _truncate_error_message(error_msg: str) -> str:
"""Truncate long error messages, preserving meaningful content."""
if len(error_msg) <= MAX_ERROR_MESSAGE_LENGTH:
return error_msg
if ":" in error_msg:
for part in error_msg.split(":"):
stripped = part.strip()
if MIN_MEANINGFUL_PART_LENGTH < len(stripped) < MAX_ERROR_MESSAGE_LENGTH:
return stripped
return f"{error_msg[:MAX_ERROR_MESSAGE_LENGTH]}..."
def test_rate_limit_pattern_basic():
# Pattern includes "429" and "rate_limit" - should match and return the friendly message
msg = "Error: 429 rate_limit exceeded for user"
codeflash_output = extract_friendly_error(msg); result = codeflash_output
def test_authentication_pattern_case_insensitive():
# Ensure matching is case-insensitive and that API key/unauthorized triggers the auth message
msg = "AUTHENTICATION failed: API_KEY unauthorized access detected"
codeflash_output = extract_friendly_error(msg); result = codeflash_output
def test_server_error_numeric_and_text_patterns():
# Either numeric "500" or phrase "internal server error" should map to server-friendly message
msg1 = "500 Server Error: unexpected condition"
msg2 = "Internal Server Error occurred while processing"
codeflash_output = extract_friendly_error(msg1)
codeflash_output = extract_friendly_error(msg2)
def test_timeout_and_connection_patterns():
# Ensure both 'timeout' and 'connection' style errors are caught by patterns
codeflash_output = extract_friendly_error("Request timed out after 30s")
codeflash_output = extract_friendly_error("Network connection failed while sending request")
def test_model_not_found_variants():
# Both "not found" and "does not exist" variants should be detected when 'model' is present
m1 = "Error: Model gpt-999 not found in registry"
m2 = "The requested model does not exist: gpt-unknown"
codeflash_output = extract_friendly_error(m1)
codeflash_output = extract_friendly_error(m2)
def test_content_policy_triggers_filter_policy_safety():
# When 'content' and a policy-related term appear, return the content policy friendly message
examples = [
"Content filter blocked the request due to policy",
"Request content failed safety checks and was denied",
"The content policy prevented this action"
]
for ex in examples:
codeflash_output = extract_friendly_error(ex)
def test_short_message_no_pattern_returns_original():
# If nothing matches and message is short (< MAX), it should be returned unchanged
short = "An unknown error occurred."
codeflash_output = extract_friendly_error(short)
def test_exact_max_length_message_unchanged():
# A message whose length equals MAX_ERROR_MESSAGE_LENGTH must be returned unchanged (<=)
msg = "x" * MAX_ERROR_MESSAGE_LENGTH
codeflash_output = extract_friendly_error(msg)
def test_truncate_with_meaningful_colon_part_preferred_over_raw_truncate():
# When the long message contains colon-separated parts, and one part is of meaningful length
meaningful = "This part contains a meaningful description of the error that should be shown"
# Construct message: first part short, second part is meaningful, trailing garbage to exceed MAX
msg = "Header: " + meaningful + ": " + ("Z" * 200)
# The function should pick the meaningful part stripped (no surrounding spaces)
codeflash_output = extract_friendly_error(msg)
def test_truncate_no_meaningful_parts_truncates_with_ellipsis():
# All colon-separated parts are exactly MIN_MEANINGFUL_PART_LENGTH (10) -> not acceptable because condition is strict '>'
short_part = "x" * MIN_MEANINGFUL_PART_LENGTH
# create many parts to exceed MAX length without having any acceptable meaningful part
parts = [short_part] * 20 # combined length will be > MAX_ERROR_MESSAGE_LENGTH
msg = ":".join(parts)
expected = msg[:MAX_ERROR_MESSAGE_LENGTH] + "..."
codeflash_output = extract_friendly_error(msg)
def test_truncate_long_single_no_colon_truncates():
# Long message without any colon should be truncated to MAX length plus ellipsis
msg = "a" * 300
codeflash_output = extract_friendly_error(msg)
def test_large_scale_many_parts_meaningful_late():
# Build a message with many small parts (each too short) then one late meaningful part
small = "s" * 5 # length 5, less than MIN_MEANINGFUL_PART_LENGTH
parts = [small] * 199
meaningful = "M" * 50 # length 50, acceptable (10 < 50 < 150)
parts.append(meaningful)
# add trailing content to make overall string long
parts.append("T" * 30)
msg = ":".join(parts)
# There are 201 parts - well under 1000. The function should iterate and find the meaningful one.
codeflash_output = extract_friendly_error(msg)
def test_pattern_detection_prefers_patterns_over_truncation():
# If a message contains both a known pattern and is long, patterns should trigger before truncation logic
long_but_rate_limited = "Rate_Limit: " + ("x" * 200) + " 429"
# Even though message is long, the presence of 'rate_limit' or '429' should trigger friendly message
codeflash_output = extract_friendly_error(long_but_rate_limited)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import pytest
from langflow.agentic.helpers.error_handling import extract_friendly_error
class TestExtractFriendlyErrorBasic:
"""Basic test cases for extract_friendly_error function."""
def test_rate_limit_error_lowercase(self):
"""Test detection of rate limit error with lowercase patterns."""
error_msg = "You have exceeded your rate limit. Please try again later."
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_rate_limit_error_429_code(self):
"""Test detection of 429 HTTP status code for rate limiting."""
error_msg = "HTTP 429: Too many requests"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_authentication_error_api_key(self):
"""Test detection of authentication errors related to API keys."""
error_msg = "Invalid API key provided"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_authentication_error_unauthorized_401(self):
"""Test detection of 401 unauthorized HTTP status code."""
error_msg = "401: Unauthorized access"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_quota_exceeded_error(self):
"""Test detection of quota exceeded errors."""
error_msg = "Your API quota has been exceeded"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_billing_insufficient_error(self):
"""Test detection of billing-related errors."""
error_msg = "Insufficient billing credits"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_timeout_error(self):
"""Test detection of timeout errors."""
error_msg = "Request timed out after 30 seconds"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_timed_out_variant(self):
"""Test detection of 'timed out' variant."""
error_msg = "The operation timed out"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_connection_error(self):
"""Test detection of connection errors."""
error_msg = "Failed to establish connection"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_network_error(self):
"""Test detection of network errors."""
error_msg = "Network unreachable"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_500_server_error(self):
"""Test detection of 500 internal server error."""
error_msg = "500: Internal server error"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_internal_server_error_verbose(self):
"""Test detection of verbose internal server error."""
error_msg = "Internal server error occurred"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_model_not_found_error(self):
"""Test detection of model not found errors."""
error_msg = "Model gpt-5 is not found"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_model_does_not_exist_error(self):
"""Test detection of model does not exist errors."""
error_msg = "The requested model does not exist"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_content_filter_policy_error(self):
"""Test detection of content policy filter errors."""
error_msg = "Request blocked by content filter policy"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_content_safety_error(self):
"""Test detection of content safety errors."""
error_msg = "Content safety filter triggered"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_unknown_error_short_message(self):
"""Test handling of unknown, short error messages."""
error_msg = "Something went wrong"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_unknown_error_within_max_length(self):
"""Test handling of unknown errors within max length."""
error_msg = "An unexpected error occurred in the system"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
class TestExtractFriendlyErrorEdgeCases:
"""Edge case tests for extract_friendly_error function."""
def test_empty_string_error(self):
"""Test handling of empty string error message."""
codeflash_output = extract_friendly_error(""); result = codeflash_output
def test_whitespace_only_error(self):
"""Test handling of whitespace-only error message."""
codeflash_output = extract_friendly_error(" "); result = codeflash_output
def test_case_insensitive_matching_uppercase(self):
"""Test that pattern matching is case-insensitive with uppercase."""
error_msg = "RATE_LIMIT EXCEEDED"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_case_insensitive_matching_mixed_case(self):
"""Test that pattern matching is case-insensitive with mixed case."""
error_msg = "Authentication FAILED: Invalid API_KEY"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_pattern_in_middle_of_word(self):
"""Test pattern matching within larger words."""
error_msg = "Operation deprecated_401_unauthorized"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_multiple_matching_patterns_first_wins(self):
"""Test that when multiple patterns match, the first one in the list is used."""
error_msg = "Rate limit exceeded: 429"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_model_error_case_insensitive(self):
"""Test model not found detection with various cases."""
error_msg = "MODEL not found: claude-3"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_content_filter_various_combinations(self):
"""Test content filter detection with different keyword combinations."""
error_msg = "Content filter policy violation"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_content_safety_various_combinations(self):
"""Test content safety detection with different keyword combinations."""
error_msg = "Safety policy content blocked"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_truncation_with_single_colon(self):
"""Test error message truncation with a single colon separator."""
# Create a message that exceeds MAX_ERROR_MESSAGE_LENGTH (150)
long_msg = "A" * 100 + ": " + "B" * 100
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_truncation_with_multiple_colons(self):
"""Test error message truncation with multiple colon separators."""
long_msg = "Very Long Error Start" + ": " + "A" * 200 + ": " + "B" * 50
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_truncation_no_colon_exceeds_max_length(self):
"""Test truncation of long message without colons."""
long_msg = "A" * 200
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_truncation_preserves_meaningful_content(self):
"""Test that truncation preserves meaningful parts between colons."""
long_msg = "Error Start: " + "X" * 50 + ": " + "Y" * 200
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_very_short_colon_separated_part_ignored(self):
"""Test that very short colon-separated parts are ignored in truncation."""
long_msg = "A" * 200 + ": " + "B" * 5 + ": " + "C" * 200
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_exact_max_length_message(self):
"""Test message exactly at MAX_ERROR_MESSAGE_LENGTH."""
msg = "A" * 150
codeflash_output = extract_friendly_error(msg); result = codeflash_output
def test_one_char_over_max_length(self):
"""Test message one character over MAX_ERROR_MESSAGE_LENGTH."""
msg = "A" * 151
codeflash_output = extract_friendly_error(msg); result = codeflash_output
def test_special_characters_in_error_message(self):
"""Test handling of special characters in error message."""
error_msg = "Error: !@#$%^&*()_+-=[]{}|;:',.<>?/"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_unicode_characters_in_error_message(self):
"""Test handling of unicode characters in error message."""
error_msg = "Error: \u2018Model\u2019 not found \ud83d\ude00"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_newline_in_error_message(self):
"""Test handling of newlines in error message."""
error_msg = "Error line 1\nrate limit exceeded\nError line 3"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_tab_in_error_message(self):
"""Test handling of tab characters in error message."""
error_msg = "Error:\trate_limit\texceeded"
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
class TestExtractFriendlyErrorLargeScale:
"""Large scale test cases for extract_friendly_error function."""
def test_very_long_error_message_truncation(self):
"""Test truncation of very long error message."""
long_msg = "A" * 500
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_very_long_message_with_meaningful_part(self):
"""Test extraction of meaningful part from very long message."""
long_msg = "Very long prefix: " + "X" * 300 + ": " + "Y" * 100
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_many_colon_separators(self):
"""Test message with many colon separators."""
msg = ": ".join(["Part" + str(i) * 20 for i in range(20)])
codeflash_output = extract_friendly_error(msg); result = codeflash_output
def test_long_message_pattern_matching_priority(self):
"""Test that pattern matching takes priority over truncation."""
long_msg = "A" * 200 + " rate_limit " + "B" * 200
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_long_message_with_model_error(self):
"""Test model error detection in very long message."""
long_msg = "A" * 100 + " model not found " + "B" * 100
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_long_message_with_content_filter(self):
"""Test content filter detection in very long message."""
long_msg = "A" * 100 + " content filter policy " + "B" * 100
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_bulk_error_processing_consistency(self):
"""Test that processing multiple errors maintains consistency."""
errors = [
"Rate limit exceeded",
"Invalid API key",
"Connection failed",
"Unknown error: " + "A" * 200
]
results = [extract_friendly_error(err) for err in errors]
def test_repeated_pattern_in_long_message(self):
"""Test message with pattern repeated many times."""
long_msg = " rate_limit " * 50
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_alternating_patterns_long_message(self):
"""Test message with alternating patterns."""
parts = ["rate_limit", "authentication", "timeout"]
long_msg = " ".join(parts * 30)
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_long_message_all_uppercase(self):
"""Test very long message in all uppercase."""
long_msg = ("A" * 50 + " RATE_LIMIT " + "B" * 50) * 3
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_colon_separated_many_parts(self):
"""Test message with many colon-separated parts."""
parts = ["Start"] + [("Part_" + str(i)) * 15 for i in range(50)] + ["End"]
long_msg = ": ".join(parts)
codeflash_output = extract_friendly_error(long_msg); result = codeflash_output
def test_performance_with_large_batch_of_different_errors(self):
"""Test performance with a large batch of different error types."""
errors = []
# Create 100 unique error messages
for i in range(100):
errors.append(f"Error {i}: " + "A" * (50 + i % 50))
results = [extract_friendly_error(err) for err in errors]
def test_extremely_long_no_colon_separator(self):
"""Test handling of extremely long message without any colon."""
msg = "Error message with no structure" + " A" * 100
codeflash_output = extract_friendly_error(msg); result = codeflash_output
def test_message_with_null_like_content(self):
"""Test message containing null-like patterns."""
error_msg = "null" * 50 + " rate_limit " + "None" * 50
codeflash_output = extract_friendly_error(error_msg); result = codeflash_output
def test_numeric_heavy_message(self):
"""Test message with many numbers."""
msg = "Error 429 429 429 rate_limit 429 429 429"
codeflash_output = extract_friendly_error(msg); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.To test or edit this optimization locally git merge codeflash/optimize-pr11372-2026-01-20T19.51.30
| if any(pattern in error_lower or pattern in error_msg for pattern in patterns): | |
| return friendly_message | |
| for pattern in patterns: | |
| if pattern in error_lower: | |
| return friendly_message |
This pull request introduces the initial implementation of the Langflow Assistant API, including new endpoints, request/response schemas, and a set of helper modules for code extraction, validation, error handling, and Server-Sent Events (SSE). The changes establish a modular and extensible backend for assistant-related features, supporting both standard and streaming interactions, provider configuration checks, and robust component code validation.
API Endpoints and Routing
router.py, providing endpoints for flow execution, assistant chat (with and without streaming), and configuration checks. The router handles provider/model selection, user/session management, error handling, and delegates business logic to service modules.__init__.py.Schemas and Data Validation
schemas.py.Helper Modules
code_extraction.py.validation.py.error_handling.py.sse.py.Module Structure and Documentation
helpersandservicesto clarify package responsibilities. [1] [2]Summary by CodeRabbit
Release Notes
✏️ Tip: You can customize this high-level summary in your review settings.