fix(parsers): handle stale and blocked sources
Some checks failed
CI/CD Pipeline / Manual Action Help (push) Has been skipped
CI/CD Pipeline / Build Golden Images (push) Has been skipped
CI/CD Pipeline / Quality Gate (push) Failing after 1s
CI/CD Pipeline / Start Dev Containers in Dokploy (push) Has been skipped
CI/CD Pipeline / Drop and Recreate Dev Database (push) Has been skipped
CI/CD Pipeline / Build and Push Images (push) Has been skipped
CI/CD Pipeline / Deploy Dev in Dokploy (push) Has been skipped
CI/CD Pipeline / Internal Notify (push) Successful in 1s

This commit is contained in:
2026-04-28 20:55:43 +02:00
parent 77d84b9778
commit 4bcac334cd
5 changed files with 315 additions and 7 deletions

View File

@@ -0,0 +1,82 @@
import json
from datetime import timedelta
from django.db import migrations
from django.utils import timezone
STALE_LOAD_MAX_AGE_MINUTES = 90
STALE_LOAD_ERROR_MESSAGE = (
"Stale parser load was marked failed after "
f"{STALE_LOAD_MAX_AGE_MINUTES} minutes without completion."
)
OLD_DUPLICATE_TASK_NAMES = [
"parse-industrial-production-daily",
"parse-manufactures-daily",
"parse-industrial-products-daily",
"parse-inspections-weekly",
]
CLEANUP_TASK_NAME = "parser:cleanup-stale-loads"
CLEANUP_TASK_PATH = "apps.parsers.tasks.cleanup_stale_parser_loads"
CLEANUP_INTERVAL = {
"every": 15,
"period": "minutes",
}
def cleanup_stale_parser_loads_and_schedules(apps, schema_editor):
ParserLoadLog = apps.get_model("parsers", "ParserLoadLog")
IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule")
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
PeriodicTask.objects.filter(name__in=OLD_DUPLICATE_TASK_NAMES).delete()
interval, _ = IntervalSchedule.objects.get_or_create(**CLEANUP_INTERVAL)
field_names = {field.name for field in PeriodicTask._meta.fields}
schedule_fields = {"interval": interval}
for field_name in ("crontab", "solar", "clocked"):
if field_name in field_names:
schedule_fields[field_name] = None
PeriodicTask.objects.update_or_create(
name=CLEANUP_TASK_NAME,
defaults={
"task": CLEANUP_TASK_PATH,
"args": json.dumps([]),
"kwargs": json.dumps({"max_age_minutes": STALE_LOAD_MAX_AGE_MINUTES}),
"enabled": True,
"description": (
"Marks stale parser load logs as failed after worker/deploy restarts."
),
**schedule_fields,
},
)
cutoff = timezone.now() - timedelta(minutes=STALE_LOAD_MAX_AGE_MINUTES)
ParserLoadLog.objects.filter(
status="in_progress",
updated_at__lt=cutoff,
).update(
status="failed",
error_message=STALE_LOAD_ERROR_MESSAGE,
)
def remove_cleanup_stale_parser_loads_schedule(apps, schema_editor):
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
PeriodicTask.objects.filter(name=CLEANUP_TASK_NAME).delete()
class Migration(migrations.Migration):
dependencies = [
("parsers", "0018_seed_weekly_parser_schedules"),
]
operations = [
migrations.RunPython(
cleanup_stale_parser_loads_and_schedules,
reverse_code=remove_cleanup_stale_parser_loads_schedule,
),
]

View File

