From 5857f1a4d20879dd8067dd9662a23766298e48d0 Mon Sep 17 00:00:00 2001 From: Aleksandr Meshchriakov Date: Tue, 28 Apr 2026 21:56:01 +0200 Subject: [PATCH] fix(parsers): detect stale jobs by run age --- src/apps/core/services.py | 12 ++++-- src/apps/parsers/frontend_compat.py | 14 ++++-- src/apps/parsers/services.py | 35 ++++++--------- src/apps/parsers/source_cards.py | 16 ++++--- tests/apps/core/test_background_jobs.py | 2 +- tests/apps/parsers/test_services.py | 5 ++- .../apps/parsers/test_source_cards_service.py | 43 ++++++++++++++++++- 7 files changed, 90 insertions(+), 37 deletions(-) diff --git a/src/apps/core/services.py b/src/apps/core/services.py index 9e72f51..bc6cebe 100644 --- a/src/apps/core/services.py +++ b/src/apps/core/services.py @@ -693,9 +693,15 @@ class BackgroundJobService(BaseReadOnlyService): from apps.core.models import JobStatus cutoff = timezone.now() - timedelta(minutes=max_age_minutes) - queryset = cls.get_queryset().filter( - status__in=[JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY], - updated_at__lt=cutoff, + queryset = ( + cls.get_queryset() + .filter( + status__in=[JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY], + ) + .filter( + Q(started_at__isnull=False, started_at__lt=cutoff) + | Q(started_at__isnull=True, created_at__lt=cutoff) + ) ) if task_names: queryset = queryset.filter(task_name__in=task_names) diff --git a/src/apps/parsers/frontend_compat.py b/src/apps/parsers/frontend_compat.py index e2d431a..1dacf54 100644 --- a/src/apps/parsers/frontend_compat.py +++ b/src/apps/parsers/frontend_compat.py @@ -323,10 +323,16 @@ def _active_tasks_for_definition( for source_key in definition.source_keys if source_key in PARSER_SOURCES ] - queryset = BackgroundJobService.get_queryset().filter( - task_name__in=task_names, - status__in=ACTIVE_JOB_STATUSES, - updated_at__gte=_stale_cutoff(), + queryset = ( + BackgroundJobService.get_queryset() + .filter( + task_name__in=task_names, + status__in=ACTIVE_JOB_STATUSES, + ) + .filter( + Q(started_at__isnull=False, started_at__gte=_stale_cutoff()) + | Q(started_at__isnull=True, created_at__gte=_stale_cutoff()) + ) ) return [_serialize_active_job(job) for job in queryset.order_by("-created_at")[:10]] diff --git a/src/apps/parsers/services.py b/src/apps/parsers/services.py index 1cb0fe1..2ff2be2 100644 --- a/src/apps/parsers/services.py +++ b/src/apps/parsers/services.py @@ -416,33 +416,24 @@ class ParserLoadLogService(BaseService[ParserLoadLog]): updated = 0 active_statuses = [JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY] for log in stale_logs: - batch_job = ( - BackgroundJob.objects.filter( - status__in=active_statuses, - meta__source=log.source, - meta__batch_id=log.batch_id, - ) - .order_by("-updated_at") - .first() + active_jobs = BackgroundJob.objects.filter( + status__in=active_statuses, + meta__source=log.source, + ).filter(Q(meta__batch_id=log.batch_id) | Q(meta__batch_id__isnull=True)) + fresh_jobs = active_jobs.filter( + Q(started_at__isnull=False, started_at__gte=cutoff) + | Q(started_at__isnull=True, created_at__gte=cutoff) ) - source_job = None - if batch_job is None: - source_job = ( - BackgroundJob.objects.filter( - status__in=active_statuses, - meta__source=log.source, - meta__batch_id__isnull=True, - ) - .order_by("-updated_at") - .first() - ) - job = batch_job or source_job - if job is not None and job.updated_at >= cutoff: + if fresh_jobs.exists(): continue cls.mark_failed(log, stale_message) updated += 1 - if job is not None: + stale_jobs = active_jobs.filter( + Q(started_at__isnull=False, started_at__lt=cutoff) + | Q(started_at__isnull=True, created_at__lt=cutoff) + ) + for job in stale_jobs.order_by("created_at"): job.fail(error=stale_message) return updated diff --git a/src/apps/parsers/source_cards.py b/src/apps/parsers/source_cards.py index 835ec7d..0b721fb 100644 --- a/src/apps/parsers/source_cards.py +++ b/src/apps/parsers/source_cards.py @@ -21,7 +21,7 @@ from apps.parsers.models import ( ProcurementRecord, ) from django.conf import settings -from django.db.models import Max +from django.db.models import Max, Q from django.http import Http404 from django.utils import timezone from rest_framework.exceptions import ValidationError @@ -727,10 +727,16 @@ class SourceCardService: cls, definition: SourceCardDefinition ) -> list[dict[str, Any]]: cutoff = cls._stale_cutoff() - queryset = BackgroundJobService.get_queryset().filter( - task_name__in=definition.task_names, - status__in=ACTIVE_JOB_STATUSES, - updated_at__gte=cutoff, + queryset = ( + BackgroundJobService.get_queryset() + .filter( + task_name__in=definition.task_names, + status__in=ACTIVE_JOB_STATUSES, + ) + .filter( + Q(started_at__isnull=False, started_at__gte=cutoff) + | Q(started_at__isnull=True, created_at__gte=cutoff) + ) ) return [ cls._serialize_job(job) for job in queryset.order_by("-created_at")[:10] diff --git a/tests/apps/core/test_background_jobs.py b/tests/apps/core/test_background_jobs.py index 2f896d9..a56d022 100644 --- a/tests/apps/core/test_background_jobs.py +++ b/tests/apps/core/test_background_jobs.py @@ -291,7 +291,7 @@ class BackgroundJobServiceTest(TestCase): old_timestamp = timezone.now() - timedelta(hours=3) BackgroundJob.objects.filter( task_id__in=[stale_job.task_id, unrelated_job.task_id] - ).update(updated_at=old_timestamp) + ).update(created_at=old_timestamp, updated_at=timezone.now()) updated = BackgroundJobService.mark_stale_active_jobs_failed( max_age_minutes=90, diff --git a/tests/apps/parsers/test_services.py b/tests/apps/parsers/test_services.py index c4528f2..fe299c0 100644 --- a/tests/apps/parsers/test_services.py +++ b/tests/apps/parsers/test_services.py @@ -426,7 +426,10 @@ class ParserLoadLogServiceTest(TestCase): ) old_timestamp = timezone.now() - timedelta(hours=3) ParserLoadLog.objects.filter(pk=log.pk).update(updated_at=old_timestamp) - BackgroundJob.objects.filter(pk=job.pk).update(updated_at=old_timestamp) + BackgroundJob.objects.filter(pk=job.pk).update( + created_at=old_timestamp, + updated_at=timezone.now(), + ) updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90) diff --git a/tests/apps/parsers/test_source_cards_service.py b/tests/apps/parsers/test_source_cards_service.py index e793e2a..3ba3bce 100644 --- a/tests/apps/parsers/test_source_cards_service.py +++ b/tests/apps/parsers/test_source_cards_service.py @@ -4,13 +4,14 @@ from datetime import timedelta from types import SimpleNamespace from unittest.mock import MagicMock, patch +from apps.core.models import BackgroundJob, JobStatus from apps.parsers.source_cards import ( SourceCardDefinition, SourceCardService, SourceItemDefinition, ) from django.http import Http404 -from django.test import SimpleTestCase +from django.test import SimpleTestCase, TestCase, override_settings from django.utils import timezone from rest_framework.exceptions import ValidationError @@ -291,3 +292,43 @@ class SourceCardServiceUnitTest(SimpleTestCase): ), "idle", ) + + +@override_settings(PARSER_STALE_LOAD_MAX_AGE_MINUTES=90) +class SourceCardServiceDatabaseTest(TestCase): + def test_get_active_tasks_ignores_old_jobs_even_when_updated_recently(self): + job = BackgroundJob.objects.create( + task_id="old-source-task", + task_name="apps.parsers.tasks.parse_industrial_products", + status=JobStatus.STARTED, + progress=10, + meta={"source": "industrial_products"}, + ) + old_timestamp = timezone.now() - timedelta(hours=3) + BackgroundJob.objects.filter(pk=job.pk).update( + created_at=old_timestamp, + started_at=old_timestamp, + updated_at=timezone.now(), + ) + + tasks = SourceCardService._get_active_tasks( + SourceCardService.get_definition("manufacturers-and-products") + ) + + self.assertEqual(tasks, []) + + def test_get_active_tasks_keeps_recent_pending_jobs(self): + BackgroundJob.objects.create( + task_id="fresh-source-task", + task_name="apps.parsers.tasks.parse_industrial_products", + status=JobStatus.PENDING, + progress=0, + meta={"source": "industrial_products"}, + ) + + tasks = SourceCardService._get_active_tasks( + SourceCardService.get_definition("manufacturers-and-products") + ) + + self.assertEqual(len(tasks), 1) + self.assertEqual(tasks[0]["task_id"], "fresh-source-task")