Skip to content

Cost logs for API Deployments #1167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0919c34
Added logs for cost per file
pk-zipstack Mar 5, 2025
dccb7a0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 5, 2025
b29a261
Fixed pre-sommit issues
pk-zipstack Mar 5, 2025
12937cb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 5, 2025
f52b1e9
fixed pre commit issues
pk-zipstack Mar 5, 2025
d0061a0
Update backend/workflow_manager/workflow_v2/execution.py
pk-zipstack Mar 5, 2025
4fabd40
Update backend/workflow_manager/workflow_v2/execution.py
pk-zipstack Mar 5, 2025
2c83d08
Made changes to log the total cost in get_aggregated_cost function
pk-zipstack Mar 5, 2025
6fdbe5b
added error handling to prevent TypeError and ZeroDivisionError
pk-zipstack Mar 5, 2025
d4755ed
Removed devision by zero error for publish_average_cost_log
pk-zipstack Mar 6, 2025
a819e84
Updated the way the error logs are displayed to the user.
pk-zipstack Mar 6, 2025
a3ee996
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 6, 2025
8a138cb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 6, 2025
4108df8
Moved publish_average_cost_log inside publish_final_workflow_logs
pk-zipstack Mar 7, 2025
3394549
Update backend/workflow_manager/workflow_v2/execution.py
pk-zipstack Mar 11, 2025
e745bd7
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 11, 2025
f940d33
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 11, 2025
7fc7b3e
Updated execution.py to fix line too long issue
pk-zipstack Mar 11, 2025
1f33197
Converted log type "error" to "warning" in publish_average_cost_log
pk-zipstack Mar 11, 2025
56dcf64
Moved get_aggregated_cost to WorkflowExectuion class as a property
pk-zipstack Mar 11, 2025
965a3b7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 11, 2025
fad083a
Update backend/workflow_manager/workflow_v2/models/execution.py
pk-zipstack Mar 12, 2025
dce0255
Update backend/workflow_manager/workflow_v2/models/execution.py
pk-zipstack Mar 12, 2025
899638e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 12, 2025
9f77f90
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 14, 2025
061db21
removed NoneType error handling in execution.py
pk-zipstack Mar 14, 2025
948942c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 14, 2025
f48b23d
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 14, 2025
6d62694
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 18, 2025
550019d
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 18, 2025
8e5fd12
Updated tool structure version
pk-zipstack Mar 18, 2025
1a065cf
Update sample.env
pk-zipstack Mar 18, 2025
471e6b9
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 18, 2025
e71386c
Added .exists() for getting aggregated cost in execution.py
pk-zipstack Apr 3, 2025
babad72
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 3, 2025
64729ad
fixed line to long in execution.py
pk-zipstack Apr 3, 2025
94e3289
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 3, 2025
82c423b
Removed redundant rounding of cost in exexution.py
pk-zipstack Apr 3, 2025
b6d95b8
Added indexing to execution_id in Usage model
pk-zipstack Apr 3, 2025
9f9323a
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Apr 3, 2025
3ac473c
Deleted a file
pk-zipstack Apr 3, 2025
d7d65c0
Updated structure tool version to 0.0.70
pk-zipstack Apr 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/prompt_studio/prompt_studio_core_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ToolStudioPromptKeys:
CREATED_BY = "created_by"
TOOL_ID = "tool_id"
RUN_ID = "run_id"
EXECUTION_ID = "execution_id"
NUMBER = "Number"
FLOAT = "Float"
PG_VECTOR = "Postgres pg_vector"
Expand Down
5 changes: 3 additions & 2 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ PROMPT_STUDIO_FILE_PATH=/app/prompt-studio-data

# Structure Tool Image (Runs prompt studio exported tools)
# https://hub.docker.com/r/unstract/tool-structure
STRUCTURE_TOOL_IMAGE_URL="docker:unstract/tool-structure:0.0.67"

STRUCTURE_TOOL_IMAGE_URL="docker:unstract/tool-structure:0.0.68"
STRUCTURE_TOOL_IMAGE_NAME="unstract/tool-structure"
STRUCTURE_TOOL_IMAGE_TAG="0.0.67"
STRUCTURE_TOOL_IMAGE_TAG="0.0.68"

# Feature Flags
EVALUATION_SERVER_IP=unstract-flipt
Expand Down
42 changes: 41 additions & 1 deletion backend/usage_v2/helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime
from typing import Any
from typing import Any, Optional

from django.db.models import QuerySet, Sum
from rest_framework.exceptions import APIException
Expand Down Expand Up @@ -65,6 +65,46 @@ def get_aggregated_token_count(run_id: str) -> dict:
logger.error(f"An unexpected error occurred for run_id {run_id}: {str(e)}")
raise APIException("Error while aggregating token counts")

@staticmethod
def get_aggregated_cost(execution_id: str) -> Optional[float]:
"""Retrieve aggregated cost for the given execution_id.

Args:
execution_id (str): The identifier for the
total cost of a particular execution.

Returns:
Optional[float]: The total cost in dollars if available, else None.

Raises:
APIException: For unexpected errors during database operations.
"""
try:
# Aggregate the cost for the given execution_id
total_cost = Usage.objects.filter(execution_id=execution_id).aggregate(
cost_in_dollars=Sum(UsageKeys.COST_IN_DOLLARS)
)[UsageKeys.COST_IN_DOLLARS]

logger.debug(
f"Cost aggregated successfully for execution_id: {execution_id}"
f", Total cost: {total_cost}"
)

return total_cost

except Usage.DoesNotExist:
# Handle the case where no usage data is found for the given execution_id
logger.warning(
f"Usage data not found for the specified execution_id: {execution_id}"
)
return None
except Exception as e:
# Handle any other exceptions that might occur during the execution
logger.error(
f"An unexpected error occurred for execution {execution_id}: {str(e)}"
)
raise APIException("Error while aggregating cost")

