Skip to content

Commit

Permalink
Make celery boost integration work
Browse files Browse the repository at this point in the history
  • Loading branch information
sergey-misuk-valor committed Oct 31, 2024
1 parent 1f44c5b commit 3aedc69
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 17 deletions.
1 change: 0 additions & 1 deletion src/hope_dedup_engine/apps/api/deduplication/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from hope_dedup_engine.apps.api.models import DeduplicationSet

DELIMITER: Final[str] = "|"
LOCK_IS_NOT_ENABLED = "LOCK_IS_NOT_ENABLED"


class DeduplicationSetLock:
Expand Down
13 changes: 9 additions & 4 deletions src/hope_dedup_engine/apps/api/deduplication/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
DuplicateKeyPair,
get_finders,
)
from hope_dedup_engine.apps.api.models import DeduplicationSet, Duplicate
from hope_dedup_engine.apps.api.models import DedupJob, DeduplicationSet, Duplicate


def _sort_keys(pair: DuplicateKeyPair) -> DuplicateKeyPair:
Expand Down Expand Up @@ -67,18 +67,22 @@ def _save_duplicates(


@shared_task(soft_time_limit=0.5 * HOUR, time_limit=1 * HOUR)
def find_duplicates(deduplication_set_id: str, serialized_lock: str) -> None:
deduplication_set = DeduplicationSet.objects.get(pk=deduplication_set_id)
def find_duplicates(dedup_job_id: int, version: int) -> None:
dedup_job: DedupJob = DedupJob.objects.get(pk=dedup_job_id, version=version)
try:
lock_enabled = config.DEDUPLICATION_SET_LOCK_ENABLED
lock = (
DeduplicationSetLock.from_string(serialized_lock) if lock_enabled else None
DeduplicationSetLock.from_string(dedup_job.serialized_lock)
if lock_enabled
else None
)

if lock_enabled:
# refresh lock in case we spent much time waiting in queue
lock.refresh()

deduplication_set = dedup_job.deduplication_set

# clean results
Duplicate.objects.filter(deduplication_set=deduplication_set).delete()

Expand All @@ -98,6 +102,7 @@ def find_duplicates(deduplication_set_id: str, serialized_lock: str) -> None:
lock.release()

except Exception:
deduplication_set = dedup_job.deduplication_set
deduplication_set.state = DeduplicationSet.State.ERROR
deduplication_set.save()
raise
15 changes: 14 additions & 1 deletion src/hope_dedup_engine/apps/api/migrations/0010_dedupjob.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by Django 5.0.7 on 2024-10-30 12:45
# Generated by Django 5.0.7 on 2024-10-31 20:48

import concurrency.fields
import django.db.models.deletion
from django.db import migrations, models


Expand Down Expand Up @@ -63,6 +64,18 @@ class Migration(migrations.Migration):
null=True,
),
),
(
"serialized_lock",
models.CharField(editable=False, max_length=128, null=True),
),
(
"deduplication_set",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="jobs",
to="api.deduplicationset",
),
),
],
options={
"abstract": False,
Expand Down
23 changes: 23 additions & 0 deletions src/hope_dedup_engine/apps/api/models/jobs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
from django.db import models

from constance import config
from django_celery_boost.models import CeleryTaskModel

from hope_dedup_engine.apps.api.deduplication.lock import DeduplicationSetLock
from hope_dedup_engine.apps.api.models import DeduplicationSet


class DedupJob(CeleryTaskModel):
deduplication_set = models.ForeignKey(
"DeduplicationSet", on_delete=models.CASCADE, related_name="jobs"
)
serialized_lock = models.CharField(max_length=128, null=True, editable=False)

celery_task_name = (
"hope_dedup_engine.apps.api.deduplication.process.find_duplicates"
)

def queue(self, use_version: bool = True) -> str | None:
if config.DEDUPLICATION_SET_LOCK_ENABLED:
self.serialized_lock = str(
DeduplicationSetLock.for_deduplication_set(self.deduplication_set)
)
self.save()

self.deduplication_set.state = DeduplicationSet.State.PROCESSING
self.deduplication_set.save()

return super().queue(use_version=use_version)
13 changes: 2 additions & 11 deletions src/hope_dedup_engine/apps/api/utils/process.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from constance import config
from rest_framework import status
from rest_framework.exceptions import APIException

from hope_dedup_engine.apps.api.deduplication.lock import LOCK_IS_NOT_ENABLED
from hope_dedup_engine.apps.api.models import DeduplicationSet
from hope_dedup_engine.apps.api.models.jobs import DedupJob


class AlreadyProcessingError(APIException):
Expand All @@ -14,17 +13,9 @@ class AlreadyProcessingError(APIException):

def start_processing(deduplication_set: DeduplicationSet) -> None:
from hope_dedup_engine.apps.api.deduplication.lock import DeduplicationSetLock
from hope_dedup_engine.apps.api.deduplication.process import find_duplicates

try:
lock = (
DeduplicationSetLock.for_deduplication_set(deduplication_set)
if config.DEDUPLICATION_SET_LOCK_ENABLED
else LOCK_IS_NOT_ENABLED
)
deduplication_set.state = DeduplicationSet.State.PROCESSING
deduplication_set.save()
find_duplicates.delay(str(deduplication_set.pk), str(lock))
DedupJob.objects.create(deduplication_set=deduplication_set).queue()
except DeduplicationSetLock.LockNotOwnedException as e:
raise AlreadyProcessingError from e

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% extends "admin_extra_buttons/change_form.html" %}
{% block object-tools %}
{{ block.super }}
<div>
{{ original.task_status }}
</div>
{% endblock %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{% extends "admin_extra_buttons/action_page.html" %}
{% block action-content %}
<h1>{{ original }}</h1>
{{ original.curr_async_result_id }}
{% if flower_addr %}
<a class="button" target="flower" href="{{ flower_addr }}/task/{{ original.curr_async_result_id }}">Flower</a>
{% endif %}
<h3>Task Info</h3>
<table>
{% for k,v in original.task_info.items %}
<tr>
<th>{{ k }}</th>
<td>{{ v }}</td>
</tr>
{% endfor %}
<tr>
<th>Position in queue</th>
<td>{{ original.queue_position }}</td>
</tr>
</table>



<h3>Queue Info</h3>
<table>
{% for k,v in original.queue_info.items %}
{% if k == "headers" %}
<tr>
<th>{{ k }}</th>
<td>
<table>
{% for k1,v1 in original.queue_info.headers.items %}
<tr>
<td>{{ k1 }}</td>
<td>{{ v1 }}</td>
</tr>
{% endfor %}
</table>
</td>
</tr>
{% else %}
<tr>
<th>{{ k }}</th>
<td>{{ v }}</td>
</tr>
{% endif %}
{% endfor %}
</table>

{% endblock %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% extends "admin_extra_buttons/confirm.html" %}
{% block action-content %}
1111
{{block.super }}
{% endblock %}

0 comments on commit 3aedc69

Please sign in to comment.