Skip to content

Commit 3d749ea

Browse files
authored
Merge branch 'main' into job-status-apis
2 parents 0bc3eca + 1d82026 commit 3d749ea

File tree

15 files changed

+1317
-46
lines changed

15 files changed

+1317
-46
lines changed

docker/build_and_push.Dockerfile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ RUN apt-get update \
3333
npm \
3434
# gcc
3535
gcc \
36+
curl \
37+
&& curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
38+
&& apt-get install -y nodejs \
3639
&& apt-get clean \
3740
&& rm -rf /var/lib/apt/lists/*
3841

@@ -75,7 +78,7 @@ FROM python:3.12.3-slim AS runtime
7578
RUN apt-get update \
7679
&& apt-get upgrade -y \
7780
&& apt-get install -y curl git libpq5 gnupg \
78-
&& curl -fsSL https://deb.nodesource.com/setup_18.x | bash - \
81+
&& curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
7982
&& apt-get install -y nodejs \
8083
&& apt-get clean \
8184
&& rm -rf /var/lib/apt/lists/* \

src/backend/base/langflow/api/v1/endpoints.py

Lines changed: 145 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import json
45
import time
56
from collections.abc import AsyncGenerator
67
from http import HTTPStatus
@@ -44,12 +45,18 @@
4445
from langflow.interface.initialize.loading import update_params_with_load_from_db_fields
4546
from langflow.processing.process import process_tweaks, run_graph_internal
4647
from langflow.schema.graph import Tweaks
47-
from langflow.services.auth.utils import api_key_security, get_current_active_user, get_webhook_user
48+
from langflow.services.auth.utils import (
49+
api_key_security,
50+
get_current_active_user,
51+
get_current_user_for_sse,
52+
get_webhook_user,
53+
)
4854
from langflow.services.cache.utils import save_uploaded_file
4955
from langflow.services.database.models.flow.model import Flow, FlowRead
5056
from langflow.services.database.models.flow.utils import get_all_webhook_components_in_flow
5157
from langflow.services.database.models.user.model import User, UserRead
5258
from langflow.services.deps import get_session_service, get_settings_service, get_telemetry_service
59+
from langflow.services.event_manager import create_webhook_event_manager, webhook_event_manager
5360
from langflow.services.telemetry.schema import RunPayload
5461
from langflow.utils.compression import compress_response
5562
from langflow.utils.version import get_version_info
@@ -59,6 +66,9 @@
5966

6067
router = APIRouter(tags=["Base"])
6168

69+
# SSE Constants
70+
SSE_HEARTBEAT_TIMEOUT_SECONDS = 30.0
71+
6272

6373
async def parse_input_request_from_body(http_request: Request) -> SimplifiedAPIRequest:
6474
"""Parse SimplifiedAPIRequest from HTTP request body.
@@ -195,6 +205,13 @@ async def simple_run_flow(
195205
raise ValueError(str(exc)) from exc
196206

197207

208+
def _get_vertex_ids_from_flow(flow: Flow) -> list[str]:
209+
"""Extract vertex IDs from flow data."""
210+
if not flow.data or not flow.data.get("nodes"):
211+
return []
212+
return [node.get("id") for node in flow.data.get("nodes", []) if node.get("id")]
213+
214+
198215
async def simple_run_flow_task(
199216
flow: Flow,
200217
input_request: SimplifiedAPIRequest,
@@ -205,17 +222,54 @@ async def simple_run_flow_task(
205222
telemetry_service=None,
206223
start_time: float | None = None,
207224
run_id: str | None = None,
225+
emit_events: bool = False,
226+
flow_id: str | None = None,
208227
):
209-
"""Run a flow task as a BackgroundTask, therefore it should not throw exceptions."""
228+
"""Run a flow task as a BackgroundTask, therefore it should not throw exceptions.
229+
230+
Args:
231+
flow: The flow to execute
232+
input_request: The simplified API request
233+
stream: Whether to stream results
234+
api_key_user: The user executing the flow
235+
event_manager: Event manager for streaming
236+
telemetry_service: Service for logging telemetry
237+
start_time: Start time for duration calculation
238+
run_id: Unique ID for this run
239+
emit_events: Whether to emit events to webhook_event_manager (for UI feedback)
240+
flow_id: Flow ID for event emission (required if emit_events=True)
241+
"""
242+
should_emit = emit_events and flow_id
243+
244+
# Create an EventManager that forwards events to webhook SSE if we should emit
245+
webhook_em = None
246+
if should_emit and event_manager is None and flow_id is not None:
247+
webhook_em = create_webhook_event_manager(flow_id, run_id)
248+
249+
# Use provided event_manager or the webhook one
250+
effective_event_manager = event_manager or webhook_em
251+
210252
try:
253+
if should_emit and flow_id is not None:
254+
vertex_ids = _get_vertex_ids_from_flow(flow)
255+
await webhook_event_manager.emit(
256+
flow_id,
257+
"vertices_sorted",
258+
{"ids": vertex_ids, "to_run": vertex_ids, "run_id": run_id},
259+
)
260+
211261
result = await simple_run_flow(
212262
flow=flow,
213263
input_request=input_request,
214264
stream=stream,
215265
api_key_user=api_key_user,
216-
event_manager=event_manager,
266+
event_manager=effective_event_manager,
217267
run_id=run_id,
218268
)
269+
270+
if should_emit and flow_id is not None:
271+
await webhook_event_manager.emit(flow_id, "end", {"run_id": run_id, "success": True})
272+
219273
if telemetry_service and start_time is not None:
220274
await telemetry_service.log_package_run(
221275
RunPayload(
@@ -230,6 +284,10 @@ async def simple_run_flow_task(
230284

231285
except Exception as exc: # noqa: BLE001
232286
await logger.aexception(f"Error running flow {flow.id} task")
287+
288+
if should_emit and flow_id is not None:
289+
await webhook_event_manager.emit(flow_id, "end", {"run_id": run_id, "success": False, "error": str(exc)})
290+
233291
if telemetry_service and start_time is not None:
234292
await telemetry_service.log_package_run(
235293
RunPayload(
@@ -608,23 +666,82 @@ async def simplified_run_flow_session(
608666
)
609667

610668

669+
@router.get("/webhook-events/{flow_id_or_name}")
670+
async def webhook_events_stream(
671+
flow_id_or_name: str, # noqa: ARG001 - Used by get_flow_by_id_or_endpoint_name dependency
672+
flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)],
673+
request: Request,
674+
):
675+
"""Server-Sent Events (SSE) endpoint for real-time webhook build updates.
676+
677+
When a flow is open in the UI, this endpoint provides live feedback
678+
of webhook execution progress, similar to clicking "Play" in the UI.
679+
680+
Authentication: Requires user to be logged in (via cookie) or provide API key.
681+
The user must own the flow to subscribe to its events.
682+
"""
683+
# Authenticate user via cookie or API key
684+
user = await get_current_user_for_sse(request)
685+
686+
# Verify user owns the flow
687+
if str(flow.user_id) != str(user.id):
688+
raise HTTPException(
689+
status_code=HTTPStatus.FORBIDDEN,
690+
detail="Access denied: You can only subscribe to events for flows you own",
691+
)
692+
693+
async def event_generator() -> AsyncGenerator[str, None]:
694+
"""Generate SSE events from the webhook event manager."""
695+
flow_id_str = str(flow.id)
696+
queue = await webhook_event_manager.subscribe(flow_id_str)
697+
698+
try:
699+
# Send initial connection event
700+
yield f"event: connected\ndata: {json.dumps({'flow_id': flow_id_str, 'flow_name': flow.name})}\n\n"
701+
702+
while True:
703+
if await request.is_disconnected():
704+
break
705+
706+
try:
707+
event = await asyncio.wait_for(queue.get(), timeout=SSE_HEARTBEAT_TIMEOUT_SECONDS)
708+
event_type = event["event"]
709+
event_data = json.dumps(event["data"])
710+
yield f"event: {event_type}\ndata: {event_data}\n\n"
711+
except asyncio.TimeoutError:
712+
yield f"event: heartbeat\ndata: {json.dumps({'timestamp': time.time()})}\n\n"
713+
714+
except asyncio.CancelledError:
715+
pass
716+
finally:
717+
await webhook_event_manager.unsubscribe(flow_id_str, queue)
718+
719+
return StreamingResponse(
720+
event_generator(),
721+
media_type="text/event-stream",
722+
headers={
723+
"Cache-Control": "no-cache",
724+
"Connection": "keep-alive",
725+
"X-Accel-Buffering": "no",
726+
},
727+
)
728+
729+
611730
@router.post("/webhook/{flow_id_or_name}", response_model=dict, status_code=HTTPStatus.ACCEPTED) # noqa: RUF100, FAST003
612731
async def webhook_run_flow(
613732
flow_id_or_name: str,
614733
flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)],
615734
request: Request,
616-
background_tasks: BackgroundTasks,
617735
):
618736
"""Run a flow using a webhook request.
619737
620738
Args:
621-
flow_id_or_name (str): The flow ID or endpoint name.
622-
flow (Flow): The flow to be executed.
623-
request (Request): The incoming HTTP request.
624-
background_tasks (BackgroundTasks): The background tasks manager.
739+
flow_id_or_name: The flow ID or endpoint name (used by dependency).
740+
flow: The flow to be executed.
741+
request: The incoming HTTP request.
625742
626743
Returns:
627-
dict: A dictionary containing the status of the task.
744+
A dictionary containing the status of the task.
628745
629746
Raises:
630747
HTTPException: If the flow is not found or if there is an error processing the request.
@@ -662,17 +779,28 @@ async def webhook_run_flow(
662779
session_id=None,
663780
)
664781

782+
# Check if there are UI listeners connected via SSE
783+
flow_id_str = str(flow.id)
784+
has_ui_listeners = webhook_event_manager.has_listeners(flow_id_str)
785+
665786
await logger.adebug("Starting background task")
666787
run_id = str(uuid4())
667-
background_tasks.add_task(
668-
simple_run_flow_task,
669-
flow=flow,
670-
input_request=input_request,
671-
api_key_user=webhook_user,
672-
telemetry_service=telemetry_service,
673-
start_time=start_time,
674-
run_id=run_id,
788+
789+
# Use asyncio.create_task to run in same event loop (needed for SSE)
790+
background_task = asyncio.create_task(
791+
simple_run_flow_task(
792+
flow=flow,
793+
input_request=input_request,
794+
api_key_user=webhook_user,
795+
telemetry_service=telemetry_service,
796+
start_time=start_time,
797+
run_id=run_id,
798+
emit_events=has_ui_listeners,
799+
flow_id=flow_id_str,
800+
)
675801
)
802+
# Fire-and-forget: log exceptions but don't block
803+
background_task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
676804
except Exception as exc:
677805
error_msg = str(exc)
678806
raise HTTPException(status_code=500, detail=error_msg) from exc

src/backend/base/langflow/services/auth/utils.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,46 @@ async def get_current_user_for_websocket(
333333
)
334334

335335

336+
async def get_current_user_for_sse(request: Request) -> User | UserRead:
337+
"""Authenticate user for SSE endpoints.
338+
339+
Similar to websocket authentication, accepts either:
340+
- Cookie authentication (access_token_lf)
341+
- API key authentication (x-api-key query param)
342+
343+
Args:
344+
request: The FastAPI request object
345+
346+
Returns:
347+
User or UserRead: The authenticated user
348+
349+
Raises:
350+
HTTPException: If authentication fails
351+
"""
352+
# Try cookie authentication first
353+
token = request.cookies.get("access_token_lf")
354+
if token:
355+
try:
356+
async with session_scope() as db:
357+
user = await get_current_user_by_jwt(token, db)
358+
if user:
359+
return user
360+
except HTTPException:
361+
pass
362+
363+
# Try API key authentication
364+
api_key = request.query_params.get("x-api-key") or request.headers.get("x-api-key")
365+
if api_key:
366+
user_read = await ws_api_key_security(api_key)
367+
if user_read:
368+
return user_read
369+
370+
raise HTTPException(
371+
status_code=status.HTTP_403_FORBIDDEN,
372+
detail="Missing or invalid credentials (cookie or API key).",
373+
)
374+
375+
336376
async def get_current_active_user(current_user: Annotated[User, Depends(get_current_user)]):
337377
if not current_user.is_active:
338378
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user")

0 commit comments

Comments
 (0)