Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quota: improve performance of quota updater #200

Merged
merged 2 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions reana_db/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,28 @@

"""REANA DB command line."""

import logging

Check warning on line 11 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L11

Added line #L11 was not covered by tests
import os
import sys

import click
from alembic import command
from alembic import config as alembic_config
from sqlalchemy.orm import defer
from reana_commons.config import REANA_LOG_FORMAT, REANA_LOG_LEVEL

Check warning on line 18 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L18

Added line #L18 was not covered by tests


from reana_db.database import init_db
from reana_db.models import Resource, ResourceType, Workflow
from reana_db.models import Resource, ResourceType

Check warning on line 22 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L22

Added line #L22 was not covered by tests
from reana_db.utils import (
store_workflow_disk_quota,
update_users_cpu_quota,
update_users_disk_quota,
update_workflows_cpu_quota,
update_workflows_disk_quota,
)

# Set up logging for CLI commands
logging.basicConfig(level=REANA_LOG_LEVEL, format=REANA_LOG_FORMAT)

Check warning on line 31 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L31

Added line #L31 was not covered by tests


@click.group()
def cli():
Expand Down Expand Up @@ -246,19 +252,19 @@
"""Update users resource quota usage."""
try:
if resource == ResourceType.disk:
update_workflows_disk_quota()

Check warning on line 255 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L255

Added line #L255 was not covered by tests
update_users_disk_quota()

# logs and reana_specification are not loaded to avoid consuming
# huge amounts of memory
for workflow in Workflow.query.options(
defer(Workflow.logs), defer(Workflow.reana_specification)
).all():
store_workflow_disk_quota(workflow)
click.secho(

Check warning on line 257 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L257

Added line #L257 was not covered by tests
"Disk quota usage updated successfully for all users and workflows.",
fg="green",
)
elif resource == ResourceType.cpu:
update_workflows_cpu_quota()

Check warning on line 262 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L262

