diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9b2fcc7..a2de49d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,14 +3,14 @@ repos: rev: 5.13.2 hooks: - id: isort - stages: [commit] + stages: [pre-commit] - repo: https://github.com/ambv/black rev: 24.4.2 hooks: - id: black args: [--config=pyproject.toml] exclude: "migrations|snapshots" - stages: [commit] + stages: [pre-commit] - repo: https://github.com/PyCQA/flake8 rev: 7.1.0 hooks: @@ -18,7 +18,7 @@ repos: args: [--config=.flake8] additional_dependencies: [flake8-bugbear==22.9.23] - stages: [ commit ] + stages: [ pre-commit ] - repo: https://github.com/PyCQA/bandit rev: '1.7.9' # Update me! hooks: diff --git a/pdm.lock b/pdm.lock index f9da967..a3f5338 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:9ca78853e306e245544b961ba5f90ebef6c59c8021963a029c4713b55197ad80" +content_hash = "sha256:af360b12fc908c108a5ddb105b8b43c689b284a2ef1ac1167c0473b093029aeb" [[metadata.targets]] requires_python = ">=3.12" @@ -612,6 +612,22 @@ files = [ {file = "django_celery_beat-2.7.0.tar.gz", hash = "sha256:8482034925e09b698c05ad61c36ed2a8dbc436724a3fe119215193a4ca6dc967"}, ] +[[package]] +name = "django-celery-boost" +version = "0.2.0" +requires_python = ">=3.10" +summary = "Django Abstract Model to work with Celery" +groups = ["default"] +dependencies = [ + "celery>=5.4.0", + "django-admin-extra-buttons>=1.5.8", + "django-concurrency>=2.6", +] +files = [ + {file = "django_celery_boost-0.2.0-py3-none-any.whl", hash = "sha256:a229a71b2a5bd1be1ba338005a64af81a3e1c159f5e553b8a211a47aa11ead1d"}, + {file = "django_celery_boost-0.2.0.tar.gz", hash = "sha256:7399a46d4c361abbe96acb1f00f53bc96eaec4f1928f3f83fd0de85c0984ebda"}, +] + [[package]] name = "django-celery-results" version = "2.5.1" diff --git a/pyproject.toml b/pyproject.toml index 9695f87..4eab890 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ dependencies = [ "setuptools>=74.1.2", "django-smart-env>=0.1.0", "jsonschema>=4.23.0", + "django-celery-boost>=0.2.0", ] [build-system] diff --git a/src/hope_dedup_engine/apps/api/admin/__init__.py b/src/hope_dedup_engine/apps/api/admin/__init__.py index 2cd9c3d..ff9f398 100644 --- a/src/hope_dedup_engine/apps/api/admin/__init__.py +++ b/src/hope_dedup_engine/apps/api/admin/__init__.py @@ -5,6 +5,7 @@ from .duplicate import DuplicateAdmin # noqa from .hdetoken import HDETokenAdmin # noqa from .image import ImageAdmin # noqa +from .jobs import DedupJob # noqa admin.site.site_header = "HOPE Dedup Engine" admin.site.site_title = "HOPE Deduplication Admin" diff --git a/src/hope_dedup_engine/apps/api/admin/deduplicationset.py b/src/hope_dedup_engine/apps/api/admin/deduplicationset.py index 654b8ee..f125631 100644 --- a/src/hope_dedup_engine/apps/api/admin/deduplicationset.py +++ b/src/hope_dedup_engine/apps/api/admin/deduplicationset.py @@ -21,7 +21,7 @@ class DeduplicationSetAdmin(AdminFiltersMixin, ExtraButtonsMixin, ModelAdmin): "id", "name", "reference_pk", - "state_value", + "state", "config", "created_at", "updated_at", @@ -29,7 +29,7 @@ class DeduplicationSetAdmin(AdminFiltersMixin, ExtraButtonsMixin, ModelAdmin): ) readonly_fields = ( "id", - "state_value", + "state", "external_system", "created_at", "created_by", @@ -39,7 +39,7 @@ class DeduplicationSetAdmin(AdminFiltersMixin, ExtraButtonsMixin, ModelAdmin): ) search_fields = ("name",) list_filter = ( - ("state_value", ChoicesFieldComboFilter), + ("state", ChoicesFieldComboFilter), ("created_at", DateRangeFilter), ("updated_at", DateRangeFilter), DjangoLookupFilter, diff --git a/src/hope_dedup_engine/apps/api/admin/jobs.py b/src/hope_dedup_engine/apps/api/admin/jobs.py new file mode 100644 index 0000000..360331d --- /dev/null +++ b/src/hope_dedup_engine/apps/api/admin/jobs.py @@ -0,0 +1,10 @@ +from django.contrib import admin + +from django_celery_boost.admin import CeleryTaskModelAdmin + +from hope_dedup_engine.apps.api.models.jobs import DedupJob + + +@admin.register(DedupJob) +class DedupJobAdmin(CeleryTaskModelAdmin): + list_display = ["deduplication_set_id", "progress"] diff --git a/src/hope_dedup_engine/apps/api/deduplication/adapters.py b/src/hope_dedup_engine/apps/api/deduplication/adapters.py index 7c72c64..a6345bc 100644 --- a/src/hope_dedup_engine/apps/api/deduplication/adapters.py +++ b/src/hope_dedup_engine/apps/api/deduplication/adapters.py @@ -1,4 +1,4 @@ -from collections.abc import Generator +from collections.abc import Callable, Generator from typing import Any from hope_dedup_engine.apps.api.deduplication.registry import DuplicateKeyPair @@ -12,9 +12,12 @@ class DuplicateFaceFinder: weight = 1 def __init__(self, deduplication_set: DeduplicationSet): + self.tracker = None self.deduplication_set = deduplication_set - def run(self) -> Generator[DuplicateKeyPair, None, None]: + def run( + self, tracker: Callable[[int], None] | None = None + ) -> Generator[DuplicateKeyPair, None, None]: filename_to_reference_pk = { filename: reference_pk for reference_pk, filename in self.deduplication_set.image_set.values_list( @@ -28,7 +31,9 @@ def run(self) -> Generator[DuplicateKeyPair, None, None]: detector = DuplicationDetector( tuple[str](filename_to_reference_pk.keys()), ds_config ) - for first_filename, second_filename, distance in detector.find_duplicates(): + for first_filename, second_filename, distance in detector.find_duplicates( + tracker + ): yield filename_to_reference_pk[first_filename], filename_to_reference_pk[ second_filename ], 1 - distance diff --git a/src/hope_dedup_engine/apps/api/deduplication/lock.py b/src/hope_dedup_engine/apps/api/deduplication/lock.py deleted file mode 100644 index 88ddf3f..0000000 --- a/src/hope_dedup_engine/apps/api/deduplication/lock.py +++ /dev/null @@ -1,77 +0,0 @@ -from __future__ import annotations - -from base64 import b64decode, b64encode -from typing import Final, Self - -from django.core.cache import cache - -from constance import config -from redis.exceptions import LockNotOwnedError -from redis.lock import Lock - -from hope_dedup_engine.apps.api.models import DeduplicationSet - -DELIMITER: Final[str] = "|" -LOCK_IS_NOT_ENABLED = "LOCK_IS_NOT_ENABLED" - - -class DeduplicationSetLock: - """ - A lock used to limit access to a specific deduplication set. - This lock can be serialized, passed to Celery worker, and then deserialized. - """ - - class LockNotOwnedException(Exception): - pass - - def __init__(self, name: str, token: bytes | None = None) -> None: - # we heavily rely on Redis being used as a cache framework backend. - redis = cache._cache.get_client() - lock = Lock( - redis, - name, - blocking=False, - thread_local=False, - timeout=config.DEDUPLICATION_SET_LAST_ACTION_TIMEOUT, - ) - - if token is None: - # new lock - if not lock.acquire(): - raise self.LockNotOwnedException - else: - # deserialized lock - lock.local.token = token - if not lock.owned(): - raise DeduplicationSetLock.LockNotOwnedException - - self.lock = lock - - def __str__(self) -> str: - name_bytes, token_bytes = self.lock.name.encode(), self.lock.local.token - encoded = map(b64encode, (name_bytes, token_bytes)) - string_values = map(bytes.decode, encoded) - return DELIMITER.join(string_values) - - def refresh(self) -> None: - try: - self.lock.extend(config.DEDUPLICATION_SET_LAST_ACTION_TIMEOUT, True) - except LockNotOwnedError as e: - raise self.LockNotOwnedException from e - - def release(self) -> None: - try: - self.lock.release() - except LockNotOwnedError as e: - raise self.LockNotOwnedException from e - - @classmethod - def for_deduplication_set( - cls: type[Self], deduplication_set: DeduplicationSet - ) -> Self: - return cls(f"lock:{deduplication_set.pk}") - - @classmethod - def from_string(cls: type[Self], serialized: str) -> Self: - name_bytes, token_bytes = map(b64decode, serialized.split(DELIMITER)) - return cls(name_bytes.decode(), token_bytes) diff --git a/src/hope_dedup_engine/apps/api/deduplication/process.py b/src/hope_dedup_engine/apps/api/deduplication/process.py index 33bfb67..7142b18 100644 --- a/src/hope_dedup_engine/apps/api/deduplication/process.py +++ b/src/hope_dedup_engine/apps/api/deduplication/process.py @@ -1,13 +1,16 @@ +from collections.abc import Callable +from functools import partial + from celery import shared_task -from constance import config -from hope_dedup_engine.apps.api.deduplication.lock import DeduplicationSetLock from hope_dedup_engine.apps.api.deduplication.registry import ( DuplicateFinder, DuplicateKeyPair, get_finders, ) -from hope_dedup_engine.apps.api.models import DeduplicationSet, Duplicate +from hope_dedup_engine.apps.api.models import DedupJob, DeduplicationSet, Duplicate +from hope_dedup_engine.apps.api.utils.notification import send_notification +from hope_dedup_engine.apps.api.utils.progress import track_progress_multi def _sort_keys(pair: DuplicateKeyPair) -> DuplicateKeyPair: @@ -18,8 +21,7 @@ def _sort_keys(pair: DuplicateKeyPair) -> DuplicateKeyPair: def _save_duplicates( finder: DuplicateFinder, deduplication_set: DeduplicationSet, - lock_enabled: bool, - lock: DeduplicationSetLock, + tracker: Callable[[int], None], ) -> None: reference_pk_to_filename_mapping = dict( deduplication_set.image_set.values_list("reference_pk", "filename") @@ -40,7 +42,7 @@ def _save_duplicates( deduplication_set.ignoredreferencepkpair_set.values_list("first", "second") ) - for first, second, score in map(_sort_keys, finder.run()): + for first, second, score in map(_sort_keys, finder.run(tracker)): first_filename, second_filename = sorted( ( reference_pk_to_filename_mapping[first], @@ -59,32 +61,35 @@ def _save_duplicates( ) duplicate.score += score * finder.weight duplicate.save() - if lock_enabled: - lock.refresh() HOUR = 60 * 60 +def update_job_progress(job: DedupJob, progress: int) -> None: + job.progress = progress + job.save() + + @shared_task(soft_time_limit=0.5 * HOUR, time_limit=1 * HOUR) -def find_duplicates(deduplication_set_id: str, serialized_lock: str) -> None: - deduplication_set = DeduplicationSet.objects.get(pk=deduplication_set_id) +def find_duplicates(dedup_job_id: int, version: int) -> None: + dedup_job: DedupJob = DedupJob.objects.get(pk=dedup_job_id, version=version) try: - lock_enabled = config.DEDUPLICATION_SET_LOCK_ENABLED - lock = ( - DeduplicationSetLock.from_string(serialized_lock) if lock_enabled else None - ) + deduplication_set = dedup_job.deduplication_set - if lock_enabled: - # refresh lock in case we spent much time waiting in queue - lock.refresh() + deduplication_set.state = DeduplicationSet.State.DIRTY + deduplication_set.save() + send_notification(deduplication_set.notification_url) # clean results Duplicate.objects.filter(deduplication_set=deduplication_set).delete() weight_total = 0 - for finder in get_finders(deduplication_set): - _save_duplicates(finder, deduplication_set, lock_enabled, lock) + for finder, tracker in zip( + get_finders(deduplication_set), + track_progress_multi(partial(update_job_progress, dedup_job)), + ): + _save_duplicates(finder, deduplication_set, tracker) weight_total += finder.weight for duplicate in deduplication_set.duplicate_set.all(): @@ -94,11 +99,5 @@ def find_duplicates(deduplication_set_id: str, serialized_lock: str) -> None: deduplication_set.state = deduplication_set.State.CLEAN deduplication_set.save() - except Exception: - deduplication_set.state = DeduplicationSet.State.ERROR - deduplication_set.save() - raise - finally: - if lock_enabled: - lock.release() + send_notification(dedup_job.deduplication_set.notification_url) diff --git a/src/hope_dedup_engine/apps/api/deduplication/registry.py b/src/hope_dedup_engine/apps/api/deduplication/registry.py index 9886ab4..603261c 100644 --- a/src/hope_dedup_engine/apps/api/deduplication/registry.py +++ b/src/hope_dedup_engine/apps/api/deduplication/registry.py @@ -1,4 +1,4 @@ -from collections.abc import Generator, Iterable +from collections.abc import Callable, Generator, Iterable from typing import Protocol from hope_dedup_engine.apps.api.models import DeduplicationSet @@ -9,7 +9,9 @@ class DuplicateFinder(Protocol): weight: int - def run(self) -> Generator[DuplicateKeyPair, None, None]: ... + def run( + self, tracker: Callable[[int], None] + ) -> Generator[DuplicateKeyPair, None, None]: ... def get_finders(deduplication_set: DeduplicationSet) -> Iterable[DuplicateFinder]: diff --git a/src/hope_dedup_engine/apps/api/migrations/0010_dedupjob.py b/src/hope_dedup_engine/apps/api/migrations/0010_dedupjob.py new file mode 100644 index 0000000..85f5c2e --- /dev/null +++ b/src/hope_dedup_engine/apps/api/migrations/0010_dedupjob.py @@ -0,0 +1,94 @@ +# Generated by Django 5.0.7 on 2024-10-31 20:48 + +import concurrency.fields +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0009_remove_config_face_distance_threshold_config_name_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="DedupJob", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "version", + concurrency.fields.AutoIncVersionField( + default=0, help_text="record revision number" + ), + ), + ( + "curr_async_result_id", + models.CharField( + blank=True, + editable=False, + help_text="Current (active) AsyncResult is", + max_length=36, + null=True, + ), + ), + ( + "last_async_result_id", + models.CharField( + blank=True, + editable=False, + help_text="Latest executed AsyncResult is", + max_length=36, + null=True, + ), + ), + ( + "celery_history", + models.JSONField(blank=True, default=dict, editable=False), + ), + ( + "celery_result", + models.CharField( + blank=True, + default="", + editable=False, + max_length=100, + null=True, + ), + ), + ( + "serialized_lock", + models.CharField(editable=False, max_length=128, null=True), + ), + ( + "deduplication_set", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="jobs", + to="api.deduplicationset", + ), + ), + ], + options={ + "abstract": False, + "default_permissions": ( + "add", + "change", + "delete", + "view", + "queue", + "terminate", + "inspect", + "revoke", + ), + }, + ), + ] diff --git a/src/hope_dedup_engine/apps/api/migrations/0011_dedupjob_progress.py b/src/hope_dedup_engine/apps/api/migrations/0011_dedupjob_progress.py new file mode 100644 index 0000000..577e926 --- /dev/null +++ b/src/hope_dedup_engine/apps/api/migrations/0011_dedupjob_progress.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0.7 on 2024-11-04 22:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0010_dedupjob"), + ] + + operations = [ + migrations.AddField( + model_name="dedupjob", + name="progress", + field=models.IntegerField(default=0), + ), + ] diff --git a/src/hope_dedup_engine/apps/api/migrations/0012_alter_dedupjob_deduplication_set.py b/src/hope_dedup_engine/apps/api/migrations/0012_alter_dedupjob_deduplication_set.py new file mode 100644 index 0000000..e031f88 --- /dev/null +++ b/src/hope_dedup_engine/apps/api/migrations/0012_alter_dedupjob_deduplication_set.py @@ -0,0 +1,21 @@ +# Generated by Django 5.1.2 on 2024-11-05 04:27 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0011_dedupjob_progress"), + ] + + operations = [ + migrations.AlterField( + model_name="dedupjob", + name="deduplication_set", + field=models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, to="api.deduplicationset" + ), + ), + ] diff --git a/src/hope_dedup_engine/apps/api/migrations/0013_remove_deduplicationset_state_value_and_more.py b/src/hope_dedup_engine/apps/api/migrations/0013_remove_deduplicationset_state_value_and_more.py new file mode 100644 index 0000000..b9d3401 --- /dev/null +++ b/src/hope_dedup_engine/apps/api/migrations/0013_remove_deduplicationset_state_value_and_more.py @@ -0,0 +1,24 @@ +# Generated by Django 5.1.2 on 2024-11-06 11:25 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0012_alter_dedupjob_deduplication_set"), + ] + + operations = [ + migrations.RemoveField( + model_name="deduplicationset", + name="state_value", + ), + migrations.AddField( + model_name="deduplicationset", + name="state", + field=models.IntegerField( + choices=[(0, "Clean"), (1, "Dirty")], db_column="state", default=0 + ), + ), + ] diff --git a/src/hope_dedup_engine/apps/api/migrations/0014_update_deduplication_set_status.py b/src/hope_dedup_engine/apps/api/migrations/0014_update_deduplication_set_status.py new file mode 100644 index 0000000..e897d0a --- /dev/null +++ b/src/hope_dedup_engine/apps/api/migrations/0014_update_deduplication_set_status.py @@ -0,0 +1,25 @@ +# Generated by Django 5.1.2 on 2024-11-07 14:19 + +from django.db import migrations + + +DIRTY = 1 +PROCESSING = 2 +ERROR = 3 + + +def forwards_func(apps, _): + DeduplicationSet = apps.get_model("api", "DeduplicationSet") + DeduplicationSet.objects.filter(state__in=[PROCESSING, ERROR]).update(state=DIRTY) + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0013_remove_deduplicationset_state_value_and_more"), + ] + + operations = [ + # reverse operation is impossible + migrations.RunPython(forwards_func, migrations.RunPython.noop), + ] diff --git a/src/hope_dedup_engine/apps/api/models/__init__.py b/src/hope_dedup_engine/apps/api/models/__init__.py index bfb6a45..8e4c472 100644 --- a/src/hope_dedup_engine/apps/api/models/__init__.py +++ b/src/hope_dedup_engine/apps/api/models/__init__.py @@ -5,3 +5,4 @@ Duplicate, Image, ) +from hope_dedup_engine.apps.api.models.jobs import DedupJob # noqa: F401 diff --git a/src/hope_dedup_engine/apps/api/models/deduplication.py b/src/hope_dedup_engine/apps/api/models/deduplication.py index 9411367..8f092db 100644 --- a/src/hope_dedup_engine/apps/api/models/deduplication.py +++ b/src/hope_dedup_engine/apps/api/models/deduplication.py @@ -4,7 +4,6 @@ from django.conf import settings from django.db import models -from hope_dedup_engine.apps.api.utils.notification import send_notification from hope_dedup_engine.apps.security.models import ExternalSystem REFERENCE_PK_LENGTH: Final[int] = 100 @@ -21,8 +20,6 @@ class State(models.IntegerChoices): 1, "Dirty", ) # Images are added to deduplication set, but not yet processed - PROCESSING = 2, "Processing" # Images are being processed - ERROR = 3, "Error" # Error occurred id = models.UUIDField(primary_key=True, default=uuid4) name = models.CharField( @@ -30,7 +27,7 @@ class State(models.IntegerChoices): ) description = models.TextField(null=True, blank=True) reference_pk = models.CharField(max_length=REFERENCE_PK_LENGTH) # source_id - state_value = models.IntegerField( + state = models.IntegerField( choices=State.choices, default=State.CLEAN, db_column="state", @@ -56,16 +53,6 @@ class State(models.IntegerChoices): notification_url = models.CharField(max_length=255, null=True, blank=True) config = models.ForeignKey("Config", null=True, on_delete=models.SET_NULL) - @property - def state(self) -> State: - return self.State(self.state_value) - - @state.setter - def state(self, value: State) -> None: - if value != self.state_value or value == self.State.CLEAN: - self.state_value = value - send_notification(self.notification_url) - def __str__(self) -> str: return f"ID: {self.pk}" if not self.name else f"{self.name}" diff --git a/src/hope_dedup_engine/apps/api/models/jobs.py b/src/hope_dedup_engine/apps/api/models/jobs.py new file mode 100644 index 0000000..b4ed620 --- /dev/null +++ b/src/hope_dedup_engine/apps/api/models/jobs.py @@ -0,0 +1,15 @@ +from django.db import models + +from django_celery_boost.models import CeleryTaskModel + + +class DedupJob(CeleryTaskModel): + deduplication_set = models.OneToOneField( + "DeduplicationSet", on_delete=models.CASCADE + ) + serialized_lock = models.CharField(max_length=128, null=True, editable=False) + progress = models.IntegerField(default=0) + + celery_task_name = ( + "hope_dedup_engine.apps.api.deduplication.process.find_duplicates" + ) diff --git a/src/hope_dedup_engine/apps/api/serializers.py b/src/hope_dedup_engine/apps/api/serializers.py index 495145a..3aa1a03 100644 --- a/src/hope_dedup_engine/apps/api/serializers.py +++ b/src/hope_dedup_engine/apps/api/serializers.py @@ -29,12 +29,12 @@ def validate_settings(self, value): class DeduplicationSetSerializer(serializers.ModelSerializer): - state = serializers.CharField(source="get_state_value_display", read_only=True) + state = serializers.CharField(source="get_state_display", read_only=True) config = ConfigSerializer(required=False) class Meta: model = DeduplicationSet - exclude = ("deleted", "state_value") + exclude = ("deleted",) read_only_fields = ( "external_system", "created_at", diff --git a/src/hope_dedup_engine/apps/api/utils/process.py b/src/hope_dedup_engine/apps/api/utils/process.py index fe72193..cfb4c09 100644 --- a/src/hope_dedup_engine/apps/api/utils/process.py +++ b/src/hope_dedup_engine/apps/api/utils/process.py @@ -1,9 +1,8 @@ -from constance import config from rest_framework import status from rest_framework.exceptions import APIException -from hope_dedup_engine.apps.api.deduplication.lock import LOCK_IS_NOT_ENABLED from hope_dedup_engine.apps.api.models import DeduplicationSet +from hope_dedup_engine.apps.api.models.jobs import DedupJob class AlreadyProcessingError(APIException): @@ -13,20 +12,8 @@ class AlreadyProcessingError(APIException): def start_processing(deduplication_set: DeduplicationSet) -> None: - from hope_dedup_engine.apps.api.deduplication.lock import DeduplicationSetLock - from hope_dedup_engine.apps.api.deduplication.process import find_duplicates - - try: - lock = ( - DeduplicationSetLock.for_deduplication_set(deduplication_set) - if config.DEDUPLICATION_SET_LOCK_ENABLED - else LOCK_IS_NOT_ENABLED - ) - deduplication_set.state = DeduplicationSet.State.PROCESSING - deduplication_set.save() - find_duplicates.delay(str(deduplication_set.pk), str(lock)) - except DeduplicationSetLock.LockNotOwnedException as e: - raise AlreadyProcessingError from e + job, _ = DedupJob.objects.get_or_create(deduplication_set=deduplication_set) + job.queue() def delete_model_data(_: DeduplicationSet) -> None: diff --git a/src/hope_dedup_engine/apps/api/utils/progress.py b/src/hope_dedup_engine/apps/api/utils/progress.py new file mode 100644 index 0000000..74179f9 --- /dev/null +++ b/src/hope_dedup_engine/apps/api/utils/progress.py @@ -0,0 +1,50 @@ +from collections.abc import Callable, Generator +from functools import partial + +STEP = 10 + + +def callback_filter( + callback: Callable[[int], None], step: int +) -> Callable[[int], None]: + previous_callback_value = -1 + + def update(progress: int) -> None: + nonlocal previous_callback_value + if (callback_value := progress // step * step) != previous_callback_value: + callback(callback_value) + previous_callback_value = callback_value + + return update + + +def track_progress( + callback: Callable[[int], None], send_zero: bool = True +) -> Callable[[int], None]: + update = callback_filter(callback, STEP) + + if send_zero: + update(0) + + return update + + +def track_progress_multi( + callback: Callable[[int], None] +) -> Generator[Callable[[int], None], None, None]: + progress = [] + + update = callback_filter(callback, STEP) + + def individual_callback(value: int, index: int) -> None: + progress[index] = value + update(sum(progress) // len(progress)) + + send_zero = True + while True: + progress.append(0) + yield track_progress( + callback=partial(individual_callback, index=len(progress) - 1), + send_zero=send_zero, + ) + send_zero = False diff --git a/src/hope_dedup_engine/apps/faces/services/duplication_detector.py b/src/hope_dedup_engine/apps/faces/services/duplication_detector.py index e9cef4e..38191cb 100644 --- a/src/hope_dedup_engine/apps/faces/services/duplication_detector.py +++ b/src/hope_dedup_engine/apps/faces/services/duplication_detector.py @@ -1,5 +1,6 @@ import logging import os +from collections.abc import Callable from itertools import combinations from typing import Any, Generator @@ -107,7 +108,9 @@ def _existed_images_name(self) -> list[str]: ) return filenames - def find_duplicates(self) -> Generator[tuple[str, str, float], None, None]: + def find_duplicates( + self, tracker: Callable[[int], None] | None = None + ) -> Generator[tuple[str, str, float], None, None]: """ Finds duplicate images based on facial encodings and yields pairs of image paths with their minimum distance. @@ -125,7 +128,8 @@ def find_duplicates(self) -> Generator[tuple[str, str, float], None, None]: existed_images_name = self._existed_images_name() encodings_all = self._load_encodings_all() - for path1, path2 in combinations(existed_images_name, 2): + total_pairs = (n := len(existed_images_name)) * (n - 1) // 2 + for i, (path1, path2) in enumerate(combinations(existed_images_name, 2), 1): encodings1 = encodings_all.get(path1) encodings2 = encodings_all.get(path2) if encodings1 is None or encodings2 is None: @@ -144,6 +148,9 @@ def find_duplicates(self) -> Generator[tuple[str, str, float], None, None]: ): yield (path1, path2, round(min_distance, 5)) + if tracker: + tracker(100 * i // total_pairs) + except Exception as e: self.logger.exception( "Error finding duplicates for images %s", self.filenames diff --git a/src/hope_dedup_engine/config/fragments/constance.py b/src/hope_dedup_engine/config/fragments/constance.py index bbb8281..039f57c 100644 --- a/src/hope_dedup_engine/config/fragments/constance.py +++ b/src/hope_dedup_engine/config/fragments/constance.py @@ -85,16 +85,6 @@ """, float, ), - "DEDUPLICATION_SET_LOCK_ENABLED": ( - True, - "Enable or disable the lock mechanism for deduplication sets", - bool, - ), - "DEDUPLICATION_SET_LAST_ACTION_TIMEOUT": ( - 120, - "Timeout in seconds for the last action on a deduplication set", - int, - ), "NEW_USER_IS_STAFF": (False, "Set any new user as staff", bool), "NEW_USER_DEFAULT_GROUP": ( DEFAULT_GROUP_NAME, @@ -120,13 +110,6 @@ ), "collapse": False, }, - "Task lock settings": { - "fields": ( - "DEDUPLICATION_SET_LOCK_ENABLED", - "DEDUPLICATION_SET_LAST_ACTION_TIMEOUT", - ), - "collapse": False, - }, "User settings": { "fields": ("NEW_USER_IS_STAFF", "NEW_USER_DEFAULT_GROUP"), "collapse": False, diff --git a/src/hope_dedup_engine/config/settings.py b/src/hope_dedup_engine/config/settings.py index ce1adf7..30d7504 100644 --- a/src/hope_dedup_engine/config/settings.py +++ b/src/hope_dedup_engine/config/settings.py @@ -44,6 +44,7 @@ "hope_dedup_engine.apps.faces", "storages", "smart_env", + "django_celery_boost", ) MIDDLEWARE = ( diff --git a/src/hope_dedup_engine/web/templates/admin/celery_boost/change_form.html b/src/hope_dedup_engine/web/templates/admin/celery_boost/change_form.html new file mode 100644 index 0000000..0460323 --- /dev/null +++ b/src/hope_dedup_engine/web/templates/admin/celery_boost/change_form.html @@ -0,0 +1,7 @@ +{% extends "admin_extra_buttons/change_form.html" %} +{% block object-tools %} + {{ block.super }} +
+ {{ original.task_status }} +
+{% endblock %} diff --git a/src/hope_dedup_engine/web/templates/admin/celery_boost/inspect.html b/src/hope_dedup_engine/web/templates/admin/celery_boost/inspect.html new file mode 100644 index 0000000..759745d --- /dev/null +++ b/src/hope_dedup_engine/web/templates/admin/celery_boost/inspect.html @@ -0,0 +1,50 @@ +{% extends "admin_extra_buttons/action_page.html" %} +{% block action-content %} +

