Skip to content

Cost logs for API Deployments #1167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0919c34
Added logs for cost per file
pk-zipstack Mar 5, 2025
dccb7a0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 5, 2025
b29a261
Fixed pre-sommit issues
pk-zipstack Mar 5, 2025
12937cb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 5, 2025
f52b1e9
fixed pre commit issues
pk-zipstack Mar 5, 2025
d0061a0
Update backend/workflow_manager/workflow_v2/execution.py
pk-zipstack Mar 5, 2025
4fabd40
Update backend/workflow_manager/workflow_v2/execution.py
pk-zipstack Mar 5, 2025
2c83d08
Made changes to log the total cost in get_aggregated_cost function
pk-zipstack Mar 5, 2025
6fdbe5b
added error handling to prevent TypeError and ZeroDivisionError
pk-zipstack Mar 5, 2025
d4755ed
Removed devision by zero error for publish_average_cost_log
pk-zipstack Mar 6, 2025
a819e84
Updated the way the error logs are displayed to the user.
pk-zipstack Mar 6, 2025
a3ee996
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 6, 2025
8a138cb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 6, 2025
4108df8
Moved publish_average_cost_log inside publish_final_workflow_logs
pk-zipstack Mar 7, 2025
3394549
Update backend/workflow_manager/workflow_v2/execution.py
pk-zipstack Mar 11, 2025
e745bd7
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 11, 2025
f940d33
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 11, 2025
7fc7b3e
Updated execution.py to fix line too long issue
pk-zipstack Mar 11, 2025
1f33197
Converted log type "error" to "warning" in publish_average_cost_log
pk-zipstack Mar 11, 2025
56dcf64
Moved get_aggregated_cost to WorkflowExectuion class as a property
pk-zipstack Mar 11, 2025
965a3b7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 11, 2025
fad083a
Update backend/workflow_manager/workflow_v2/models/execution.py
pk-zipstack Mar 12, 2025
dce0255
Update backend/workflow_manager/workflow_v2/models/execution.py
pk-zipstack Mar 12, 2025
899638e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 12, 2025
9f77f90
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 14, 2025
061db21
removed NoneType error handling in execution.py
pk-zipstack Mar 14, 2025
948942c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 14, 2025
f48b23d
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 14, 2025
6d62694
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 18, 2025
550019d
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 18, 2025
8e5fd12
Updated tool structure version
pk-zipstack Mar 18, 2025
1a065cf
Update sample.env
pk-zipstack Mar 18, 2025
471e6b9
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Mar 18, 2025
e71386c
Added .exists() for getting aggregated cost in execution.py
pk-zipstack Apr 3, 2025
babad72
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 3, 2025
64729ad
fixed line to long in execution.py
pk-zipstack Apr 3, 2025
94e3289
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 3, 2025
82c423b
Removed redundant rounding of cost in exexution.py
pk-zipstack Apr 3, 2025
b6d95b8
Added indexing to execution_id in Usage model
pk-zipstack Apr 3, 2025
9f9323a
Merge branch 'main' into UN-1799-Cost-log-for-ETL
pk-zipstack Apr 3, 2025
3ac473c
Deleted a file
pk-zipstack Apr 3, 2025
d7d65c0
Updated structure tool version to 0.0.70
pk-zipstack Apr 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 1 addition & 41 deletions backend/usage_v2/helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime
from typing import Any, Optional
from typing import Any

from django.db.models import QuerySet, Sum
from rest_framework.exceptions import APIException
Expand Down Expand Up @@ -65,46 +65,6 @@ def get_aggregated_token_count(run_id: str) -> dict:
logger.error(f"An unexpected error occurred for run_id {run_id}: {str(e)}")
raise APIException("Error while aggregating token counts")

@staticmethod
def get_aggregated_cost(execution_id: str) -> Optional[float]:
"""Retrieve aggregated cost for the given execution_id.

Args:
execution_id (str): The identifier for the
total cost of a particular execution.

Returns:
Optional[float]: The total cost in dollars if available, else None.

Raises:
APIException: For unexpected errors during database operations.
"""
try:
# Aggregate the cost for the given execution_id
total_cost = Usage.objects.filter(execution_id=execution_id).aggregate(
cost_in_dollars=Sum(UsageKeys.COST_IN_DOLLARS)
)[UsageKeys.COST_IN_DOLLARS]

logger.debug(
f"Cost aggregated successfully for execution_id: {execution_id}"
f", Total cost: {total_cost}"
)

return total_cost

