Skip to content

Commit fb10261

Browse files
Cristhianzlclaudeautofix-ci[bot]
authored
feat: Add robust startup retry logic and port conflict resolution (#10347)
* fix mcp timeout and settings * ✨ (service.py): add new methods to read process output, ensure port availability, log startup error details, and normalize config values 🐛 (service.py): fix handling of process output, error messages, and port availability to improve error handling and logging during startup * ✨ (service.py): add tracking of which project is using which port to prevent conflicts 🐛 (service.py): check if another project is using the same port before starting a new project to avoid conflicts 🐛 (service.py): handle releasing port and cleaning up port tracking when stopping a project to prevent memory leaks 🐛 (service.py): re-check port availability before each retry when starting a project to prevent race conditions 🐛 (service.py): register the port used by a project to prevent other projects from using the same port 🐛 (McpServerTab.tsx): invalidate MCP project data and composer URL queries to refresh auth settings and OAuth server info 🐛 (McpServerTab.tsx): clear waiting state if the auth type is not OAuth * fix killing port on tab changes * add not persist oauth on failure start * add oath check on available port * add improvements mcp service * fix: suppress SSE streaming warnings and handle connection closed errors - Add warning filters for ResourceWarning from anyio streams - Handle SSE connection closed errors gracefully - Prevent log spam from normal SSE lifecycle events 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * add host and port to uvx command, safe redact value * fix error messages state not been displayed * fix mypy errors * [autofix.ci] apply automated fixes * add the correct auth test --------- Co-authored-by: Claude <[email protected]> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent b31b2e9 commit fb10261

File tree

10 files changed

+2230
-199
lines changed

10 files changed

+2230
-199
lines changed

src/backend/base/langflow/api/v1/mcp_projects.py

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,9 @@ async def update_project_mcp_settings(
385385
should_start_composer = False
386386
should_stop_composer = False
387387

388+
# Store original auth settings in case we need to rollback
389+
original_auth_settings = project.auth_settings
390+
388391
# Update project-level auth settings with encryption
389392
if "auth_settings" in request.model_fields_set and request.auth_settings is not None:
390393
auth_result = handle_auth_settings_update(
@@ -396,8 +399,6 @@ async def update_project_mcp_settings(
396399
should_start_composer = auth_result["should_start_composer"]
397400
should_stop_composer = auth_result["should_stop_composer"]
398401

399-
session.add(project)
400-
401402
# Query flows in the project
402403
flows = (await session.exec(select(Flow).where(Flow.folder_id == project_id))).all()
403404
flows_to_update = {x.id: x for x in request.settings}
@@ -416,13 +417,17 @@ async def update_project_mcp_settings(
416417
session.add(flow)
417418
updated_flows.append(flow)
418419

419-
await session.commit()
420-
421420
response: dict[str, Any] = {
422421
"message": f"Updated MCP settings for {len(updated_flows)} flows and project auth settings"
423422
}
424423

424+
# Handle MCP Composer start/stop before committing auth settings
425425
if should_handle_mcp_composer:
426+
# Get MCP Composer service once for all branches
427+
mcp_composer_service: MCPComposerService = cast(
428+
MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE)
429+
)
430+
426431
if should_start_composer:
427432
await logger.adebug(
428433
f"Auth settings changed to OAuth for project {project.name} ({project_id}), "
@@ -434,26 +439,37 @@ async def update_project_mcp_settings(
434439
auth_config = await _get_mcp_composer_auth_config(project)
435440
await get_or_start_mcp_composer(auth_config, project.name, project_id)
436441
composer_sse_url = await get_composer_sse_url(project)
442+
# Clear any previous error on success
443+
mcp_composer_service.clear_last_error(str(project_id))
437444
response["result"] = {
438445
"project_id": str(project_id),
439446
"sse_url": composer_sse_url,
440447
"uses_composer": True,
441448
}
442449
except MCPComposerError as e:
450+
# Don't rollback auth settings - persist them so UI can show the error
451+
await logger.awarning(f"MCP Composer failed to start for project {project_id}: {e.message}")
452+
# Store the error message so it can be retrieved via composer-url endpoint
453+
mcp_composer_service.set_last_error(str(project_id), e.message)
443454
response["result"] = {
444455
"project_id": str(project_id),
445456
"uses_composer": True,
446457
"error_message": e.message,
447458
}
448459
except Exception as e:
449-
# Unexpected errors
450-
await logger.aerror(f"Failed to get mcp composer URL for project {project_id}: {e}")
460+
# Rollback auth settings on unexpected errors
461+
await logger.aerror(
462+
f"Unexpected error starting MCP Composer for project {project_id}, "
463+
f"rolling back auth settings: {e}"
464+
)
465+
project.auth_settings = original_auth_settings
451466
raise HTTPException(status_code=500, detail=str(e)) from e
452467
else:
453-
# This shouldn't happen - we determined we should start composer but now we can't use it
468+
# OAuth is set but MCP Composer is disabled - save settings but return error
454469
await logger.aerror(
455470
f"PATCH: OAuth set but MCP Composer is disabled in settings for project {project_id}"
456471
)
472+
# Don't rollback - keep the auth settings so they can be used when composer is enabled
457473
response["result"] = {
458474
"project_id": str(project_id),
459475
"uses_composer": False,
@@ -464,10 +480,9 @@ async def update_project_mcp_settings(
464480
f"Auth settings changed from OAuth for project {project.name} ({project_id}), "
465481
"stopping MCP Composer"
466482
)
467-
mcp_composer_service: MCPComposerService = cast(
468-
MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE)
469-
)
470483
await mcp_composer_service.stop_project_composer(str(project_id))
484+
# Clear any error when user explicitly disables OAuth
485+
mcp_composer_service.clear_last_error(str(project_id))
471486

472487
# Provide the direct SSE URL since we're no longer using composer
473488
sse_url = await get_project_sse_url(project_id)
@@ -480,6 +495,10 @@ async def update_project_mcp_settings(
480495
"uses_composer": False,
481496
}
482497

498+
# Only commit if composer started successfully (or wasn't needed)
499+
session.add(project)
500+
await session.commit()
501+
483502
return response
484503

485504
except Exception as e:
@@ -731,7 +750,22 @@ async def get_project_composer_url(
731750
"""
732751
try:
733752
project = await verify_project_access(project_id, current_user)
753+
754+
# Check if there's a recent error from a failed OAuth attempt
755+
mcp_composer_service: MCPComposerService = cast(
756+
MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE)
757+
)
758+
last_error = mcp_composer_service.get_last_error(str(project_id))
759+
734760
if not should_use_mcp_composer(project):
761+
# If there's a recent error, return it even though OAuth is not currently active
762+
# This happens when OAuth was attempted but rolled back due to an error
763+
if last_error:
764+
return {
765+
"project_id": str(project_id),
766+
"uses_composer": False,
767+
"error_message": last_error,
768+
}
735769
return {
736770
"project_id": str(project_id),
737771
"uses_composer": False,

src/backend/base/langflow/main.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@
4747
# Ignore Pydantic deprecation warnings from Langchain
4848
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20)
4949

50+
# Suppress ResourceWarning from anyio streams (SSE connections)
51+
warnings.filterwarnings("ignore", category=ResourceWarning, message=".*MemoryObjectReceiveStream.*")
52+
warnings.filterwarnings("ignore", category=ResourceWarning, message=".*MemoryObjectSendStream.*")
53+
5054
_tasks: list[asyncio.Task] = []
5155

5256
MAX_PORT = 65535
@@ -458,6 +462,17 @@ async def exception_handler(_request: Request, exc: Exception):
458462
status_code=exc.status_code,
459463
content={"message": str(exc.detail)},
460464
)
465+
466+
# Suppress known SSE streaming errors that are harmless
467+
exc_str = str(exc)
468+
if "Unexpected message" in exc_str and "http.response.start" in exc_str:
469+
# This is a known issue with SSE connections being closed
470+
await logger.adebug(f"SSE connection closed: {exc_str}")
471+
return JSONResponse(
472+
status_code=200,
473+
content={"message": "Connection closed"},
474+
)
475+
461476
await logger.aerror(f"unhandled error: {exc}", exc_info=exc)
462477

463478
await log_exception_to_telemetry(exc, "handler")

0 commit comments

Comments
 (0)