Added line #L262 was not covered by tests
update_users_cpu_quota()
click.secho(
f"Users {resource.name} quota usage updated successfully.", fg="green"
)
click.secho(

Check warning on line 264 in reana_db/cli.py

View check run for this annotation

Codecov / codecov/patch

reana_db/cli.py#L264

Added line #L264 was not covered by tests
"CPU quota usage updated successfully for all users and workflows.",
fg="green",
)
except Exception as e:
click.secho(
f"[ERROR]: An error occurred when updating users {resource.name} quota usage: {repr(e)}",
Expand Down
2 changes: 2 additions & 0 deletions reana_db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from reana_db.utils import (
build_workspace_path,
store_workflow_disk_quota,
update_users_cpu_quota,
update_users_disk_quota,
update_workflow_cpu_quota,
update_workspace_retention_rules,
Expand Down Expand Up @@ -768,6 +769,7 @@ def _update_cpu_quota(workflow):

try:
update_workflow_cpu_quota(workflow=workflow)
update_users_cpu_quota(user=workflow.owner)
except Exception as e:
logging.error(f"Failed to update cpu quota: \n{e}\nContinuing...")

Expand Down
196 changes: 149 additions & 47 deletions reana_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from typing import Optional
from uuid import UUID

from sqlalchemy import inspect
from sqlalchemy import inspect, func

Check warning on line 16 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L16

Added line #L16 was not covered by tests
from sqlalchemy.orm import defer
from reana_commons.utils import get_disk_usage
from reana_commons.errors import REANAMissingWorkspaceError
Expand Down Expand Up @@ -206,6 +206,65 @@
return workflow


class Timer:

Check warning on line 209 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L209

Added line #L209 was not covered by tests
"""Timer to time events and log periodic progress."""

def __init__(self, name=None, total=None, periodic_delta=100) -> None:

Check warning on line 212 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L212

Added line #L212 was not covered by tests
"""Initialise new Timer."""
self.name = name
self.total = total
self.periodic_delta = periodic_delta
self.count = 0
self.start = datetime.now()

def elapsed(self) -> float:

Check warning on line 220 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L220

Added line #L220 was not covered by tests
"""Elapsed time since the creation of the Timer, in seconds."""
diff = datetime.now() - self.start
return diff.total_seconds()

def estimated_total(self) -> float:

Check warning on line 225 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L225

Added line #L225 was not covered by tests
"""Estimated total time, in seconds."""
if not self.total or not self.count:
return 0

Check warning on line 228 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L228

Added line #L228 was not covered by tests
return self.elapsed() * self.total / self.count

def per_event(self) -> float:

Check warning on line 231 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L231

Added line #L231 was not covered by tests
"""Time per event, in seconds."""
if self.count == 0:
return 0

Check warning on line 234 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L234

Added line #L234 was not covered by tests
return self.elapsed() / self.count

def log_progress(self) -> None:

Check warning on line 237 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L237

Added line #L237 was not covered by tests
"""Log progress of events."""
progress = ""
if self.name:
progress = f"{self.name} "
progress += f"progress: {self.count}"
if self.total:
progress += f"/{self.total}"
progress += (
f" elapsed: {self.elapsed():.3f}s"
f" est.total: {self.estimated_total():.3f}s"
f" per event: {self.per_event():.3f}s"
)
logging.info(progress)

def log_periodic_progress(self) -> None:

Check warning on line 252 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L252

Added line #L252 was not covered by tests
"""Periodically log progress of events.

Progress is logged periodically after a given amount of events
and when all the events are completed.
"""
if self.count != self.total and self.count % self.periodic_delta != 0:
return
self.log_progress()

def count_event(self) -> None:

Check warning on line 262 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L262

Added line #L262 was not covered by tests
"""Count a new event."""
self.count += 1
self.log_periodic_progress()


def get_default_quota_resource(resource_type):
"""
Get default quota resource by given resource type.
Expand Down Expand Up @@ -251,40 +310,37 @@
update policy.
"""
from reana_db.config import DEFAULT_QUOTA_RESOURCES
from reana_db.database import Session
from reana_db.models import Resource, ResourceType, User, UserResource

if not override_policy_checks and should_skip_quota_update(ResourceType.disk):
return

users = [user] if user else User.query.all()
disk_resource = get_default_quota_resource(ResourceType.disk.name)

users = [user] if user else User.query.all()
timer = Timer("User disk quota usage update", total=len(users))
for u in users:
disk_resource = Resource.query.filter_by(
name=DEFAULT_QUOTA_RESOURCES["disk"]
).one_or_none()

if disk_resource:
from .database import Session

user_resource_quota = UserResource.query.filter_by(
user_id=u.id_, resource_id=disk_resource.id_
).first()
if bytes_to_sum is not None:
updated_quota_usage = user_resource_quota.quota_used + bytes_to_sum
if updated_quota_usage < 0:
logging.warning(
f"Disk quota consumption of user {u.id_} would become negative: "
f"{user_resource_quota.quota_used} [original usage] + {bytes_to_sum} [delta] "
f"-> {updated_quota_usage} [new usage]. Setting the new usage to zero."
)
user_resource_quota.quota_used = 0
else:
user_resource_quota.quota_used = updated_quota_usage
user_resource_quota = UserResource.query.filter_by(
user_id=u.id_, resource_id=disk_resource.id_
).one()
if bytes_to_sum is not None:
updated_quota_usage = user_resource_quota.quota_used + bytes_to_sum
if updated_quota_usage < 0:
logging.warning(

Check warning on line 330 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L328-L330

Added lines #L328 - L330 were not covered by tests
f"Disk quota consumption of user {u.id_} would become negative: "
f"{user_resource_quota.quota_used} [original usage] + {bytes_to_sum} [delta] "
f"-> {updated_quota_usage} [new usage]. Setting the new usage to zero."
)
user_resource_quota.quota_used = 0

Check warning on line 335 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L335

Added line #L335 was not covered by tests
else:
workspace_path = u.get_user_workspace()
disk_usage_bytes = get_disk_usage_or_zero(workspace_path)
user_resource_quota.quota_used = disk_usage_bytes
Session.commit()
user_resource_quota.quota_used = updated_quota_usage

Check warning on line 337 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L337

Added line #L337 was not covered by tests
else:
workspace_path = u.get_user_workspace()
disk_usage_bytes = get_disk_usage_or_zero(workspace_path)
user_resource_quota.quota_used = disk_usage_bytes
Session.commit()
timer.count_event()


def update_workflow_cpu_quota(workflow) -> int:
Expand All @@ -300,11 +356,15 @@
WorkflowResource,
)

if should_skip_quota_update(ResourceType.cpu):
return

cpu_resource = get_default_quota_resource(ResourceType.cpu.name)

terminated_at = workflow.run_finished_at or workflow.run_stopped_at
if workflow.run_started_at and terminated_at:
cpu_time = terminated_at - workflow.run_started_at
cpu_milliseconds = int(cpu_time.total_seconds() * 1000)
cpu_resource = get_default_quota_resource(ResourceType.cpu.name)
# WorkflowResource might exist already if the cluster
# follows a combined termination + periodic policy (eg. created
# by the status listener, revisited by the cronjob)
Expand All @@ -319,23 +379,40 @@
resource_id=cpu_resource.id_,
quota_used=cpu_milliseconds,
)
user_resource_quota = UserResource.query.filter_by(
user_id=workflow.owner_id, resource_id=cpu_resource.id_
).first()
user_resource_quota.quota_used += cpu_milliseconds
Session.add(workflow_resource)
Session.commit()
return cpu_milliseconds
return 0


