Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

106: integrate celery boost #109

Merged
merged 7 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ repos:
rev: 5.13.2
hooks:
- id: isort
stages: [commit]
stages: [pre-commit]
- repo: https://github.com/ambv/black
rev: 24.4.2
hooks:
- id: black
args: [--config=pyproject.toml]
exclude: "migrations|snapshots"
stages: [commit]
stages: [pre-commit]
- repo: https://github.com/PyCQA/flake8
rev: 7.1.0
hooks:
- id: flake8
args: [--config=.flake8]

additional_dependencies: [flake8-bugbear==22.9.23]
stages: [ commit ]
stages: [ pre-commit ]
- repo: https://github.com/PyCQA/bandit
rev: '1.7.9' # Update me!
hooks:
Expand Down
18 changes: 17 additions & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies = [
"setuptools>=74.1.2",
"django-smart-env>=0.1.0",
"jsonschema>=4.23.0",
"django-celery-boost>=0.2.0",
]

[build-system]
Expand Down
1 change: 1 addition & 0 deletions src/hope_dedup_engine/apps/api/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .duplicate import DuplicateAdmin # noqa
from .hdetoken import HDETokenAdmin # noqa
from .image import ImageAdmin # noqa
from .jobs import DedupJob # noqa

admin.site.site_header = "HOPE Dedup Engine"
admin.site.site_title = "HOPE Deduplication Admin"
Expand Down
6 changes: 3 additions & 3 deletions src/hope_dedup_engine/apps/api/admin/deduplicationset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ class DeduplicationSetAdmin(AdminFiltersMixin, ExtraButtonsMixin, ModelAdmin):
"id",
"name",
"reference_pk",
"state_value",
"state",
"config",
"created_at",
"updated_at",
"deleted",
)
readonly_fields = (
"id",
"state_value",
"state",
"external_system",
"created_at",
"created_by",
Expand All @@ -39,7 +39,7 @@ class DeduplicationSetAdmin(AdminFiltersMixin, ExtraButtonsMixin, ModelAdmin):
)
search_fields = ("name",)
list_filter = (
("state_value", ChoicesFieldComboFilter),
("state", ChoicesFieldComboFilter),
("created_at", DateRangeFilter),
("updated_at", DateRangeFilter),
DjangoLookupFilter,
Expand Down
10 changes: 10 additions & 0 deletions src/hope_dedup_engine/apps/api/admin/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from django.contrib import admin

from django_celery_boost.admin import CeleryTaskModelAdmin

from hope_dedup_engine.apps.api.models.jobs import DedupJob


@admin.register(DedupJob)
class DedupJobAdmin(CeleryTaskModelAdmin):
list_display = ["deduplication_set_id", "progress"]
11 changes: 8 additions & 3 deletions src/hope_dedup_engine/apps/api/deduplication/adapters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Generator
from collections.abc import Callable, Generator
from typing import Any

from hope_dedup_engine.apps.api.deduplication.registry import DuplicateKeyPair
Expand All @@ -12,9 +12,12 @@ class DuplicateFaceFinder:
weight = 1

def __init__(self, deduplication_set: DeduplicationSet):
self.tracker = None
self.deduplication_set = deduplication_set

def run(self) -> Generator[DuplicateKeyPair, None, None]:
def run(
self, tracker: Callable[[int], None] | None = None
) -> Generator[DuplicateKeyPair, None, None]:
filename_to_reference_pk = {
filename: reference_pk
for reference_pk, filename in self.deduplication_set.image_set.values_list(
Expand All @@ -28,7 +31,9 @@ def run(self) -> Generator[DuplicateKeyPair, None, None]:
detector = DuplicationDetector(
tuple[str](filename_to_reference_pk.keys()), ds_config
)
for first_filename, second_filename, distance in detector.find_duplicates():
for first_filename, second_filename, distance in detector.find_duplicates(
tracker
):
yield filename_to_reference_pk[first_filename], filename_to_reference_pk[
second_filename
], 1 - distance
77 changes: 0 additions & 77 deletions src/hope_dedup_engine/apps/api/deduplication/lock.py

This file was deleted.

51 changes: 25 additions & 26 deletions src/hope_dedup_engine/apps/api/deduplication/process.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from collections.abc import Callable
from functools import partial

from celery import shared_task
from constance import config

from hope_dedup_engine.apps.api.deduplication.lock import DeduplicationSetLock
from hope_dedup_engine.apps.api.deduplication.registry import (
DuplicateFinder,
DuplicateKeyPair,
get_finders,
)
from hope_dedup_engine.apps.api.models import DeduplicationSet, Duplicate
from hope_dedup_engine.apps.api.models import DedupJob, DeduplicationSet, Duplicate
from hope_dedup_engine.apps.api.utils.notification import send_notification
from hope_dedup_engine.apps.api.utils.progress import track_progress_multi


def _sort_keys(pair: DuplicateKeyPair) -> DuplicateKeyPair:
Expand All @@ -18,8 +21,7 @@ def _sort_keys(pair: DuplicateKeyPair) -> DuplicateKeyPair:
def _save_duplicates(
finder: DuplicateFinder,
deduplication_set: DeduplicationSet,
lock_enabled: bool,
lock: DeduplicationSetLock,
tracker: Callable[[int], None],
) -> None:
reference_pk_to_filename_mapping = dict(
deduplication_set.image_set.values_list("reference_pk", "filename")
Expand All @@ -40,7 +42,7 @@ def _save_duplicates(
deduplication_set.ignoredreferencepkpair_set.values_list("first", "second")
)

for first, second, score in map(_sort_keys, finder.run()):
for first, second, score in map(_sort_keys, finder.run(tracker)):
first_filename, second_filename = sorted(
(
reference_pk_to_filename_mapping[first],
Expand All @@ -59,32 +61,35 @@ def _save_duplicates(
)
duplicate.score += score * finder.weight
duplicate.save()
if lock_enabled:
lock.refresh()


HOUR = 60 * 60


def update_job_progress(job: DedupJob, progress: int) -> None:
job.progress = progress
job.save()


@shared_task(soft_time_limit=0.5 * HOUR, time_limit=1 * HOUR)
def find_duplicates(deduplication_set_id: str, serialized_lock: str) -> None:
deduplication_set = DeduplicationSet.objects.get(pk=deduplication_set_id)
def find_duplicates(dedup_job_id: int, version: int) -> None:
dedup_job: DedupJob = DedupJob.objects.get(pk=dedup_job_id, version=version)
try:
lock_enabled = config.DEDUPLICATION_SET_LOCK_ENABLED
lock = (
DeduplicationSetLock.from_string(serialized_lock) if lock_enabled else None
)
deduplication_set = dedup_job.deduplication_set

if lock_enabled:
# refresh lock in case we spent much time waiting in queue
lock.refresh()
deduplication_set.state = DeduplicationSet.State.DIRTY
deduplication_set.save()
send_notification(deduplication_set.notification_url)

# clean results
Duplicate.objects.filter(deduplication_set=deduplication_set).delete()

weight_total = 0
for finder in get_finders(deduplication_set):
_save_duplicates(finder, deduplication_set, lock_enabled, lock)
for finder, tracker in zip(
get_finders(deduplication_set),
track_progress_multi(partial(update_job_progress, dedup_job)),
):
_save_duplicates(finder, deduplication_set, tracker)
weight_total += finder.weight

for duplicate in deduplication_set.duplicate_set.all():
Expand All @@ -94,11 +99,5 @@ def find_duplicates(deduplication_set_id: str, serialized_lock: str) -> None:
deduplication_set.state = deduplication_set.State.CLEAN
deduplication_set.save()

except Exception:
deduplication_set.state = DeduplicationSet.State.ERROR
deduplication_set.save()
raise

finally:
if lock_enabled:
lock.release()
send_notification(dedup_job.deduplication_set.notification_url)
6 changes: 4 additions & 2 deletions src/hope_dedup_engine/apps/api/deduplication/registry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Generator, Iterable
from collections.abc import Callable, Generator, Iterable
from typing import Protocol

from hope_dedup_engine.apps.api.models import DeduplicationSet
Expand All @@ -9,7 +9,9 @@
class DuplicateFinder(Protocol):
weight: int

def run(self) -> Generator[DuplicateKeyPair, None, None]: ...
def run(
self, tracker: Callable[[int], None]
) -> Generator[DuplicateKeyPair, None, None]: ...


def get_finders(deduplication_set: DeduplicationSet) -> Iterable[DuplicateFinder]:
Expand Down
Loading
Loading