@@ -9,7 +9,7 @@ import re
from collections import defaultdict
from contextlib import suppress
from dataclasses import dataclass
from datetime import date, datetime
from datetime import date, datetime, timedelta
from decimal import Decimal, InvalidOperation
from typing import Any
from urllib.parse import urlparse
@@ -393,6 +393,48 @@ class ParserLoadLogService(BaseService[ParserLoadLog]):
"""Отметить загрузку как неудачную."""
return cls.update(log, status="failed", error_message=error_message)
@classmethod
def mark_stale_in_progress_failed(
cls,
*,
max_age_minutes: int,
) -> int:
"""Закрыть зависшие in_progress логи без живой свежей BackgroundJob."""
from apps.core.models import BackgroundJob, JobStatus
cutoff = timezone.now() - timedelta(minutes=max_age_minutes)
stale_logs = list(
cls.model.objects.filter(
status=ParserLoadLog.Status.IN_PROGRESS,
updated_at__lt=cutoff,
).order_by("created_at")
)
stale_message = (
"Stale parser load was marked failed after "
f"{max_age_minutes} minutes without completion."
)
updated = 0
active_statuses = [JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY]
for log in stale_logs:
job = (
BackgroundJob.objects.filter(
status__in=active_statuses,
meta__source=log.source,
meta__batch_id=log.batch_id,
)
.order_by("-updated_at")
.first()
)
if job is not None and job.updated_at >= cutoff:
continue
cls.mark_failed(log, stale_message)
updated += 1
if job is not None:
job.fail(error=stale_message)
return updated
@classmethod
def update_records_count(cls, log: ParserLoadLog, count: int) -> ParserLoadLog:
"""Обновить количество записей."""

View File

