From 39db56af1e70a9373c401f878e0ce8aef6d594e5 Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Sun, 5 Jan 2025 18:52:39 +0100 Subject: [PATCH 1/6] feat: workflow-benchmarking --- keep/api/routes/workflows.py | 57 ++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/keep/api/routes/workflows.py b/keep/api/routes/workflows.py index 7a5912996..e643afe10 100644 --- a/keep/api/routes/workflows.py +++ b/keep/api/routes/workflows.py @@ -1,6 +1,7 @@ import datetime import logging import os +import time from typing import Any, Dict, List, Optional import validators @@ -26,6 +27,7 @@ get_session, get_workflow, get_workflow_by_name, + get_workflow_executions_count, ) from keep.api.core.db import get_workflow_executions as get_workflow_executions_db from keep.api.models.alert import AlertDto, IncidentDto @@ -748,3 +750,58 @@ def get_workflow_execution_status( event_type=event_type, ) return workflow_execution_dto + + +@router.post( + "/{workflow_id}/benchmark", + description="Benchmark a workflow", +) +def benchmark_workflow( + workflow_id: str, + authenticated_entity: AuthenticatedEntity = Depends( + IdentityManagerFactory.get_auth_verifier(["write:workflows"]) + ), +) -> str: + if os.environ.get("KEEP_WF_BENCHMARK_ENABLED", "").lower() != "true": + return HTTPException(status_code=405, detail="Workflow benchmarking is not avaliable") + + TOTAL_WORKFLOWS_TO_RUN = 5000 + tenant_id = authenticated_entity.tenant_id + created_by = authenticated_entity.email + workflowmanager = WorkflowManager.get_instance() + + event_body = {"tenant_id": tenant_id, "id": "benchmark", "name": "benchmark"} + event_class = AlertDto + event = event_class(**event_body) + + time_start = datetime.datetime.now() + + result_lines = [] + result_lines.append(f"Started {TOTAL_WORKFLOWS_TO_RUN} workflows") + + def get_total_workflow_executions_count(tenant_id): + wf_executions_before_benchmark = get_workflow_executions_count(tenant_id) + return wf_executions_before_benchmark.get("success", 0) + + total_wf_executions_before_benchmark = get_total_workflow_executions_count(tenant_id) + + for _ in range(1, TOTAL_WORKFLOWS_TO_RUN): + workflowmanager.scheduler.handle_manual_event_workflow( + workflow_id, tenant_id, created_by, event + ) + + result_lines.append(f"Scheduling took {datetime.datetime.now() - time_start}") + execution_time_start = datetime.datetime.now() + + while get_total_workflow_executions_count(tenant_id) - \ + total_wf_executions_before_benchmark < TOTAL_WORKFLOWS_TO_RUN - 1: + + time.sleep(1) + result_lines.append(f"Time: {datetime.datetime.now() - time_start}, workflows: {get_total_workflow_executions_count(tenant_id) - total_wf_executions_before_benchmark}") + print("\n".join(result_lines)) + + result_lines.append(f"Finished {TOTAL_WORKFLOWS_TO_RUN} workflows in {datetime.datetime.now() - time_start}") + result_lines.append(f"After scheduling finished: {datetime.datetime.now() - execution_time_start}") + result_lines.append(f"WF/second: {TOTAL_WORKFLOWS_TO_RUN / (datetime.datetime.now() - execution_time_start).total_seconds()}") + + return "\r\n".join(result_lines) From bb360667c564c647abbed63ceddde33c6016e59f Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Sun, 5 Jan 2025 18:58:33 +0100 Subject: [PATCH 2/6] 1000 --- keep/api/routes/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keep/api/routes/workflows.py b/keep/api/routes/workflows.py index e643afe10..fd7f5877c 100644 --- a/keep/api/routes/workflows.py +++ b/keep/api/routes/workflows.py @@ -765,7 +765,7 @@ def benchmark_workflow( if os.environ.get("KEEP_WF_BENCHMARK_ENABLED", "").lower() != "true": return HTTPException(status_code=405, detail="Workflow benchmarking is not avaliable") - TOTAL_WORKFLOWS_TO_RUN = 5000 + TOTAL_WORKFLOWS_TO_RUN = 1000 tenant_id = authenticated_entity.tenant_id created_by = authenticated_entity.email workflowmanager = WorkflowManager.get_instance() From d865bbd7171bd45b06b0f3bc62795f6cbcd8d4aa Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Mon, 6 Jan 2025 18:44:09 +0100 Subject: [PATCH 3/6] Fix --- keep/api/routes/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keep/api/routes/workflows.py b/keep/api/routes/workflows.py index fd7f5877c..5e9a2d0c8 100644 --- a/keep/api/routes/workflows.py +++ b/keep/api/routes/workflows.py @@ -761,7 +761,7 @@ def benchmark_workflow( authenticated_entity: AuthenticatedEntity = Depends( IdentityManagerFactory.get_auth_verifier(["write:workflows"]) ), -) -> str: +): if os.environ.get("KEEP_WF_BENCHMARK_ENABLED", "").lower() != "true": return HTTPException(status_code=405, detail="Workflow benchmarking is not avaliable") From 5dfa9d33b7791ee17542951d5fb590e98094a308 Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Mon, 6 Jan 2025 19:35:58 +0100 Subject: [PATCH 4/6] Fix --- keep/api/routes/workflows.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/keep/api/routes/workflows.py b/keep/api/routes/workflows.py index 5e9a2d0c8..f8f752e05 100644 --- a/keep/api/routes/workflows.py +++ b/keep/api/routes/workflows.py @@ -770,9 +770,7 @@ def benchmark_workflow( created_by = authenticated_entity.email workflowmanager = WorkflowManager.get_instance() - event_body = {"tenant_id": tenant_id, "id": "benchmark", "name": "benchmark"} - event_class = AlertDto - event = event_class(**event_body) + event = AlertDto({"tenant_id": tenant_id, "id": "benchmark", "name": "benchmark"}) time_start = datetime.datetime.now() From 5960a4809a5d09a29166cb8654fae073eaddd971 Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Mon, 6 Jan 2025 22:37:40 +0100 Subject: [PATCH 5/6] Exclude from schema --- keep/api/routes/workflows.py | 1 + 1 file changed, 1 insertion(+) diff --git a/keep/api/routes/workflows.py b/keep/api/routes/workflows.py index f8f752e05..633e058be 100644 --- a/keep/api/routes/workflows.py +++ b/keep/api/routes/workflows.py @@ -755,6 +755,7 @@ def get_workflow_execution_status( @router.post( "/{workflow_id}/benchmark", description="Benchmark a workflow", + include_in_schema=False, ) def benchmark_workflow( workflow_id: str, From 88e85cafaae184905267f10b08dea34066f07d9e Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Mon, 6 Jan 2025 22:38:44 +0100 Subject: [PATCH 6/6] Fix --- keep/api/routes/workflows.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/keep/api/routes/workflows.py b/keep/api/routes/workflows.py index 633e058be..2d2891445 100644 --- a/keep/api/routes/workflows.py +++ b/keep/api/routes/workflows.py @@ -763,6 +763,10 @@ def benchmark_workflow( IdentityManagerFactory.get_auth_verifier(["write:workflows"]) ), ): + """ + Should be removed once the WF optimization work is done, or exposed to customers to allow them + to benchmark workflows on their environments. + """ if os.environ.get("KEEP_WF_BENCHMARK_ENABLED", "").lower() != "true": return HTTPException(status_code=405, detail="Workflow benchmarking is not avaliable")