def update_workflows_cpu_quota() -> None:

Check warning on line 388 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L388

Added line #L388 was not covered by tests
"""Update the CPU quotas of all workflows in a more efficient way."""
from reana_db.database import Session
from reana_db.models import Workflow

# logs and reana_specification are not loaded to avoid consuming
# huge amounts of memory
workflows = Workflow.query.options(
defer(Workflow.logs), defer(Workflow.reana_specification)
).all()
# We expunge all the workflows, as they will not be modified when updating the quotas.
# This makes `Session.commit()` much faster
for workflow in workflows:
Session.expunge(workflow)
timer = Timer("Workflow CPU quota usage update", total=len(workflows))
for workflow in workflows:
update_workflow_cpu_quota(workflow)
timer.count_event()


def update_users_cpu_quota(user=None) -> None:
"""Update users CPU quota usage.

:param user: User whose CPU quota will be updated. If None, applies to all users.
User CPU quotas will be calculated from workflow CPU quotas,
so the latter should be updated before the former.

:param user: User whose CPU quota will be updated. If None, applies to all users.
:type user: reana_db.models.User

"""
from reana_db.database import Session
from reana_db.models import (
Expand All @@ -344,33 +421,38 @@
UserResource,
UserToken,
UserTokenStatus,
Workflow,
WorkflowResource,
)

if should_skip_quota_update(ResourceType.cpu):
return

cpu_resource = get_default_quota_resource(ResourceType.cpu.name)

if user:
users = [user]
else:
users = User.query.join(UserToken).filter_by(
status=UserTokenStatus.active # skip users with no active token
users = (
User.query.join(UserToken)
.filter_by(status=UserTokenStatus.active) # skip users with no active token
.all()
)
timer_user = Timer("User CPU quota usage update", total=len(users))
for user in users:
cpu_milliseconds = 0
# logs and reana_specification are not loaded to avoid consuming
# huge amounts of memory
for workflow in user.workflows.options(
defer(Workflow.logs),
defer(Workflow.reana_specification),
):
cpu_milliseconds += update_workflow_cpu_quota(workflow=workflow)
cpu_resource = get_default_quota_resource(ResourceType.cpu.name)
cpu_milliseconds = (
Session.query(func.sum(WorkflowResource.quota_used))
.filter(WorkflowResource.resource_id == cpu_resource.id_)
.join(user.workflows.subquery())
.scalar()
)
if not cpu_milliseconds:
cpu_milliseconds = 0
user_resource_quota = UserResource.query.filter_by(
user_id=user.id_, resource_id=cpu_resource.id_
).first()
user_resource_quota.quota_used = cpu_milliseconds
Session.commit()
timer_user.count_event()


def update_workspace_retention_rules(rules, status) -> None:
Expand Down Expand Up @@ -467,7 +549,7 @@
workflow.workspace_path
)
Session.commit()
elif inspect(workflow).persistent:
else:
workflow_resource = WorkflowResource(
workflow_id=workflow.id_,
resource_id=disk_resource.id_,
Expand All @@ -477,3 +559,23 @@
Session.commit()

return workflow_resource


def update_workflows_disk_quota() -> None:

Check warning on line 564 in reana_db/utils.py

View check run for this annotation

Codecov / codecov/patch

reana_db/utils.py#L564

Added line #L564 was not covered by tests
"""Update the disk quotas of all workflows in a more efficient way."""
from reana_db.database import Session
from reana_db.models import Workflow

# logs and reana_specification are not loaded to avoid consuming
# huge amounts of memory
workflows = Workflow.query.options(
defer(Workflow.logs), defer(Workflow.reana_specification)
).all()
# We expunge all the workflows, as they will not be modified when updating the quotas.
# This makes `Session.commit()` much faster
for workflow in workflows:
Session.expunge(workflow)
timer = Timer("Workflow disk quota usage update", total=len(workflows))
for workflow in workflows:
store_workflow_disk_quota(workflow)
timer.count_event()
Loading