except Usage.DoesNotExist:
# Handle the case where no usage data is found for the given execution_id
logger.warning(
f"Usage data not found for the specified execution_id: {execution_id}"
)
return None
except Exception as e:
# Handle any other exceptions that might occur during the execution
logger.error(
f"An unexpected error occurred for execution {execution_id}: {str(e)}"
)
raise APIException("Error while aggregating cost")

@staticmethod
def aggregate_usage_metrics(queryset: QuerySet) -> dict[str, Any]:
"""
Expand Down
32 changes: 20 additions & 12 deletions backend/workflow_manager/workflow_v2/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,24 @@ def publish_initial_workflow_logs(self, total_files: int) -> None:
)

def publish_final_workflow_logs(
self, total_files: int, successful_files: int, failed_files: int
self,
total_files: int,
successful_files: int,
failed_files: int,
execution_id: str,
) -> None:
"""Publishes the final logs for the workflow.

Returns:
None
"""
self.publish_average_cost_log(
execution_id=execution_id, total_files=successful_files
)

# To not associate final logs with a file execution
self.file_execution_id = None

self.publish_update_log(LogState.END_WORKFLOW, "1", LogComponent.STATUS_BAR)
self.publish_update_log(
LogState.SUCCESS, "Executed successfully", LogComponent.WORKFLOW
Expand All @@ -311,25 +320,24 @@ def publish_final_workflow_logs(
def publish_average_cost_log(self, execution_id, total_files):

try:
total_cost = UsageHelper.get_aggregated_cost(execution_id)
execution: WorkflowExecution = WorkflowExecution.objects.get(
pk=execution_id
)

total_cost = execution.get_aggregated_usage_cost
average_cost = round(total_cost / total_files, 5)

self.publish_log(
message=(
f"The average cost per file for execution '{execution_id}' "
f"is '${average_cost:}'. Total cost: '${total_cost:}'"
)
)
except TypeError as e:
self.publish_log(
message=(
f"Unable to calculate cost for execution '{execution_id}'. "
f"Cost data may be unavailable or incomplete."
)
)

logger.error(
f"Error calculating cost for execution '{execution_id}': "
f"{str(e)}. Total cost: {total_cost}, Total files: {total_files}"
logger.warning(
f"Error calculating cost for execution '{execution_id}' "
f"of '{total_files}' files : "
f"{str(e)}.\nContinuing execution"
)

def log_total_cost_per_file(self, run_id, file_name):
Expand Down
42 changes: 42 additions & 0 deletions backend/workflow_manager/workflow_v2/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
from api_v2.models import APIDeployment
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.db.models import Sum
from pipeline_v2.models import Pipeline
from tags.models import Tag
from usage_v2.constants import UsageKeys
from usage_v2.models import Usage
from utils.common_utils import CommonUtils
from utils.models.base_model import BaseModel
from workflow_manager.workflow_v2.enums import ExecutionStatus
Expand Down Expand Up @@ -157,6 +160,45 @@ def pretty_execution_time(self) -> str:
)
return str(timedelta(seconds=time_in_secs)).split(".")[0]

@property
def get_aggregated_usage_cost(self) -> Optional[float]:
"""Retrieve aggregated cost for the given execution_id.

Args:
execution_id (str): The identifier for the
total cost of a particular execution.

Returns:
Optional[float]: The total cost in dollars if available, else None.

Raises:
APIException: For unexpected errors during database operations.
"""
try:
# Aggregate the cost for the given execution_id
total_cost = Usage.objects.filter(execution_id=self.id).aggregate(
cost_in_dollars=Sum(UsageKeys.COST_IN_DOLLARS)
)[UsageKeys.COST_IN_DOLLARS]

logger.debug(
f"Cost aggregated successfully for execution_id: {self.id}"
f", Total cost: {total_cost}"
)

return total_cost

except Usage.DoesNotExist:
# Handle the case where no usage data is found for the given execution_id
logger.warning(
f"Usage data not found for the specified execution_id: {self.id}"
)
return None
except Exception as e:
logger.warning(
f"An unexpected error occurred for execution {self.id}: {str(e)}"
)
return None

def __str__(self) -> str:
return (
f"Workflow execution: {self.id} ("
Expand Down
4 changes: 1 addition & 3 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,12 @@ def process_input_files(
execution_service.update_execution(ExecutionStatus.COMPLETED)

workflow_execution = execution_service.get_execution_instance()
execution_service.publish_average_cost_log(
execution_id=workflow_execution.id, total_files=successful_files
)

execution_service.publish_final_workflow_logs(
total_files=total_files,
successful_files=successful_files,
failed_files=failed_files,
execution_id=workflow_execution.id,
)

return workflow_execution
Expand Down