Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions api/services/trigger/webhook_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,10 +816,18 @@ class Cache(BaseModel):
not_found_in_cache.append(node_id)
continue

with Session(db.engine) as session:
try:
# lock the concurrent webhook trigger creation
redis_client.lock(f"{cls.__WEBHOOK_NODE_CACHE_KEY__}:apps:{app.id}:lock", timeout=10)
lock_key = f"{cls.__WEBHOOK_NODE_CACHE_KEY__}:apps:{app.id}:lock"
lock = redis_client.lock(lock_key, timeout=10)
lock_acquired = False

try:
# acquire the lock with blocking and timeout
lock_acquired = lock.acquire(blocking=True, blocking_timeout=10)
if not lock_acquired:
logger.warning("Failed to acquire lock for webhook sync, app %s", app.id)
raise RuntimeError("Failed to acquire lock for webhook trigger synchronization")

with Session(db.engine) as session:
# fetch the non-cached nodes from DB
all_records = session.scalars(
select(WorkflowWebhookTrigger).where(
Expand Down Expand Up @@ -854,11 +862,16 @@ class Cache(BaseModel):
session.delete(nodes_id_in_db[node_id])
redis_client.delete(f"{cls.__WEBHOOK_NODE_CACHE_KEY__}:{node_id}")
session.commit()
except Exception:
logger.exception("Failed to sync webhook relationships for app %s", app.id)
raise
finally:
redis_client.delete(f"{cls.__WEBHOOK_NODE_CACHE_KEY__}:apps:{app.id}:lock")
except Exception:
logger.exception("Failed to sync webhook relationships for app %s", app.id)
raise
finally:
# release the lock only if it was acquired
if lock_acquired:
try:
lock.release()
except Exception:
logger.exception("Failed to release lock for webhook sync, app %s", app.id)

@classmethod
def generate_webhook_id(cls) -> str:
Expand Down
3 changes: 2 additions & 1 deletion api/services/workflow_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import uuid
from collections.abc import Callable, Generator, Mapping, Sequence
from datetime import UTC, datetime
from typing import Any, cast

from sqlalchemy import exists, select
Expand Down Expand Up @@ -1032,7 +1033,7 @@ def _setup_variable_pool(
system_variable = SystemVariable(
user_id=user_id,
app_id=workflow.app_id,
timestamp=int(naive_utc_now().timestamp()),
timestamp=int(datetime.now(UTC).timestamp()),
workflow_id=workflow.id,
files=files or [],
workflow_execution_id=str(uuid.uuid4()),
Expand Down