Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chg ! deduplicationset config #139

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
2 changes: 1 addition & 1 deletion docs/src/did/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ To override only the **FACE_DISTANCE_THRESHOLD** parameter, your custom configur

```json
{
"face_distance_threshols": 0.7
"face_distance_threshold": 0.7
}
```

Expand Down
4 changes: 2 additions & 2 deletions src/hope_dedup_engine/apps/api/admin/finding.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from adminfilters.filters import DjangoLookupFilter, NumberFilter
from adminfilters.mixin import AdminFiltersMixin

from hope_dedup_engine.apps.api.models import Finding, Image
from hope_dedup_engine.apps.api.models import Finding


@register(Finding)
Expand All @@ -19,7 +19,7 @@ class FindingAdmin(AdminFiltersMixin, ModelAdmin):
)

def formatted_status_code(self, obj):
return f"{obj.status_code} {Image.StatusCode(obj.status_code).name}"
return f"{obj.status_code} {Finding.StatusCode(obj.status_code).name}"

formatted_status_code.short_description = "Status Code"

Expand Down
48 changes: 0 additions & 48 deletions src/hope_dedup_engine/apps/api/deduplication/adapters.py

This file was deleted.

63 changes: 0 additions & 63 deletions src/hope_dedup_engine/apps/api/deduplication/config.py

This file was deleted.

151 changes: 40 additions & 111 deletions src/hope_dedup_engine/apps/api/deduplication/process.py
Original file line number Diff line number Diff line change
@@ -1,132 +1,61 @@
from dataclasses import asdict

from django.db.models import F
from celery import Task

from celery import chord, shared_task

from hope_dedup_engine.apps.api.deduplication.config import DeduplicationSetConfig

# from hope_dedup_engine.apps.api.deduplication.registry import ( # DuplicateFinder,; DuplicateKeyPair,
# get_finders,
# )
from hope_dedup_engine.apps.api.models import DedupJob, DeduplicationSet, Finding
from hope_dedup_engine.apps.api.models.config import DeduplicationSetConfig
from hope_dedup_engine.apps.api.utils.notification import send_notification

