diff --git a/backend/prompt_studio/prompt_studio_core_v2/constants.py b/backend/prompt_studio/prompt_studio_core_v2/constants.py index 559fe4f7c..9dad3bb82 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/constants.py +++ b/backend/prompt_studio/prompt_studio_core_v2/constants.py @@ -22,6 +22,7 @@ class ToolStudioPromptKeys: CREATED_BY = "created_by" TOOL_ID = "tool_id" RUN_ID = "run_id" + EXECUTION_ID = "execution_id" NUMBER = "Number" FLOAT = "Float" PG_VECTOR = "Postgres pg_vector" diff --git a/backend/sample.env b/backend/sample.env index 58c2ef5d3..4cbd469e7 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -75,9 +75,9 @@ PROMPT_STUDIO_FILE_PATH=/app/prompt-studio-data # Structure Tool Image (Runs prompt studio exported tools) # https://hub.docker.com/r/unstract/tool-structure -STRUCTURE_TOOL_IMAGE_URL="docker:unstract/tool-structure:0.0.69" +STRUCTURE_TOOL_IMAGE_URL="docker:unstract/tool-structure:0.0.70" STRUCTURE_TOOL_IMAGE_NAME="unstract/tool-structure" -STRUCTURE_TOOL_IMAGE_TAG="0.0.69" +STRUCTURE_TOOL_IMAGE_TAG="0.0.70" # Feature Flags EVALUATION_SERVER_IP=unstract-flipt diff --git a/backend/usage_v2/migrations/0003_usage_usage_executi_4deb35_idx.py b/backend/usage_v2/migrations/0003_usage_usage_executi_4deb35_idx.py new file mode 100644 index 000000000..6a645fcf5 --- /dev/null +++ b/backend/usage_v2/migrations/0003_usage_usage_executi_4deb35_idx.py @@ -0,0 +1,19 @@ +# Generated by Django 4.2.1 on 2025-04-03 08:37 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("usage_v2", "0002_alter_usage_run_id"), + ] + + operations = [ + migrations.AddIndex( + model_name="usage", + index=models.Index( + fields=["execution_id"], name="usage_executi_4deb35_idx" + ), + ), + ] diff --git a/backend/usage_v2/models.py b/backend/usage_v2/models.py index a8db04490..9885f746e 100644 --- a/backend/usage_v2/models.py +++ b/backend/usage_v2/models.py @@ -79,4 +79,5 @@ class Meta: db_table = "usage" indexes = [ models.Index(fields=["run_id"]), + models.Index(fields=["execution_id"]), ] diff --git a/backend/workflow_manager/workflow_v2/execution.py b/backend/workflow_manager/workflow_v2/execution.py index b009e5076..ba5f3e363 100644 --- a/backend/workflow_manager/workflow_v2/execution.py +++ b/backend/workflow_manager/workflow_v2/execution.py @@ -14,6 +14,7 @@ from unstract.workflow_execution.dto import WorkflowDto from unstract.workflow_execution.enums import ExecutionType, LogComponent, LogState from unstract.workflow_execution.exceptions import StopExecution +from usage_v2.helper import UsageHelper from utils.local_context import StateStore from utils.user_context import UserContext from workflow_manager.file_execution.models import WorkflowFileExecution @@ -289,15 +290,21 @@ def publish_initial_workflow_logs(self, total_files: int) -> None: ) def publish_final_workflow_logs( - self, total_files: int, successful_files: int, failed_files: int + self, + total_files: int, + successful_files: int, + failed_files: int, ) -> None: """Publishes the final logs for the workflow. Returns: None """ + self.publish_average_cost_log(total_files=successful_files) + # To not associate final logs with a file execution self.file_execution_id = None + self.publish_update_log(LogState.END_WORKFLOW, "1", LogComponent.STATUS_BAR) self.publish_update_log( LogState.SUCCESS, "Executed successfully", LogComponent.WORKFLOW @@ -307,6 +314,30 @@ def publish_final_workflow_logs( f"{successful_files} successfully executed and {failed_files} error(s)" ) + def publish_average_cost_log(self, total_files: int): + + try: + execution = WorkflowExecution.objects.get(pk=self.execution_id) + total_cost = execution.get_aggregated_usage_cost + + if total_cost is not None: + average_cost = round(total_cost / total_files, 5) + self.publish_log( + message=( + f"The average cost per file for execution " + f"'{self.execution_id}' is '${average_cost}'" + ) + ) + except Exception as e: + logger.warning(f"Unable to get aggregated cost for '{self.id}': {str(e)}") + + def log_total_cost_per_file(self, run_id, file_name): + cost_dict = UsageHelper.get_aggregated_token_count(run_id=run_id) + cost = round(cost_dict.get("cost_in_dollars", 0), 5) + + # Log the total cost for a particular file executed in the workflow + self.publish_log(message=f"Total cost for file '{file_name}' is '${cost}'") + def publish_initial_tool_execution_logs( self, current_file_idx: int, total_files: int, file_name: str ) -> None: diff --git a/backend/workflow_manager/workflow_v2/models/execution.py b/backend/workflow_manager/workflow_v2/models/execution.py index e3537b570..dcf3043e3 100644 --- a/backend/workflow_manager/workflow_v2/models/execution.py +++ b/backend/workflow_manager/workflow_v2/models/execution.py @@ -6,8 +6,11 @@ from api_v2.models import APIDeployment from django.core.exceptions import ObjectDoesNotExist from django.db import models +from django.db.models import Sum from pipeline_v2.models import Pipeline from tags.models import Tag +from usage_v2.constants import UsageKeys +from usage_v2.models import Usage from utils.common_utils import CommonUtils from utils.models.base_model import BaseModel from workflow_manager.workflow_v2.enums import ExecutionStatus @@ -157,6 +160,37 @@ def pretty_execution_time(self) -> str: ) return str(timedelta(seconds=time_in_secs)).split(".")[0] + @property + def get_aggregated_usage_cost(self) -> Optional[float]: + """Retrieve aggregated cost for the given execution_id. + + + Returns: + Optional[float]: The total cost in dollars if available, else None. + + Raises: + APIException: For unexpected errors during database operations. + """ + # Aggregate the cost for the given execution_id + queryset = Usage.objects.filter(execution_id=self.id) + + if queryset.exists(): + result = queryset.aggregate(cost_in_dollars=Sum(UsageKeys.COST_IN_DOLLARS)) + total_cost = result.get(UsageKeys.COST_IN_DOLLARS) + else: + # Handle the case where no usage data is found for the given execution_id + logger.warning( + f"Usage data not found for the specified execution_id: {self.id}" + ) + return None + + logger.debug( + f"Cost aggregated successfully for execution_id: {self.id}" + f", Total cost: {total_cost}" + ) + + return total_cost + def __str__(self) -> str: return ( f"Workflow execution: {self.id} (" diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index adcfd5e3f..1417ff2ec 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -212,12 +212,15 @@ def process_input_files( else: execution_service.update_execution(ExecutionStatus.COMPLETED) + workflow_execution = execution_service.get_execution_instance() + execution_service.publish_final_workflow_logs( total_files=total_files, successful_files=successful_files, failed_files=failed_files, ) - return execution_service.get_execution_instance() + + return workflow_execution @staticmethod def _process_file( @@ -275,6 +278,11 @@ def _process_file( use_file_history=execution_service.use_file_history, file_execution_id=file_execution_id, ) + + execution_service.log_total_cost_per_file( + run_id=file_execution_id, file_name=file_name + ) + execution_service.publish_update_log( LogState.SUCCESS, f"{file_name}'s output is processed successfully", diff --git a/prompt-service/src/unstract/prompt_service/constants.py b/prompt-service/src/unstract/prompt_service/constants.py index 1eadb430d..2f007d731 100644 --- a/prompt-service/src/unstract/prompt_service/constants.py +++ b/prompt-service/src/unstract/prompt_service/constants.py @@ -7,6 +7,7 @@ class PromptServiceContants: OUTPUTS = "outputs" TOOL_ID = "tool_id" RUN_ID = "run_id" + EXECUTION_ID = "execution_id" FILE_NAME = "file_name" FILE_HASH = "file_hash" NAME = "name" diff --git a/prompt-service/src/unstract/prompt_service/main.py b/prompt-service/src/unstract/prompt_service/main.py index 397c75b98..e418e6f5a 100644 --- a/prompt-service/src/unstract/prompt_service/main.py +++ b/prompt-service/src/unstract/prompt_service/main.py @@ -107,6 +107,7 @@ def prompt_processor() -> Any: prompts = payload.get(PSKeys.OUTPUTS, []) tool_id: str = payload.get(PSKeys.TOOL_ID, "") run_id: str = payload.get(PSKeys.RUN_ID, "") + execution_id: str = payload.get(PSKeys.EXECUTION_ID, "") file_hash = payload.get(PSKeys.FILE_HASH) file_path = payload.get(PSKeys.FILE_PATH) doc_name = str(payload.get(PSKeys.FILE_NAME, "")) @@ -193,7 +194,7 @@ def prompt_processor() -> Any: ) try: - usage_kwargs = {"run_id": run_id} + usage_kwargs = {"run_id": run_id, "execution_id": execution_id} adapter_instance_id = output[PSKeys.LLM] llm = LLM( tool=util, diff --git a/tools/structure/src/config/properties.json b/tools/structure/src/config/properties.json index c11ff1368..9cbe68691 100644 --- a/tools/structure/src/config/properties.json +++ b/tools/structure/src/config/properties.json @@ -2,7 +2,7 @@ "schemaVersion": "0.0.1", "displayName": "Structure Tool", "functionName": "structure_tool", - "toolVersion": "0.0.69", + "toolVersion": "0.0.70", "description": "This is a template tool which can answer set of input prompts designed in the Prompt Studio", "input": { "description": "File that needs to be indexed and parsed for answers" diff --git a/tools/structure/src/constants.py b/tools/structure/src/constants.py index ecac9f1d3..32b097a4f 100644 --- a/tools/structure/src/constants.py +++ b/tools/structure/src/constants.py @@ -77,3 +77,4 @@ class SettingsKeys: TOOL = "tool" METRICS = "metrics" INDEXING = "indexing" + EXECUTION_ID = "execution_id" diff --git a/tools/structure/src/main.py b/tools/structure/src/main.py index 70f983bea..84cbaf46e 100644 --- a/tools/structure/src/main.py +++ b/tools/structure/src/main.py @@ -108,6 +108,7 @@ def run( # TODO : Resolve and pass log events ID payload = { SettingsKeys.RUN_ID: self.file_execution_id, + SettingsKeys.EXECUTION_ID: self.execution_id, SettingsKeys.TOOL_SETTINGS: tool_settings, SettingsKeys.OUTPUTS: outputs, SettingsKeys.TOOL_ID: tool_id, @@ -122,6 +123,7 @@ def run( usage_kwargs: dict[Any, Any] = dict() usage_kwargs[UsageKwargs.RUN_ID] = self.file_execution_id usage_kwargs[UsageKwargs.FILE_NAME] = self.source_file_name + usage_kwargs[UsageKwargs.EXECUTION_ID] = self.execution_id if tool_settings[SettingsKeys.ENABLE_SINGLE_PASS_EXTRACTION]: index.index(