{{ original }}

+ {{ original.curr_async_result_id }} + {% if flower_addr %} + Flower + {% endif %} +

Task Info

+ + {% for k,v in original.task_info.items %} + + + + + {% endfor %} + + + + +
{{ k }}{{ v }}
Position in queue{{ original.queue_position }}
+ + + +

Queue Info

+ + {% for k,v in original.queue_info.items %} + {% if k == "headers" %} + + + + + {% else %} + + + + + {% endif %} + {% endfor %} +
{{ k }} + + {% for k1,v1 in original.queue_info.headers.items %} + + + + + {% endfor %} +
{{ k1 }}{{ v1 }}
+
{{ k }}{{ v }}
+ +{% endblock %} diff --git a/src/hope_dedup_engine/web/templates/admin/celery_boost/queue.html b/src/hope_dedup_engine/web/templates/admin/celery_boost/queue.html new file mode 100644 index 0000000..0e12870 --- /dev/null +++ b/src/hope_dedup_engine/web/templates/admin/celery_boost/queue.html @@ -0,0 +1,5 @@ +{% extends "admin_extra_buttons/confirm.html" %} +{% block action-content %} + 1111 +{{block.super }} +{% endblock %} diff --git a/tests/api/conftest.py b/tests/api/conftest.py index 802c50c..5dfb9df 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -12,6 +12,7 @@ ) from testutils.factories.api import ( ConfigFactory, + DedupJobFactory, DeduplicationSetFactory, DuplicateFactory, IgnoredFilenamePairFactory, @@ -44,6 +45,7 @@ IgnoredReferencePkPairFactory, deduplication_set=LazyFixture("deduplication_set") ) register(ConfigFactory) +register(DedupJobFactory, deduplication_set=LazyFixture("deduplication_set")) @fixture @@ -86,7 +88,7 @@ def start_processing(mocker: MockerFixture) -> MagicMock: @fixture(autouse=True) def send_notification(mocker: MockerFixture) -> MagicMock: return mocker.patch( - "hope_dedup_engine.apps.api.models.deduplication.send_notification" + "hope_dedup_engine.apps.api.deduplication.process.send_notification" ) diff --git a/tests/api/test_deduplication_set_lock.py b/tests/api/test_deduplication_set_lock.py deleted file mode 100644 index 1885860..0000000 --- a/tests/api/test_deduplication_set_lock.py +++ /dev/null @@ -1,53 +0,0 @@ -from time import sleep - -from constance.test.pytest import override_config -from pytest import fail, raises -from pytest_django.fixtures import SettingsWrapper - -from hope_dedup_engine.apps.api.deduplication.lock import DeduplicationSetLock -from hope_dedup_engine.apps.api.models import DeduplicationSet - - -def test_basic_usage(deduplication_set: DeduplicationSet) -> None: - try: - lock = DeduplicationSetLock.for_deduplication_set(deduplication_set) - lock.refresh() - lock.release() - except Exception as e: - fail(f"Unexpected exception raised: {e}") - - -def test_can_serialize_and_deserialize(deduplication_set: DeduplicationSet) -> None: - try: - DeduplicationSetLock.from_string( - str(DeduplicationSetLock.for_deduplication_set(deduplication_set)) - ) - except Exception as e: - fail(f"Unexpected exception raised: {e}") - - -def test_cannot_acquire_second_lock_for_same_deduplication_set( - deduplication_set: DeduplicationSet, -) -> None: - DeduplicationSetLock.for_deduplication_set(deduplication_set) - with raises(DeduplicationSetLock.LockNotOwnedException): - DeduplicationSetLock.for_deduplication_set(deduplication_set) - - -def test_cannot_deserialize_released_lock(deduplication_set: DeduplicationSet) -> None: - lock = DeduplicationSetLock.for_deduplication_set(deduplication_set) - serialized_lock = str(lock) - lock.release() - with raises(DeduplicationSetLock.LockNotOwnedException): - DeduplicationSetLock.from_string(serialized_lock) - - -def test_lock_is_released_after_timeout( - deduplication_set: DeduplicationSet, settings: SettingsWrapper -) -> None: - timeout = 0.1 - with override_config(DEDUPLICATION_SET_LAST_ACTION_TIMEOUT=timeout): - lock = DeduplicationSetLock.for_deduplication_set(deduplication_set) - sleep(2 * timeout) - with raises(DeduplicationSetLock.LockNotOwnedException): - lock.refresh() diff --git a/tests/api/test_deduplication_set_process.py b/tests/api/test_deduplication_set_process.py index 9227c69..266b420 100644 --- a/tests/api/test_deduplication_set_process.py +++ b/tests/api/test_deduplication_set_process.py @@ -15,8 +15,6 @@ ( DeduplicationSet.State.CLEAN, DeduplicationSet.State.DIRTY, - DeduplicationSet.State.PROCESSING, - DeduplicationSet.State.ERROR, ), ) def test_can_trigger_deduplication_set_processing_in_any_state( diff --git a/tests/api/test_find_duplicates.py b/tests/api/test_find_duplicates.py index 3a9a659..2015ac8 100644 --- a/tests/api/test_find_duplicates.py +++ b/tests/api/test_find_duplicates.py @@ -1,42 +1,38 @@ -from unittest.mock import MagicMock +from unittest.mock import MagicMock, call from pytest import raises -from hope_dedup_engine.apps.api.deduplication.lock import DeduplicationSetLock from hope_dedup_engine.apps.api.deduplication.process import find_duplicates from hope_dedup_engine.apps.api.deduplication.registry import DuplicateFinder -from hope_dedup_engine.apps.api.models import DeduplicationSet +from hope_dedup_engine.apps.api.models import DedupJob, DeduplicationSet from hope_dedup_engine.apps.api.models.deduplication import Duplicate, Image def test_previous_results_are_removed_before_processing( + dedup_job: DedupJob, deduplication_set: DeduplicationSet, duplicate: Duplicate, duplicate_finders: list[DuplicateFinder], ) -> None: assert deduplication_set.duplicate_set.count() - find_duplicates( - str(deduplication_set.pk), - str(DeduplicationSetLock.for_deduplication_set(deduplication_set)), - ) + find_duplicates(dedup_job.pk, dedup_job.version) assert not deduplication_set.duplicate_set.count() def test_duplicates_are_stored( + dedup_job: DedupJob, deduplication_set: DeduplicationSet, image: Image, second_image: Image, all_duplicates_finder: DuplicateFinder, ) -> None: assert not deduplication_set.duplicate_set.count() - find_duplicates( - str(deduplication_set.pk), - str(DeduplicationSetLock.for_deduplication_set(deduplication_set)), - ) + find_duplicates(dedup_job.pk, dedup_job.version) assert deduplication_set.duplicate_set.count() def test_ignored_reference_pk_pairs( + dedup_job: DedupJob, deduplication_set: DeduplicationSet, image: Image, second_image: Image, @@ -47,15 +43,13 @@ def test_ignored_reference_pk_pairs( first=image.reference_pk, second=second_image.reference_pk, ) - find_duplicates( - str(deduplication_set.pk), - str(DeduplicationSetLock.for_deduplication_set(deduplication_set)), - ) + find_duplicates(dedup_job.pk, dedup_job.version) ignored_reference_pk_pair.delete() assert not deduplication_set.duplicate_set.count() def test_ignored_filename_pairs( + dedup_job: DedupJob, deduplication_set: DeduplicationSet, image: Image, second_image: Image, @@ -66,52 +60,43 @@ def test_ignored_filename_pairs( first=image.filename, second=second_image.filename, ) - find_duplicates( - str(deduplication_set.pk), - str(DeduplicationSetLock.for_deduplication_set(deduplication_set)), - ) + find_duplicates(dedup_job.pk, dedup_job.version) ignored_filename_pair.delete() assert not deduplication_set.duplicate_set.count() def test_weight_is_taken_into_account( + dedup_job: DedupJob, deduplication_set: DeduplicationSet, image: Image, second_image: Image, all_duplicates_finder: DuplicateFinder, no_duplicate_finder: DuplicateFinder, ) -> None: - find_duplicates( - str(deduplication_set.pk), - str(DeduplicationSetLock.for_deduplication_set(deduplication_set)), - ) + find_duplicates(dedup_job.pk, dedup_job.version) assert deduplication_set.duplicate_set.first().score == 0.5 def test_notification_sent_on_successful_run( + dedup_job: DedupJob, deduplication_set: DeduplicationSet, duplicate_finders: list[DuplicateFinder], send_notification: MagicMock, ) -> None: send_notification.reset_mock() # remove notification for CREATE state - find_duplicates( - str(deduplication_set.pk), - str(DeduplicationSetLock.for_deduplication_set(deduplication_set)), - ) - send_notification.assert_called_once_with(deduplication_set.notification_url) + find_duplicates(dedup_job.pk, dedup_job.version) + send_notification.assert_has_calls(2 * [call(deduplication_set.notification_url)]) def test_notification_sent_on_failure( + dedup_job: DedupJob, deduplication_set: DeduplicationSet, failing_duplicate_finder: DuplicateFinder, send_notification: MagicMock, ) -> None: send_notification.reset_mock() # remove notification for CREATE state with raises(Exception): - find_duplicates( - str(deduplication_set.pk), - str(DeduplicationSetLock.for_deduplication_set(deduplication_set)), - ) + find_duplicates(dedup_job.pk, dedup_job.version) deduplication_set.refresh_from_db() - assert deduplication_set.state == deduplication_set.State.ERROR - send_notification.assert_called_once_with(deduplication_set.notification_url) + assert deduplication_set.state == deduplication_set.State.DIRTY + send_notification.assert_has_calls(2 * [call(deduplication_set.notification_url)]) diff --git a/tests/api/test_utils.py b/tests/api/test_utils.py index 2b809da..234929d 100644 --- a/tests/api/test_utils.py +++ b/tests/api/test_utils.py @@ -8,6 +8,7 @@ REQUEST_TIMEOUT, send_notification, ) +from hope_dedup_engine.apps.api.utils.progress import callback_filter @fixture @@ -44,3 +45,12 @@ def test_exception_is_sent_to_sentry( requests_get.side_effect = exception send_notification("https://example.com") sentry_sdk_capture_exception.assert_called_once_with(exception) + + +def test_callback_filter() -> None: + step = 10 + values = [] + update = callback_filter(lambda x: values.append(x), step) + for i in range(1, 101): + update(i) + assert values == list(range(0, 101, step)) diff --git a/tests/extras/demoapp/scripts/base_case b/tests/extras/demoapp/scripts/base_case index 0d23a58..3c4c4d0 100755 --- a/tests/extras/demoapp/scripts/base_case +++ b/tests/extras/demoapp/scripts/base_case @@ -11,9 +11,10 @@ fi ./create_deduplication_set "$1" | jq -r ".id" | xargs ./use_deduplication_set -for file in ../demo_images/*.{jpg,png}; do - if [[ -f "$file" ]]; then - ./create_image $(basename "$file") +for path in ../demo_images/*.{jpg,png}; do + file=$(basename $path) + if [[ -f "$path" ]] && [ "$file" != "too_small.jpg" ] && [ "$file" != "without_face.jpg" ]; then + ./create_image "$(basename "$file")" fi done diff --git a/tests/extras/demoapp/scripts/process_deduplication_set b/tests/extras/demoapp/scripts/process_deduplication_set index a7f66ef..d04255b 100755 --- a/tests/extras/demoapp/scripts/process_deduplication_set +++ b/tests/extras/demoapp/scripts/process_deduplication_set @@ -8,7 +8,7 @@ call_api POST "deduplication_sets/$DEDUPLICATION_SET_ID/process" ./show_deduplication_set -until [ "$(./show_deduplication_set | jq -r ".state")" != "Processing" ] +until [ "$(./show_deduplication_set | jq -r ".state")" != "Clean" ] do sleep 0.5 done diff --git a/tests/extras/testutils/duplicate_finders.py b/tests/extras/testutils/duplicate_finders.py index 605ab7a..6c2c80d 100644 --- a/tests/extras/testutils/duplicate_finders.py +++ b/tests/extras/testutils/duplicate_finders.py @@ -1,4 +1,4 @@ -from collections.abc import Generator +from collections.abc import Callable, Generator from itertools import combinations from hope_dedup_engine.apps.api.deduplication.registry import DuplicateKeyPair @@ -11,7 +11,9 @@ class AllDuplicateFinder: def __init__(self, deduplication_set: DeduplicationSet) -> None: self.deduplication_set = deduplication_set - def run(self) -> Generator[DuplicateKeyPair, None, None]: + def run( + self, _: Callable[[int], None] | None = None + ) -> Generator[DuplicateKeyPair, None, None]: reference_pks = self.deduplication_set.image_set.values_list( "reference_pk", flat=True ).order_by("reference_pk") @@ -22,7 +24,9 @@ def run(self) -> Generator[DuplicateKeyPair, None, None]: class NoDuplicateFinder: weight = 1 - def run(self) -> Generator[DuplicateKeyPair, None, None]: + def run( + self, _: Callable[[int], None] | None = None + ) -> Generator[DuplicateKeyPair, None, None]: # empty generator return yield @@ -31,5 +35,7 @@ def run(self) -> Generator[DuplicateKeyPair, None, None]: class FailingDuplicateFinder: weight = 1 - def run(self) -> Generator[DuplicateKeyPair, None, None]: + def run( + self, _: Callable[[int], None] | None = None + ) -> Generator[DuplicateKeyPair, None, None]: raise Exception diff --git a/tests/extras/testutils/factories/api.py b/tests/extras/testutils/factories/api.py index 5d86a52..16c1dc5 100644 --- a/tests/extras/testutils/factories/api.py +++ b/tests/extras/testutils/factories/api.py @@ -2,7 +2,7 @@ from factory.django import DjangoModelFactory from testutils.factories import ExternalSystemFactory, UserFactory -from hope_dedup_engine.apps.api.models import DeduplicationSet, HDEToken +from hope_dedup_engine.apps.api.models import DedupJob, DeduplicationSet, HDEToken from hope_dedup_engine.apps.api.models.config import Config from hope_dedup_engine.apps.api.models.deduplication import ( Duplicate, @@ -85,3 +85,10 @@ class IgnoredReferencePkPairFactory(DjangoModelFactory): class Meta: model = IgnoredReferencePkPair + + +class DedupJobFactory(DjangoModelFactory): + deduplication_set = SubFactory(DeduplicationSetFactory) + + class Meta: + model = DedupJob