# from hope_dedup_engine.apps.api.utils.progress import track_progress_multi
from hope_dedup_engine.apps.faces.celery_tasks import (
callback_encodings,
encode_chunk,
get_chunks,
handle_error,
from hope_dedup_engine.apps.faces.celery.pipeline import image_pipeline
from hope_dedup_engine.config.celery import app
from hope_dedup_engine.utils.celery.task_result import (
Result,
UnexpectedResultError,
is_error,
is_value,
)

# def _sort_keys(pair: DuplicateKeyPair) -> DuplicateKeyPair:
# first, second, score = pair
# return *sorted((first, second)), score

@app.task
def clear_findings(deduplication_set_id: str) -> None:
deduplication_set: DeduplicationSet = DeduplicationSet.objects.get(
id=deduplication_set_id
)

# def _save_duplicates(
# finder: DuplicateFinder,
# deduplication_set: DeduplicationSet,
# tracker: Callable[[int], None],
# ) -> None:
# reference_pk_to_filename_mapping = dict(
# deduplication_set.image_set.values_list("reference_pk", "filename")
# )
# ignored_filename_pairs = frozenset(
# map(
# tuple,
# map(
# sorted,
# deduplication_set.ignoredfilenamepair_set.values_list(
# "first", "second"
# ),
# ),
# )
# )
Finding.objects.filter(deduplication_set=deduplication_set).delete()

# ignored_reference_pk_pairs = frozenset(
# deduplication_set.ignoredreferencepkpair_set.values_list("first", "second")
# )
deduplication_set.state = DeduplicationSet.State.DIRTY
deduplication_set.save(update_fields=["state"])
send_notification(deduplication_set.notification_url)

# for first, second, score in map(_sort_keys, finder.run(tracker)):
# first_filename, second_filename = sorted(
# (
# reference_pk_to_filename_mapping[first],
# reference_pk_to_filename_mapping[second],
# )
# )
# ignored = (first, second) in ignored_reference_pk_pairs or (
# first_filename,
# second_filename,
# ) in ignored_filename_pairs
# if not ignored:
# duplicate, _ = Duplicate.objects.get_or_create(
# deduplication_set=deduplication_set,
# first_reference_pk=first,
# second_reference_pk=second,
# )
# duplicate.score += score * finder.weight
# duplicate.save()

@app.task
def finish(result: Result, deduplication_set_id: str) -> None:
deduplication_set: DeduplicationSet = DeduplicationSet.objects.get(
id=deduplication_set_id
)

HOUR = 60 * 60

if is_error(result):
deduplication_set.state = DeduplicationSet.State.DIRTY
elif is_value(result):
deduplication_set.state = DeduplicationSet.State.CLEAN
else:
raise UnexpectedResultError(result)
deduplication_set.save(update_fields=["state"])

def update_job_progress(job: DedupJob, progress: int) -> None:
job.progress = progress
job.save(update_fields=["progress"])
send_notification(deduplication_set.notification_url)


@shared_task(soft_time_limit=0.5 * HOUR, time_limit=1 * HOUR)
def find_duplicates(dedup_job_id: int, version: int) -> None:
@app.task(bind=True)
def find_duplicates(self: Task, dedup_job_id: int, version: int) -> None:
dedup_job: DedupJob = DedupJob.objects.get(pk=dedup_job_id, version=version)
deduplication_set = dedup_job.deduplication_set
try:

deduplication_set.state = DeduplicationSet.State.DIRTY
deduplication_set.save(update_fields=["state"])
send_notification(deduplication_set.notification_url)

config = asdict(
DeduplicationSetConfig.from_deduplication_set(deduplication_set)
)

# clean results
Finding.objects.filter(deduplication_set=deduplication_set).delete()
dedup_job.progress = 0
dedup_job.save(update_fields=["progress"])

# weight_total = 0
# for finder, tracker in zip(
# for finder, _ in zip(
# get_finders(deduplication_set),
# track_progress_multi(partial(update_job_progress, dedup_job)),
# ):
# # _save_duplicates(finder, deduplication_set, tracker)
# weight_total += finder.weight

weight_total = 1
deduplication_set.finding_set.update(score=F("score") / weight_total)

files = deduplication_set.image_set.values_list("filename", flat=True)
chunks = get_chunks(files)
tasks = [encode_chunk.s(chunk, config) for chunk in chunks]
chord_id = chord(tasks)(callback_encodings.s(config=config))
config = asdict(DeduplicationSetConfig.from_deduplication_set(deduplication_set))

# for finder, tracker in zip(
# get_finders(deduplication_set),
# track_progress_multi(partial(update_job_progress, dedup_job)),
# ):
# for first, second, score in finder.run(tracker):
# finding = (first, second, score * finder.weight)
# deduplication_set.update_findings(finding)
pipeline = (
clear_findings.s(deduplication_set.id)
| image_pipeline(deduplication_set, config)
| finish.s(deduplication_set.id)
)

return {
"deduplication_set": str(deduplication_set),
"chord_id": str(chord_id),
"chunks": len(chunks),
}
except Exception:
handle_error(deduplication_set)
raise
return self.replace(pipeline)
20 changes: 0 additions & 20 deletions src/hope_dedup_engine/apps/api/deduplication/registry.py

This file was deleted.

7 changes: 0 additions & 7 deletions src/hope_dedup_engine/apps/api/forms.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Generated by Django 5.1.5 on 2025-01-28 08:54

import django.db.models.deletion
import hope_dedup_engine.apps.api.validators
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("api", "0016_remove_finding_error_finding_first_filename_and_more"),
]

operations = [
migrations.AddField(
model_name="deduplicationset",
name="encoding_errors",
field=models.JSONField(blank=True, default=dict, null=True),
),
migrations.AlterField(
model_name="config",
name="settings",
field=models.JSONField(
blank=True,
default=dict,
null=True,
validators=[
hope_dedup_engine.apps.api.validators.validate_constance_config
],
),
),
migrations.AlterField(
model_name="deduplicationset",
name="config",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="api.config",
),
),
]
Loading
Loading