Skip to content

Commit

Permalink
quota: improve performance of quota updater
Browse files Browse the repository at this point in the history
Improve the performance of the quota updater by "expunging" all
workflows before starting to update quotas, as the Workflow table does
not need to be modified, thus making commits much faster. Also refactor
the various update functions to make their behaviour consistent with
each other.

Partially addresses #193
  • Loading branch information
mdonadoni committed Sep 1, 2023
1 parent b05061a commit 8cc26f7
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 64 deletions.
30 changes: 13 additions & 17 deletions reana_db/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@
import click
from alembic import command
from alembic import config as alembic_config
from sqlalchemy.orm import defer
from reana_commons.config import REANA_LOG_FORMAT, REANA_LOG_LEVEL

Check warning on line 18 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L18

Added line #L18 was not covered by tests


from reana_db.database import init_db
from reana_db.models import Resource, ResourceType, Workflow
from reana_db.models import Resource, ResourceType

Check warning on line 22 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L22

Added line #L22 was not covered by tests
from reana_db.utils import (
Timer,
store_workflow_disk_quota,
update_users_cpu_quota,
update_users_disk_quota,
update_workflows_cpu_quota,
update_workflows_disk_quota,
)

# Set up logging for CLI commands
Expand Down Expand Up @@ -253,22 +252,19 @@ def _resource_usage_update(resource: ResourceType) -> None:
"""Update users resource quota usage."""
try:
if resource == ResourceType.disk:
update_workflows_disk_quota()

Check warning on line 255 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L255

Added line #L255 was not covered by tests
update_users_disk_quota()

# logs and reana_specification are not loaded to avoid consuming
# huge amounts of memory
workflows = Workflow.query.options(
defer(Workflow.logs), defer(Workflow.reana_specification)
).all()
timer = Timer("Workflow disk quota usage update", total=len(workflows))
for workflow in workflows:
store_workflow_disk_quota(workflow)
timer.count_event()
click.secho(

Check warning on line 257 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L257

Added line #L257 was not covered by tests
"Disk quota usage updated successfully for all users and workflows.",
fg="green",
)
elif resource == ResourceType.cpu:
update_workflows_cpu_quota()

Check warning on line 262 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L262

