Files
mostovik-backend/src/apps/parsers/clients/proverki/client.py
Aleksandr Meshchriakov 199d871923
Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 1m28s
CI/CD Pipeline / Build Docker Images (push) Has been cancelled
CI/CD Pipeline / Push to Gitea Registry (push) Has been cancelled
CI/CD Pipeline / Run Tests (push) Has been cancelled
feat(parsers): add proverki.gov.ru parser with sync_inspections task
- Add InspectionRecord model with is_federal_law_248, data_year, data_month fields
- Add ProverkiClient with Playwright support for JS-rendered portal
- Add streaming XML parser for large files (>50MB)
- Add sync_inspections task with incremental loading logic
  - Starts from 01.01.2025 if DB is empty
  - Loads both FZ-294 and FZ-248 inspections
  - Stops after 2 consecutive empty months
- Add InspectionService methods: get_last_loaded_period, has_data_for_period
- Add Minpromtorg parsers (certificates, manufacturers)
- Add Django Admin for parser models
- Update README with parsers documentation and changelog
2026-01-21 20:16:25 +01:00

1103 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Клиент для парсинга данных с proverki.gov.ru.
Источник: ФГИС "Единый реестр проверок" (Генпрокуратура РФ).
Поддерживает несколько стратегий получения данных:
1. Прямой доступ к Open Data файлам (XML)
2. API запросы (если доступны)
3. Playwright headless browser для динамического контента (fallback)
"""
import io
import logging
import tempfile
import zipfile
from collections.abc import Callable
from dataclasses import dataclass, field
from xml.etree import ( # noqa: S314 - XML parsing with proper error handling
ElementTree as ET,
)
from apps.parsers.clients.base import BaseHTTPClient, HTTPClientError
from apps.parsers.clients.proverki.schemas import Inspection, InspectionPlan
logger = logging.getLogger(__name__)
# Конфигурация по умолчанию
DEFAULT_HOST = "proverki.gov.ru"
DEFAULT_OPEN_DATA_PATH = "/portal/public-open-data"
# Паттерны URL для открытых данных
# Формат: https://proverki.gov.ru/opendata/{dataset_id}/data-{date}.zip
OPEN_DATA_BASE_URL = "https://proverki.gov.ru/opendata"
# URL портала открытых данных (для Playwright)
OPEN_DATA_PORTAL_URL = "https://proverki.gov.ru/portal/public-open-data"
class ProverkiClientError(HTTPClientError):
"""Ошибка клиента proverki.gov.ru."""
pass
@dataclass
class ProverkiClient:
"""
Клиент для получения данных о проверках с proverki.gov.ru.
Полностью изолирован от Django. Все настройки передаются через конструктор.
Стратегия работы:
1. Пытается получить данные через прямые HTTP запросы к Open Data
2. Если сайт требует JavaScript - использует Playwright
3. Скачивает ZIP/XML архивы с данными
4. Парсит XML и возвращает структурированные данные
Использование:
client = ProverkiClient()
inspections = client.fetch_inspections(year=2025)
for inspection in inspections:
print(inspection.registration_number, inspection.inn)
"""
proxies: list[str] | None = None
host: str = DEFAULT_HOST
timeout: int = 120
temp_dir: str | None = None
use_playwright: bool = True # Использовать Playwright как fallback
_http_client: BaseHTTPClient | None = field(default=None, repr=False)
_playwright: object | None = field(default=None, repr=False)
_browser: object | None = field(default=None, repr=False)
def __post_init__(self) -> None:
"""Инициализация клиента."""
self._http_client = None
self._playwright = None
self._browser = None
self._temp_dir = self.temp_dir or tempfile.gettempdir()
@property
def http_client(self) -> BaseHTTPClient:
"""Ленивая инициализация HTTP клиента."""
if self._http_client is None:
self._http_client = BaseHTTPClient(
base_url=f"https://{self.host}",
proxies=self.proxies,
timeout=self.timeout,
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Accept": "application/json, application/xml, text/html, */*",
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
},
)
return self._http_client
def fetch_inspections(
self,
*,
year: int | None = None,
month: int | None = None,
file_url: str | None = None,
is_federal_law_248: bool = False,
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Inspection]:
"""
Получить список проверок.
Args:
year: Год плана проверок (если не указан file_url)
month: Месяц (опционально)
file_url: Прямая ссылка на файл данных
is_federal_law_248: Запрос проверок по ФЗ-248 (новые проверки с 2021)
progress_callback: Callback для отчёта о прогрессе (percent, message)
Returns:
Список проверок
Raises:
ProverkiClientError: При ошибке получения данных
"""
fz_type = "ФЗ-248" if is_federal_law_248 else "ФЗ-294"
logger.info(
"Fetching inspections (year=%s, month=%s, %s)", year, month, fz_type
)
if progress_callback:
progress_callback(0, "Инициализация...")
try:
# Если передан прямой URL - скачиваем его
if file_url:
return self._download_and_parse(file_url, progress_callback)
# Иначе пытаемся найти файлы для указанного периода
plans = self._discover_data_files(
year=year, month=month, is_federal_law_248=is_federal_law_248
)
if not plans:
logger.warning("No data files found for year=%s, month=%s", year, month)
return []
if progress_callback:
progress_callback(10, f"Найдено {len(plans)} файлов данных")
# Скачиваем и парсим каждый файл
all_inspections = []
for i, plan in enumerate(plans):
if progress_callback:
progress = 10 + (i * 80) // len(plans)
progress_callback(progress, f"Загрузка {plan.file_name}...")
inspections = self._download_and_parse(
plan.file_url, None, file_format=plan.file_format
)
all_inspections.extend(inspections)
logger.info(
"Parsed %d inspections from %s", len(inspections), plan.file_name
)
if progress_callback:
progress_callback(95, f"Загружено {len(all_inspections)} проверок")
logger.info("Total fetched %d inspections", len(all_inspections))
return all_inspections
except HTTPClientError:
raise
except Exception as e:
logger.error("Error fetching inspections: %s", e)
raise ProverkiClientError(f"Failed to fetch inspections: {e}") from e
def _discover_data_files(
self,
*,
year: int | None = None,
month: int | None = None,
is_federal_law_248: bool = False,
) -> list[InspectionPlan]:
"""
Найти доступные файлы данных для указанного периода.
URL структура портала proverki.gov.ru:
- Проверки по месяцам: /portal/public-open-data/check/{year}/{month}
- Планы проверок: /portal/public-open-data/check/{year}/plans
- Параметр isFederalLaw248=true для проверок по ФЗ-248
"""
plans = []
if not year:
return plans
base_url = "https://proverki.gov.ru/portal/public-open-data/check"
fz_param = "true" if is_federal_law_248 else "false"
fz_suffix = "fz248" if is_federal_law_248 else "fz294"
# Если указан конкретный месяц - ищем проверки за этот месяц
if month:
portal_url = f"{base_url}/{year}/{month}?isFederalLaw248={fz_param}"
plans.append(
InspectionPlan(
year=year,
month=month,
file_url=portal_url,
file_name=f"inspection-{month}-{year}-{fz_suffix}.zip",
file_format="portal",
)
)
else:
# Без месяца - ищем план проверок на год
portal_url = f"{base_url}/{year}/plans?isFederalLaw248={fz_param}"
plans.append(
InspectionPlan(
year=year,
month=None,
file_url=portal_url,
file_name=f"plan-{year}-{fz_suffix}.zip",
file_format="portal",
)
)
logger.info(
"Discovered %d data files for year=%s, month=%s, fz248=%s",
len(plans),
year,
month,
is_federal_law_248,
)
return plans
def _download_and_parse( # noqa: C901
self,
file_url: str,
progress_callback: Callable[[int, str], None] | None = None,
file_format: str = "auto",
) -> list[Inspection]:
"""Скачать файл и распарсить его содержимое."""
logger.info("Downloading: %s (format=%s)", file_url, file_format)
# Если это портал - сразу используем Playwright
if file_format == "portal" or "/portal/" in file_url:
if not self.use_playwright:
raise ProverkiClientError(
"Портал proverki.gov.ru требует JavaScript. "
"Включите use_playwright=True.",
url=file_url,
)
if progress_callback:
progress_callback(20, "Навигация по порталу...")
content = self._download_from_portal(file_url, progress_callback)
self._close_playwright()
else:
if progress_callback:
progress_callback(20, f"Скачивание {file_url}...")
content = self.http_client.download_file(file_url)
logger.info("Downloaded %d bytes", len(content))
# Проверка на HTML ответ (сайт требует JavaScript)
if content[:15].lower().startswith((b"<!doctype html", b"<html")):
if self.use_playwright:
logger.info("Server returned HTML, switching to Playwright...")
if progress_callback:
progress_callback(
22, "Сервер требует JavaScript, запускаем браузер..."
)
content = self._download_with_playwright(
file_url, progress_callback
)
self._close_playwright()
else:
raise ProverkiClientError(
"Сервер вернул HTML вместо данных. "
"API proverki.gov.ru требует JavaScript. "
"Включите use_playwright=True или получите данные вручную.",
url=file_url,
)
# Определяем тип файла по СОДЕРЖИМОМУ (приоритет), затем по расширению
is_zip = content[:4] == b"PK\x03\x04"
is_xml = content[:5] == b"<?xml" or (
content[:100].strip().startswith(b"<")
and b"<html" not in content[:100].lower()
)
if is_zip:
return self._parse_zip_archive(content, progress_callback)
elif is_xml or file_url.endswith(".xml"):
return self._parse_xml_content(content, progress_callback)
else:
# Пытаемся как XML
try:
return self._parse_xml_content(content, progress_callback)
except Exception as e:
raise ProverkiClientError(
f"Unknown file format for {file_url}",
url=file_url,
) from e
def _download_from_portal(
self,
portal_url: str,
progress_callback: Callable[[int, str], None] | None = None,
) -> bytes:
"""
Скачать данные с портала proverki.gov.ru через Playwright.
Навигация:
1. Открыть страницу датасета
2. Дождаться загрузки Angular/SPA контента
3. Кликнуть на вкладку "Скачать" (download tab)
4. Найти и кликнуть на "Набор данных" (ZIP)
5. Скачать файл
Args:
portal_url: URL страницы датасета на портале
progress_callback: Игнорируется (для совместимости)
Returns:
Содержимое ZIP файла в байтах
"""
logger.info("Downloading from portal: %s", portal_url)
browser = self._get_browser()
context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
accept_downloads=True,
)
page = context.new_page()
try:
# Переходим на страницу датасета
logger.info("Navigating to dataset page: %s", portal_url)
page.goto(portal_url, wait_until="networkidle", timeout=60000)
# Ждём загрузки SPA контента (Angular) - ищем признаки загруженной страницы
# На proverki.gov.ru используется Angular, контент загружается динамически
try:
# Ждём появления контента страницы (заголовок датасета или вкладки)
page.wait_for_selector(
"text=Паспорт, text=Скачать, h1, h2, .card, .dataset",
timeout=15000,
)
except Exception:
logger.warning("Timeout waiting for SPA content, continuing anyway...")
page.wait_for_timeout(3000) # Дополнительное ожидание для Angular
# Debug: скриншот для анализа
page_content = page.content()
logger.info(
"Page loaded, content length: %d, title: %s",
len(page_content),
page.title(),
)
# ШАГ 1: Кликаем на вкладку "Скачать" для доступа к ссылкам на файлы
# Портал имеет две вкладки: "Паспорт" и "Скачать"
download_tab = page.query_selector(
"a:has-text('Скачать'):not([href*='.zip']), "
"button:has-text('Скачать'), "
"[role='tab']:has-text('Скачать'), "
".tab:has-text('Скачать'), "
".nav-link:has-text('Скачать'), "
"li:has-text('Скачать') a, "
"span:has-text('Скачать')"
)
if download_tab:
logger.info("Found 'Скачать' tab, clicking...")
download_tab.click()
page.wait_for_timeout(2000) # Ждём загрузки контента вкладки
else:
logger.warning("'Скачать' tab not found, trying to find links directly")
# ШАГ 2: Ищем ссылку "Набор данных" (основная кнопка загрузки ZIP)
# URL формат: https://proverki.gov.ru/blob/opendata/{year}/{month}/data-*.zip
zip_link = page.query_selector(
"a:has-text('Набор данных'), "
"a[href*='/blob/opendata/'][href$='.zip'], "
"a[href*='data-'][href$='.zip']"
)
if not zip_link:
# Попробуем найти в таблице истории изменений
# На скриншоте видны файлы data-YYYYMMDD-structure-*.zip
zip_link = page.query_selector(
"table a[href$='.zip'], " "a[href*='structure'][href$='.zip']"
)
if not zip_link:
# Последняя попытка - любая ссылка на .zip
zip_link = page.query_selector("a[href$='.zip']")
if zip_link:
href = zip_link.get_attribute("href")
logger.info("Found ZIP download link: %s", href)
with page.expect_download(timeout=120000) as download_info:
zip_link.click()
download = download_info.value
download_path = download.path()
if download_path:
with open(download_path, "rb") as f:
content = f.read()
logger.info("Downloaded %d bytes from portal", len(content))
return content
# Если не нашли ZIP - пробуем XML (паспорт набора данных)
xml_link = page.query_selector(
"a:has-text('Паспорт набора данных'), " "a[href$='.xml']"
)
if xml_link:
logger.info("Found XML link, clicking...")
with page.expect_download(timeout=60000) as download_info:
xml_link.click()
download = download_info.value
download_path = download.path()
if download_path:
with open(download_path, "rb") as f:
content = f.read()
logger.info("Downloaded %d bytes (XML) from portal", len(content))
return content
# Debug: выводим что есть на странице
all_links = page.query_selector_all("a[href]")
logger.warning(
"No download links found. Page has %d links. Sample hrefs: %s",
len(all_links),
[link.get_attribute("href") for link in all_links[:10]],
)
# Проверяем, есть ли на странице сообщение об ошибке или отсутствии данных
if (
"не найден" in page_content.lower()
or "not found" in page_content.lower()
):
raise ProverkiClientError(
"Данные за указанный период не найдены на портале",
url=portal_url,
)
raise ProverkiClientError(
"Не удалось найти ссылку на скачивание данных на портале. "
"Убедитесь, что выбран корректный год/месяц.",
url=portal_url,
)
finally:
context.close()
def _parse_zip_archive(
self,
content: bytes,
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Inspection]:
"""Распаковать ZIP архив и распарсить XML файлы внутри."""
inspections = []
with zipfile.ZipFile(io.BytesIO(content)) as zf:
xml_files = [
name for name in zf.namelist() if name.lower().endswith(".xml")
]
if not xml_files:
logger.warning("No XML files found in ZIP archive")
return []
logger.info("Found %d XML files in archive", len(xml_files))
for i, xml_name in enumerate(xml_files):
if progress_callback:
progress = 30 + (i * 60) // len(xml_files)
progress_callback(progress, f"Парсинг {xml_name}...")
xml_content = zf.read(xml_name)
file_inspections = self._parse_xml_content(xml_content, None)
inspections.extend(file_inspections)
return inspections
def _parse_xml_content( # noqa: C901
self,
content: bytes,
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Inspection]:
"""
Распарсить XML содержимое файла проверок.
Поддерживает различные XML форматы proverki.gov.ru, включая с namespaces.
Использует потоковый парсинг для больших файлов (>50 МБ).
"""
inspections = []
# Для больших файлов используем iterparse (потоковый парсинг)
if len(content) > 50 * 1024 * 1024: # > 50 MB
logger.info(
"Large file detected (%d MB), using streaming parser",
len(content) // (1024 * 1024),
)
return self._parse_xml_streaming(content, progress_callback)
try:
# Пробуем разные кодировки
for encoding in ["utf-8", "windows-1251", "cp1251"]:
try:
xml_str = content.decode(encoding)
break
except UnicodeDecodeError:
continue
else:
xml_str = content.decode("utf-8", errors="replace")
# Очистка невалидных XML символов (часто встречается в госданных)
xml_str = self._sanitize_xml(xml_str)
root = ET.fromstring(xml_str) # noqa: S314
# Определяем namespace из root tag
# Формат: {namespace}tagname
ns = {}
root_tag = root.tag
if root_tag.startswith("{"):
ns_uri = root_tag[1 : root_tag.index("}")]
ns["ns"] = ns_uri
logger.info("Detected XML namespace: %s", ns_uri)
# Ищем записи о проверках по разным возможным тегам
# Сначала с namespace, затем без
inspection_tags = [
".//ns:INSPECTION" if ns else None,
".//ns:inspection" if ns else None,
".//ns:check" if ns else None,
".//ns:КНМ" if ns else None,
".//inspection",
".//check",
".//proverka",
".//КНМ", # Контрольно-надзорное мероприятие
".//INSPECTION",
".//record",
".//item",
]
records = []
for tag in inspection_tags:
if tag is None:
continue
try:
if ns and tag.startswith(".//ns:"):
found = root.findall(tag, ns)
else:
found = root.findall(tag)
if found:
records = found
logger.info("Found %d records with tag %s", len(found), tag)
break
except Exception as e:
logger.debug("Tag %s search failed: %s", tag, e)
continue
# Если не нашли по тегам, берём все дочерние элементы корня
if not records:
records = list(root)
logger.debug("Using %d root children as records", len(records))
# Debug: показываем структуру XML
logger.info(
"XML structure: root=%s, children=%d, first_child=%s",
root.tag,
len(list(root)),
list(root)[0].tag if list(root) else "none",
)
# Если первый child - тоже контейнер, используем его детей
if records and len(records) == 1:
first_child = records[0]
child_tag = (
first_child.tag.split("}")[-1].upper()
if "}" in first_child.tag
else first_child.tag.upper()
)
if child_tag in ("INSPECTION", "CHECK", "КНМ", "RECORD"):
# Это и есть запись, используем её
pass
else:
# Это контейнер, берём его детей
records = list(first_child)
logger.info(
"Using %d children of first element as records", len(records)
)
for record in records:
inspection = self._parse_xml_record(record)
if inspection:
inspections.append(inspection)
except ET.ParseError as e:
logger.error("XML parse error: %s", e)
raise ProverkiClientError(f"Failed to parse XML: {e}") from e
return inspections
def _parse_xml_streaming(
self,
content: bytes,
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Inspection]:
"""
Потоковый парсинг большого XML файла.
Использует iterparse для обработки файла по элементам,
не загружая весь файл в память.
"""
inspections = []
# Декодируем и создаём поток
for encoding in ["utf-8", "windows-1251", "cp1251"]:
try:
xml_str = content.decode(encoding)
break
except UnicodeDecodeError:
continue
else:
xml_str = content.decode("utf-8", errors="replace")
xml_str = self._sanitize_xml(xml_str)
# Используем iterparse для потоковой обработки
import io
xml_stream = io.StringIO(xml_str)
# Определяем теги, которые нас интересуют
target_tags = {"INSPECTION", "inspection", "check", "КНМ"}
count = 0
try:
for _event, elem in ET.iterparse(xml_stream, events=["end"]): # noqa: S314
# Извлекаем имя тега без namespace
tag_name = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag
if tag_name in target_tags:
inspection = self._parse_xml_record(elem)
if inspection:
inspections.append(inspection)
count += 1
if count % 10000 == 0:
logger.info("Streaming parsed %d inspections...", count)
# Очищаем элемент для освобождения памяти
elem.clear()
except ET.ParseError as e:
logger.error("XML streaming parse error at %d records: %s", count, e)
if inspections:
logger.info(
"Returning %d successfully parsed records", len(inspections)
)
else:
raise ProverkiClientError(f"Failed to parse XML: {e}") from e
logger.info("Streaming parsing complete: %d inspections", len(inspections))
return inspections
def _sanitize_xml(self, xml_str: str) -> str:
"""
Очистить XML строку от невалидных символов.
Госданные часто содержат управляющие символы, которые не допускаются в XML:
- Символы 0x00-0x08, 0x0B, 0x0C, 0x0E-0x1F (кроме tab, newline, cr)
- Некорректные символы в значениях атрибутов
Args:
xml_str: Исходная XML строка
Returns:
Очищенная XML строка
"""
import re
# Удаляем недопустимые XML символы (control characters кроме tab, newline, cr)
# XML 1.0 допускает: #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
illegal_xml_chars_re = re.compile(
r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x84\x86-\x9f]"
)
xml_str = illegal_xml_chars_re.sub("", xml_str)
# Заменяем неэкранированные амперсанды (частая ошибка в госданных)
# Но не трогаем уже экранированные сущности
xml_str = re.sub(
r"&(?!(?:amp|lt|gt|apos|quot|#\d+|#x[0-9a-fA-F]+);)", "&amp;", xml_str
)
return xml_str
def _parse_xml_record(self, element: ET.Element) -> Inspection | None: # noqa: C901
"""
Преобразовать XML элемент в объект Inspection.
Адаптируется к различным форматам XML proverki.gov.ru:
- Данные могут быть в атрибутах элементов
- Данные могут быть во вложенных элементах (I_SUBJECT, I_AUTHORITY)
"""
try:
# Определяем namespace элемента
ns_uri = None
if element.tag.startswith("{"):
ns_uri = element.tag[1 : element.tag.index("}")]
def find_child(tag_name: str) -> ET.Element | None:
"""Найти дочерний элемент с учётом namespace."""
if ns_uri:
child = element.find(f"{{{ns_uri}}}{tag_name}")
if child is not None:
return child
return element.find(tag_name)
# Получаем вложенные элементы, где могут быть данные
i_subject = find_child("I_SUBJECT") # Данные об организации
i_authority = find_child("I_AUTHORITY") # Контролирующий орган
i_approve = find_child("I_APPROVE") # Информация об утверждении
i_classification = find_child("I_CLASSIFICATION") # Классификация
def get_attr_value(attr_names: list[str]) -> str: # noqa: C901
"""Найти значение атрибута в элементе или вложенных элементах."""
# Сначала ищем в атрибутах самого элемента INSPECTION
for name in attr_names:
if name in element.attrib:
return element.attrib[name].strip()
# Ищем в I_SUBJECT (ORG_NAME, INN, OGRN)
if i_subject is not None:
for name in attr_names:
if name in i_subject.attrib:
return i_subject.attrib[name].strip()
# Ищем в I_AUTHORITY (FRGU_ORG_NAME)
if i_authority is not None:
for name in attr_names:
if name in i_authority.attrib:
return i_authority.attrib[name].strip()
# Ищем в I_CLASSIFICATION
if i_classification is not None:
for name in attr_names:
if name in i_classification.attrib:
return i_classification.attrib[name].strip()
# Ищем в I_APPROVE
if i_approve is not None:
for name in attr_names:
if name in i_approve.attrib:
return i_approve.attrib[name].strip()
# Fallback: ищем в дочерних элементах (текст)
for name in attr_names:
if ns_uri:
child = element.find(f"{{{ns_uri}}}{name}")
else:
child = element.find(name)
if child is not None and child.text:
return child.text.strip()
return ""
# Маппинг атрибутов на поля Inspection
# Используем названия из реального XML proverki.gov.ru
registration_number = get_attr_value(
[
"ERPID",
"I_NUMBER",
"FRGU_NUM",
"registration_number",
"regnum",
"id",
"number",
]
)
inn = get_attr_value(["INN", "inn", "ORG_INN", "I_INN"])
ogrn = get_attr_value(["OGRN", "ogrn", "ORG_OGRN", "I_OGRN"])
organisation_name = get_attr_value(
[
"ORG_NAME",
"FULL_NAME",
"SHORT_NAME",
"I_NAME",
"organisation_name",
"org_name",
"name",
]
)
control_authority = get_attr_value(
[
"FRGU_ORG_NAME",
"PROSEC_NAME",
"KNO_NAME",
"ORGAN_NAME",
"control_authority",
"authority",
]
)
inspection_type = get_attr_value(
[
"ITYPE_NAME",
"TYPE_NAME",
"I_TYPE",
"inspection_type",
"type",
]
)
inspection_form = get_attr_value(
[
"ICARRYOUT_TYPE_NAME",
"FORM_NAME",
"I_FORM",
"inspection_form",
"form",
]
)
start_date = get_attr_value(
[
"START_DATE",
"I_DATE_START",
"DATE_START",
"start_date",
"date_start",
"date",
]
)
end_date = get_attr_value(
[
"END_DATE",
"I_DATE_END",
"DATE_END",
"end_date",
"date_end",
]
)
status = get_attr_value(
[
"STATUS",
"I_STATUS",
"status",
"state",
]
)
legal_basis = get_attr_value(
[
"FZ_NAME",
"IREASON_NAME",
"I_REASON",
"REASON",
"legal_basis",
"basis",
"law",
]
)
result = get_attr_value(
[
"RESULT",
"I_RESULT",
"result",
"outcome",
]
)
inspection = Inspection(
registration_number=registration_number,
inn=inn,
ogrn=ogrn,
organisation_name=organisation_name,
control_authority=control_authority,
inspection_type=inspection_type,
inspection_form=inspection_form,
start_date=start_date,
end_date=end_date,
status=status,
legal_basis=legal_basis,
result=result,
)
# Проверяем что хотя бы базовые поля заполнены
if not inspection.inn and not inspection.registration_number:
# Debug: выводим структуру элемента
logger.debug(
"Empty inspection from element %s, attribs: %s",
element.tag,
list(element.attrib.keys())[:5],
)
return None
return inspection
except Exception as e:
logger.warning("Failed to parse XML record: %s", e)
return None
def fetch_inspection_plans(self, year: int) -> list[InspectionPlan]:
"""
Получить список доступных планов проверок за год.
Args:
year: Год
Returns:
Список InspectionPlan с метаданными о файлах
"""
return self._discover_data_files(year=year)
def _get_browser(self):
"""Ленивая инициализация Playwright browser."""
if self._browser is None:
try:
from playwright.sync_api import sync_playwright
self._playwright = sync_playwright().start()
self._browser = self._playwright.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
],
)
logger.info("Playwright browser initialized")
except ImportError as e:
raise ProverkiClientError(
"Playwright не установлен. Установите: uv add playwright && "
"playwright install chromium"
) from e
except Exception as e:
raise ProverkiClientError(
f"Не удалось запустить Playwright browser: {e}"
) from e
return self._browser
def _download_with_playwright(
self,
url: str,
progress_callback: Callable[[int, str], None] | None = None,
) -> bytes:
"""
Скачать данные через Playwright (для сайтов с JavaScript).
Примечание: progress_callback не используется внутри этого метода,
так как Playwright работает в асинхронном контексте, который конфликтует
с Django ORM.
Args:
url: URL для загрузки
progress_callback: Игнорируется (для совместимости интерфейса)
Returns:
Содержимое файла в байтах
"""
logger.info("Using Playwright to fetch: %s", url)
browser = self._get_browser()
context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
accept_downloads=True,
)
page = context.new_page()
try:
# Сначала пробуем прямой переход на URL (с рендерингом JS)
logger.info("Trying direct URL with JS rendering: %s", url)
response = page.goto(url, wait_until="networkidle", timeout=60000)
# Проверяем, получили ли мы данные напрямую
content_type = response.headers.get("content-type", "") if response else ""
if (
"xml" in content_type
or "zip" in content_type
or "octet-stream" in content_type
):
# Получили файл напрямую
body = page.content()
if body and not body.strip().startswith("<!DOCTYPE"):
logger.info("Got direct file response")
return body.encode("utf-8")
# Если вернулась HTML страница, пробуем найти ссылку на скачивание
page_content = page.content()
# Проверяем, есть ли в странице сам XML
if "<?xml" in page_content:
# XML встроен в страницу
logger.info("Found XML embedded in page")
return page_content.encode("utf-8")
# Ищем ссылку на скачивание на текущей странице
download_links = page.query_selector_all(
"a[href*='.xml'], a[href*='.zip'], "
"a[href*='download'], button:has-text('Скачать')"
)
if download_links:
logger.info("Found %d download links on page", len(download_links))
with page.expect_download(timeout=60000) as download_info:
download_links[0].click()
download = download_info.value
download_path = download.path()
if download_path:
with open(download_path, "rb") as f:
content = f.read()
logger.info("Downloaded %d bytes via Playwright", len(content))
logger.debug("Content preview: %s", content[:200])
return content
# Последняя попытка - идём на портал открытых данных
logger.info("Navigating to portal as last resort: %s", OPEN_DATA_PORTAL_URL)
page.goto(OPEN_DATA_PORTAL_URL, wait_until="networkidle", timeout=60000)
page.wait_for_timeout(3000)
# Ищем ссылку на нужный dataset (план проверок)
download_links = page.query_selector_all(
"a[href*='plan'], a[href*='proverki'], a[href*='.xml'], a[href*='.zip']"
)
if not download_links:
download_links = page.query_selector_all("a[href*='download']")
logger.info("Found %d download links on portal", len(download_links))
if download_links:
with page.expect_download(timeout=60000) as download_info:
download_links[0].click()
download = download_info.value
download_path = download.path()
if download_path:
with open(download_path, "rb") as f:
content = f.read()
logger.info("Downloaded %d bytes via Playwright", len(content))
logger.debug("Content preview: %s", content[:200])
return content
raise ProverkiClientError(
"Не удалось найти ссылки на скачивание данных",
url=url,
)
finally:
context.close()
def _close_playwright(self) -> None:
"""Закрыть Playwright и освободить event loop."""
if self._browser is not None:
try:
self._browser.close()
except Exception as e:
logger.warning("Error closing browser: %s", e)
self._browser = None
if self._playwright is not None:
try:
self._playwright.stop()
except Exception as e:
logger.warning("Error stopping Playwright: %s", e)
self._playwright = None
def close(self) -> None:
"""Закрыть клиент и освободить ресурсы."""
if self._http_client is not None:
self._http_client.close()
self._http_client = None
self._close_playwright()
def __enter__(self) -> "ProverkiClient":
"""Поддержка context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Закрытие при выходе из context manager."""
self.close()