From c537ba8d176baf8b21aa7f779ef53eaf37fcca8e Mon Sep 17 00:00:00 2001 From: Aleksandr Meshchriakov Date: Tue, 28 Apr 2026 16:06:34 +0200 Subject: [PATCH] fix remaining parser source failures --- src/apps/parsers/clients/base.py | 4 + src/apps/parsers/source_registry.py | 2 + src/apps/parsers/tasks.py | 165 +++++++++++++++++++++++++++- tests/apps/parsers/test_tasks.py | 87 +++++++++++++++ 4 files changed, 255 insertions(+), 3 deletions(-) diff --git a/src/apps/parsers/clients/base.py b/src/apps/parsers/clients/base.py index 5f47f55..af399c5 100644 --- a/src/apps/parsers/clients/base.py +++ b/src/apps/parsers/clients/base.py @@ -11,7 +11,9 @@ from dataclasses import dataclass, field from typing import Any import requests +import urllib3 from requests.adapters import BaseAdapter +from urllib3.exceptions import InsecureRequestWarning logger = logging.getLogger(__name__) @@ -96,6 +98,8 @@ class BaseHTTPClient: def _create_session(self) -> requests.Session: """Создать и настроить сессию requests.""" session = requests.Session() + if not self.verify_ssl: + urllib3.disable_warnings(InsecureRequestWarning) # Настройка прокси self._current_proxy = self._select_proxy() diff --git a/src/apps/parsers/source_registry.py b/src/apps/parsers/source_registry.py index 85410a7..9e20141 100644 --- a/src/apps/parsers/source_registry.py +++ b/src/apps/parsers/source_registry.py @@ -252,6 +252,8 @@ PARSER_SOURCES: dict[str, ParserSourceDescriptor] = { parser_strategy="fedresurs_bankruptcy_search", source_notes=( "Официальный ЕФРСБ; может отдавать anti-bot challenge worker'ам. " + "Если официальный портал недоступен, используется Checko API по " + "организациям из реестров. " "Ручная загрузка разрешена только для выгрузок, переданных Сергеем." ), supports_file_upload=True, diff --git a/src/apps/parsers/tasks.py b/src/apps/parsers/tasks.py index 04e7a3e..fdcedd6 100644 --- a/src/apps/parsers/tasks.py +++ b/src/apps/parsers/tasks.py @@ -5,6 +5,7 @@ Celery задачи для приложения парсеров. Интегрируются с BackgroundJob для отслеживания прогресса. """ +import hashlib import logging import shutil import time @@ -14,6 +15,8 @@ from pathlib import Path from apps.core.services import BackgroundJobService from apps.core.tasks import PeriodicTask as CorePeriodicTask +from apps.parsers.clients.checko import CheckoClient, CompanyRequest +from apps.parsers.clients.checko.exceptions import CheckoError from apps.parsers.clients.common import GenericParserItem, StructuredDataClient from apps.parsers.clients.minpromtorg import ( IndustrialProductionClient, @@ -38,6 +41,8 @@ from apps.parsers.services import ( ) from apps.parsers.source_registry import PARSER_SOURCES from celery import shared_task +from django.conf import settings +from registers.models import Organization from requests.adapters import BaseAdapter logger = logging.getLogger(__name__) @@ -45,6 +50,11 @@ logger = logging.getLogger(__name__) # Константы для синхронизации проверок DEFAULT_START_YEAR = 2025 DEFAULT_START_MONTH = 1 +STRUCTURED_SOURCE_OPTIONS = { + "fstec": {"verify_ssl": False}, + "fedresurs_bankruptcy": {"timeout": 30}, +} +FEDRESURS_CHECKO_FALLBACK_LIMIT = 100 def _resolve_proxies(proxies: list[str] | None) -> list[str] | None: @@ -151,7 +161,11 @@ def _fetch_structured_records( ) -> list[GenericParserItem]: """Загрузить records через structured client из URL или локального storage.""" descriptor = PARSER_SOURCES[source_key] - client = StructuredDataClient(source=source_key, proxies=proxies) + client = StructuredDataClient( + source=source_key, + proxies=proxies, + **STRUCTURED_SOURCE_OPTIONS.get(source_key, {}), + ) if file_path: from django.core.files.storage import default_storage @@ -164,6 +178,152 @@ def _fetch_structured_records( return client.fetch_records(file_url=file_url or descriptor.upstream_url) +def _fetch_fedresurs_bankruptcy_records( + *, + file_url: str | None, + file_path: str | None, + proxies: list[str] | None, +) -> list[GenericParserItem]: + """Загрузить банкротства: официальный портал, затем fallback через Checko.""" + try: + return _fetch_structured_records( + source_key="fedresurs_bankruptcy", + file_url=file_url, + file_path=file_path, + proxies=proxies, + ) + except Exception as exc: + if file_url or file_path: + raise + logger.warning( + "Fedresurs official source failed, falling back to Checko: %s", + exc, + ) + return _fetch_checko_bankruptcy_records(proxies=proxies) + + +def _fetch_checko_bankruptcy_records( + *, + proxies: list[str] | None, +) -> list[GenericParserItem]: + """Получить ЕФРСБ-сообщения по организациям из наших реестров через Checko.""" + api_key = getattr(settings, "CHECKO_API_KEY", "") + if not api_key: + logger.warning("CHECKO_API_KEY is empty; Fedresurs fallback skipped") + return [] + + limit = getattr( + settings, + "FEDRESURS_CHECKO_FALLBACK_LIMIT", + FEDRESURS_CHECKO_FALLBACK_LIMIT, + ) + try: + limit = int(limit) + except (TypeError, ValueError): + limit = FEDRESURS_CHECKO_FALLBACK_LIMIT + if limit <= 0: + logger.info("Fedresurs Checko fallback is disabled by limit=%s", limit) + return [] + organizations = list( + Organization.objects.order_by("mn_inn").values( + "mn_inn", + "mn_ogrn", + "pn_name", + )[:limit] + ) + if not organizations: + logger.info("No registry organizations found for Fedresurs fallback") + return [] + + checko_proxies = ( + proxies if getattr(settings, "CHECKO_USE_RUNTIME_PROXIES", False) else None + ) + client = CheckoClient(api_key=api_key, proxies=checko_proxies, timeout=30) + records: list[GenericParserItem] = [] + for organization in organizations: + inn = str(organization["mn_inn"]) + ogrn = str(organization["mn_ogrn"]) + name = organization["pn_name"] + try: + response = client.get_company(CompanyRequest(inn=inn)) + except CheckoError as exc: + logger.info("Checko bankruptcy lookup skipped for inn=%s: %s", inn, exc) + continue + company = response.data + if company is None: + continue + records.extend( + _checko_bankruptcy_items( + company=company, + fallback_inn=inn, + fallback_ogrn=ogrn, + fallback_name=name, + ) + ) + logger.info("Fetched %d bankruptcy records through Checko fallback", len(records)) + return records + + +def _checko_bankruptcy_items( + *, + company, + fallback_inn: str, + fallback_ogrn: str, + fallback_name: str, +) -> list[GenericParserItem]: + """Преобразовать банкротные сообщения Checko в generic records.""" + records: list[GenericParserItem] = [] + inn = str(getattr(company, "inn", "") or fallback_inn) + ogrn = str(getattr(company, "ogrn", "") or fallback_ogrn) + name = getattr(company, "short_name", None) or fallback_name + for message in getattr(company, "bankruptcy", ()): + message_type = getattr(message, "type", "") or "Сообщение ЕФРСБ" + message_date = getattr(message, "date", "") or "" + case_number = getattr(message, "case_number", None) or "" + external_id = _fedresurs_external_id( + inn=inn, + message_type=message_type, + message_date=message_date, + case_number=case_number, + ) + records.append( + GenericParserItem( + source="fedresurs_bankruptcy", + external_id=external_id, + inn=inn, + ogrn=ogrn, + organisation_name=name, + title=message_type, + record_date=message_date, + status=message_type, + payload={ + "provider": "checko", + "declared_source": "ЕФРСБ", + "inn": inn, + "ogrn": ogrn, + "organisation_name": name, + "type": message_type, + "date": message_date, + "case_number": case_number, + }, + ) + ) + return records + + +def _fedresurs_external_id( + *, + inn: str, + message_type: str, + message_date: str, + case_number: str, +) -> str: + """Стабильный ID для ЕФРСБ-сообщения из fallback-источника.""" + raw = f"{inn}:{message_type}:{message_date}:{case_number}" + digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:24] + return f"checko-fedresurs:{digest}" + + @shared_task(bind=True, base=CorePeriodicTask) def sync_ru_proxies(self) -> dict[str, int | str]: # noqa: ARG001 """Периодически загружать RU-прокси из Proxy-Tools.""" @@ -1665,8 +1825,7 @@ def parse_fedresurs_bankruptcy( source=ParserLoadLog.Source.FEDRESURS_BANKRUPTCY, task_name="apps.parsers.tasks.parse_fedresurs_bankruptcy", requested_by_id=requested_by_id, - fetch_records=lambda: _fetch_structured_records( - source_key="fedresurs_bankruptcy", + fetch_records=lambda: _fetch_fedresurs_bankruptcy_records( file_url=file_url, file_path=file_path, proxies=proxies, diff --git a/tests/apps/parsers/test_tasks.py b/tests/apps/parsers/test_tasks.py index 538236c..97e9033 100644 --- a/tests/apps/parsers/test_tasks.py +++ b/tests/apps/parsers/test_tasks.py @@ -57,6 +57,7 @@ from apps.parsers.tasks import ( ) from django.test import TestCase, override_settings from openpyxl import Workbook +from registers.models import Organization from tests.apps.parsers.factories import ( InspectionRecordFactory, @@ -155,6 +156,92 @@ class SyncRuProxiesTaskTestCase(TestCase): sync_mock.assert_called_once_with() +class GenericSourceFetchTestCase(TestCase): + """Tests for source-specific generic fetch configuration.""" + + def test_fstec_disables_ssl_verification_for_broken_certificate_chain(self): + class _RecordingStructuredClient: + instances = [] + + def __init__(self, **kwargs): + self.kwargs = kwargs + self.instances.append(self) + + def fetch_records(self, **_kwargs): + return [] + + with patch.object( + parser_tasks, + "StructuredDataClient", + _RecordingStructuredClient, + ): + records = parser_tasks._fetch_structured_records( + source_key="fstec", + file_url="https://reestr.fstec.ru/reg3", + file_path=None, + proxies=[], + ) + + self.assertEqual(records, []) + self.assertEqual(len(_RecordingStructuredClient.instances), 1) + self.assertEqual( + _RecordingStructuredClient.instances[0].kwargs["source"], + "fstec", + ) + self.assertFalse(_RecordingStructuredClient.instances[0].kwargs["verify_ssl"]) + + @override_settings(CHECKO_API_KEY="test-key", FEDRESURS_CHECKO_FALLBACK_LIMIT=10) + def test_fedresurs_falls_back_to_checko_for_registry_organizations(self): + organization = Organization.objects.create( + pn_name='ООО "Тест"', + mn_ogrn=1027700000000, + mn_inn=7701000001, + in_kpp=770101001, + mn_okpo="12345678", + ) + + class _CheckoClient: + def __init__(self, **_kwargs): + return + + def get_company(self, _request): + return SimpleNamespace( + data=SimpleNamespace( + ogrn=str(organization.mn_ogrn), + inn=str(organization.mn_inn), + short_name=organization.pn_name, + bankruptcy=( + SimpleNamespace( + type="Сообщение о введении наблюдения", + date="2026-04-01", + case_number="А40-1/2026", + ), + ), + ) + ) + + with ( + patch.object( + parser_tasks, + "_fetch_structured_records", + side_effect=RuntimeError("HTTP 401"), + ), + patch.object(parser_tasks, "CheckoClient", _CheckoClient), + ): + records = parser_tasks._fetch_fedresurs_bankruptcy_records( + file_url=None, + file_path=None, + proxies=[], + ) + + self.assertEqual(len(records), 1) + self.assertEqual(records[0].source, "fedresurs_bankruptcy") + self.assertEqual(records[0].inn, str(organization.mn_inn)) + self.assertEqual(records[0].ogrn, str(organization.mn_ogrn)) + self.assertEqual(records[0].record_date, "2026-04-01") + self.assertEqual(records[0].payload["provider"], "checko") + + @override_settings( CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True,