@@ -15,6 +15,7 @@ from pathlib import Path
from apps.core.services import BackgroundJobService
from apps.core.tasks import PeriodicTask as CorePeriodicTask
from apps.parsers.clients.base import HTTPClientError
from apps.parsers.clients.checko import CheckoClient, CompanyRequest
from apps.parsers.clients.checko.exceptions import CheckoError
from apps.parsers.clients.common import GenericParserItem, StructuredDataClient
@@ -55,6 +56,15 @@ STRUCTURED_SOURCE_OPTIONS = {
"fedresurs_bankruptcy": {"timeout": 30},
}
FEDRESURS_CHECKO_FALLBACK_LIMIT = 100
PARSER_STALE_LOAD_MAX_AGE_MINUTES = 90
PARSER_SOFT_TIME_LIMIT_SECONDS = 15 * 60
PARSER_TIME_LIMIT_SECONDS = 20 * 60
class ParserSourceSkipped(Exception):
"""Источник временно недоступен, но задача должна завершиться как skipped."""
pass
def _resolve_proxies(proxies: list[str] | None) -> list[str] | None:
@@ -145,6 +155,22 @@ def _run_generic_parser(
result = {"batch_id": batch_id, "saved": saved_count, "status": "success"}
job.complete(result=result)
return result
except ParserSourceSkipped as e:
message = str(e)
logger.warning("%s skipped: %s", task_name, message)
ParserLoadLogService.update(
load_log,
status=ParserLoadLog.Status.SKIPPED,
error_message=message,
)
result = {
"batch_id": batch_id,
"saved": 0,
"status": "skipped",
"reason": message,
}
job.complete(result=result)
return result
except Exception as e:
logger.error("%s failed: %s", task_name, e, exc_info=True)
ParserLoadLogService.mark_failed(load_log, str(e))
@@ -185,6 +211,7 @@ def _fetch_fedresurs_bankruptcy_records(
proxies: list[str] | None,
) -> list[GenericParserItem]:
"""Загрузить банкротства: официальный портал, затем fallback через Checko."""
official_error: Exception | None = None
try:
return _fetch_structured_records(
source_key="fedresurs_bankruptcy",
@@ -195,11 +222,42 @@ def _fetch_fedresurs_bankruptcy_records(
except Exception as exc:
if file_url or file_path:
raise
official_error = exc
logger.warning(
"Fedresurs official source failed, falling back to Checko: %s",
exc,
)
return _fetch_checko_bankruptcy_records(proxies=proxies)
records = _fetch_checko_bankruptcy_records(proxies=proxies)
if records:
return records
if isinstance(official_error, HTTPClientError):
raise ParserSourceSkipped(
"fedresurs upstream is unavailable or blocked; "
"Checko fallback returned no bankruptcy records"
) from official_error
return records
def _fetch_fstec_records(
*,
file_url: str | None,
file_path: str | None,
proxies: list[str] | None,
) -> list[GenericParserItem]:
"""Загрузить ФСТЭК, не превращая WAF/доступ upstream в вечную ошибку."""
try:
return _fetch_structured_records(
source_key="fstec",
file_url=file_url,
file_path=file_path,
proxies=proxies,
)
except HTTPClientError as exc:
if file_url or file_path:
raise
raise ParserSourceSkipped(
"fstec upstream is unavailable or blocked; configure runtime proxy"
) from exc
def _fetch_checko_bankruptcy_records(
@@ -724,7 +782,11 @@ def parse_manufactures(
raise
@shared_task(bind=True)
@shared_task(
bind=True,
soft_time_limit=PARSER_SOFT_TIME_LIMIT_SECONDS,
time_limit=PARSER_TIME_LIMIT_SECONDS,
)
def parse_industrial_products(
self,
proxies: list[str] | None = None,
@@ -1808,7 +1870,11 @@ def parse_arbitration_cases(
)
@shared_task(bind=True)
@shared_task(
bind=True,
soft_time_limit=PARSER_SOFT_TIME_LIMIT_SECONDS,
time_limit=PARSER_TIME_LIMIT_SECONDS,
)
def parse_fedresurs_bankruptcy(
self,
*,
@@ -1833,7 +1899,11 @@ def parse_fedresurs_bankruptcy(
)
@shared_task(bind=True)
@shared_task(
bind=True,
soft_time_limit=PARSER_SOFT_TIME_LIMIT_SECONDS,
time_limit=PARSER_TIME_LIMIT_SECONDS,
)
def parse_fstec_registers(
self,
*,
@@ -1850,8 +1920,7 @@ def parse_fstec_registers(
source=ParserLoadLog.Source.FSTEC,
task_name="apps.parsers.tasks.parse_fstec_registers",
requested_by_id=requested_by_id,
fetch_records=lambda: _fetch_structured_records(
source_key="fstec",
fetch_records=lambda: _fetch_fstec_records(
file_url=file_url,
file_path=file_path,
proxies=proxies,
@@ -1859,6 +1928,25 @@ def parse_fstec_registers(
)
@shared_task
def cleanup_stale_parser_loads(max_age_minutes: int | None = None) -> dict:
"""Закрыть stale in_progress загрузки после рестартов worker/deploy."""
if max_age_minutes is None:
max_age_minutes = getattr(
settings,
"PARSER_STALE_LOAD_MAX_AGE_MINUTES",
PARSER_STALE_LOAD_MAX_AGE_MINUTES,
)
marked_failed = ParserLoadLogService.mark_stale_in_progress_failed(
max_age_minutes=int(max_age_minutes)
)
return {
"status": "success",
"marked_failed": marked_failed,
"max_age_minutes": int(max_age_minutes),
}
@shared_task(bind=True)
def parse_trudvsem_vacancies(
self,

View File

@@ -1,8 +1,10 @@
"""Tests for parsers services."""
from datetime import timedelta
from unittest.mock import patch
from urllib.parse import urlparse
from apps.core.models import BackgroundJob, JobStatus
from apps.parsers.clients.minpromtorg.industrial import IndustrialProductionClient
from apps.parsers.clients.minpromtorg.schemas import (
IndustrialCertificate,
@@ -32,6 +34,7 @@ from apps.parsers.services import (
)
from apps.registers.models import Organization
from django.test import TestCase, override_settings, tag
from django.utils import timezone
from tests.utils import TestHTTPServer
from tests.utils.fixtures import build_minpromtorg_certificates_excel, fake
@@ -367,6 +370,47 @@ class ParserLoadLogServiceTest(TestCase):
log.refresh_from_db()
self.assertEqual(log.records_count, 250)
def test_mark_stale_in_progress_failed_marks_old_logs(self):
"""Old in_progress logs without a fresh active job are closed."""
log = ParserLoadLogFactory(
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
batch_id=1,
status=ParserLoadLog.Status.IN_PROGRESS,
)
ParserLoadLog.objects.filter(pk=log.pk).update(
updated_at=timezone.now() - timedelta(hours=3)
)
updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90)
log.refresh_from_db()
self.assertEqual(updated, 1)
self.assertEqual(log.status, ParserLoadLog.Status.FAILED)
self.assertIn("Stale parser load", log.error_message)
def test_mark_stale_in_progress_failed_keeps_fresh_active_job(self):
"""A fresh active BackgroundJob keeps the matching load in progress."""
log = ParserLoadLogFactory(
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
batch_id=1,
status=ParserLoadLog.Status.IN_PROGRESS,
)
ParserLoadLog.objects.filter(pk=log.pk).update(
updated_at=timezone.now() - timedelta(hours=3)
)
BackgroundJob.objects.create(
task_id="active-task",
task_name="apps.parsers.tasks.parse_industrial_products",
status=JobStatus.STARTED,
meta={"source": log.source, "batch_id": log.batch_id},
)
updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90)
log.refresh_from_db()
self.assertEqual(updated, 0)
self.assertEqual(log.status, ParserLoadLog.Status.IN_PROGRESS)
class IndustrialCertificateServiceTest(TestCase):
"""Tests for IndustrialCertificateService."""

View File

@@ -13,6 +13,7 @@ from unittest.mock import patch
from urllib.parse import urlparse
from apps.parsers import tasks as parser_tasks
from apps.parsers.clients.base import HTTPError
from apps.parsers.clients.minpromtorg.industrial import (
IndustrialProductionClient,
IndustrialProductionClientError,
@@ -36,6 +37,7 @@ from apps.parsers.models import (
ParserLoadLog,
ProcurementRecord,
)
from apps.parsers.services import ParserLoadLogService
from apps.parsers.tasks import (
_move_to_dir,
_process_fns_file_sync,
@@ -241,6 +243,56 @@ class GenericSourceFetchTestCase(TestCase):
self.assertEqual(records[0].record_date, "2026-04-01")
self.assertEqual(records[0].payload["provider"], "checko")
@override_settings(CHECKO_API_KEY="")
def test_fedresurs_skips_when_official_blocked_and_fallback_empty(self):
with patch.object(
parser_tasks,
"_fetch_structured_records",
side_effect=HTTPError(
"HTTP 401 for https://bankrot.fedresurs.ru/",
status_code=401,
url="https://bankrot.fedresurs.ru/",
),
):
result = parser_tasks.parse_fedresurs_bankruptcy(proxies=[])
log = ParserLoadLog.objects.get(
source=ParserLoadLog.Source.FEDRESURS_BANKRUPTCY
)
self.assertEqual(result["status"], "skipped")
self.assertEqual(log.status, ParserLoadLog.Status.SKIPPED)
self.assertIn("fedresurs upstream", log.error_message)
def test_fstec_skips_when_upstream_is_blocked(self):
with patch.object(
parser_tasks,
"_fetch_structured_records",
side_effect=HTTPError(
"HTTP 403 for https://reestr.fstec.ru/reg3",
status_code=403,
url="https://reestr.fstec.ru/reg3",
),
):
result = parser_tasks.parse_fstec_registers(proxies=[])
log = ParserLoadLog.objects.get(source=ParserLoadLog.Source.FSTEC)
self.assertEqual(result["status"], "skipped")
self.assertEqual(log.status, ParserLoadLog.Status.SKIPPED)
self.assertIn("fstec upstream", log.error_message)
def test_cleanup_stale_parser_loads_returns_count(self):
with patch.object(
ParserLoadLogService,
"mark_stale_in_progress_failed",
return_value=2,
) as cleanup_mock:
result = parser_tasks.cleanup_stale_parser_loads(max_age_minutes=45)
cleanup_mock.assert_called_once_with(max_age_minutes=45)
self.assertEqual(result["status"], "success")
self.assertEqual(result["marked_failed"], 2)
self.assertEqual(result["max_age_minutes"], 45)
@override_settings(
CELERY_TASK_ALWAYS_EAGER=True,