From 8cc26f7a6c8f3d280cee417929baab06d9c26635 Mon Sep 17 00:00:00 2001 From: Marco Donadoni Date: Fri, 25 Aug 2023 11:08:35 +0200 Subject: [PATCH] quota: improve performance of quota updater Improve the performance of the quota updater by "expunging" all workflows before starting to update quotas, as the Workflow table does not need to be modified, thus making commits much faster. Also refactor the various update functions to make their behaviour consistent with each other. Partially addresses #193 --- reana_db/cli.py | 30 +++++----- reana_db/models.py | 2 + reana_db/utils.py | 129 +++++++++++++++++++++++++++---------------- tests/test_models.py | 65 ++++++++++++++++++++++ 4 files changed, 162 insertions(+), 64 deletions(-) diff --git a/reana_db/cli.py b/reana_db/cli.py index e620d61..442d1d0 100644 --- a/reana_db/cli.py +++ b/reana_db/cli.py @@ -15,17 +15,16 @@ import click from alembic import command from alembic import config as alembic_config -from sqlalchemy.orm import defer from reana_commons.config import REANA_LOG_FORMAT, REANA_LOG_LEVEL from reana_db.database import init_db -from reana_db.models import Resource, ResourceType, Workflow +from reana_db.models import Resource, ResourceType from reana_db.utils import ( - Timer, - store_workflow_disk_quota, update_users_cpu_quota, update_users_disk_quota, + update_workflows_cpu_quota, + update_workflows_disk_quota, ) # Set up logging for CLI commands @@ -253,22 +252,19 @@ def _resource_usage_update(resource: ResourceType) -> None: """Update users resource quota usage.""" try: if resource == ResourceType.disk: + update_workflows_disk_quota() update_users_disk_quota() - - # logs and reana_specification are not loaded to avoid consuming - # huge amounts of memory - workflows = Workflow.query.options( - defer(Workflow.logs), defer(Workflow.reana_specification) - ).all() - timer = Timer("Workflow disk quota usage update", total=len(workflows)) - for workflow in workflows: - store_workflow_disk_quota(workflow) - timer.count_event() + click.secho( + "Disk quota usage updated successfully for all users and workflows.", + fg="green", + ) elif resource == ResourceType.cpu: + update_workflows_cpu_quota() update_users_cpu_quota() - click.secho( - f"Users {resource.name} quota usage updated successfully.", fg="green" - ) + click.secho( + "CPU quota usage updated successfully for all users and workflows.", + fg="green", + ) except Exception as e: click.secho( f"[ERROR]: An error occurred when updating users {resource.name} quota usage: {repr(e)}", diff --git a/reana_db/models.py b/reana_db/models.py index fee027a..4e434c1 100644 --- a/reana_db/models.py +++ b/reana_db/models.py @@ -60,6 +60,7 @@ from reana_db.utils import ( build_workspace_path, store_workflow_disk_quota, + update_users_cpu_quota, update_users_disk_quota, update_workflow_cpu_quota, update_workspace_retention_rules, @@ -768,6 +769,7 @@ def _update_cpu_quota(workflow): try: update_workflow_cpu_quota(workflow=workflow) + update_users_cpu_quota(user=workflow.owner) except Exception as e: logging.error(f"Failed to update cpu quota: \n{e}\nContinuing...") diff --git a/reana_db/utils.py b/reana_db/utils.py index fdecf4d..e575921 100644 --- a/reana_db/utils.py +++ b/reana_db/utils.py @@ -13,7 +13,7 @@ from typing import Optional from uuid import UUID -from sqlalchemy import inspect +from sqlalchemy import inspect, func from sqlalchemy.orm import defer from reana_commons.utils import get_disk_usage from reana_commons.errors import REANAMissingWorkspaceError @@ -310,40 +310,36 @@ def update_users_disk_quota( update policy. """ from reana_db.config import DEFAULT_QUOTA_RESOURCES + from reana_db.database import Session from reana_db.models import Resource, ResourceType, User, UserResource if not override_policy_checks and should_skip_quota_update(ResourceType.disk): return + disk_resource = get_default_quota_resource(ResourceType.disk.name) + users = [user] if user else User.query.all() timer = Timer("User disk quota usage update", total=len(users)) for u in users: - disk_resource = Resource.query.filter_by( - name=DEFAULT_QUOTA_RESOURCES["disk"] - ).one_or_none() - - if disk_resource: - from .database import Session - - user_resource_quota = UserResource.query.filter_by( - user_id=u.id_, resource_id=disk_resource.id_ - ).first() - if bytes_to_sum is not None: - updated_quota_usage = user_resource_quota.quota_used + bytes_to_sum - if updated_quota_usage < 0: - logging.warning( - f"Disk quota consumption of user {u.id_} would become negative: " - f"{user_resource_quota.quota_used} [original usage] + {bytes_to_sum} [delta] " - f"-> {updated_quota_usage} [new usage]. Setting the new usage to zero." - ) - user_resource_quota.quota_used = 0 - else: - user_resource_quota.quota_used = updated_quota_usage + user_resource_quota = UserResource.query.filter_by( + user_id=u.id_, resource_id=disk_resource.id_ + ).one() + if bytes_to_sum is not None: + updated_quota_usage = user_resource_quota.quota_used + bytes_to_sum + if updated_quota_usage < 0: + logging.warning( + f"Disk quota consumption of user {u.id_} would become negative: " + f"{user_resource_quota.quota_used} [original usage] + {bytes_to_sum} [delta] " + f"-> {updated_quota_usage} [new usage]. Setting the new usage to zero." + ) + user_resource_quota.quota_used = 0 else: - workspace_path = u.get_user_workspace() - disk_usage_bytes = get_disk_usage_or_zero(workspace_path) - user_resource_quota.quota_used = disk_usage_bytes - Session.commit() + user_resource_quota.quota_used = updated_quota_usage + else: + workspace_path = u.get_user_workspace() + disk_usage_bytes = get_disk_usage_or_zero(workspace_path) + user_resource_quota.quota_used = disk_usage_bytes + Session.commit() timer.count_event() @@ -360,11 +356,15 @@ def update_workflow_cpu_quota(workflow) -> int: WorkflowResource, ) + if should_skip_quota_update(ResourceType.cpu): + return + + cpu_resource = get_default_quota_resource(ResourceType.cpu.name) + terminated_at = workflow.run_finished_at or workflow.run_stopped_at if workflow.run_started_at and terminated_at: cpu_time = terminated_at - workflow.run_started_at cpu_milliseconds = int(cpu_time.total_seconds() * 1000) - cpu_resource = get_default_quota_resource(ResourceType.cpu.name) # WorkflowResource might exist already if the cluster # follows a combined termination + periodic policy (eg. created # by the status listener, revisited by the cronjob) @@ -379,23 +379,40 @@ def update_workflow_cpu_quota(workflow) -> int: resource_id=cpu_resource.id_, quota_used=cpu_milliseconds, ) - user_resource_quota = UserResource.query.filter_by( - user_id=workflow.owner_id, resource_id=cpu_resource.id_ - ).first() - user_resource_quota.quota_used += cpu_milliseconds Session.add(workflow_resource) Session.commit() return cpu_milliseconds return 0 +def update_workflows_cpu_quota() -> None: + """Update the CPU quotas of all workflows in a more efficient way.""" + from reana_db.database import Session + from reana_db.models import Workflow + + # logs and reana_specification are not loaded to avoid consuming + # huge amounts of memory + workflows = Workflow.query.options( + defer(Workflow.logs), defer(Workflow.reana_specification) + ).all() + # We expunge all the workflows, as they will not be modified when updating the quotas. + # This makes `Session.commit()` much faster + for workflow in workflows: + Session.expunge(workflow) + timer = Timer("Workflow CPU quota usage update", total=len(workflows)) + for workflow in workflows: + update_workflow_cpu_quota(workflow) + timer.count_event() + + def update_users_cpu_quota(user=None) -> None: """Update users CPU quota usage. - :param user: User whose CPU quota will be updated. If None, applies to all users. + User CPU quotas will be calculated from workflow CPU quotas, + so the latter should be updated before the former. + :param user: User whose CPU quota will be updated. If None, applies to all users. :type user: reana_db.models.User - """ from reana_db.database import Session from reana_db.models import ( @@ -404,12 +421,14 @@ def update_users_cpu_quota(user=None) -> None: UserResource, UserToken, UserTokenStatus, - Workflow, + WorkflowResource, ) if should_skip_quota_update(ResourceType.cpu): return + cpu_resource = get_default_quota_resource(ResourceType.cpu.name) + if user: users = [user] else: @@ -419,25 +438,21 @@ def update_users_cpu_quota(user=None) -> None: .all() ) timer_user = Timer("User CPU quota usage update", total=len(users)) - timer_workflow = Timer("Workflow CPU quota usage update") for user in users: - cpu_milliseconds = 0 - # logs and reana_specification are not loaded to avoid consuming - # huge amounts of memory - for workflow in user.workflows.options( - defer(Workflow.logs), - defer(Workflow.reana_specification), - ): - cpu_milliseconds += update_workflow_cpu_quota(workflow=workflow) - timer_workflow.count_event() - cpu_resource = get_default_quota_resource(ResourceType.cpu.name) + cpu_milliseconds = ( + Session.query(func.sum(WorkflowResource.quota_used)) + .filter(WorkflowResource.resource_id == cpu_resource.id_) + .join(user.workflows.subquery()) + .scalar() + ) + if not cpu_milliseconds: + cpu_milliseconds = 0 user_resource_quota = UserResource.query.filter_by( user_id=user.id_, resource_id=cpu_resource.id_ ).first() user_resource_quota.quota_used = cpu_milliseconds Session.commit() timer_user.count_event() - timer_workflow.log_progress() def update_workspace_retention_rules(rules, status) -> None: @@ -534,7 +549,7 @@ def store_workflow_disk_quota( workflow.workspace_path ) Session.commit() - elif inspect(workflow).persistent: + else: workflow_resource = WorkflowResource( workflow_id=workflow.id_, resource_id=disk_resource.id_, @@ -544,3 +559,23 @@ def store_workflow_disk_quota( Session.commit() return workflow_resource + + +def update_workflows_disk_quota() -> None: + """Update the disk quotas of all workflows in a more efficient way.""" + from reana_db.database import Session + from reana_db.models import Workflow + + # logs and reana_specification are not loaded to avoid consuming + # huge amounts of memory + workflows = Workflow.query.options( + defer(Workflow.logs), defer(Workflow.reana_specification) + ).all() + # We expunge all the workflows, as they will not be modified when updating the quotas. + # This makes `Session.commit()` much faster + for workflow in workflows: + Session.expunge(workflow) + timer = Timer("Workflow disk quota usage update", total=len(workflows)) + for workflow in workflows: + store_workflow_disk_quota(workflow) + timer.count_event() diff --git a/tests/test_models.py b/tests/test_models.py index d954d3b..98a2998 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -30,6 +30,9 @@ from reana_db.utils import ( get_default_quota_resource, update_users_cpu_quota, + update_workflows_cpu_quota, + update_workflows_disk_quota, + update_users_disk_quota, update_workspace_retention_rules, ) @@ -274,6 +277,10 @@ def test_access_token(db, session, new_user): "reana_db.models.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY", [ResourceType.cpu.name, ResourceType.disk.name], ) +@mock.patch( + "reana_db.utils.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY", + [ResourceType.cpu.name, ResourceType.disk.name], +) def test_workflow_cpu_quota_usage_update(db, session, run_workflow): """Test quota usage update once workflow is finished/stopped/failed.""" time_elapsed_seconds = 0.5 @@ -352,6 +359,9 @@ def test_all_users_cpu_quota_usage_update( with mock.patch( "reana_db.models.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY", workflow_termination_quota_update_policy, + ), mock.patch( + "reana_db.utils.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY", + workflow_termination_quota_update_policy, ), mock.patch( "reana_db.utils.PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY", periodic_update, @@ -367,6 +377,7 @@ def test_all_users_cpu_quota_usage_update( else: assert quota_cpu_usage == 0 + update_workflows_cpu_quota() update_users_cpu_quota() quota_cpu_usage = new_user.get_quota_usage()["cpu"]["usage"]["raw"] @@ -376,6 +387,60 @@ def test_all_users_cpu_quota_usage_update( assert quota_cpu_usage == 0 +@pytest.mark.parametrize( + "workflow_termination_quota_update_policy, periodic_update", + [ + ([], True), + ([], False), + ([ResourceType.disk.name], False), + ([ResourceType.disk.name], True), + ], +) +def test_all_users_disk_quota_usage_update( + db, + session, + new_user, + run_workflow, + workflow_termination_quota_update_policy, + periodic_update, +): + """Test disk periodic update cronjob functionality.""" + num_workflows = 2 + dir_size = 128 + with mock.patch( + "reana_db.models.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY", + workflow_termination_quota_update_policy, + ), mock.patch( + "reana_db.utils.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY", + workflow_termination_quota_update_policy, + ), mock.patch( + "reana_db.utils.PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY", + periodic_update, + ), mock.patch( + "reana_db.utils.get_disk_usage", return_value=[{"size": {"raw": str(dir_size)}}] + ): + workflows = [run_workflow() for n in range(num_workflows)] + + quota_disk_usage = new_user.get_quota_usage()["disk"]["usage"]["raw"] + if workflow_termination_quota_update_policy: + for wf in workflows: + assert wf.resources[0].quota_used == dir_size + assert quota_disk_usage == dir_size + else: + assert quota_disk_usage == 0 + + update_workflows_disk_quota() + update_users_disk_quota() + + quota_disk_usage = new_user.get_quota_usage()["disk"]["usage"]["raw"] + if workflow_termination_quota_update_policy or periodic_update: + for wf in workflows: + assert wf.resources[0].quota_used == dir_size + assert quota_disk_usage == dir_size + else: + assert quota_disk_usage == 0 + + @pytest.mark.parametrize( "unit, value, human_readable_string", [