diff --git a/src/apps/parsers/migrations/0019_cleanup_stale_parser_loads_schedule.py b/src/apps/parsers/migrations/0019_cleanup_stale_parser_loads_schedule.py new file mode 100644 index 0000000..acb908f --- /dev/null +++ b/src/apps/parsers/migrations/0019_cleanup_stale_parser_loads_schedule.py @@ -0,0 +1,82 @@ +import json +from datetime import timedelta + +from django.db import migrations +from django.utils import timezone + + +STALE_LOAD_MAX_AGE_MINUTES = 90 +STALE_LOAD_ERROR_MESSAGE = ( + "Stale parser load was marked failed after " + f"{STALE_LOAD_MAX_AGE_MINUTES} minutes without completion." +) + +OLD_DUPLICATE_TASK_NAMES = [ + "parse-industrial-production-daily", + "parse-manufactures-daily", + "parse-industrial-products-daily", + "parse-inspections-weekly", +] + +CLEANUP_TASK_NAME = "parser:cleanup-stale-loads" +CLEANUP_TASK_PATH = "apps.parsers.tasks.cleanup_stale_parser_loads" +CLEANUP_INTERVAL = { + "every": 15, + "period": "minutes", +} + + +def cleanup_stale_parser_loads_and_schedules(apps, schema_editor): + ParserLoadLog = apps.get_model("parsers", "ParserLoadLog") + IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule") + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + + PeriodicTask.objects.filter(name__in=OLD_DUPLICATE_TASK_NAMES).delete() + + interval, _ = IntervalSchedule.objects.get_or_create(**CLEANUP_INTERVAL) + field_names = {field.name for field in PeriodicTask._meta.fields} + schedule_fields = {"interval": interval} + for field_name in ("crontab", "solar", "clocked"): + if field_name in field_names: + schedule_fields[field_name] = None + + PeriodicTask.objects.update_or_create( + name=CLEANUP_TASK_NAME, + defaults={ + "task": CLEANUP_TASK_PATH, + "args": json.dumps([]), + "kwargs": json.dumps({"max_age_minutes": STALE_LOAD_MAX_AGE_MINUTES}), + "enabled": True, + "description": ( + "Marks stale parser load logs as failed after worker/deploy restarts." + ), + **schedule_fields, + }, + ) + + cutoff = timezone.now() - timedelta(minutes=STALE_LOAD_MAX_AGE_MINUTES) + ParserLoadLog.objects.filter( + status="in_progress", + updated_at__lt=cutoff, + ).update( + status="failed", + error_message=STALE_LOAD_ERROR_MESSAGE, + ) + + +def remove_cleanup_stale_parser_loads_schedule(apps, schema_editor): + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + PeriodicTask.objects.filter(name=CLEANUP_TASK_NAME).delete() + + +class Migration(migrations.Migration): + dependencies = [ + ("parsers", "0018_seed_weekly_parser_schedules"), + ] + + operations = [ + migrations.RunPython( + cleanup_stale_parser_loads_and_schedules, + reverse_code=remove_cleanup_stale_parser_loads_schedule, + ), + ] diff --git a/src/apps/parsers/services.py b/src/apps/parsers/services.py index 43804ad..1c918c0 100644 --- a/src/apps/parsers/services.py +++ b/src/apps/parsers/services.py @@ -9,7 +9,7 @@ import re from collections import defaultdict from contextlib import suppress from dataclasses import dataclass -from datetime import date, datetime +from datetime import date, datetime, timedelta from decimal import Decimal, InvalidOperation from typing import Any from urllib.parse import urlparse @@ -393,6 +393,48 @@ class ParserLoadLogService(BaseService[ParserLoadLog]): """Отметить загрузку как неудачную.""" return cls.update(log, status="failed", error_message=error_message) + @classmethod + def mark_stale_in_progress_failed( + cls, + *, + max_age_minutes: int, + ) -> int: + """Закрыть зависшие in_progress логи без живой свежей BackgroundJob.""" + from apps.core.models import BackgroundJob, JobStatus + + cutoff = timezone.now() - timedelta(minutes=max_age_minutes) + stale_logs = list( + cls.model.objects.filter( + status=ParserLoadLog.Status.IN_PROGRESS, + updated_at__lt=cutoff, + ).order_by("created_at") + ) + stale_message = ( + "Stale parser load was marked failed after " + f"{max_age_minutes} minutes without completion." + ) + updated = 0 + active_statuses = [JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY] + for log in stale_logs: + job = ( + BackgroundJob.objects.filter( + status__in=active_statuses, + meta__source=log.source, + meta__batch_id=log.batch_id, + ) + .order_by("-updated_at") + .first() + ) + if job is not None and job.updated_at >= cutoff: + continue + + cls.mark_failed(log, stale_message) + updated += 1 + if job is not None: + job.fail(error=stale_message) + + return updated + @classmethod def update_records_count(cls, log: ParserLoadLog, count: int) -> ParserLoadLog: """Обновить количество записей.""" diff --git a/src/apps/parsers/tasks.py b/src/apps/parsers/tasks.py index fdcedd6..a8fe54a 100644 --- a/src/apps/parsers/tasks.py +++ b/src/apps/parsers/tasks.py @@ -15,6 +15,7 @@ from pathlib import Path from apps.core.services import BackgroundJobService from apps.core.tasks import PeriodicTask as CorePeriodicTask +from apps.parsers.clients.base import HTTPClientError from apps.parsers.clients.checko import CheckoClient, CompanyRequest from apps.parsers.clients.checko.exceptions import CheckoError from apps.parsers.clients.common import GenericParserItem, StructuredDataClient @@ -55,6 +56,15 @@ STRUCTURED_SOURCE_OPTIONS = { "fedresurs_bankruptcy": {"timeout": 30}, } FEDRESURS_CHECKO_FALLBACK_LIMIT = 100 +PARSER_STALE_LOAD_MAX_AGE_MINUTES = 90 +PARSER_SOFT_TIME_LIMIT_SECONDS = 15 * 60 +PARSER_TIME_LIMIT_SECONDS = 20 * 60 + + +class ParserSourceSkipped(Exception): + """Источник временно недоступен, но задача должна завершиться как skipped.""" + + pass def _resolve_proxies(proxies: list[str] | None) -> list[str] | None: @@ -145,6 +155,22 @@ def _run_generic_parser( result = {"batch_id": batch_id, "saved": saved_count, "status": "success"} job.complete(result=result) return result + except ParserSourceSkipped as e: + message = str(e) + logger.warning("%s skipped: %s", task_name, message) + ParserLoadLogService.update( + load_log, + status=ParserLoadLog.Status.SKIPPED, + error_message=message, + ) + result = { + "batch_id": batch_id, + "saved": 0, + "status": "skipped", + "reason": message, + } + job.complete(result=result) + return result except Exception as e: logger.error("%s failed: %s", task_name, e, exc_info=True) ParserLoadLogService.mark_failed(load_log, str(e)) @@ -185,6 +211,7 @@ def _fetch_fedresurs_bankruptcy_records( proxies: list[str] | None, ) -> list[GenericParserItem]: """Загрузить банкротства: официальный портал, затем fallback через Checko.""" + official_error: Exception | None = None try: return _fetch_structured_records( source_key="fedresurs_bankruptcy", @@ -195,11 +222,42 @@ def _fetch_fedresurs_bankruptcy_records( except Exception as exc: if file_url or file_path: raise + official_error = exc logger.warning( "Fedresurs official source failed, falling back to Checko: %s", exc, ) - return _fetch_checko_bankruptcy_records(proxies=proxies) + records = _fetch_checko_bankruptcy_records(proxies=proxies) + if records: + return records + if isinstance(official_error, HTTPClientError): + raise ParserSourceSkipped( + "fedresurs upstream is unavailable or blocked; " + "Checko fallback returned no bankruptcy records" + ) from official_error + return records + + +def _fetch_fstec_records( + *, + file_url: str | None, + file_path: str | None, + proxies: list[str] | None, +) -> list[GenericParserItem]: + """Загрузить ФСТЭК, не превращая WAF/доступ upstream в вечную ошибку.""" + try: + return _fetch_structured_records( + source_key="fstec", + file_url=file_url, + file_path=file_path, + proxies=proxies, + ) + except HTTPClientError as exc: + if file_url or file_path: + raise + raise ParserSourceSkipped( + "fstec upstream is unavailable or blocked; configure runtime proxy" + ) from exc def _fetch_checko_bankruptcy_records( @@ -724,7 +782,11 @@ def parse_manufactures( raise -@shared_task(bind=True) +@shared_task( + bind=True, + soft_time_limit=PARSER_SOFT_TIME_LIMIT_SECONDS, + time_limit=PARSER_TIME_LIMIT_SECONDS, +) def parse_industrial_products( self, proxies: list[str] | None = None, @@ -1808,7 +1870,11 @@ def parse_arbitration_cases( ) -@shared_task(bind=True) +@shared_task( + bind=True, + soft_time_limit=PARSER_SOFT_TIME_LIMIT_SECONDS, + time_limit=PARSER_TIME_LIMIT_SECONDS, +) def parse_fedresurs_bankruptcy( self, *, @@ -1833,7 +1899,11 @@ def parse_fedresurs_bankruptcy( ) -@shared_task(bind=True) +@shared_task( + bind=True, + soft_time_limit=PARSER_SOFT_TIME_LIMIT_SECONDS, + time_limit=PARSER_TIME_LIMIT_SECONDS, +) def parse_fstec_registers( self, *, @@ -1850,8 +1920,7 @@ def parse_fstec_registers( source=ParserLoadLog.Source.FSTEC, task_name="apps.parsers.tasks.parse_fstec_registers", requested_by_id=requested_by_id, - fetch_records=lambda: _fetch_structured_records( - source_key="fstec", + fetch_records=lambda: _fetch_fstec_records( file_url=file_url, file_path=file_path, proxies=proxies, @@ -1859,6 +1928,25 @@ def parse_fstec_registers( ) +@shared_task +def cleanup_stale_parser_loads(max_age_minutes: int | None = None) -> dict: + """Закрыть stale in_progress загрузки после рестартов worker/deploy.""" + if max_age_minutes is None: + max_age_minutes = getattr( + settings, + "PARSER_STALE_LOAD_MAX_AGE_MINUTES", + PARSER_STALE_LOAD_MAX_AGE_MINUTES, + ) + marked_failed = ParserLoadLogService.mark_stale_in_progress_failed( + max_age_minutes=int(max_age_minutes) + ) + return { + "status": "success", + "marked_failed": marked_failed, + "max_age_minutes": int(max_age_minutes), + } + + @shared_task(bind=True) def parse_trudvsem_vacancies( self, diff --git a/tests/apps/parsers/test_services.py b/tests/apps/parsers/test_services.py index 69e773c..d9570c6 100644 --- a/tests/apps/parsers/test_services.py +++ b/tests/apps/parsers/test_services.py @@ -1,8 +1,10 @@ """Tests for parsers services.""" +from datetime import timedelta from unittest.mock import patch from urllib.parse import urlparse +from apps.core.models import BackgroundJob, JobStatus from apps.parsers.clients.minpromtorg.industrial import IndustrialProductionClient from apps.parsers.clients.minpromtorg.schemas import ( IndustrialCertificate, @@ -32,6 +34,7 @@ from apps.parsers.services import ( ) from apps.registers.models import Organization from django.test import TestCase, override_settings, tag +from django.utils import timezone from tests.utils import TestHTTPServer from tests.utils.fixtures import build_minpromtorg_certificates_excel, fake @@ -367,6 +370,47 @@ class ParserLoadLogServiceTest(TestCase): log.refresh_from_db() self.assertEqual(log.records_count, 250) + def test_mark_stale_in_progress_failed_marks_old_logs(self): + """Old in_progress logs without a fresh active job are closed.""" + log = ParserLoadLogFactory( + source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS, + batch_id=1, + status=ParserLoadLog.Status.IN_PROGRESS, + ) + ParserLoadLog.objects.filter(pk=log.pk).update( + updated_at=timezone.now() - timedelta(hours=3) + ) + + updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90) + + log.refresh_from_db() + self.assertEqual(updated, 1) + self.assertEqual(log.status, ParserLoadLog.Status.FAILED) + self.assertIn("Stale parser load", log.error_message) + + def test_mark_stale_in_progress_failed_keeps_fresh_active_job(self): + """A fresh active BackgroundJob keeps the matching load in progress.""" + log = ParserLoadLogFactory( + source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS, + batch_id=1, + status=ParserLoadLog.Status.IN_PROGRESS, + ) + ParserLoadLog.objects.filter(pk=log.pk).update( + updated_at=timezone.now() - timedelta(hours=3) + ) + BackgroundJob.objects.create( + task_id="active-task", + task_name="apps.parsers.tasks.parse_industrial_products", + status=JobStatus.STARTED, + meta={"source": log.source, "batch_id": log.batch_id}, + ) + + updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90) + + log.refresh_from_db() + self.assertEqual(updated, 0) + self.assertEqual(log.status, ParserLoadLog.Status.IN_PROGRESS) + class IndustrialCertificateServiceTest(TestCase): """Tests for IndustrialCertificateService.""" diff --git a/tests/apps/parsers/test_tasks.py b/tests/apps/parsers/test_tasks.py index 97e9033..aa9286c 100644 --- a/tests/apps/parsers/test_tasks.py +++ b/tests/apps/parsers/test_tasks.py @@ -13,6 +13,7 @@ from unittest.mock import patch from urllib.parse import urlparse from apps.parsers import tasks as parser_tasks +from apps.parsers.clients.base import HTTPError from apps.parsers.clients.minpromtorg.industrial import ( IndustrialProductionClient, IndustrialProductionClientError, @@ -36,6 +37,7 @@ from apps.parsers.models import ( ParserLoadLog, ProcurementRecord, ) +from apps.parsers.services import ParserLoadLogService from apps.parsers.tasks import ( _move_to_dir, _process_fns_file_sync, @@ -241,6 +243,56 @@ class GenericSourceFetchTestCase(TestCase): self.assertEqual(records[0].record_date, "2026-04-01") self.assertEqual(records[0].payload["provider"], "checko") + @override_settings(CHECKO_API_KEY="") + def test_fedresurs_skips_when_official_blocked_and_fallback_empty(self): + with patch.object( + parser_tasks, + "_fetch_structured_records", + side_effect=HTTPError( + "HTTP 401 for https://bankrot.fedresurs.ru/", + status_code=401, + url="https://bankrot.fedresurs.ru/", + ), + ): + result = parser_tasks.parse_fedresurs_bankruptcy(proxies=[]) + + log = ParserLoadLog.objects.get( + source=ParserLoadLog.Source.FEDRESURS_BANKRUPTCY + ) + self.assertEqual(result["status"], "skipped") + self.assertEqual(log.status, ParserLoadLog.Status.SKIPPED) + self.assertIn("fedresurs upstream", log.error_message) + + def test_fstec_skips_when_upstream_is_blocked(self): + with patch.object( + parser_tasks, + "_fetch_structured_records", + side_effect=HTTPError( + "HTTP 403 for https://reestr.fstec.ru/reg3", + status_code=403, + url="https://reestr.fstec.ru/reg3", + ), + ): + result = parser_tasks.parse_fstec_registers(proxies=[]) + + log = ParserLoadLog.objects.get(source=ParserLoadLog.Source.FSTEC) + self.assertEqual(result["status"], "skipped") + self.assertEqual(log.status, ParserLoadLog.Status.SKIPPED) + self.assertIn("fstec upstream", log.error_message) + + def test_cleanup_stale_parser_loads_returns_count(self): + with patch.object( + ParserLoadLogService, + "mark_stale_in_progress_failed", + return_value=2, + ) as cleanup_mock: + result = parser_tasks.cleanup_stale_parser_loads(max_age_minutes=45) + + cleanup_mock.assert_called_once_with(max_age_minutes=45) + self.assertEqual(result["status"], "success") + self.assertEqual(result["marked_failed"], 2) + self.assertEqual(result["max_age_minutes"], 45) + @override_settings( CELERY_TASK_ALWAYS_EAGER=True,