@staticmethod
def aggregate_usage_metrics(queryset: QuerySet) -> dict[str, Any]:
"""
Expand Down
32 changes: 32 additions & 0 deletions backend/workflow_manager/workflow_v2/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from unstract.workflow_execution.dto import WorkflowDto
from unstract.workflow_execution.enums import ExecutionType, LogComponent, LogState
from unstract.workflow_execution.exceptions import StopExecution
from usage_v2.helper import UsageHelper
from utils.local_context import StateStore
from utils.user_context import UserContext
from workflow_manager.file_execution.models import WorkflowFileExecution
Expand Down Expand Up @@ -305,6 +306,37 @@ def publish_final_workflow_logs(
f"{successful_files} successfully executed and {failed_files} error(s)"
)

def publish_average_cost_log(self, execution_id, total_files):

try:
total_cost = UsageHelper.get_aggregated_cost(execution_id)
average_cost = round(total_cost / total_files, 5)
self.publish_log(
message=(
f"The average cost per file for execution '{execution_id}' "
f"is '${average_cost:}'. Total cost: '${total_cost:}'"
)
)
except TypeError as e:
self.publish_log(
message=(
f"Unable to calculate cost for execution '{execution_id}'. "
f"Cost data may be unavailable or incomplete."
)
)

logger.error(
f"Error calculating cost for execution '{execution_id}': "
f"{str(e)}. Total cost: {total_cost}, Total files: {total_files}"
)

def log_total_cost_per_file(self, run_id, file_name):
cost_dict = UsageHelper.get_aggregated_token_count(run_id=run_id)
cost = round(cost_dict.get("cost_in_dollars", 0), 5)

# Log the total cost for a particular file executed in the workflow
self.publish_log(message=f"Total cost for file '{file_name}' is '${cost}'")

def publish_initial_tool_execution_logs(
self, current_file_idx: int, total_files: int, file_name: str
) -> None:
Expand Down
13 changes: 12 additions & 1 deletion backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,18 @@ def process_input_files(
else:
execution_service.update_execution(ExecutionStatus.COMPLETED)

workflow_execution = execution_service.get_execution_instance()
execution_service.publish_average_cost_log(
execution_id=workflow_execution.id, total_files=successful_files
)

execution_service.publish_final_workflow_logs(
total_files=total_files,
successful_files=successful_files,
failed_files=failed_files,
)
return execution_service.get_execution_instance()

return workflow_execution

@staticmethod
def _process_file(
Expand Down Expand Up @@ -274,6 +280,11 @@ def _process_file(
error=error,
use_file_history=execution_service.use_file_history,
)

execution_service.log_total_cost_per_file(
run_id=file_execution_id, file_name=file_name
)

execution_service.publish_update_log(
LogState.SUCCESS,
f"{file_name}'s output is processed successfully",
Expand Down
1 change: 1 addition & 0 deletions prompt-service/src/unstract/prompt_service/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class PromptServiceContants:
OUTPUTS = "outputs"
TOOL_ID = "tool_id"
RUN_ID = "run_id"
EXECUTION_ID = "execution_id"
FILE_NAME = "file_name"
FILE_HASH = "file_hash"
NAME = "name"
Expand Down
3 changes: 2 additions & 1 deletion prompt-service/src/unstract/prompt_service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def prompt_processor() -> Any:
prompts = payload.get(PSKeys.OUTPUTS, [])
tool_id: str = payload.get(PSKeys.TOOL_ID, "")
run_id: str = payload.get(PSKeys.RUN_ID, "")
execution_id: str = payload.get(PSKeys.EXECUTION_ID, "")
file_hash = payload.get(PSKeys.FILE_HASH)
file_path = payload.get(PSKeys.FILE_PATH)
doc_name = str(payload.get(PSKeys.FILE_NAME, ""))
Expand Down Expand Up @@ -192,7 +193,7 @@ def prompt_processor() -> Any:
)

try:
usage_kwargs = {"run_id": run_id}
usage_kwargs = {"run_id": run_id, "execution_id": execution_id}
adapter_instance_id = output[PSKeys.LLM]
llm = LLM(
tool=util,
Expand Down
2 changes: 1 addition & 1 deletion tools/structure/src/config/properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"schemaVersion": "0.0.1",
"displayName": "Structure Tool",
"functionName": "structure_tool",
"toolVersion": "0.0.67",
"toolVersion": "0.0.68",
"description": "This is a template tool which can answer set of input prompts designed in the Prompt Studio",
"input": {
"description": "File that needs to be indexed and parsed for answers"
Expand Down
1 change: 1 addition & 0 deletions tools/structure/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@ class SettingsKeys:
TOOL = "tool"
METRICS = "metrics"
INDEXING = "indexing"
EXECUTION_ID = "execution_id"
2 changes: 2 additions & 0 deletions tools/structure/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def run(
# TODO : Resolve and pass log events ID
payload = {
SettingsKeys.RUN_ID: self.file_execution_id,
SettingsKeys.EXECUTION_ID: self.execution_id,
SettingsKeys.TOOL_SETTINGS: tool_settings,
SettingsKeys.OUTPUTS: outputs,
SettingsKeys.TOOL_ID: tool_id,
Expand All @@ -122,6 +123,7 @@ def run(
usage_kwargs: dict[Any, Any] = dict()
usage_kwargs[UsageKwargs.RUN_ID] = self.file_execution_id
usage_kwargs[UsageKwargs.FILE_NAME] = self.source_file_name
usage_kwargs[UsageKwargs.EXECUTION_ID] = self.execution_id

process_text: Optional[Callable[[str], str]] = None
try:
Expand Down