diff --git a/src/apps/core/services.py b/src/apps/core/services.py index 533c8c4..9e72f51 100644 --- a/src/apps/core/services.py +++ b/src/apps/core/services.py @@ -6,12 +6,14 @@ They are easily testable and can manage transactions. """ import logging +from datetime import timedelta from typing import Any, Generic, TypeVar import django from apps.core.exceptions import NotFoundError from django.db import models, transaction -from django.db.models import QuerySet +from django.db.models import Q, QuerySet +from django.utils import timezone logger = logging.getLogger(__name__) @@ -678,3 +680,37 @@ class BackgroundJobService(BaseReadOnlyService): .delete() ) return deleted + + @classmethod + def mark_stale_active_jobs_failed( + cls, + *, + max_age_minutes: int, + task_names: set[str] | None = None, + meta_sources: set[str] | None = None, + ) -> int: + """Mark old active jobs as failed after worker restarts or hard kills.""" + from apps.core.models import JobStatus + + cutoff = timezone.now() - timedelta(minutes=max_age_minutes) + queryset = cls.get_queryset().filter( + status__in=[JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY], + updated_at__lt=cutoff, + ) + if task_names: + queryset = queryset.filter(task_name__in=task_names) + if meta_sources: + source_filter = Q() + for source in meta_sources: + source_filter |= Q(meta__source=source) + queryset = queryset.filter(source_filter) + + stale_message = ( + "Stale background job was marked failed after " + f"{max_age_minutes} minutes without progress." + ) + updated = 0 + for job in queryset.order_by("created_at"): + job.fail(error=stale_message) + updated += 1 + return updated diff --git a/src/apps/parsers/frontend_compat.py b/src/apps/parsers/frontend_compat.py index ad078b4..e2d431a 100644 --- a/src/apps/parsers/frontend_compat.py +++ b/src/apps/parsers/frontend_compat.py @@ -24,11 +24,13 @@ from apps.parsers.views import ( TASKS_BY_NAME, build_task_kwargs, ) +from django.conf import settings from django.core.cache import cache from django.core.paginator import Paginator from django.db.models import CharField, Max, Q from django.db.models.functions import Cast from django.http import Http404, HttpResponse +from django.utils import timezone from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from rest_framework import status @@ -44,6 +46,7 @@ SYSTEM_LOGS_TAG = "System Logs" ACTIVE_JOB_STATUSES = {"pending", "started", "retry"} SUCCESS_LOAD_STATUSES = {"success", "skipped"} ERROR_LOAD_STATUSES = {"failed", "failure", "error"} +STALE_ACTIVE_MAX_AGE_MINUTES = 90 PARSING_SETTINGS_CACHE_KEY = "parsers:frontend_compat:parsing_settings" PARSING_SETTINGS_FIELDS = { @@ -295,6 +298,23 @@ def _serialize_active_job(job) -> dict[str, Any]: } +def _stale_cutoff(): + max_age_minutes = int( + getattr( + settings, + "PARSER_STALE_LOAD_MAX_AGE_MINUTES", + STALE_ACTIVE_MAX_AGE_MINUTES, + ) + ) + return timezone.now() - timedelta(minutes=max_age_minutes) + + +def _is_stale_load(load_log: ParserLoadLog | None) -> bool: + if load_log is None or load_log.status != "in_progress": + return False + return load_log.updated_at < _stale_cutoff() + + def _active_tasks_for_definition( definition: FrontendSourceCardDefinition, ) -> list[dict]: @@ -306,6 +326,7 @@ def _active_tasks_for_definition( queryset = BackgroundJobService.get_queryset().filter( task_name__in=task_names, status__in=ACTIVE_JOB_STATUSES, + updated_at__gte=_stale_cutoff(), ) return [_serialize_active_job(job) for job in queryset.order_by("-created_at")[:10]] @@ -330,8 +351,16 @@ def _status_for_card( ) -> str: if not definition.is_available: return "unavailable" - if active_tasks or (latest_load and latest_load.status == "in_progress"): + if active_tasks: return "in_progress" + if ( + latest_load + and latest_load.status == "in_progress" + and not _is_stale_load(latest_load) + ): + return "in_progress" + if latest_load and latest_load.status == "in_progress": + return "error" if latest_load and latest_load.status in ERROR_LOAD_STATUSES: return "error" if last_updated_at: @@ -416,7 +445,13 @@ def _build_source_card(definition: FrontendSourceCardDefinition) -> dict[str, An ), "last_updated_at": last_updated_at, "next_update_at": next_update_at, - "error_message": latest_load.error_message if latest_load else "", + "error_message": ( + "Загрузка зависла и будет закрыта cleanup-задачей." + if _is_stale_load(latest_load) + else latest_load.error_message + if latest_load + else "" + ), "task_names": [ PARSER_SOURCES[source_key].task_name for source_key in definition.source_keys diff --git a/src/apps/parsers/services.py b/src/apps/parsers/services.py index 1c918c0..1cb0fe1 100644 --- a/src/apps/parsers/services.py +++ b/src/apps/parsers/services.py @@ -416,7 +416,7 @@ class ParserLoadLogService(BaseService[ParserLoadLog]): updated = 0 active_statuses = [JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY] for log in stale_logs: - job = ( + batch_job = ( BackgroundJob.objects.filter( status__in=active_statuses, meta__source=log.source, @@ -425,6 +425,18 @@ class ParserLoadLogService(BaseService[ParserLoadLog]): .order_by("-updated_at") .first() ) + source_job = None + if batch_job is None: + source_job = ( + BackgroundJob.objects.filter( + status__in=active_statuses, + meta__source=log.source, + meta__batch_id__isnull=True, + ) + .order_by("-updated_at") + .first() + ) + job = batch_job or source_job if job is not None and job.updated_at >= cutoff: continue diff --git a/src/apps/parsers/source_cards.py b/src/apps/parsers/source_cards.py index 44be505..835ec7d 100644 --- a/src/apps/parsers/source_cards.py +++ b/src/apps/parsers/source_cards.py @@ -20,6 +20,7 @@ from apps.parsers.models import ( ParserLoadLog, ProcurementRecord, ) +from django.conf import settings from django.db.models import Max from django.http import Http404 from django.utils import timezone @@ -27,6 +28,7 @@ from rest_framework.exceptions import ValidationError SUCCESSFUL_LOAD_STATUSES = {"success", "skipped"} ACTIVE_JOB_STATUSES = [JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY] +STALE_ACTIVE_MAX_AGE_MINUTES = 90 @dataclass(frozen=True) @@ -293,6 +295,9 @@ class SourceCardService: latest_load=latest_load, last_updated_at=last_updated_at, ) + error_message = latest_load.error_message if latest_load else "" + if cls._is_stale_load(latest_load): + error_message = cls._stale_load_message() return { "slug": definition.slug, @@ -307,7 +312,7 @@ class SourceCardService: "organizations_count": organizations_count, "last_updated_at": last_updated_at, "next_update_at": cls._get_next_update_at(definition, last_updated_at), - "error_message": latest_load.error_message if latest_load else "", + "error_message": error_message, "task_names": list(definition.task_names), "refresh_requires_params": any( param.required for param in definition.refresh_params @@ -721,9 +726,11 @@ class SourceCardService: def _get_active_tasks( cls, definition: SourceCardDefinition ) -> list[dict[str, Any]]: + cutoff = cls._stale_cutoff() queryset = BackgroundJobService.get_queryset().filter( task_name__in=definition.task_names, status__in=ACTIVE_JOB_STATUSES, + updated_at__gte=cutoff, ) return [ cls._serialize_job(job) for job in queryset.order_by("-created_at")[:10] @@ -749,14 +756,44 @@ class SourceCardService: return "unavailable" if active_tasks: return "in_progress" - if latest_load and latest_load.status == "in_progress": + if ( + latest_load + and latest_load.status == "in_progress" + and not cls._is_stale_load(latest_load) + ): return "in_progress" + if latest_load and latest_load.status == "in_progress": + return "error" if latest_load and latest_load.status == "failed": return "error" if last_updated_at: return "success" return "idle" + @classmethod + def _stale_cutoff(cls): + max_age_minutes = int( + getattr( + settings, + "PARSER_STALE_LOAD_MAX_AGE_MINUTES", + STALE_ACTIVE_MAX_AGE_MINUTES, + ) + ) + return timezone.now() - timedelta(minutes=max_age_minutes) + + @classmethod + def _is_stale_load(cls, latest_load: ParserLoadLog | None) -> bool: + if latest_load is None or latest_load.status != "in_progress": + return False + updated_at = getattr(latest_load, "updated_at", None) + if updated_at is None: + return False + return updated_at < cls._stale_cutoff() + + @staticmethod + def _stale_load_message() -> str: + return "Загрузка зависла и будет закрыта cleanup-задачей." + @classmethod def _get_status_label(cls, status: str) -> str: labels = { diff --git a/src/apps/parsers/tasks.py b/src/apps/parsers/tasks.py index a8fe54a..f62652f 100644 --- a/src/apps/parsers/tasks.py +++ b/src/apps/parsers/tasks.py @@ -96,19 +96,27 @@ def _get_or_create_background_job( ): """Reuse a pre-created job or create a new one for the task.""" job = BackgroundJobService.get_by_task_id_or_none(task_id) + payload = {"source": source, **(meta or {})} + if batch_id is not None: + payload["batch_id"] = batch_id if not job: - payload = {"source": source, **(meta or {})} - if batch_id is not None: - payload["batch_id"] = batch_id job = BackgroundJobService.create_job( task_id=task_id, task_name=task_name, user_id=requested_by_id, meta=payload, ) - elif requested_by_id is not None and job.user_id is None: - job.user_id = requested_by_id - job.save(update_fields=["user_id", "updated_at"]) + else: + update_fields = [] + merged_meta = {**(job.meta or {}), **payload} + if merged_meta != job.meta: + job.meta = merged_meta + update_fields.append("meta") + if requested_by_id is not None and job.user_id is None: + job.user_id = requested_by_id + update_fields.append("user_id") + if update_fields: + job.save(update_fields=[*update_fields, "updated_at"]) return job @@ -1930,19 +1938,27 @@ def parse_fstec_registers( @shared_task def cleanup_stale_parser_loads(max_age_minutes: int | None = None) -> dict: - """Закрыть stale in_progress загрузки после рестартов worker/deploy.""" + """Закрыть stale in_progress загрузки и jobs после рестартов worker/deploy.""" if max_age_minutes is None: max_age_minutes = getattr( settings, "PARSER_STALE_LOAD_MAX_AGE_MINUTES", PARSER_STALE_LOAD_MAX_AGE_MINUTES, ) + source_values = {descriptor.source for descriptor in PARSER_SOURCES.values()} + task_names = {descriptor.task_name for descriptor in PARSER_SOURCES.values()} marked_failed = ParserLoadLogService.mark_stale_in_progress_failed( max_age_minutes=int(max_age_minutes) ) + marked_jobs_failed = BackgroundJobService.mark_stale_active_jobs_failed( + max_age_minutes=int(max_age_minutes), + task_names=task_names, + meta_sources=source_values, + ) return { "status": "success", "marked_failed": marked_failed, + "marked_jobs_failed": marked_jobs_failed, "max_age_minutes": int(max_age_minutes), } diff --git a/tests/apps/core/test_background_jobs.py b/tests/apps/core/test_background_jobs.py index f93655c..2f896d9 100644 --- a/tests/apps/core/test_background_jobs.py +++ b/tests/apps/core/test_background_jobs.py @@ -1,8 +1,11 @@ """Тесты для BackgroundJob.""" +from datetime import timedelta + from apps.core.models import BackgroundJob, JobStatus from apps.core.services import BackgroundJobService from django.test import TestCase +from django.utils import timezone from faker import Faker fake = Faker() @@ -252,10 +255,6 @@ class BackgroundJobServiceTest(TestCase): self.assertEqual([j.task_id for j in active_jobs], [job_user.task_id]) def test_cleanup_old_jobs(self): - from datetime import timedelta - - from django.utils import timezone - old_job = BackgroundJobService.create_job( task_id="job-old", task_name="test.task", @@ -272,3 +271,39 @@ class BackgroundJobServiceTest(TestCase): deleted = BackgroundJobService.cleanup_old_jobs(days=30) self.assertEqual(deleted, 1) + + def test_mark_stale_active_jobs_failed_scopes_by_task_and_source(self): + stale_job = BackgroundJobService.create_job( + task_id="job-stale", + task_name="apps.parsers.tasks.parse_industrial_products", + meta={"source": "industrial_products"}, + ) + fresh_job = BackgroundJobService.create_job( + task_id="job-fresh", + task_name="apps.parsers.tasks.parse_industrial_products", + meta={"source": "industrial_products"}, + ) + unrelated_job = BackgroundJobService.create_job( + task_id="job-unrelated", + task_name="apps.other.tasks.task", + meta={"source": "industrial_products"}, + ) + old_timestamp = timezone.now() - timedelta(hours=3) + BackgroundJob.objects.filter( + task_id__in=[stale_job.task_id, unrelated_job.task_id] + ).update(updated_at=old_timestamp) + + updated = BackgroundJobService.mark_stale_active_jobs_failed( + max_age_minutes=90, + task_names={"apps.parsers.tasks.parse_industrial_products"}, + meta_sources={"industrial_products"}, + ) + + stale_job.refresh_from_db() + fresh_job.refresh_from_db() + unrelated_job.refresh_from_db() + self.assertEqual(updated, 1) + self.assertEqual(stale_job.status, JobStatus.FAILURE) + self.assertIn("Stale background job", stale_job.error) + self.assertEqual(fresh_job.status, JobStatus.PENDING) + self.assertEqual(unrelated_job.status, JobStatus.PENDING) diff --git a/tests/apps/parsers/test_services.py b/tests/apps/parsers/test_services.py index d9570c6..c4528f2 100644 --- a/tests/apps/parsers/test_services.py +++ b/tests/apps/parsers/test_services.py @@ -411,6 +411,31 @@ class ParserLoadLogServiceTest(TestCase): self.assertEqual(updated, 0) self.assertEqual(log.status, ParserLoadLog.Status.IN_PROGRESS) + def test_mark_stale_in_progress_failed_closes_precreated_job_without_batch(self): + """Pre-created source-card jobs without batch_id are still linked by source.""" + log = ParserLoadLogFactory( + source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS, + batch_id=2, + status=ParserLoadLog.Status.IN_PROGRESS, + ) + job = BackgroundJob.objects.create( + task_id="precreated-source-card-task", + task_name="apps.parsers.tasks.parse_industrial_products", + status=JobStatus.STARTED, + meta={"source": log.source, "source_card": "manufacturers-and-products"}, + ) + old_timestamp = timezone.now() - timedelta(hours=3) + ParserLoadLog.objects.filter(pk=log.pk).update(updated_at=old_timestamp) + BackgroundJob.objects.filter(pk=job.pk).update(updated_at=old_timestamp) + + updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90) + + log.refresh_from_db() + job.refresh_from_db() + self.assertEqual(updated, 1) + self.assertEqual(log.status, ParserLoadLog.Status.FAILED) + self.assertEqual(job.status, JobStatus.FAILURE) + class IndustrialCertificateServiceTest(TestCase): """Tests for IndustrialCertificateService.""" diff --git a/tests/apps/parsers/test_source_cards_service.py b/tests/apps/parsers/test_source_cards_service.py index e1fdd96..e793e2a 100644 --- a/tests/apps/parsers/test_source_cards_service.py +++ b/tests/apps/parsers/test_source_cards_service.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import timedelta from types import SimpleNamespace from unittest.mock import MagicMock, patch @@ -10,6 +11,7 @@ from apps.parsers.source_cards import ( ) from django.http import Http404 from django.test import SimpleTestCase +from django.utils import timezone from rest_framework.exceptions import ValidationError @@ -249,6 +251,19 @@ class SourceCardServiceUnitTest(SimpleTestCase): ), "in_progress", ) + stale_in_progress_load = SimpleNamespace( + status="in_progress", + updated_at=timezone.now() - timedelta(hours=3), + ) + self.assertEqual( + SourceCardService._get_status( + definition=SourceCardService.get_definition("financial-indicators"), + active_tasks=[], + latest_load=stale_in_progress_load, + last_updated_at=None, + ), + "error", + ) self.assertEqual( SourceCardService._get_status( definition=SourceCardService.get_definition("financial-indicators"), diff --git a/tests/apps/parsers/test_tasks.py b/tests/apps/parsers/test_tasks.py index aa9286c..143e124 100644 --- a/tests/apps/parsers/test_tasks.py +++ b/tests/apps/parsers/test_tasks.py @@ -12,6 +12,7 @@ from types import SimpleNamespace from unittest.mock import patch from urllib.parse import urlparse +from apps.core.services import BackgroundJobService from apps.parsers import tasks as parser_tasks from apps.parsers.clients.base import HTTPError from apps.parsers.clients.minpromtorg.industrial import ( @@ -285,14 +286,46 @@ class GenericSourceFetchTestCase(TestCase): ParserLoadLogService, "mark_stale_in_progress_failed", return_value=2, - ) as cleanup_mock: + ) as cleanup_mock, patch.object( + BackgroundJobService, + "mark_stale_active_jobs_failed", + return_value=3, + ) as jobs_cleanup_mock: result = parser_tasks.cleanup_stale_parser_loads(max_age_minutes=45) cleanup_mock.assert_called_once_with(max_age_minutes=45) + jobs_cleanup_mock.assert_called_once() + self.assertEqual(jobs_cleanup_mock.call_args.kwargs["max_age_minutes"], 45) self.assertEqual(result["status"], "success") self.assertEqual(result["marked_failed"], 2) + self.assertEqual(result["marked_jobs_failed"], 3) self.assertEqual(result["max_age_minutes"], 45) + def test_get_or_create_background_job_merges_meta_for_precreated_job(self): + BackgroundJobService.create_job( + task_id="precreated-task", + task_name="apps.parsers.tasks.parse_industrial_products", + user_id=None, + meta={ + "source": ParserLoadLog.Source.INDUSTRIAL_PRODUCTS, + "source_card": "manufacturers-and-products", + }, + ) + + job = parser_tasks._get_or_create_background_job( + task_id="precreated-task", + task_name="apps.parsers.tasks.parse_industrial_products", + source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS, + batch_id=7, + requested_by_id=42, + meta={"source_key": "mpt_products"}, + ) + + self.assertEqual(job.user_id, 42) + self.assertEqual(job.meta["source_card"], "manufacturers-and-products") + self.assertEqual(job.meta["source_key"], "mpt_products") + self.assertEqual(job.meta["batch_id"], 7) + @override_settings( CELERY_TASK_ALWAYS_EAGER=True,