-
Notifications
You must be signed in to change notification settings - Fork 8.4k
fix: add better stream error handling to openai responses api #11467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThe changes enhance error propagation in streaming flows by broadening exception handling in the API endpoint, adding streaming error event support, and introducing OpenAI-compatible error chunk generation. New integration tests validate error formatting in streaming and pre-streaming scenarios. Changes
Sequence DiagramsequenceDiagram
participant Client
participant API as Streaming API
participant FlowEngine as Flow Engine
participant EventMgr as Event Manager
participant ErrorHandler as Error Handler
participant Formatter as OpenAI Formatter
Client->>API: Stream request
API->>FlowEngine: Start flow execution
FlowEngine->>FlowEngine: Process stream
FlowEngine-->>EventMgr: Error occurs (exception)
EventMgr->>EventMgr: Fire on_error event
EventMgr->>ErrorHandler: Route error event
ErrorHandler->>Formatter: Create error chunk
Note over Formatter: finish_reason="error"<br/>Extract error_message
Formatter-->>API: Return error chunk
API->>Client: Yield error chunk JSON
API->>Client: Yield DONE signal
API-->>Client: Close stream
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Important Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (5 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #11467 +/- ##
==========================================
- Coverage 34.84% 34.83% -0.02%
==========================================
Files 1420 1420
Lines 68190 68194 +4
Branches 9979 9979
==========================================
- Hits 23762 23756 -6
- Misses 43203 43213 +10
Partials 1225 1225
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/backend/base/langflow/api/v1/endpoints.py (1)
372-386: Handle cancellation separately and fix Ruff BLE001.Catching
Exceptionhere treats client disconnects (asyncio.CancelledError) as errors, and Ruff enforces BLE001 to avoid blind exception catches. Re‑raise cancellations and mark the broad catch as intentional.✅ Suggested fix
- except Exception as e: # Catch ALL exceptions to ensure errors are propagated in streaming - await logger.aerror(f"Error running flow: {e}") - event_manager.on_error(data={"error": str(e)}) + except asyncio.CancelledError: + raise + except Exception as e: # noqa: BLE001 + await logger.aerror(f"Error running flow: {e}") + event_manager.on_error(data={"error": str(e)})
🤖 Fix all issues with AI agents
In `@src/backend/tests/integration/test_openai_error_propagation.py`:
- Around line 11-36: There are whitespace-only blank lines in the test block
that declare invalid_flow_id, headers, payload, response and response_data which
trigger Ruff W293; remove those trailing/blank-only lines (or run the project
formatter) so there are no lines containing only spaces or tabs in that test
function (look for the block using invalid_flow_id, headers, payload, response,
response_data and trim any blank/whitespace-only lines).
- Around line 45-60: The test test_openai_streaming_runtime_error_format
currently never triggers a runtime failure; patch the flow used by the
simple_api_test fixture to raise an exception during execution (use monkeypatch
to stub a component method such as ChatOutput.execute or TextInput.process to
raise an exception) before calling the streaming endpoint with client; then
assert the streamed chunks include a content chunk with finish_reason="error"
and the expected error payload to verify real error propagation.
- Around line 71-80: Replace the current synchronous parsing that reads
response.text with the streaming pattern used elsewhere: call client.stream(...)
to get the response and iterate asynchronously over response.aiter_lines(),
collecting lines that start with "data: " (excluding "data: [DONE]"), parse each
data payload with json.loads and append to the existing chunks list; update the
test that currently uses the response variable and the for line in response.text
loop to use the async stream/aiter_lines approach so the test exercises SSE
streaming behavior.
| """Test that pre-streaming errors (e.g., invalid flow ID) return proper error format. | ||
|
|
||
| Errors that occur before streaming starts (validation errors, flow not found, etc.) | ||
| return a JSON error response, not a streaming response. | ||
| """ | ||
| invalid_flow_id = "00000000-0000-0000-0000-000000000000" | ||
|
|
||
| headers = {"x-api-key": created_api_key.api_key} | ||
| payload = { | ||
| "model": invalid_flow_id, | ||
| "input": "test input", | ||
| "stream": True, # Even with stream=True, pre-streaming errors return JSON | ||
| } | ||
|
|
||
| response = await client.post( | ||
| "api/v1/responses", | ||
| json=payload, | ||
| headers=headers, | ||
| ) | ||
|
|
||
| # Should return 200 with error in response body | ||
| assert response.status_code == 200 | ||
|
|
||
| # Parse the response | ||
| response_data = response.json() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove whitespace‑only blank lines (Ruff W293).
CI is failing on W293; trim trailing whitespace or run the formatter to clear these lines.
Also applies to: 47-62
🧰 Tools
🪛 GitHub Check: Ruff Style Check (3.13)
[failure] 36-36: Ruff (W293)
src/backend/tests/integration/test_openai_error_propagation.py:36:1: W293 Blank line contains whitespace
[failure] 33-33: Ruff (W293)
src/backend/tests/integration/test_openai_error_propagation.py:33:1: W293 Blank line contains whitespace
[failure] 30-30: Ruff (W293)
src/backend/tests/integration/test_openai_error_propagation.py:30:1: W293 Blank line contains whitespace
[failure] 24-24: Ruff (W293)
src/backend/tests/integration/test_openai_error_propagation.py:24:1: W293 Blank line contains whitespace
[failure] 17-17: Ruff (W293)
src/backend/tests/integration/test_openai_error_propagation.py:17:1: W293 Blank line contains whitespace
[failure] 12-12: Ruff (W293)
src/backend/tests/integration/test_openai_error_propagation.py:12:1: W293 Blank line contains whitespace
🤖 Prompt for AI Agents
In `@src/backend/tests/integration/test_openai_error_propagation.py` around lines
11 - 36, There are whitespace-only blank lines in the test block that declare
invalid_flow_id, headers, payload, response and response_data which trigger Ruff
W293; remove those trailing/blank-only lines (or run the project formatter) so
there are no lines containing only spaces or tabs in that test function (look
for the block using invalid_flow_id, headers, payload, response, response_data
and trim any blank/whitespace-only lines).
| @pytest.mark.integration | ||
| async def test_openai_streaming_runtime_error_format(client: AsyncClient, created_api_key, simple_api_test): | ||
| """Test that runtime errors during streaming are properly formatted. | ||
|
|
||
| This test verifies the fix for the bug where error events during flow execution | ||
| were not being propagated to clients using the OpenAI SDK. The fix ensures errors | ||
| are sent as content chunks with finish_reason="error" instead of custom error events. | ||
|
|
||
| Note: This test validates the error chunk format. Runtime errors during actual | ||
| flow execution will be formatted the same way. | ||
| """ | ||
| headers = {"x-api-key": created_api_key.api_key} | ||
| payload = { | ||
| "model": str(simple_api_test["id"]), | ||
| "input": "test input", | ||
| "stream": True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find the simple_api_test fixture definition
rg -n "def simple_api_test" -A 15 src/backend/tests/Repository: langflow-ai/langflow
Length of output: 46
🏁 Script executed:
#!/bin/bash
# Look at the full test implementation
cat -n src/backend/tests/integration/test_openai_error_propagation.pyRepository: langflow-ai/langflow
Length of output: 5992
🏁 Script executed:
#!/bin/bash
# Search for any error-related setup in the test file
rg -n "error|raise|monkeypatch" src/backend/tests/integration/test_openai_error_propagation.pyRepository: langflow-ai/langflow
Length of output: 1631
🏁 Script executed:
#!/bin/bash
# Search for simple_api_test fixture in all test files
find src/backend/tests -name "conftest.py" -o -name "*.py" | xargs rg -n "def simple_api_test" -B 2 -A 10Repository: langflow-ai/langflow
Length of output: 48
🏁 Script executed:
#!/bin/bash
# Also search for simple_api_test as a fixture marker
rg -n "@pytest.fixture" -A 5 src/backend/tests | grep -A 5 "simple_api_test"Repository: langflow-ai/langflow
Length of output: 1194
🏁 Script executed:
#!/bin/bash
# Get the full simple_api_test fixture implementation
sed -n '765,777p' src/backend/tests/conftest.pyRepository: langflow-ai/langflow
Length of output: 720
🏁 Script executed:
#!/bin/bash
# Also check what json_simple_api_test contains
rg -n "SIMPLE_API_TEST" src/backend/tests/conftest.py -B 2 -A 2Repository: langflow-ai/langflow
Length of output: 539
🏁 Script executed:
#!/bin/bash
# Find where SIMPLE_API_TEST is defined
rg -r "SIMPLE_API_TEST\s*=" src/backend/testsRepository: langflow-ai/langflow
Length of output: 4391
🏁 Script executed:
#!/bin/bash
# Find the SimpleAPITest.json file
find src/backend/tests -name "SimpleAPITest.json" -o -name "*SimpleAPI*"Repository: langflow-ai/langflow
Length of output: 105
🏁 Script executed:
#!/bin/bash
# Look for data files directory
find src/backend/tests -type d -name "data" -o -name "json"Repository: langflow-ai/langflow
Length of output: 119
🏁 Script executed:
#!/bin/bash
# Examine the SimpleAPITest.json to understand the flow
cat src/backend/tests/data/SimpleAPITest.json | head -100Repository: langflow-ai/langflow
Length of output: 3135
🏁 Script executed:
#!/bin/bash
# Check the full size and structure
wc -l src/backend/tests/data/SimpleAPITest.json
jq '.data | keys' src/backend/tests/data/SimpleAPITest.json 2>/dev/null || echo "File may not be valid JSON or jq not available"Repository: langflow-ai/langflow
Length of output: 148
🏁 Script executed:
#!/bin/bash
# Extract and examine the nodes in SimpleAPITest.json
jq '.data.nodes[] | {id, type: .data.type}' src/backend/tests/data/SimpleAPITest.jsonRepository: langflow-ai/langflow
Length of output: 224
🏁 Script executed:
#!/bin/bash
# Check if there are any component references that might fail or error
jq '.data.nodes[] | select(.data.type | contains("Error") or contains("Exception"))' src/backend/tests/data/SimpleAPITest.jsonRepository: langflow-ai/langflow
Length of output: 46
🏁 Script executed:
#!/bin/bash
# Look at the actual test more carefully - does it have any assertions checking for errors?
grep -n "finish_reason" src/backend/tests/integration/test_openai_error_propagation.py -B 3 -A 3Repository: langflow-ai/langflow
Length of output: 2163
Add error scenario to test actual finish_reason="error" handling during streaming.
The test validates chunk format structure but never triggers a runtime error. The simple_api_test fixture uses a basic flow (ChatInput → TextInput → ChatOutput) that completes successfully without errors. Use monkeypatch to force a failure during flow execution (e.g., mock a component method to raise an exception) and verify that an error chunk with finish_reason="error" is properly sent to the client. This ensures the error propagation fix actually works in practice, not just theoretically.
🧰 Tools
🪛 GitHub Check: Ruff Style Check (3.13)
[failure] 52-52: Ruff (W293)
src/backend/tests/integration/test_openai_error_propagation.py:52:1: W293 Blank line contains whitespace
[failure] 48-48: Ruff (W293)
src/backend/tests/integration/test_openai_error_propagation.py:48:1: W293 Blank line contains whitespace
🤖 Prompt for AI Agents
In `@src/backend/tests/integration/test_openai_error_propagation.py` around lines
45 - 60, The test test_openai_streaming_runtime_error_format currently never
triggers a runtime failure; patch the flow used by the simple_api_test fixture
to raise an exception during execution (use monkeypatch to stub a component
method such as ChatOutput.execute or TextInput.process to raise an exception)
before calling the streaming endpoint with client; then assert the streamed
chunks include a content chunk with finish_reason="error" and the expected error
payload to verify real error propagation.
| # Parse the streaming response | ||
| chunks = [] | ||
| for line in response.text.split("\n"): | ||
| if line.startswith("data: ") and not line.startswith("data: [DONE]"): | ||
| data_str = line[6:] | ||
| try: | ||
| chunk_data = json.loads(data_str) | ||
| chunks.append(chunk_data) | ||
| except json.JSONDecodeError: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n src/backend/tests/integration/test_openai_error_propagation.py | head -150Repository: langflow-ai/langflow
Length of output: 5992
🏁 Script executed:
cat -n src/backend/tests/conftest.py | head -200Repository: langflow-ai/langflow
Length of output: 10200
🏁 Script executed:
grep -n "def client" src/backend/tests/conftest.py | head -20Repository: langflow-ai/langflow
Length of output: 93
🏁 Script executed:
sed -n '406,450p' src/backend/tests/conftest.pyRepository: langflow-ai/langflow
Length of output: 1927
🏁 Script executed:
rg -A 10 "aiter_lines" src/backend/tests --type pyRepository: langflow-ai/langflow
Length of output: 19670
🏁 Script executed:
rg -B 5 "client.stream" src/backend/tests --type py | head -50Repository: langflow-ai/langflow
Length of output: 1084
🏁 Script executed:
rg -B 3 -A 10 "async with client.stream" src/backend/tests/unit/test_endpoints.pyRepository: langflow-ai/langflow
Length of output: 1323
Use streaming iteration (client.stream + response.aiter_lines()) instead of response.text for both streaming tests.
response.text buffers the entire response body and doesn't exercise SSE streaming behavior. Use the streaming pattern for consistency with other streaming endpoint tests in the codebase.
♻️ Suggested pattern (apply to both streaming tests)
- response = await client.post(
- "api/v1/responses",
- json=payload,
- headers=headers,
- )
-
- assert response.status_code == 200
-
- # Parse the streaming response
- chunks = []
- for line in response.text.split("\n"):
+ chunks = []
+ async with client.stream(
+ "POST",
+ "api/v1/responses",
+ json=payload,
+ headers=headers,
+ ) as response:
+ assert response.status_code == 200
+ async for line in response.aiter_lines():
if line.startswith("data: ") and not line.startswith("data: [DONE]"):
data_str = line[6:]
try:
chunk_data = json.loads(data_str)
chunks.append(chunk_data)
except json.JSONDecodeError:
pass🤖 Prompt for AI Agents
In `@src/backend/tests/integration/test_openai_error_propagation.py` around lines
71 - 80, Replace the current synchronous parsing that reads response.text with
the streaming pattern used elsewhere: call client.stream(...) to get the
response and iterate asynchronously over response.aiter_lines(), collecting
lines that start with "data: " (excluding "data: [DONE]"), parse each data
payload with json.loads and append to the existing chunks list; update the test
that currently uses the response variable and the for line in response.text loop
to use the async stream/aiter_lines approach so the test exercises SSE streaming
behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/backend/base/langflow/api/v1/endpoints.py (1)
383-387: Fix lint failure and preserve cancellation semantics.Ruff flags the blind
Exceptioncatch (BLE001). Additionally,asyncio.CancelledErrorshould not be converted into an error event—it should propagate normally while still allowing the finally block to execute. Explicitly re-raiseasyncio.CancelledErrorand suppress BLE001 for the broad catch.✅ Suggested fix
+ except asyncio.CancelledError: + raise except Exception as e: # noqa: BLE001 - intentional to propagate streaming errors await logger.aerror(f"Error running flow: {e}") event_manager.on_error(data={"error": str(e)})
🧹 Nitpick comments (1)
src/backend/tests/integration/test_openai_error_propagation.py (1)
63-97: Use streaming iteration for SSE assertions.
response.textbuffers the full response and doesn’t validate streaming behavior. Preferasync for line in response.aiter_lines()and stop on[DONE]to mirror real SSE consumption.♻️ Suggested pattern
- for line in response.text.split("\n"): + async for line in response.aiter_lines(): if line.startswith("data: ") and not line.startswith("data: [DONE]"): data_str = line[6:] try: chunk_data = json.loads(data_str) chunks.append(chunk_data) except json.JSONDecodeError: pass + if line.startswith("data: [DONE]"): + breakAs per coding guidelines, streaming endpoints should be validated using line-by-line iteration.
Also applies to: 121-134
This pull request improves error handling and OpenAI-compatible streaming error propagation in the API, ensuring that runtime and streaming errors are surfaced to clients in a way that the OpenAI SDK can properly parse and handle. It introduces a new error chunk format for streaming errors, updates event handling, and adds integration tests to verify correct error propagation and response formatting.
Error handling and streaming improvements:
run_flow_generatorto catch all exceptions, ensuring that all errors are properly propagated during streaming.finish_reason="error"using the newcreate_openai_error_chunkutility, so OpenAI SDK clients receive errors in a parseable format. [1] [2]finish_reason="stop"to successful streaming completions for OpenAI compatibility.Schema and utility enhancements:
finish_reasonto theOpenAIResponsesStreamChunkschema and implemented thecreate_openai_error_chunkutility for creating error chunks in the OpenAI streaming format. [1] [2] [3] [4]"on_error"event in the event manager to support error streaming.Testing:
finish_reason="stop".Summary by CodeRabbit
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.