From 731b68f13c4232529914a4c9ff1240bc804127c3 Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Wed, 29 Jan 2025 11:19:54 -0600 Subject: [PATCH 01/10] Create model to track title resolution statuses --- sde_collections/models/delta_patterns.py | 25 ++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sde_collections/models/delta_patterns.py b/sde_collections/models/delta_patterns.py index 61c8e9ea..c48eccae 100644 --- a/sde_collections/models/delta_patterns.py +++ b/sde_collections/models/delta_patterns.py @@ -691,3 +691,28 @@ def save(self, *args, **kwargs): class DeltaResolvedTitleError(DeltaResolvedTitleBase): error_string = models.TextField(null=False, blank=False) http_status_code = models.IntegerField(null=True, blank=True) + + +class TitleResolutionStatus(models.Model): + """Tracks the status of title resolution tasks.""" + + class Status(models.TextChoices): + PENDING = "pending", "Pending" + PROCESSING = "processing", "Processing" + RESOLVED = "resolved", "Resolved" + FAILED = "failed", "Failed" + + title_pattern = models.ForeignKey(DeltaTitlePattern, on_delete=models.CASCADE) + delta_url = models.ForeignKey("DeltaUrl", on_delete=models.CASCADE) + status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING) + error_message = models.TextField(blank=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now_add=True) + + class Meta: + verbose_name = "Title Resolution Status" + verbose_name_plural = "Title Resolution Statuses" + unique_together = ("title_pattern", "delta_url") + indexes = [ + models.Index(fields=["status", "created_at"]), + ] From cf228d39af55f1aa701555cf74ca5936cb1d643f Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Fri, 31 Jan 2025 15:15:26 -0600 Subject: [PATCH 02/10] Update already available models to track the resolution statuses --- sde_collections/models/delta_patterns.py | 73 +++++++++++++++--------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/sde_collections/models/delta_patterns.py b/sde_collections/models/delta_patterns.py index c48eccae..fc2d6065 100644 --- a/sde_collections/models/delta_patterns.py +++ b/sde_collections/models/delta_patterns.py @@ -476,10 +476,10 @@ def generate_title_for_url(self, url_obj) -> tuple[str, str | None]: def apply(self) -> None: """ - Apply the title pattern to matching URLs: + Queue title pattern resolution for matching URLs: 1. Find new Curated URLs that match but weren't previously affected 2. Create Delta URLs only where the generated title differs - 3. Update all matching Delta URLs with new titles + 3. Queue background tasks for title resolution 4. Track title resolution status and errors """ DeltaUrl = apps.get_model("sde_collections", "DeltaUrl") @@ -670,21 +670,33 @@ class DeltaResolvedTitleBase(models.Model): title_pattern = models.ForeignKey(DeltaTitlePattern, on_delete=models.CASCADE) delta_url = models.OneToOneField("sde_collections.DeltaUrl", on_delete=models.CASCADE) created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) class Meta: abstract = True class DeltaResolvedTitle(DeltaResolvedTitleBase): + class Status(models.TextChoices): + PENDING = "pending", "Pending" + PROCESSING = "processing", "Processing" + RESOLVED = "resolved", "Resolved" + FAILED = "failed", "Failed" + resolved_title = models.CharField(blank=True, default="") + status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING) class Meta: verbose_name = "Resolved Title" verbose_name_plural = "Resolved Titles" + indexes = [ + models.Index(fields=["status", "created_at"]), + ] def save(self, *args, **kwargs): - # Finds the linked delta URL and deletes DeltaResolvedTitleError objects linked to it - DeltaResolvedTitleError.objects.filter(delta_url=self.delta_url).delete() + if self.status == self.Status.RESOLVED: + # Finds the linked delta URL and deletes DeltaResolvedTitleError objects linked to it + DeltaResolvedTitleError.objects.filter(delta_url=self.delta_url).delete() super().save(*args, **kwargs) @@ -692,27 +704,36 @@ class DeltaResolvedTitleError(DeltaResolvedTitleBase): error_string = models.TextField(null=False, blank=False) http_status_code = models.IntegerField(null=True, blank=True) + def save(self, *args, **kwargs): + # When saving an error, update the related DeltaResolvedTitle status + DeltaResolvedTitle.objects.update_or_create( + delta_url=self.delta_url, + title_pattern=self.title_pattern, + defaults={"status": DeltaResolvedTitle.Status.FAILED, "resolved_title": ""}, + ) + super().save(*args, **kwargs) -class TitleResolutionStatus(models.Model): - """Tracks the status of title resolution tasks.""" - - class Status(models.TextChoices): - PENDING = "pending", "Pending" - PROCESSING = "processing", "Processing" - RESOLVED = "resolved", "Resolved" - FAILED = "failed", "Failed" - - title_pattern = models.ForeignKey(DeltaTitlePattern, on_delete=models.CASCADE) - delta_url = models.ForeignKey("DeltaUrl", on_delete=models.CASCADE) - status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING) - error_message = models.TextField(blank=True) - created_at = models.DateTimeField(auto_now_add=True) - updated_at = models.DateTimeField(auto_now_add=True) - class Meta: - verbose_name = "Title Resolution Status" - verbose_name_plural = "Title Resolution Statuses" - unique_together = ("title_pattern", "delta_url") - indexes = [ - models.Index(fields=["status", "created_at"]), - ] +# class TitleResolutionStatus(models.Model): +# """Tracks the status of title resolution tasks.""" + +# class Status(models.TextChoices): +# PENDING = "pending", "Pending" +# PROCESSING = "processing", "Processing" +# RESOLVED = "resolved", "Resolved" +# FAILED = "failed", "Failed" + +# title_pattern = models.ForeignKey(DeltaTitlePattern, on_delete=models.CASCADE) +# delta_url = models.ForeignKey("DeltaUrl", on_delete=models.CASCADE) +# status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING) +# error_message = models.TextField(blank=True) +# created_at = models.DateTimeField(auto_now_add=True) +# updated_at = models.DateTimeField(auto_now_add=True) + +# class Meta: +# verbose_name = "Title Resolution Status" +# verbose_name_plural = "Title Resolution Statuses" +# unique_together = ("title_pattern", "delta_url") +# indexes = [ +# models.Index(fields=["status", "created_at"]), +# ] From 242211c5a5a9da97cc59db5ac5e0119f3de83157 Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Sun, 2 Feb 2025 17:36:17 -0600 Subject: [PATCH 03/10] Resolution working with Celery task --- ...0076_deltaresolvedtitle_status_and_more.py | 41 +++++++++ sde_collections/models/delta_patterns.py | 61 ++++++------- sde_collections/tasks.py | 85 +++++++++++++++++-- 3 files changed, 146 insertions(+), 41 deletions(-) create mode 100644 sde_collections/migrations/0076_deltaresolvedtitle_status_and_more.py diff --git a/sde_collections/migrations/0076_deltaresolvedtitle_status_and_more.py b/sde_collections/migrations/0076_deltaresolvedtitle_status_and_more.py new file mode 100644 index 00000000..e8efbe40 --- /dev/null +++ b/sde_collections/migrations/0076_deltaresolvedtitle_status_and_more.py @@ -0,0 +1,41 @@ +# Generated by Django 4.2.9 on 2025-02-02 15:45 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sde_collections", "0075_alter_collection_reindexing_status_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="pending", + max_length=20, + ), + ), + migrations.AddField( + model_name="deltaresolvedtitle", + name="updated_at", + field=models.DateTimeField(auto_now=True), + ), + migrations.AddField( + model_name="deltaresolvedtitleerror", + name="updated_at", + field=models.DateTimeField(auto_now=True), + ), + migrations.AddIndex( + model_name="deltaresolvedtitle", + index=models.Index(fields=["status", "created_at"], name="sde_collect_status_42dc80_idx"), + ), + ] diff --git a/sde_collections/models/delta_patterns.py b/sde_collections/models/delta_patterns.py index fc2d6065..b5989bc3 100644 --- a/sde_collections/models/delta_patterns.py +++ b/sde_collections/models/delta_patterns.py @@ -482,9 +482,13 @@ def apply(self) -> None: 3. Queue background tasks for title resolution 4. Track title resolution status and errors """ + # Inserting here to avoid circular import issue + from ..tasks import process_title_resolutions + + pattern_url_pairs = [] + DeltaUrl = apps.get_model("sde_collections", "DeltaUrl") DeltaResolvedTitle = apps.get_model("sde_collections", "DeltaResolvedTitle") - DeltaResolvedTitleError = apps.get_model("sde_collections", "DeltaResolvedTitleError") # Get newly matching Curated URLs matching_curated_urls = self.get_matching_curated_urls() @@ -497,60 +501,45 @@ def apply(self) -> None: if not self.is_most_distinctive_pattern(curated_url): continue - new_title, error = self.generate_title_for_url(curated_url) - - if error: - DeltaResolvedTitleError.objects.update_or_create( - delta_url=curated_url, defaults={"title_pattern": self, "error_string": error} # lookup field - ) - continue - - # Skip if the generated title matches existing or if Delta already exists - if ( - curated_url.generated_title == new_title - or DeltaUrl.objects.filter(url=curated_url.url, collection=self.collection).exists() - ): - continue - - # Create new Delta URL with the new title + # Create a new Delta URL (without title initially) fields = { field.name: getattr(curated_url, field.name) for field in curated_url._meta.fields if field.name not in ["id", "collection"] } - fields["generated_title"] = new_title fields["to_delete"] = False fields["collection"] = self.collection - delta_url = DeltaUrl.objects.create(**fields) + # Create initial resolution record + resolution = DeltaResolvedTitle.objects.create( + title_pattern=self, delta_url=delta_url, status=DeltaResolvedTitle.Status.PENDING + ) + print(f"PENDING created for CuratedURL {delta_url.id}; Pattern ID is {self.id}") - # Record successful title resolution - DeltaResolvedTitle.objects.create(title_pattern=self, delta_url=delta_url, resolved_title=new_title) + # Queue the background task using delay() + # resolve_title_pattern.delay(self.id, delta_url.id) + pattern_url_pairs.append((self.id, delta_url.id)) - # Update titles for all matching Delta URLs for delta_url in self.get_matching_delta_urls(): if not self.is_most_distinctive_pattern(delta_url): continue - new_title, error = self.generate_title_for_url(delta_url) - - if error: - DeltaResolvedTitleError.objects.update_or_create( - delta_url=delta_url, defaults={"title_pattern": self, "error_string": error} # lookup field - ) - continue - - # Update title and record resolution - key change here - DeltaResolvedTitle.objects.update_or_create( - delta_url=delta_url, # Only use delta_url for lookup - defaults={"title_pattern": self, "resolved_title": new_title}, + # Create/update resolution record + resolution, _ = DeltaResolvedTitle.objects.update_or_create( + delta_url=delta_url, defaults={"title_pattern": self, "status": DeltaResolvedTitle.Status.PENDING} ) + # resolve_title_pattern.delay(self.id, delta_url.id) + pattern_url_pairs.append((self.id, delta_url.id)) - delta_url.generated_title = new_title - delta_url.save() + if pattern_url_pairs: + print(pattern_url_pairs) + print("URL pairs created; Sent for resolution") + process_title_resolutions.delay(pattern_url_pairs) # Update pattern relationships + print("Finished resolution; Starting pattern relationships") self.update_affected_delta_urls_list() + print("Finished this block") def unapply(self) -> None: """ diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 86605124..b8164804 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -3,6 +3,7 @@ import shutil import boto3 +from celery import group from django.apps import apps from django.conf import settings from django.core import management @@ -148,11 +149,85 @@ def pull_latest_collection_metadata_from_github(): s3_client.upload_file(FILENAME, s3_bucket_name, s3_key) -@celery_app.task() -def resolve_title_pattern(title_pattern_id): - TitlePattern = apps.get_model("sde_collections", "TitlePattern") - title_pattern = TitlePattern.objects.get(id=title_pattern_id) - title_pattern.apply() +# @celery_app.task() +# def resolve_title_pattern(title_pattern_id): +# TitlePattern = apps.get_model("sde_collections", "TitlePattern") +# title_pattern = TitlePattern.objects.get(id=title_pattern_id) +# title_pattern.apply() + + +@celery_app.task(name="sde_collections.tasks.resolve_title_pattern") +def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: + """Background task to resolve a title pattern for a specific URL""" + DeltaUrl = apps.get_model("sde_collections", "DeltaUrl") + DeltaTitlePattern = apps.get_model("sde_collections", "DeltaTitlePattern") + DeltaResolvedTitle = apps.get_model("sde_collections", "DeltaResolvedTitle") + DeltaResolvedTitleError = apps.get_model("sde_collections", "DeltaResolvedTitleError") + + try: + with transaction.atomic(): + # First attempt to get an existing record or create a new one + resolution, created = DeltaResolvedTitle.objects.get_or_create( + title_pattern_id=title_pattern_id, + delta_url_id=delta_url_id, + defaults={"status": DeltaResolvedTitle.Status.PROCESSING}, + ) + + # If we found an existing record (created=False) + if not created: + resolution.status = DeltaResolvedTitle.Status.PROCESSING + resolution.save() + print(f"Not created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + + if DeltaResolvedTitle.objects.filter( + delta_url_id=delta_url_id, + created_at__gt=resolution.created_at, + status__in=[DeltaResolvedTitle.Status.PENDING, DeltaResolvedTitle.Status.PROCESSING], + ).exists(): + print(f"Returning for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + return + + with open("/tmp/celery_debug.log", "a") as f: + f.write(f"PROCESSING created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + + # Get pattern and URL + pattern = DeltaTitlePattern.objects.get(id=title_pattern_id) + delta_url = DeltaUrl.objects.get(id=delta_url_id) + + # Generate new title + new_title, error = pattern.generate_title_for_url(delta_url) + + if error: + DeltaResolvedTitleError.objects.create( + title_pattern=pattern, + delta_url=delta_url, + error_string=str(error), + http_status_code=getattr(error, "status_code", None), + ) + print(f"FAILED created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + return + + delta_url.generated_title = new_title + delta_url.save() + + resolution.resolved_title = new_title + resolution.status = DeltaResolvedTitle.Status.RESOLVED + resolution.save() + + print(f"RESOLVED created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + + except Exception as e: + DeltaResolvedTitleError.objects.create( + title_pattern_id=title_pattern_id, delta_url_id=delta_url_id, error_string=str(e) + ) + + +@celery_app.task(name="sde_collections.tasks.process_title_resolutions") +def process_title_resolutions(pattern_url_pairs: list[tuple[int, int]]) -> None: + """Creates a group of tasks and processes them with Celery's native batching""" + # Create a group of tasks + tasks = [resolve_title_pattern.s(pattern_id, url_id) for pattern_id, url_id in pattern_url_pairs] + group(tasks).apply_async() @celery_app.task(soft_time_limit=600) From 51de92fc09a8a38b48884baaafe9fb15e1a335f6 Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Sun, 2 Feb 2025 19:15:24 -0600 Subject: [PATCH 04/10] Add debug logging --- sde_collections/models/delta_patterns.py | 16 ++-- sde_collections/tasks.py | 105 ++++++++++++----------- 2 files changed, 65 insertions(+), 56 deletions(-) diff --git a/sde_collections/models/delta_patterns.py b/sde_collections/models/delta_patterns.py index b5989bc3..b187338d 100644 --- a/sde_collections/models/delta_patterns.py +++ b/sde_collections/models/delta_patterns.py @@ -510,15 +510,16 @@ def apply(self) -> None: fields["to_delete"] = False fields["collection"] = self.collection delta_url = DeltaUrl.objects.create(**fields) + # Create initial resolution record resolution = DeltaResolvedTitle.objects.create( title_pattern=self, delta_url=delta_url, status=DeltaResolvedTitle.Status.PENDING ) print(f"PENDING created for CuratedURL {delta_url.id}; Pattern ID is {self.id}") - # Queue the background task using delay() - # resolve_title_pattern.delay(self.id, delta_url.id) + # Add the pattern url pairs to the list for background queuing pattern_url_pairs.append((self.id, delta_url.id)) + # resolve_title_pattern.delay(self.id, delta_url.id) for delta_url in self.get_matching_delta_urls(): if not self.is_most_distinctive_pattern(delta_url): @@ -528,18 +529,17 @@ def apply(self) -> None: resolution, _ = DeltaResolvedTitle.objects.update_or_create( delta_url=delta_url, defaults={"title_pattern": self, "status": DeltaResolvedTitle.Status.PENDING} ) - # resolve_title_pattern.delay(self.id, delta_url.id) + print(f"PENDING created for DeltaURL {delta_url.id}; Pattern ID is {self.id}") + pattern_url_pairs.append((self.id, delta_url.id)) + # resolve_title_pattern.delay(self.id, delta_url.id) if pattern_url_pairs: - print(pattern_url_pairs) - print("URL pairs created; Sent for resolution") process_title_resolutions.delay(pattern_url_pairs) - # Update pattern relationships - print("Finished resolution; Starting pattern relationships") self.update_affected_delta_urls_list() - print("Finished this block") + + # print("Finished this block") def unapply(self) -> None: """ diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index b8164804..5634cec2 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -1,4 +1,5 @@ import json +import logging import os import shutil @@ -20,6 +21,8 @@ from .sinequa_api import Api from .utils.github_helper import GitHubHandler +logger = logging.getLogger(__name__) + def _get_data_to_import(collection, server_name): # ignore these because they are API collections and don't have URLs @@ -159,62 +162,59 @@ def pull_latest_collection_metadata_from_github(): @celery_app.task(name="sde_collections.tasks.resolve_title_pattern") def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: """Background task to resolve a title pattern for a specific URL""" + logger.info(f"Single resolution task received for URL ID: {delta_url_id}") + DeltaUrl = apps.get_model("sde_collections", "DeltaUrl") DeltaTitlePattern = apps.get_model("sde_collections", "DeltaTitlePattern") DeltaResolvedTitle = apps.get_model("sde_collections", "DeltaResolvedTitle") DeltaResolvedTitleError = apps.get_model("sde_collections", "DeltaResolvedTitleError") try: - with transaction.atomic(): - # First attempt to get an existing record or create a new one - resolution, created = DeltaResolvedTitle.objects.get_or_create( - title_pattern_id=title_pattern_id, - delta_url_id=delta_url_id, - defaults={"status": DeltaResolvedTitle.Status.PROCESSING}, - ) - - # If we found an existing record (created=False) - if not created: - resolution.status = DeltaResolvedTitle.Status.PROCESSING - resolution.save() - print(f"Not created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") - - if DeltaResolvedTitle.objects.filter( - delta_url_id=delta_url_id, - created_at__gt=resolution.created_at, - status__in=[DeltaResolvedTitle.Status.PENDING, DeltaResolvedTitle.Status.PROCESSING], - ).exists(): - print(f"Returning for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") - return - - with open("/tmp/celery_debug.log", "a") as f: - f.write(f"PROCESSING created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") - - # Get pattern and URL - pattern = DeltaTitlePattern.objects.get(id=title_pattern_id) - delta_url = DeltaUrl.objects.get(id=delta_url_id) - - # Generate new title - new_title, error = pattern.generate_title_for_url(delta_url) - - if error: - DeltaResolvedTitleError.objects.create( - title_pattern=pattern, - delta_url=delta_url, - error_string=str(error), - http_status_code=getattr(error, "status_code", None), - ) - print(f"FAILED created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") - return - - delta_url.generated_title = new_title - delta_url.save() + # with transaction.atomic(): + # First attempt to get an existing record or create a new one + resolution, created = DeltaResolvedTitle.objects.get_or_create( + title_pattern_id=title_pattern_id, + delta_url_id=delta_url_id, + defaults={"status": DeltaResolvedTitle.Status.PROCESSING}, + ) - resolution.resolved_title = new_title - resolution.status = DeltaResolvedTitle.Status.RESOLVED + # If we found an existing record (created=False) + if not created: + resolution.status = DeltaResolvedTitle.Status.PROCESSING resolution.save() - print(f"RESOLVED created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + logger.info(f"PROCESSING created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + + # if DeltaResolvedTitle.objects.filter( + # delta_url_id=delta_url_id, + # created_at__gt=resolution.created_at, + # status__in=[DeltaResolvedTitle.Status.PENDING, DeltaResolvedTitle.Status.PROCESSING], + # ).exists(): + # logger.info(f"Returning for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + # return + + # Get pattern and URL + pattern = DeltaTitlePattern.objects.get(id=title_pattern_id) + delta_url = DeltaUrl.objects.get(id=delta_url_id) + # Generate new title + new_title, error = pattern.generate_title_for_url(delta_url) + + if error: + DeltaResolvedTitleError.objects.create( + title_pattern=pattern, + delta_url=delta_url, + error_string=str(error), + http_status_code=getattr(error, "status_code", None), + ) + logger.info(f"FAILED created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + return + + delta_url.generated_title = new_title + delta_url.save() + resolution.resolved_title = new_title + resolution.status = DeltaResolvedTitle.Status.RESOLVED + resolution.save() + logger.info(f"RESOLVED created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") except Exception as e: DeltaResolvedTitleError.objects.create( @@ -225,9 +225,18 @@ def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: @celery_app.task(name="sde_collections.tasks.process_title_resolutions") def process_title_resolutions(pattern_url_pairs: list[tuple[int, int]]) -> None: """Creates a group of tasks and processes them with Celery's native batching""" + + logger.info("Bulk resolution task received.") + # Create a group of tasks tasks = [resolve_title_pattern.s(pattern_id, url_id) for pattern_id, url_id in pattern_url_pairs] - group(tasks).apply_async() + # group(tasks).apply_async() + group(tasks)().delay() + + # group( + # resolve_title_pattern.delay(pattern_id, url_id) + # for pattern_id, url_id in pattern_url_pairs + # ) @celery_app.task(soft_time_limit=600) From 26461e99c1e09c7bdb824f49a0984fe34f6f36ce Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Sun, 2 Feb 2025 23:42:39 -0600 Subject: [PATCH 05/10] Make apply() to run fully in background --- sde_collections/models/delta_patterns.py | 61 ++-------------- sde_collections/tasks.py | 93 ++++++++++++++++-------- 2 files changed, 70 insertions(+), 84 deletions(-) diff --git a/sde_collections/models/delta_patterns.py b/sde_collections/models/delta_patterns.py index b187338d..ad4d82a9 100644 --- a/sde_collections/models/delta_patterns.py +++ b/sde_collections/models/delta_patterns.py @@ -3,7 +3,7 @@ from django.apps import apps from django.core.exceptions import ValidationError -from django.db import models +from django.db import models, transaction from ..utils.title_resolver import ( is_valid_xpath, @@ -482,64 +482,15 @@ def apply(self) -> None: 3. Queue background tasks for title resolution 4. Track title resolution status and errors """ + # Inserting here to avoid circular import issue from ..tasks import process_title_resolutions - pattern_url_pairs = [] - - DeltaUrl = apps.get_model("sde_collections", "DeltaUrl") - DeltaResolvedTitle = apps.get_model("sde_collections", "DeltaResolvedTitle") - - # Get newly matching Curated URLs - matching_curated_urls = self.get_matching_curated_urls() - previously_unaffected_curated = matching_curated_urls.exclude( - id__in=self.curated_urls.values_list("id", flat=True) - ) - - # Process each previously unaffected curated URL - for curated_url in previously_unaffected_curated: - if not self.is_most_distinctive_pattern(curated_url): - continue - - # Create a new Delta URL (without title initially) - fields = { - field.name: getattr(curated_url, field.name) - for field in curated_url._meta.fields - if field.name not in ["id", "collection"] - } - fields["to_delete"] = False - fields["collection"] = self.collection - delta_url = DeltaUrl.objects.create(**fields) - - # Create initial resolution record - resolution = DeltaResolvedTitle.objects.create( - title_pattern=self, delta_url=delta_url, status=DeltaResolvedTitle.Status.PENDING - ) - print(f"PENDING created for CuratedURL {delta_url.id}; Pattern ID is {self.id}") - - # Add the pattern url pairs to the list for background queuing - pattern_url_pairs.append((self.id, delta_url.id)) - # resolve_title_pattern.delay(self.id, delta_url.id) - - for delta_url in self.get_matching_delta_urls(): - if not self.is_most_distinctive_pattern(delta_url): - continue - - # Create/update resolution record - resolution, _ = DeltaResolvedTitle.objects.update_or_create( - delta_url=delta_url, defaults={"title_pattern": self, "status": DeltaResolvedTitle.Status.PENDING} - ) - print(f"PENDING created for DeltaURL {delta_url.id}; Pattern ID is {self.id}") - - pattern_url_pairs.append((self.id, delta_url.id)) - # resolve_title_pattern.delay(self.id, delta_url.id) - - if pattern_url_pairs: - process_title_resolutions.delay(pattern_url_pairs) - - self.update_affected_delta_urls_list() + def queue_task(): + process_title_resolutions.delay(self.id) - # print("Finished this block") + # Queue the background task only after the transaction commits (i.e, after apply() method) + transaction.on_commit(queue_task) def unapply(self) -> None: """ diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 5634cec2..fb7a5026 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -159,6 +159,67 @@ def pull_latest_collection_metadata_from_github(): # title_pattern.apply() +@celery_app.task(name="sde_collections.tasks.process_title_resolutions") +def process_title_resolutions(pattern_id: int) -> None: + """Background task to prepare and queue title resolutions""" + try: + + DeltaTitlePattern = apps.get_model("sde_collections", "DeltaTitlePattern") + DeltaUrl = apps.get_model("sde_collections", "DeltaUrl") + DeltaResolvedTitle = apps.get_model("sde_collections", "DeltaResolvedTitle") + + pattern = DeltaTitlePattern.objects.get(id=pattern_id) + pattern_url_pairs = [] + + with transaction.atomic(): + # Process curated URLs + matching_curated_urls = pattern.get_matching_curated_urls() + previously_unaffected_curated = matching_curated_urls.exclude( + id__in=pattern.curated_urls.values_list("id", flat=True) + ) + + for curated_url in previously_unaffected_curated: + if not pattern.is_most_distinctive_pattern(curated_url): + continue + + # Create Delta URL + fields = { + field.name: getattr(curated_url, field.name) + for field in curated_url._meta.fields + if field.name not in ["id", "collection"] + } + fields["to_delete"] = False + fields["collection"] = pattern.collection + delta_url = DeltaUrl.objects.create(**fields) + + # Create resolution record + DeltaResolvedTitle.objects.create( + title_pattern=pattern, delta_url=delta_url, status=DeltaResolvedTitle.Status.PENDING + ) + pattern_url_pairs.append((pattern_id, delta_url.id)) + + # Process delta URLs + for delta_url in pattern.get_matching_delta_urls(): + if not pattern.is_most_distinctive_pattern(delta_url): + continue + + DeltaResolvedTitle.objects.update_or_create( + delta_url=delta_url, + defaults={"title_pattern": pattern, "status": DeltaResolvedTitle.Status.PENDING}, + ) + pattern_url_pairs.append((pattern_id, delta_url.id)) + + # Update relationships + pattern.update_affected_delta_urls_list() + + # Queue the resolution tasks + if pattern_url_pairs: + group(resolve_title_pattern.s(pattern_id, url_id) for pattern_id, url_id in pattern_url_pairs)() + + except Exception as e: + logger.error(f"Error in process_title_resolutions for pattern {pattern_id}: {str(e)}") + + @celery_app.task(name="sde_collections.tasks.resolve_title_pattern") def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: """Background task to resolve a title pattern for a specific URL""" @@ -170,7 +231,6 @@ def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: DeltaResolvedTitleError = apps.get_model("sde_collections", "DeltaResolvedTitleError") try: - # with transaction.atomic(): # First attempt to get an existing record or create a new one resolution, created = DeltaResolvedTitle.objects.get_or_create( title_pattern_id=title_pattern_id, @@ -183,15 +243,7 @@ def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: resolution.status = DeltaResolvedTitle.Status.PROCESSING resolution.save() - logger.info(f"PROCESSING created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") - - # if DeltaResolvedTitle.objects.filter( - # delta_url_id=delta_url_id, - # created_at__gt=resolution.created_at, - # status__in=[DeltaResolvedTitle.Status.PENDING, DeltaResolvedTitle.Status.PROCESSING], - # ).exists(): - # logger.info(f"Returning for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") - # return + logger.info(f"PROCESSING status created for DeltaURL {delta_url_id}; Pattern ID {title_pattern_id}") # Get pattern and URL pattern = DeltaTitlePattern.objects.get(id=title_pattern_id) @@ -206,7 +258,7 @@ def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: error_string=str(error), http_status_code=getattr(error, "status_code", None), ) - logger.info(f"FAILED created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + logger.info(f"FAILED status created for DeltaURL {delta_url_id}; Pattern ID {title_pattern_id}") return delta_url.generated_title = new_title @@ -214,7 +266,7 @@ def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: resolution.resolved_title = new_title resolution.status = DeltaResolvedTitle.Status.RESOLVED resolution.save() - logger.info(f"RESOLVED created for DeltaURL {delta_url_id}; Pattern ID is {title_pattern_id}") + logger.info(f"RESOLVED status created for DeltaURL {delta_url_id}; Pattern ID {title_pattern_id}") except Exception as e: DeltaResolvedTitleError.objects.create( @@ -222,23 +274,6 @@ def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: ) -@celery_app.task(name="sde_collections.tasks.process_title_resolutions") -def process_title_resolutions(pattern_url_pairs: list[tuple[int, int]]) -> None: - """Creates a group of tasks and processes them with Celery's native batching""" - - logger.info("Bulk resolution task received.") - - # Create a group of tasks - tasks = [resolve_title_pattern.s(pattern_id, url_id) for pattern_id, url_id in pattern_url_pairs] - # group(tasks).apply_async() - group(tasks)().delay() - - # group( - # resolve_title_pattern.delay(pattern_id, url_id) - # for pattern_id, url_id in pattern_url_pairs - # ) - - @celery_app.task(soft_time_limit=600) def fetch_and_replace_full_text(collection_id, server_name): """ From 8e9cbef744fcceb3e2308a2b2ae58f03bb37a972 Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Mon, 3 Feb 2025 02:36:04 -0600 Subject: [PATCH 06/10] Add status tracker and merged in frontend --- sde_collections/models/delta_patterns.py | 25 --------- sde_collections/urls.py | 5 ++ sde_collections/views.py | 29 ++++++++++ .../static/js/delta_url_list.js | 54 +++++++++++++++++++ 4 files changed, 88 insertions(+), 25 deletions(-) diff --git a/sde_collections/models/delta_patterns.py b/sde_collections/models/delta_patterns.py index ad4d82a9..799eb24d 100644 --- a/sde_collections/models/delta_patterns.py +++ b/sde_collections/models/delta_patterns.py @@ -652,28 +652,3 @@ def save(self, *args, **kwargs): defaults={"status": DeltaResolvedTitle.Status.FAILED, "resolved_title": ""}, ) super().save(*args, **kwargs) - - -# class TitleResolutionStatus(models.Model): -# """Tracks the status of title resolution tasks.""" - -# class Status(models.TextChoices): -# PENDING = "pending", "Pending" -# PROCESSING = "processing", "Processing" -# RESOLVED = "resolved", "Resolved" -# FAILED = "failed", "Failed" - -# title_pattern = models.ForeignKey(DeltaTitlePattern, on_delete=models.CASCADE) -# delta_url = models.ForeignKey("DeltaUrl", on_delete=models.CASCADE) -# status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING) -# error_message = models.TextField(blank=True) -# created_at = models.DateTimeField(auto_now_add=True) -# updated_at = models.DateTimeField(auto_now_add=True) - -# class Meta: -# verbose_name = "Title Resolution Status" -# verbose_name_plural = "Title Resolution Statuses" -# unique_together = ("title_pattern", "delta_url") -# indexes = [ -# models.Index(fields=["status", "created_at"]), -# ] diff --git a/sde_collections/urls.py b/sde_collections/urls.py index 9ee77759..33bfba87 100644 --- a/sde_collections/urls.py +++ b/sde_collections/urls.py @@ -67,4 +67,9 @@ name="candidate-url-api", ), path("titles-and-errors/", views.TitlesAndErrorsView.as_view(), name="titles-and-errors-list"), + path( + "api/title-patterns//status/", + views.TitlePatternStatusView.as_view(), + name="title-pattern-status", + ), ] diff --git a/sde_collections/views.py b/sde_collections/views.py index fb268170..abea9be4 100644 --- a/sde_collections/views.py +++ b/sde_collections/views.py @@ -4,6 +4,7 @@ from django.contrib.auth import get_user_model from django.contrib.auth.mixins import LoginRequiredMixin from django.db import models +from django.db.models import Count from django.shortcuts import get_object_or_404, redirect, render from django.urls import reverse from django.utils import timezone @@ -450,6 +451,34 @@ def get_queryset(self): return super().get_queryset().order_by("match_pattern") +class TitlePatternStatusView(APIView): + def get(self, request, pattern_id): + try: + pattern = DeltaTitlePattern.objects.get(id=pattern_id) + + # Get counts for each status + status_counts = ( + DeltaResolvedTitle.objects.filter(title_pattern=pattern) + .values("status") + .annotate(count=Count("status")) + ) + + # Initialize counts + result = {"pending": 0, "processing": 0, "resolved": 0, "failed": 0, "total": 0} + + # Update counts + for item in status_counts: + status_type = item["status"].lower() + count = item["count"] + result[status_type] = count + result["total"] += count + + return Response(result) + + except DeltaTitlePattern.DoesNotExist: + return Response({"error": f"Pattern {pattern_id} not found"}, status=status.HTTP_404_NOT_FOUND) + + class DocumentTypePatternViewSet(CollectionFilterMixin, viewsets.ModelViewSet): queryset = DeltaDocumentTypePattern.objects.all() serializer_class = DocumentTypePatternSerializer diff --git a/sde_indexing_helper/static/js/delta_url_list.js b/sde_indexing_helper/static/js/delta_url_list.js index 32c1667f..914bd5e2 100644 --- a/sde_indexing_helper/static/js/delta_url_list.js +++ b/sde_indexing_helper/static/js/delta_url_list.js @@ -1789,6 +1789,7 @@ function postTitlePatterns( csrfmiddlewaretoken: csrftoken }, success: function (data) { + toastr.success("Background title resolution started for pattern: " + data.match_pattern); $('#delta_urls_table').DataTable().ajax.reload(null, false); $('#title_patterns_table').DataTable().ajax.reload(null, false); if (currentTab === "") { //Only add a notification if we are on the first tab @@ -1799,6 +1800,7 @@ function postTitlePatterns( `` ); } + pollTitleResolutionStatus(data.id, data.match_pattern); }, error: function (xhr, status, error) { var errorMessage = xhr.responseText; @@ -1814,6 +1816,58 @@ function postTitlePatterns( }); } +function pollTitleResolutionStatus(patternId, match_pattern) { + const POLL_INTERVAL = 2000; // 2 seconds + const MAX_POLLS = 150; // 5 minutes = (5 * 60 * 1000) / 2000 = 150 polls + let pollCount = 0; + + let pollInterval = setInterval(async () => { + try { + // Check if we've exceeded max polls + if (pollCount >= MAX_POLLS) { + clearInterval(pollInterval); + toastr.warning(`Title Resolution timed out after 5 minutes for pattern: ${match_pattern}`); + return; + } + + pollCount++; + const response = await fetch(`/api/title-patterns/${patternId}/status/`); + const data = await response.json(); + + // Check resolution status and show appropriate message + if (data.total > 0) { + if(data.pending === 0 && data.processing === 0) { + clearInterval(pollInterval); + + // All successful + if (data.failed === 0) { + toastr.success( + `Title Resolution completed successfully for pattern: ${match_pattern}\n` + + `${data.resolved} URLs processed` + ); + } + + // Some failed + else { + toastr.warning( + `Title Resolution completed with some issues for pattern: ${match_pattern}\n` + + `${data.resolved} succeeded, ${data.failed} failed` + ); + } + + // Refresh tables + $('#delta_urls_table').DataTable().ajax.reload(null, false); + $('#title_patterns_table').DataTable().ajax.reload(null, false); + } } + + } catch (error) { + console.error('Error polling status:', error); + clearInterval(pollInterval); + } + }, POLL_INTERVAL); +} + + function postVisited(url) { $.ajax({ url: url, From 2a773cfcaae353c39e582f3829eefb817d23fa53 Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Mon, 3 Feb 2025 11:33:46 -0600 Subject: [PATCH 07/10] Add brackets on messages --- sde_indexing_helper/static/js/delta_url_list.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sde_indexing_helper/static/js/delta_url_list.js b/sde_indexing_helper/static/js/delta_url_list.js index 914bd5e2..ea382971 100644 --- a/sde_indexing_helper/static/js/delta_url_list.js +++ b/sde_indexing_helper/static/js/delta_url_list.js @@ -1843,7 +1843,7 @@ function pollTitleResolutionStatus(patternId, match_pattern) { if (data.failed === 0) { toastr.success( `Title Resolution completed successfully for pattern: ${match_pattern}\n` + - `${data.resolved} URLs processed` + `[ ${data.resolved} URLs processed ]` ); } @@ -1851,7 +1851,7 @@ function pollTitleResolutionStatus(patternId, match_pattern) { else { toastr.warning( `Title Resolution completed with some issues for pattern: ${match_pattern}\n` + - `${data.resolved} succeeded, ${data.failed} failed` + `[ ${data.resolved} succeeded, ${data.failed} failed ]` ); } From 98ddf1e3137f9a53ee4d40965ccbdfa773439c15 Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Fri, 21 Feb 2025 01:32:11 -0600 Subject: [PATCH 08/10] Fix status tracking and fix tests --- config/settings/test.py | 4 + .../0077_alter_deltaresolvedtitle_status.py | 28 +++ .../0078_alter_deltaresolvedtitle_status.py | 28 +++ .../0079_alter_deltaresolvedtitle_status.py | 27 +++ .../0080_alter_deltaresolvedtitle_status.py | 28 +++ .../0081_alter_deltaresolvedtitle_status.py | 27 +++ sde_collections/models/delta_patterns.py | 2 +- sde_collections/tasks.py | 197 ++++++++---------- sde_collections/tests/conftest.py | 7 + .../tests/test_title_pattern_unapply.py | 4 +- sde_collections/views.py | 10 +- .../static/js/delta_url_list.js | 7 +- 12 files changed, 253 insertions(+), 116 deletions(-) create mode 100644 sde_collections/migrations/0077_alter_deltaresolvedtitle_status.py create mode 100644 sde_collections/migrations/0078_alter_deltaresolvedtitle_status.py create mode 100644 sde_collections/migrations/0079_alter_deltaresolvedtitle_status.py create mode 100644 sde_collections/migrations/0080_alter_deltaresolvedtitle_status.py create mode 100644 sde_collections/migrations/0081_alter_deltaresolvedtitle_status.py create mode 100644 sde_collections/tests/conftest.py diff --git a/config/settings/test.py b/config/settings/test.py index d7eaa130..80374d2a 100644 --- a/config/settings/test.py +++ b/config/settings/test.py @@ -30,3 +30,7 @@ TEMPLATES[0]["OPTIONS"]["debug"] = True # type: ignore # noqa F405 # Your stuff... # ------------------------------------------------------------------------------ + + +CELERY_TASK_ALWAYS_EAGER = True # Executes tasks immediately instead of sending to the queue +CELERY_TASK_EAGER_PROPAGATES = True # Raises exceptions in the main thread for easier debugging diff --git a/sde_collections/migrations/0077_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0077_alter_deltaresolvedtitle_status.py new file mode 100644 index 00000000..afa70a09 --- /dev/null +++ b/sde_collections/migrations/0077_alter_deltaresolvedtitle_status.py @@ -0,0 +1,28 @@ +# Generated by Django 4.2.9 on 2025-02-21 06:41 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sde_collections", "0076_deltaresolvedtitle_status_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + blank=True, + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + max_length=20, + null=True, + ), + ), + ] diff --git a/sde_collections/migrations/0078_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0078_alter_deltaresolvedtitle_status.py new file mode 100644 index 00000000..e856200c --- /dev/null +++ b/sde_collections/migrations/0078_alter_deltaresolvedtitle_status.py @@ -0,0 +1,28 @@ +# Generated by Django 4.2.9 on 2025-02-21 06:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sde_collections", "0077_alter_deltaresolvedtitle_status"), + ] + + operations = [ + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="", + max_length=20, + null=True, + ), + ), + ] diff --git a/sde_collections/migrations/0079_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0079_alter_deltaresolvedtitle_status.py new file mode 100644 index 00000000..04521c74 --- /dev/null +++ b/sde_collections/migrations/0079_alter_deltaresolvedtitle_status.py @@ -0,0 +1,27 @@ +# Generated by Django 4.2.9 on 2025-02-21 06:47 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sde_collections", "0078_alter_deltaresolvedtitle_status"), + ] + + operations = [ + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="", + max_length=20, + ), + ), + ] diff --git a/sde_collections/migrations/0080_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0080_alter_deltaresolvedtitle_status.py new file mode 100644 index 00000000..610d1490 --- /dev/null +++ b/sde_collections/migrations/0080_alter_deltaresolvedtitle_status.py @@ -0,0 +1,28 @@ +# Generated by Django 4.2.9 on 2025-02-21 06:47 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sde_collections", "0079_alter_deltaresolvedtitle_status"), + ] + + operations = [ + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="", + max_length=20, + null=True, + ), + ), + ] diff --git a/sde_collections/migrations/0081_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0081_alter_deltaresolvedtitle_status.py new file mode 100644 index 00000000..b958afed --- /dev/null +++ b/sde_collections/migrations/0081_alter_deltaresolvedtitle_status.py @@ -0,0 +1,27 @@ +# Generated by Django 4.2.9 on 2025-02-21 06:52 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sde_collections", "0080_alter_deltaresolvedtitle_status"), + ] + + operations = [ + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="", + max_length=20, + ), + ), + ] diff --git a/sde_collections/models/delta_patterns.py b/sde_collections/models/delta_patterns.py index 799eb24d..804c2337 100644 --- a/sde_collections/models/delta_patterns.py +++ b/sde_collections/models/delta_patterns.py @@ -624,7 +624,7 @@ class Status(models.TextChoices): FAILED = "failed", "Failed" resolved_title = models.CharField(blank=True, default="") - status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING) + status = models.CharField(max_length=20, choices=Status.choices, default="") class Meta: verbose_name = "Resolved Title" diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index fb7a5026..fecf7f14 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -4,12 +4,11 @@ import shutil import boto3 -from celery import group from django.apps import apps from django.conf import settings from django.core import management from django.core.management.commands import loaddata -from django.db import transaction +from django.db import IntegrityError, transaction from config import celery_app from sde_collections.models.collection_choice_fields import ( @@ -152,126 +151,110 @@ def pull_latest_collection_metadata_from_github(): s3_client.upload_file(FILENAME, s3_bucket_name, s3_key) -# @celery_app.task() -# def resolve_title_pattern(title_pattern_id): -# TitlePattern = apps.get_model("sde_collections", "TitlePattern") -# title_pattern = TitlePattern.objects.get(id=title_pattern_id) -# title_pattern.apply() - - @celery_app.task(name="sde_collections.tasks.process_title_resolutions") def process_title_resolutions(pattern_id: int) -> None: - """Background task to prepare and queue title resolutions""" - try: - - DeltaTitlePattern = apps.get_model("sde_collections", "DeltaTitlePattern") - DeltaUrl = apps.get_model("sde_collections", "DeltaUrl") - DeltaResolvedTitle = apps.get_model("sde_collections", "DeltaResolvedTitle") - - pattern = DeltaTitlePattern.objects.get(id=pattern_id) - pattern_url_pairs = [] - - with transaction.atomic(): - # Process curated URLs - matching_curated_urls = pattern.get_matching_curated_urls() - previously_unaffected_curated = matching_curated_urls.exclude( - id__in=pattern.curated_urls.values_list("id", flat=True) - ) - - for curated_url in previously_unaffected_curated: - if not pattern.is_most_distinctive_pattern(curated_url): - continue - - # Create Delta URL - fields = { - field.name: getattr(curated_url, field.name) - for field in curated_url._meta.fields - if field.name not in ["id", "collection"] - } - fields["to_delete"] = False - fields["collection"] = pattern.collection - delta_url = DeltaUrl.objects.create(**fields) - - # Create resolution record - DeltaResolvedTitle.objects.create( - title_pattern=pattern, delta_url=delta_url, status=DeltaResolvedTitle.Status.PENDING - ) - pattern_url_pairs.append((pattern_id, delta_url.id)) - - # Process delta URLs - for delta_url in pattern.get_matching_delta_urls(): - if not pattern.is_most_distinctive_pattern(delta_url): - continue - - DeltaResolvedTitle.objects.update_or_create( - delta_url=delta_url, - defaults={"title_pattern": pattern, "status": DeltaResolvedTitle.Status.PENDING}, - ) - pattern_url_pairs.append((pattern_id, delta_url.id)) - - # Update relationships - pattern.update_affected_delta_urls_list() - - # Queue the resolution tasks - if pattern_url_pairs: - group(resolve_title_pattern.s(pattern_id, url_id) for pattern_id, url_id in pattern_url_pairs)() - - except Exception as e: - logger.error(f"Error in process_title_resolutions for pattern {pattern_id}: {str(e)}") - - -@celery_app.task(name="sde_collections.tasks.resolve_title_pattern") -def resolve_title_pattern(title_pattern_id: int, delta_url_id: int) -> None: - """Background task to resolve a title pattern for a specific URL""" - logger.info(f"Single resolution task received for URL ID: {delta_url_id}") + """Background task to process and resolve title patterns""" - DeltaUrl = apps.get_model("sde_collections", "DeltaUrl") DeltaTitlePattern = apps.get_model("sde_collections", "DeltaTitlePattern") + DeltaUrl = apps.get_model("sde_collections", "DeltaUrl") DeltaResolvedTitle = apps.get_model("sde_collections", "DeltaResolvedTitle") DeltaResolvedTitleError = apps.get_model("sde_collections", "DeltaResolvedTitleError") - try: - # First attempt to get an existing record or create a new one - resolution, created = DeltaResolvedTitle.objects.get_or_create( - title_pattern_id=title_pattern_id, - delta_url_id=delta_url_id, - defaults={"status": DeltaResolvedTitle.Status.PROCESSING}, - ) - - # If we found an existing record (created=False) - if not created: - resolution.status = DeltaResolvedTitle.Status.PROCESSING - resolution.save() + pattern = DeltaTitlePattern.objects.get(id=pattern_id) - logger.info(f"PROCESSING status created for DeltaURL {delta_url_id}; Pattern ID {title_pattern_id}") + # Process curated URLs + matching_curated_urls = pattern.get_matching_curated_urls() + previously_unaffected_curated = matching_curated_urls.exclude( + id__in=pattern.curated_urls.values_list("id", flat=True) + ) + + for curated_url in previously_unaffected_curated: + if not pattern.is_most_distinctive_pattern(curated_url): + continue - # Get pattern and URL - pattern = DeltaTitlePattern.objects.get(id=title_pattern_id) - delta_url = DeltaUrl.objects.get(id=delta_url_id) # Generate new title - new_title, error = pattern.generate_title_for_url(delta_url) + new_title, error = pattern.generate_title_for_url(curated_url) if error: - DeltaResolvedTitleError.objects.create( - title_pattern=pattern, - delta_url=delta_url, - error_string=str(error), - http_status_code=getattr(error, "status_code", None), + DeltaResolvedTitleError.objects.update_or_create( + delta_url=curated_url, defaults={"title_pattern": pattern, "error_string": error} # lookup field + ) + logger.error(f"Title resolution FAILED for CuratedURL {curated_url.id}: {error}") + continue + + # Skip if the generated title matches existing or if Delta already exists + if ( + curated_url.generated_title == new_title + or DeltaUrl.objects.filter(url=curated_url.url, collection=pattern.collection).exists() + ): + continue + + # Create Delta URL with the new title + fields = { + field.name: getattr(curated_url, field.name) + for field in curated_url._meta.fields + if field.name not in ["id", "collection"] + } + fields["generated_title"] = new_title + fields["to_delete"] = False + fields["collection"] = pattern.collection + + delta_url = DeltaUrl.objects.create(**fields) + + DeltaResolvedTitle.objects.create(title_pattern=pattern, delta_url=delta_url, resolved_title=new_title) + + # Process delta URLs + # Set PENDING status initially to all the matching URLs + for delta_url in pattern.get_matching_delta_urls(): + if not pattern.is_most_distinctive_pattern(delta_url): + continue + try: + resolution, created = DeltaResolvedTitle.objects.update_or_create( + delta_url=delta_url, # lookup field + defaults={"title_pattern": pattern, "status": DeltaResolvedTitle.Status.PENDING}, + ) + except IntegrityError as e: + logger.error(f"IntegrityError for delta_url {delta_url.id}: {str(e)}") + continue + + for delta_url in pattern.get_matching_delta_urls(): + if not pattern.is_most_distinctive_pattern(delta_url): + continue + + try: + resolution, created = DeltaResolvedTitle.objects.update_or_create( + delta_url=delta_url, # lookup field + defaults={"title_pattern": pattern, "status": DeltaResolvedTitle.Status.PROCESSING}, ) - logger.info(f"FAILED status created for DeltaURL {delta_url_id}; Pattern ID {title_pattern_id}") - return - delta_url.generated_title = new_title - delta_url.save() - resolution.resolved_title = new_title - resolution.status = DeltaResolvedTitle.Status.RESOLVED - resolution.save() - logger.info(f"RESOLVED status created for DeltaURL {delta_url_id}; Pattern ID {title_pattern_id}") + # Generate new title + new_title, error = pattern.generate_title_for_url(delta_url) - except Exception as e: - DeltaResolvedTitleError.objects.create( - title_pattern_id=title_pattern_id, delta_url_id=delta_url_id, error_string=str(e) - ) + if error: + DeltaResolvedTitleError.objects.update_or_create( + delta_url=delta_url, defaults={"title_pattern": pattern, "error_string": error} # lookup field + ) + resolution.status = DeltaResolvedTitle.Status.FAILED + resolution.save() + logger.error(f"Title resolution FAILED for DeltaURL {delta_url.id}: {error}") + continue + + delta_url.generated_title = new_title + delta_url.save() + resolution.resolved_title = new_title + resolution.status = DeltaResolvedTitle.Status.RESOLVED + resolution.save() + + except Exception as e: + logger.error(f"Error processing delta URL {delta_url.id}: {str(e)}") + DeltaResolvedTitleError.objects.update_or_create( + delta_url=delta_url, defaults={"title_pattern": pattern, "error_string": str(e)} # lookup field + ) + resolution.status = DeltaResolvedTitle.Status.FAILED + resolution.save() + + # Update relationships + pattern.update_affected_delta_urls_list() @celery_app.task(soft_time_limit=600) diff --git a/sde_collections/tests/conftest.py b/sde_collections/tests/conftest.py new file mode 100644 index 00000000..c6ce35ab --- /dev/null +++ b/sde_collections/tests/conftest.py @@ -0,0 +1,7 @@ +import pytest + + +@pytest.fixture(autouse=True) +def _use_transactional_db(transactional_db): + """Enable transaction rollback for all tests""" + pass diff --git a/sde_collections/tests/test_title_pattern_unapply.py b/sde_collections/tests/test_title_pattern_unapply.py index db8ed7e5..1a63cfce 100644 --- a/sde_collections/tests/test_title_pattern_unapply.py +++ b/sde_collections/tests/test_title_pattern_unapply.py @@ -1,6 +1,6 @@ # docker-compose -f local.yml run --rm django pytest sde_collections/tests/test_title_pattern_unapply.py -from django.test import TestCase +from django.test import TransactionTestCase from sde_collections.models.delta_patterns import ( DeltaResolvedTitle, @@ -12,7 +12,7 @@ from .factories import CollectionFactory, DumpUrlFactory -class TestTitlePatternUnapplyLogic(TestCase): +class TestTitlePatternUnapplyLogic(TransactionTestCase): """Test complete lifecycle of title pattern application and removal.""" def setUp(self): diff --git a/sde_collections/views.py b/sde_collections/views.py index abea9be4..7864b08f 100644 --- a/sde_collections/views.py +++ b/sde_collections/views.py @@ -453,10 +453,10 @@ def get_queryset(self): class TitlePatternStatusView(APIView): def get(self, request, pattern_id): - try: - pattern = DeltaTitlePattern.objects.get(id=pattern_id) + pattern = DeltaTitlePattern.objects.get(id=pattern_id) - # Get counts for each status + # Get counts for each status + if DeltaResolvedTitle.objects.filter(title_pattern=pattern).exists(): status_counts = ( DeltaResolvedTitle.objects.filter(title_pattern=pattern) .values("status") @@ -475,8 +475,8 @@ def get(self, request, pattern_id): return Response(result) - except DeltaTitlePattern.DoesNotExist: - return Response({"error": f"Pattern {pattern_id} not found"}, status=status.HTTP_404_NOT_FOUND) + else: + return Response({"error": "DeltaResolvedTitleNotFound"}) class DocumentTypePatternViewSet(CollectionFilterMixin, viewsets.ModelViewSet): diff --git a/sde_indexing_helper/static/js/delta_url_list.js b/sde_indexing_helper/static/js/delta_url_list.js index ea382971..3602991f 100644 --- a/sde_indexing_helper/static/js/delta_url_list.js +++ b/sde_indexing_helper/static/js/delta_url_list.js @@ -1834,8 +1834,13 @@ function pollTitleResolutionStatus(patternId, match_pattern) { const response = await fetch(`/api/title-patterns/${patternId}/status/`); const data = await response.json(); + if (data.error && data.error === "DeltaResolvedTitleNotFound") { + toastr.error("No URLs were affected by this pattern."); + clearInterval(pollInterval); + } + // Check resolution status and show appropriate message - if (data.total > 0) { + else if (data.total > 0) { if(data.pending === 0 && data.processing === 0) { clearInterval(pollInterval); From 0eea662ea5aa6d52d3f74697392910e5b481309a Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Fri, 21 Feb 2025 13:21:43 -0600 Subject: [PATCH 09/10] Increase celery task's soft time limit --- sde_collections/tasks.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index fecf7f14..67c583c7 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -151,7 +151,7 @@ def pull_latest_collection_metadata_from_github(): s3_client.upload_file(FILENAME, s3_bucket_name, s3_key) -@celery_app.task(name="sde_collections.tasks.process_title_resolutions") +@celery_app.task(name="sde_collections.tasks.process_title_resolutions", soft_time_limit=10000) def process_title_resolutions(pattern_id: int) -> None: """Background task to process and resolve title patterns""" @@ -201,7 +201,12 @@ def process_title_resolutions(pattern_id: int) -> None: delta_url = DeltaUrl.objects.create(**fields) - DeltaResolvedTitle.objects.create(title_pattern=pattern, delta_url=delta_url, resolved_title=new_title) + DeltaResolvedTitle.objects.create( + title_pattern=pattern, + delta_url=delta_url, + resolved_title=new_title, + status=DeltaResolvedTitle.Status.RESOLVED, + ) # Process delta URLs # Set PENDING status initially to all the matching URLs From 69cd9b37ef5bc19de22d9b74e3af45e444454d85 Mon Sep 17 00:00:00 2001 From: Kiran Dawadi Date: Mon, 24 Feb 2025 17:15:44 -0600 Subject: [PATCH 10/10] Migrations squashed --- ...0076_deltaresolvedtitle_status_and_more.py | 41 ------ ...ed_0082_alter_deltaresolvedtitle_status.py | 128 ++++++++++++++++++ .../0077_alter_deltaresolvedtitle_status.py | 28 ---- .../0078_alter_deltaresolvedtitle_status.py | 28 ---- .../0079_alter_deltaresolvedtitle_status.py | 27 ---- .../0080_alter_deltaresolvedtitle_status.py | 28 ---- .../0081_alter_deltaresolvedtitle_status.py | 27 ---- sde_collections/models/delta_patterns.py | 2 +- 8 files changed, 129 insertions(+), 180 deletions(-) delete mode 100644 sde_collections/migrations/0076_deltaresolvedtitle_status_and_more.py create mode 100644 sde_collections/migrations/0076_deltaresolvedtitle_status_and_more_squashed_0082_alter_deltaresolvedtitle_status.py delete mode 100644 sde_collections/migrations/0077_alter_deltaresolvedtitle_status.py delete mode 100644 sde_collections/migrations/0078_alter_deltaresolvedtitle_status.py delete mode 100644 sde_collections/migrations/0079_alter_deltaresolvedtitle_status.py delete mode 100644 sde_collections/migrations/0080_alter_deltaresolvedtitle_status.py delete mode 100644 sde_collections/migrations/0081_alter_deltaresolvedtitle_status.py diff --git a/sde_collections/migrations/0076_deltaresolvedtitle_status_and_more.py b/sde_collections/migrations/0076_deltaresolvedtitle_status_and_more.py deleted file mode 100644 index e8efbe40..00000000 --- a/sde_collections/migrations/0076_deltaresolvedtitle_status_and_more.py +++ /dev/null @@ -1,41 +0,0 @@ -# Generated by Django 4.2.9 on 2025-02-02 15:45 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("sde_collections", "0075_alter_collection_reindexing_status_and_more"), - ] - - operations = [ - migrations.AddField( - model_name="deltaresolvedtitle", - name="status", - field=models.CharField( - choices=[ - ("pending", "Pending"), - ("processing", "Processing"), - ("resolved", "Resolved"), - ("failed", "Failed"), - ], - default="pending", - max_length=20, - ), - ), - migrations.AddField( - model_name="deltaresolvedtitle", - name="updated_at", - field=models.DateTimeField(auto_now=True), - ), - migrations.AddField( - model_name="deltaresolvedtitleerror", - name="updated_at", - field=models.DateTimeField(auto_now=True), - ), - migrations.AddIndex( - model_name="deltaresolvedtitle", - index=models.Index(fields=["status", "created_at"], name="sde_collect_status_42dc80_idx"), - ), - ] diff --git a/sde_collections/migrations/0076_deltaresolvedtitle_status_and_more_squashed_0082_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0076_deltaresolvedtitle_status_and_more_squashed_0082_alter_deltaresolvedtitle_status.py new file mode 100644 index 00000000..d3e2dc3d --- /dev/null +++ b/sde_collections/migrations/0076_deltaresolvedtitle_status_and_more_squashed_0082_alter_deltaresolvedtitle_status.py @@ -0,0 +1,128 @@ +# Generated by Django 4.2.9 on 2025-02-24 22:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sde_collections", "0075_alter_collection_reindexing_status_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="pending", + max_length=20, + ), + ), + migrations.AddField( + model_name="deltaresolvedtitle", + name="updated_at", + field=models.DateTimeField(auto_now=True), + ), + migrations.AddField( + model_name="deltaresolvedtitleerror", + name="updated_at", + field=models.DateTimeField(auto_now=True), + ), + migrations.AddIndex( + model_name="deltaresolvedtitle", + index=models.Index(fields=["status", "created_at"], name="sde_collect_status_42dc80_idx"), + ), + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + blank=True, + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + max_length=20, + null=True, + ), + ), + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="", + max_length=20, + null=True, + ), + ), + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="", + max_length=20, + ), + ), + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="", + max_length=20, + null=True, + ), + ), + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + default="", + max_length=20, + ), + ), + migrations.AlterField( + model_name="deltaresolvedtitle", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("resolved", "Resolved"), + ("failed", "Failed"), + ], + max_length=20, + null=True, + ), + ), + ] diff --git a/sde_collections/migrations/0077_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0077_alter_deltaresolvedtitle_status.py deleted file mode 100644 index afa70a09..00000000 --- a/sde_collections/migrations/0077_alter_deltaresolvedtitle_status.py +++ /dev/null @@ -1,28 +0,0 @@ -# Generated by Django 4.2.9 on 2025-02-21 06:41 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("sde_collections", "0076_deltaresolvedtitle_status_and_more"), - ] - - operations = [ - migrations.AlterField( - model_name="deltaresolvedtitle", - name="status", - field=models.CharField( - blank=True, - choices=[ - ("pending", "Pending"), - ("processing", "Processing"), - ("resolved", "Resolved"), - ("failed", "Failed"), - ], - max_length=20, - null=True, - ), - ), - ] diff --git a/sde_collections/migrations/0078_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0078_alter_deltaresolvedtitle_status.py deleted file mode 100644 index e856200c..00000000 --- a/sde_collections/migrations/0078_alter_deltaresolvedtitle_status.py +++ /dev/null @@ -1,28 +0,0 @@ -# Generated by Django 4.2.9 on 2025-02-21 06:46 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("sde_collections", "0077_alter_deltaresolvedtitle_status"), - ] - - operations = [ - migrations.AlterField( - model_name="deltaresolvedtitle", - name="status", - field=models.CharField( - choices=[ - ("pending", "Pending"), - ("processing", "Processing"), - ("resolved", "Resolved"), - ("failed", "Failed"), - ], - default="", - max_length=20, - null=True, - ), - ), - ] diff --git a/sde_collections/migrations/0079_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0079_alter_deltaresolvedtitle_status.py deleted file mode 100644 index 04521c74..00000000 --- a/sde_collections/migrations/0079_alter_deltaresolvedtitle_status.py +++ /dev/null @@ -1,27 +0,0 @@ -# Generated by Django 4.2.9 on 2025-02-21 06:47 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("sde_collections", "0078_alter_deltaresolvedtitle_status"), - ] - - operations = [ - migrations.AlterField( - model_name="deltaresolvedtitle", - name="status", - field=models.CharField( - choices=[ - ("pending", "Pending"), - ("processing", "Processing"), - ("resolved", "Resolved"), - ("failed", "Failed"), - ], - default="", - max_length=20, - ), - ), - ] diff --git a/sde_collections/migrations/0080_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0080_alter_deltaresolvedtitle_status.py deleted file mode 100644 index 610d1490..00000000 --- a/sde_collections/migrations/0080_alter_deltaresolvedtitle_status.py +++ /dev/null @@ -1,28 +0,0 @@ -# Generated by Django 4.2.9 on 2025-02-21 06:47 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("sde_collections", "0079_alter_deltaresolvedtitle_status"), - ] - - operations = [ - migrations.AlterField( - model_name="deltaresolvedtitle", - name="status", - field=models.CharField( - choices=[ - ("pending", "Pending"), - ("processing", "Processing"), - ("resolved", "Resolved"), - ("failed", "Failed"), - ], - default="", - max_length=20, - null=True, - ), - ), - ] diff --git a/sde_collections/migrations/0081_alter_deltaresolvedtitle_status.py b/sde_collections/migrations/0081_alter_deltaresolvedtitle_status.py deleted file mode 100644 index b958afed..00000000 --- a/sde_collections/migrations/0081_alter_deltaresolvedtitle_status.py +++ /dev/null @@ -1,27 +0,0 @@ -# Generated by Django 4.2.9 on 2025-02-21 06:52 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("sde_collections", "0080_alter_deltaresolvedtitle_status"), - ] - - operations = [ - migrations.AlterField( - model_name="deltaresolvedtitle", - name="status", - field=models.CharField( - choices=[ - ("pending", "Pending"), - ("processing", "Processing"), - ("resolved", "Resolved"), - ("failed", "Failed"), - ], - default="", - max_length=20, - ), - ), - ] diff --git a/sde_collections/models/delta_patterns.py b/sde_collections/models/delta_patterns.py index 804c2337..8b116a53 100644 --- a/sde_collections/models/delta_patterns.py +++ b/sde_collections/models/delta_patterns.py @@ -624,7 +624,7 @@ class Status(models.TextChoices): FAILED = "failed", "Failed" resolved_title = models.CharField(blank=True, default="") - status = models.CharField(max_length=20, choices=Status.choices, default="") + status = models.CharField(max_length=20, choices=Status.choices, null=True) class Meta: verbose_name = "Resolved Title"