Added line #L262 was not covered by tests
update_users_cpu_quota()
click.secho(
f"Users {resource.name} quota usage updated successfully.", fg="green"
)
click.secho(

Check warning on line 264 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L264

Added line #L264 was not covered by tests
"CPU quota usage updated successfully for all users and workflows.",
fg="green",
)
except Exception as e:
click.secho(
f"[ERROR]: An error occurred when updating users {resource.name} quota usage: {repr(e)}",
Expand Down
2 changes: 2 additions & 0 deletions reana_db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from reana_db.utils import (
build_workspace_path,
store_workflow_disk_quota,
update_users_cpu_quota,
update_users_disk_quota,
update_workflow_cpu_quota,
update_workspace_retention_rules,
Expand Down Expand Up @@ -768,6 +769,7 @@ def _update_cpu_quota(workflow):

try:
update_workflow_cpu_quota(workflow=workflow)
update_users_cpu_quota(user=workflow.owner)
except Exception as e:
logging.error(f"Failed to update cpu quota: \n{e}\nContinuing...")

Expand Down
129 changes: 82 additions & 47 deletions reana_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from typing import Optional
from uuid import UUID

from sqlalchemy import inspect
from sqlalchemy import inspect, func

Check warning on line 16 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L16

Added line #L16 was not covered by tests
from sqlalchemy.orm import defer
from reana_commons.utils import get_disk_usage
from reana_commons.errors import REANAMissingWorkspaceError
Expand Down Expand Up @@ -310,40 +310,36 @@ def update_users_disk_quota(
update policy.
"""
from reana_db.config import DEFAULT_QUOTA_RESOURCES
from reana_db.database import Session
from reana_db.models import Resource, ResourceType, User, UserResource

if not override_policy_checks and should_skip_quota_update(ResourceType.disk):
return

disk_resource = get_default_quota_resource(ResourceType.disk.name)

users = [user] if user else User.query.all()
timer = Timer("User disk quota usage update", total=len(users))
for u in users:
disk_resource = Resource.query.filter_by(
name=DEFAULT_QUOTA_RESOURCES["disk"]
).one_or_none()

if disk_resource:
from .database import Session

user_resource_quota = UserResource.query.filter_by(
user_id=u.id_, resource_id=disk_resource.id_
).first()
if bytes_to_sum is not None:
updated_quota_usage = user_resource_quota.quota_used + bytes_to_sum
if updated_quota_usage < 0:
logging.warning(
f"Disk quota consumption of user {u.id_} would become negative: "
f"{user_resource_quota.quota_used} [original usage] + {bytes_to_sum} [delta] "
f"-> {updated_quota_usage} [new usage]. Setting the new usage to zero."
)
user_resource_quota.quota_used = 0
else:
user_resource_quota.quota_used = updated_quota_usage
user_resource_quota = UserResource.query.filter_by(
user_id=u.id_, resource_id=disk_resource.id_
).one()
if bytes_to_sum is not None:
updated_quota_usage = user_resource_quota.quota_used + bytes_to_sum
if updated_quota_usage < 0:
logging.warning(

Check warning on line 330 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L328-L330

Added lines #L328 - L330 were not covered by tests
f"Disk quota consumption of user {u.id_} would become negative: "
f"{user_resource_quota.quota_used} [original usage] + {bytes_to_sum} [delta] "
f"-> {updated_quota_usage} [new usage]. Setting the new usage to zero."
)
user_resource_quota.quota_used = 0

Check warning on line 335 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L335

Added line #L335 was not covered by tests
else:
workspace_path = u.get_user_workspace()
disk_usage_bytes = get_disk_usage_or_zero(workspace_path)
user_resource_quota.quota_used = disk_usage_bytes
Session.commit()
user_resource_quota.quota_used = updated_quota_usage

Check warning on line 337 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L337

Added line #L337 was not covered by tests
else:
workspace_path = u.get_user_workspace()
disk_usage_bytes = get_disk_usage_or_zero(workspace_path)
user_resource_quota.quota_used = disk_usage_bytes
Session.commit()
timer.count_event()


Expand All @@ -360,11 +356,15 @@ def update_workflow_cpu_quota(workflow) -> int:
WorkflowResource,
)

if should_skip_quota_update(ResourceType.cpu):
return

cpu_resource = get_default_quota_resource(ResourceType.cpu.name)

terminated_at = workflow.run_finished_at or workflow.run_stopped_at
if workflow.run_started_at and terminated_at:
cpu_time = terminated_at - workflow.run_started_at
cpu_milliseconds = int(cpu_time.total_seconds() * 1000)
cpu_resource = get_default_quota_resource(ResourceType.cpu.name)
# WorkflowResource might exist already if the cluster
# follows a combined termination + periodic policy (eg. created
# by the status listener, revisited by the cronjob)
Expand All @@ -379,23 +379,40 @@ def update_workflow_cpu_quota(workflow) -> int:
resource_id=cpu_resource.id_,
quota_used=cpu_milliseconds,
)
user_resource_quota = UserResource.query.filter_by(
user_id=workflow.owner_id, resource_id=cpu_resource.id_
).first()
user_resource_quota.quota_used += cpu_milliseconds
Session.add(workflow_resource)
Session.commit()
return cpu_milliseconds
return 0


def update_workflows_cpu_quota() -> None:

Check warning on line 388 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L388

Added line #L388 was not covered by tests
"""Update the CPU quotas of all workflows in a more efficient way."""
from reana_db.database import Session
from reana_db.models import Workflow

# logs and reana_specification are not loaded to avoid consuming
# huge amounts of memory
workflows = Workflow.query.options(
defer(Workflow.logs), defer(Workflow.reana_specification)
).all()
# We expunge all the workflows, as they will not be modified when updating the quotas.
# This makes `Session.commit()` much faster
for workflow in workflows:
Session.expunge(workflow)
timer = Timer("Workflow CPU quota usage update", total=len(workflows))
for workflow in workflows:
update_workflow_cpu_quota(workflow)
timer.count_event()


def update_users_cpu_quota(user=None) -> None:
"""Update users CPU quota usage.
:param user: User whose CPU quota will be updated. If None, applies to all users.
User CPU quotas will be calculated from workflow CPU quotas,
so the latter should be updated before the former.
:param user: User whose CPU quota will be updated. If None, applies to all users.
:type user: reana_db.models.User
"""
from reana_db.database import Session
from reana_db.models import (
Expand All @@ -404,12 +421,14 @@ def update_users_cpu_quota(user=None) -> None:
UserResource,
UserToken,
UserTokenStatus,
Workflow,
WorkflowResource,
)

if should_skip_quota_update(ResourceType.cpu):
return

cpu_resource = get_default_quota_resource(ResourceType.cpu.name)

if user:
users = [user]
else:
Expand All @@ -419,25 +438,21 @@ def update_users_cpu_quota(user=None) -> None:
.all()
)
timer_user = Timer("User CPU quota usage update", total=len(users))
timer_workflow = Timer("Workflow CPU quota usage update")
for user in users:
cpu_milliseconds = 0
# logs and reana_specification are not loaded to avoid consuming
# huge amounts of memory
for workflow in user.workflows.options(
defer(Workflow.logs),
defer(Workflow.reana_specification),
):
cpu_milliseconds += update_workflow_cpu_quota(workflow=workflow)
timer_workflow.count_event()
cpu_resource = get_default_quota_resource(ResourceType.cpu.name)
cpu_milliseconds = (
Session.query(func.sum(WorkflowResource.quota_used))
.filter(WorkflowResource.resource_id == cpu_resource.id_)
.join(user.workflows.subquery())
.scalar()
)
if not cpu_milliseconds:
cpu_milliseconds = 0
user_resource_quota = UserResource.query.filter_by(
user_id=user.id_, resource_id=cpu_resource.id_
).first()
user_resource_quota.quota_used = cpu_milliseconds
Session.commit()
timer_user.count_event()
timer_workflow.log_progress()


def update_workspace_retention_rules(rules, status) -> None:
Expand Down Expand Up @@ -534,7 +549,7 @@ def store_workflow_disk_quota(
workflow.workspace_path
)
Session.commit()
elif inspect(workflow).persistent:
else:
workflow_resource = WorkflowResource(
workflow_id=workflow.id_,
resource_id=disk_resource.id_,
Expand All @@ -544,3 +559,23 @@ def store_workflow_disk_quota(
Session.commit()

return workflow_resource


def update_workflows_disk_quota() -> None:

Check warning on line 564 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L564

Added line #L564 was not covered by tests
"""Update the disk quotas of all workflows in a more efficient way."""
from reana_db.database import Session
from reana_db.models import Workflow

# logs and reana_specification are not loaded to avoid consuming
# huge amounts of memory
workflows = Workflow.query.options(
defer(Workflow.logs), defer(Workflow.reana_specification)
).all()
# We expunge all the workflows, as they will not be modified when updating the quotas.
# This makes `Session.commit()` much faster
for workflow in workflows:
Session.expunge(workflow)
timer = Timer("Workflow disk quota usage update", total=len(workflows))
for workflow in workflows:
store_workflow_disk_quota(workflow)
timer.count_event()
65 changes: 65 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
from reana_db.utils import (
get_default_quota_resource,
update_users_cpu_quota,
update_workflows_cpu_quota,
update_workflows_disk_quota,
update_users_disk_quota,
update_workspace_retention_rules,
)

Expand Down Expand Up @@ -274,6 +277,10 @@ def test_access_token(db, session, new_user):
"reana_db.models.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY",
[ResourceType.cpu.name, ResourceType.disk.name],
)
@mock.patch(
"reana_db.utils.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY",
[ResourceType.cpu.name, ResourceType.disk.name],
)
def test_workflow_cpu_quota_usage_update(db, session, run_workflow):
"""Test quota usage update once workflow is finished/stopped/failed."""
time_elapsed_seconds = 0.5
Expand Down Expand Up @@ -352,6 +359,9 @@ def test_all_users_cpu_quota_usage_update(
with mock.patch(
"reana_db.models.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY",
workflow_termination_quota_update_policy,
), mock.patch(
"reana_db.utils.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY",
workflow_termination_quota_update_policy,
), mock.patch(
"reana_db.utils.PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY",
periodic_update,
Expand All @@ -367,6 +377,7 @@ def test_all_users_cpu_quota_usage_update(
else:
assert quota_cpu_usage == 0

update_workflows_cpu_quota()
update_users_cpu_quota()

quota_cpu_usage = new_user.get_quota_usage()["cpu"]["usage"]["raw"]
Expand All @@ -376,6 +387,60 @@ def test_all_users_cpu_quota_usage_update(
assert quota_cpu_usage == 0


@pytest.mark.parametrize(
"workflow_termination_quota_update_policy, periodic_update",
[
([], True),
([], False),
([ResourceType.disk.name], False),
([ResourceType.disk.name], True),
],
)
def test_all_users_disk_quota_usage_update(
db,
session,
new_user,
run_workflow,
workflow_termination_quota_update_policy,
periodic_update,
):
"""Test disk periodic update cronjob functionality."""
num_workflows = 2
dir_size = 128
with mock.patch(
"reana_db.models.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY",
workflow_termination_quota_update_policy,
), mock.patch(
"reana_db.utils.WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY",
workflow_termination_quota_update_policy,
), mock.patch(
"reana_db.utils.PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY",
periodic_update,
), mock.patch(
"reana_db.utils.get_disk_usage", return_value=[{"size": {"raw": str(dir_size)}}]
):
workflows = [run_workflow() for n in range(num_workflows)]

quota_disk_usage = new_user.get_quota_usage()["disk"]["usage"]["raw"]
if workflow_termination_quota_update_policy:
for wf in workflows:
assert wf.resources[0].quota_used == dir_size
assert quota_disk_usage == dir_size
else:
assert quota_disk_usage == 0

update_workflows_disk_quota()
update_users_disk_quota()

quota_disk_usage = new_user.get_quota_usage()["disk"]["usage"]["raw"]
if workflow_termination_quota_update_policy or periodic_update:
for wf in workflows:
assert wf.resources[0].quota_used == dir_size
assert quota_disk_usage == dir_size
else:
assert quota_disk_usage == 0


@pytest.mark.parametrize(
"unit, value, human_readable_string",
[
Expand Down

0 comments on commit 8cc26f7

Please sign in to comment.