Files
mostovik-backend/tests/apps/parsers/test_proverki_client.py
Aleksandr Meshchriakov 0f17ff6773
All checks were successful
CI/CD Pipeline / Quality Gate (push) Successful in 26s
CI/CD Pipeline / Build and Push Images (push) Successful in 6s
CI/CD Pipeline / Internal Notify (push) Successful in 0s
CI/CD Pipeline / Deploy Dev in Dokploy (push) Successful in 1s
Add organizations v2 API and registry enrichment
2026-05-06 19:04:46 +02:00

1174 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Unit tests for ProverkiClient using local HTTP server (no mocks)."""
from __future__ import annotations
import asyncio
import os
import sys
import tempfile
import types
from asyncio import events as asyncio_events
from pathlib import Path
from xml.etree import ElementPath as element_path
from xml.etree import ElementTree as ET
from apps.parsers.clients.base import HTTPClientError
from apps.parsers.clients.proverki import ProverkiClient
from apps.parsers.clients.proverki.client import (
OPEN_DATA_PORTAL_URL,
ProverkiClientError,
)
from django.test import SimpleTestCase
from tests.utils import TestHTTPServer
from tests.utils.fixtures import build_zip, fake
_CYRILLIC_KNM = "\u041a\u041d\u041c"
_CYRILLIC_INN = "\u0418\u041d\u041d"
_CYRILLIC_OGRN = "\u041e\u0413\u0420\u041d"
def _digits(length: int) -> str:
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
def _attrs_string(attrs: dict[str, str]) -> str:
return " ".join(f'{key}="{value}"' for key, value in attrs.items())
def _inspection_attrs() -> dict[str, str]:
return {
"ERPID": _digits(12),
"INN": _digits(10),
"OGRN": _digits(13),
"ORG_NAME": fake.company(),
"FRGU_ORG_NAME": fake.company(),
"ITYPE_NAME": fake.word(),
"ICARRYOUT_TYPE_NAME": fake.word(),
"START_DATE": str(fake.date()),
"END_DATE": str(fake.date()),
"STATUS": fake.word(),
"FZ_NAME": fake.sentence(nb_words=3),
"RESULT": fake.sentence(nb_words=2),
}
def _xml_with_tag(tag: str, attrs: dict[str, str]) -> bytes:
body = f"<{tag} {_attrs_string(attrs)} />"
xml = "<?xml version='1.0' encoding='utf-8'?>" f"<ROOT>{body}</ROOT>"
return xml.encode("utf-8")
def _xml_with_namespace(tag: str, attrs: dict[str, str]) -> bytes:
ns = "http://example.com/ns"
body = f"<ns:{tag} {_attrs_string(attrs)} />"
xml = (
"<?xml version='1.0' encoding='utf-8'?>"
f"<ns:ROOT xmlns:ns='{ns}'>{body}</ns:ROOT>"
)
return xml.encode("utf-8")
def _xml_with_container(tag: str, attrs: dict[str, str]) -> bytes:
body = f"<{tag} {_attrs_string(attrs)} />"
xml = (
"<?xml version='1.0' encoding='utf-8'?>"
f"<ROOT><CONTAINER>{body}</CONTAINER></ROOT>"
)
return xml.encode("utf-8")
def _xml_with_children() -> bytes:
inn = _digits(10)
ogrn = _digits(13)
registration = _digits(12)
xml = (
"<?xml version='1.0' encoding='utf-8'?>"
"<ROOT>"
f'<INSPECTION ERPID="{registration}">'
f'<I_SUBJECT INN="{inn}" OGRN="{ogrn}" ORG_NAME="{fake.company()}" />'
f'<I_AUTHORITY FRGU_ORG_NAME="{fake.company()}" />'
"</INSPECTION>"
"</ROOT>"
)
return xml.encode("utf-8")
def _xml_with_cyrillic_tag() -> bytes:
attrs = {
_CYRILLIC_INN: _digits(10),
_CYRILLIC_OGRN: _digits(13),
"I_NUMBER": _digits(12),
}
body = f"<{_CYRILLIC_KNM} {_attrs_string(attrs)} />"
xml = "<?xml version='1.0' encoding='utf-8'?>" f"<ROOT>{body}</ROOT>"
return xml.encode("utf-8")
def _client_for(server: TestHTTPServer) -> ProverkiClient:
return ProverkiClient(
host="testserver",
scheme="http",
http_adapter=server.adapter,
use_playwright=False,
)
class ProverkiDiscoverFilesTest(SimpleTestCase):
def test_discover_data_files_month(self):
client = ProverkiClient()
plans = client._discover_data_files(year=2025, month=2, is_federal_law_248=True)
self.assertEqual(len(plans), 1)
self.assertEqual(plans[0].month, 2)
self.assertIn("fz248", plans[0].file_name)
def test_discover_data_files_year_only(self):
client = ProverkiClient()
plans = client._discover_data_files(
year=2024, month=None, is_federal_law_248=False
)
self.assertEqual(len(plans), 1)
self.assertIsNone(plans[0].month)
self.assertIn("fz294", plans[0].file_name)
def test_discover_data_files_without_year(self):
client = ProverkiClient()
self.assertEqual(client._discover_data_files(year=None), [])
class ProverkiDownloadParseTest(SimpleTestCase):
def test_download_and_parse_zip(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
archive = build_zip([("data.xml", xml)])
with TestHTTPServer() as server:
server.add_bytes(
"/opendata/data.zip", archive, content_type="application/zip"
)
client = _client_for(server)
inspections = client.fetch_inspections(
file_url=f"{server.base_url}/opendata/data.zip"
)
self.assertEqual(len(inspections), 1)
def test_download_and_parse_xml(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
with TestHTTPServer() as server:
server.add_bytes("/opendata/data.xml", xml, content_type="application/xml")
client = _client_for(server)
inspections = client.fetch_inspections(
file_url=f"{server.base_url}/opendata/data.xml"
)
self.assertEqual(len(inspections), 1)
def test_download_and_parse_portal_without_playwright(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
archive = build_zip([("data.xml", xml)])
with TestHTTPServer() as server:
server.add_bytes(
"/portal/public-open-data/check/2025/1",
archive,
content_type="application/zip",
)
client = _client_for(server)
inspections = client._download_and_parse(
f"{server.base_url}/portal/public-open-data/check/2025/1",
file_format="portal",
)
self.assertEqual(len(inspections), 1)
def test_download_and_parse_portal_without_playwright_with_progress(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
archive = build_zip([("data.xml", xml)])
progress: list[tuple[int, str]] = []
def on_progress(value: int, message: str) -> None:
progress.append((value, message))
with TestHTTPServer() as server:
server.add_bytes(
"/portal/public-open-data/check/2025/2",
archive,
content_type="application/zip",
)
client = _client_for(server)
inspections = client._download_and_parse(
f"{server.base_url}/portal/public-open-data/check/2025/2",
progress_callback=on_progress,
file_format="portal",
)
self.assertEqual(len(inspections), 1)
self.assertTrue(progress)
def test_download_and_parse_html_without_playwright_fails(self):
html = b"<html><body>blocked</body></html>"
with TestHTTPServer() as server:
server.add_bytes(
"/portal/public-open-data/check/2025/1",
html,
content_type="text/html",
)
client = _client_for(server)
with self.assertRaises(ProverkiClientError):
client._download_and_parse(
f"{server.base_url}/portal/public-open-data/check/2025/1",
file_format="portal",
)
def test_download_and_parse_html_without_playwright_non_portal(self):
html = b"<html><body>blocked</body></html>"
with TestHTTPServer() as server:
server.add_bytes("/opendata/data.html", html, content_type="text/html")
client = _client_for(server)
with self.assertRaises(ProverkiClientError):
client._download_and_parse(f"{server.base_url}/opendata/data.html")
def test_download_and_parse_unknown_format(self):
with TestHTTPServer() as server:
server.add_bytes("/opendata/data.bin", b"not-xml-or-zip")
client = _client_for(server)
with self.assertRaises(ProverkiClientError):
client.fetch_inspections(
file_url=f"{server.base_url}/opendata/data.bin"
)
def test_parse_zip_archive_without_xml_files(self):
archive = build_zip([("readme.txt", b"no xml here")])
client = ProverkiClient()
inspections = client._parse_zip_archive(archive)
self.assertEqual(inspections, [])
def test_fetch_inspections_with_progress_callback(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
archive = build_zip([("data.xml", xml)])
progress: list[tuple[int, str]] = []
def on_progress(value: int, message: str) -> None:
progress.append((value, message))
with TestHTTPServer() as server:
server.add_bytes(
"/opendata/data.zip", archive, content_type="application/zip"
)
client = _client_for(server)
inspections = client.fetch_inspections(
file_url=f"{server.base_url}/opendata/data.zip",
progress_callback=on_progress,
)
self.assertEqual(len(inspections), 1)
self.assertTrue(progress)
def test_fetch_inspections_http_error_bubbles(self):
with TestHTTPServer() as server:
server.add_bytes("/opendata/data.zip", b"", status=500)
client = _client_for(server)
with self.assertRaises(HTTPClientError):
client.fetch_inspections(
file_url=f"{server.base_url}/opendata/data.zip"
)
def test_fetch_inspection_plans(self):
client = ProverkiClient()
plans = client.fetch_inspection_plans(2025)
self.assertEqual(len(plans), 1)
self.assertIn("plan-2025", plans[0].file_name)
def test_fetch_inspections_wraps_generic_error(self):
class _FailClient(ProverkiClient):
def _download_and_parse(self, *args, **kwargs): # type: ignore[override]
raise ValueError("boom")
client = _FailClient()
with self.assertRaises(ProverkiClientError):
client.fetch_inspections(file_url="http://example.com/data.zip")
def test_download_and_parse_portal_with_playwright_branch(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
archive = build_zip([("data.xml", xml)])
progress = []
class _PortalClient(ProverkiClient):
def _download_from_portal(self, *args, **kwargs): # type: ignore[override]
return archive
def _close_playwright(self): # type: ignore[override]
return None
def on_progress(value: int, _message: str) -> None:
progress.append(value)
client = _PortalClient(use_playwright=True)
inspections = client._download_and_parse(
"http://portal.example.com",
progress_callback=on_progress,
file_format="portal",
)
self.assertEqual(len(inspections), 1)
self.assertTrue(progress)
def test_download_and_parse_portal_with_playwright_no_progress(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
archive = build_zip([("data.xml", xml)])
class _PortalClient(ProverkiClient):
def _download_from_portal(self, *args, **kwargs): # type: ignore[override]
return archive
def _close_playwright(self): # type: ignore[override]
return None
client = _PortalClient(use_playwright=True)
inspections = client._download_and_parse(
"http://portal.example.com", file_format="portal"
)
self.assertEqual(len(inspections), 1)
def test_download_from_portal_does_not_wait_for_networkidle(self):
archive = build_zip(
[("data.xml", _xml_with_tag("INSPECTION", _inspection_attrs()))]
)
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(archive)
temp_file.close()
goto_calls = []
class _Element:
def __init__(self, href: str | None = None):
self.href = href
def click(self):
return None
def get_attribute(self, name: str):
return self.href if name == "href" else None
class _Download:
def path(self):
return temp_file.name
class _DownloadContext:
value = _Download()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return None
class _Page:
def __init__(self):
self.query_count = 0
def goto(self, url, wait_until=None, timeout=None):
goto_calls.append(
{"url": url, "wait_until": wait_until, "timeout": timeout}
)
def wait_for_selector(self, *args, **kwargs):
return None
def wait_for_timeout(self, *args, **kwargs):
return None
def content(self):
return "<html><a href='/data.zip'>Набор данных</a></html>"
def title(self):
return "dataset"
def query_selector(self, selector):
self.query_count += 1
if self.query_count == 1:
return _Element()
return _Element("/data.zip")
def expect_download(self, timeout=None):
return _DownloadContext()
class _Context:
def new_page(self):
return _Page()
def close(self):
return None
class _Browser:
def new_context(self, **kwargs):
return _Context()
class _PortalClient(ProverkiClient):
def _get_browser(self): # type: ignore[override]
return _Browser()
try:
content = _PortalClient(use_playwright=True)._download_from_portal(
"https://proverki.gov.ru/portal/public-open-data/check/2026/5?isFederalLaw248=true"
)
finally:
os.unlink(temp_file.name)
self.assertEqual(content, archive)
self.assertEqual(goto_calls[0]["wait_until"], "domcontentloaded")
def test_download_and_parse_html_switches_to_playwright(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
class _HtmlClient(ProverkiClient):
def _download_with_playwright(self, *args, **kwargs): # type: ignore[override]
return xml
def _close_playwright(self): # type: ignore[override]
return None
with TestHTTPServer() as server:
server.add_bytes(
"/data.html", b"<html>blocked</html>", content_type="text/html"
)
client = _HtmlClient(
host="testserver",
scheme="http",
http_adapter=server.adapter,
use_playwright=True,
)
inspections = client._download_and_parse(f"{server.base_url}/data.html")
self.assertEqual(len(inspections), 1)
def test_download_and_parse_html_switches_to_playwright_with_progress(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
progress: list[tuple[int, str]] = []
class _HtmlClient(ProverkiClient):
def _download_with_playwright(self, *args, **kwargs): # type: ignore[override]
return xml
def _close_playwright(self): # type: ignore[override]
return None
def on_progress(value: int, message: str) -> None:
progress.append((value, message))
with TestHTTPServer() as server:
server.add_bytes(
"/data.html", b"<html>blocked</html>", content_type="text/html"
)
client = _HtmlClient(
host="testserver",
scheme="http",
http_adapter=server.adapter,
use_playwright=True,
)
inspections = client._download_and_parse(
f"{server.base_url}/data.html", progress_callback=on_progress
)
self.assertEqual(len(inspections), 1)
self.assertTrue(progress)
def test_fetch_inspections_with_plans_and_progress(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
archive = build_zip([("data.xml", xml)])
progress: list[int] = []
class _TestClient(ProverkiClient):
def _discover_data_files(self, **_kwargs): # type: ignore[override]
from apps.parsers.clients.proverki.schemas import InspectionPlan
return [
InspectionPlan(
year=2025,
month=1,
file_url=f"{server.base_url}/opendata/data.zip",
file_name="data.zip",
file_format="auto",
)
]
def on_progress(value: int, _message: str) -> None:
progress.append(value)
with TestHTTPServer() as server:
server.add_bytes(
"/opendata/data.zip", archive, content_type="application/zip"
)
client = _TestClient(
host="testserver",
scheme="http",
http_adapter=server.adapter,
use_playwright=False,
)
inspections = client.fetch_inspections(
year=2025, month=1, progress_callback=on_progress
)
self.assertEqual(len(inspections), 1)
self.assertTrue(progress)
class ProverkiParseXMLTest(SimpleTestCase):
def test_parse_xml_with_namespace(self):
xml = _xml_with_namespace("INSPECTION", _inspection_attrs())
client = ProverkiClient()
inspections = client._parse_xml_content(xml)
self.assertEqual(len(inspections), 1)
def test_parse_xml_with_container(self):
xml = _xml_with_container("inspection", _inspection_attrs())
client = ProverkiClient()
inspections = client._parse_xml_content(xml)
self.assertEqual(len(inspections), 1)
def test_parse_xml_with_children(self):
client = ProverkiClient()
inspections = client._parse_xml_content(_xml_with_children())
self.assertEqual(len(inspections), 1)
def test_parse_xml_with_cyrillic_tag(self):
client = ProverkiClient()
inspections = client._parse_xml_content(_xml_with_cyrillic_tag())
self.assertEqual(len(inspections), 1)
def test_parse_xml_streaming_threshold(self):
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
client = ProverkiClient()
client.STREAMING_THRESHOLD_BYTES = 1
inspections = client._parse_xml_content(xml)
self.assertEqual(len(inspections), 1)
def test_parse_xml_record_missing_fields_returns_none(self):
element = ET.fromstring("<INSPECTION />") # noqa: S314
client = ProverkiClient()
self.assertIsNone(client._parse_xml_record(element))
def test_parse_xml_record_partial_fields(self):
element = ET.fromstring(f"<INSPECTION INN='{_digits(10)}' />") # noqa: S314
client = ProverkiClient()
inspection = client._parse_xml_record(element)
self.assertIsNotNone(inspection)
self.assertEqual(inspection.inn, element.attrib["INN"])
def test_parse_xml_container_records(self):
xml = (
b"<?xml version='1.0' encoding='utf-8'?>"
b"<ROOT><CONTAINER><ITEM /></CONTAINER></ROOT>"
)
client = ProverkiClient()
inspections = client._parse_xml_content(xml)
self.assertEqual(inspections, [])
def test_parse_xml_content_decode_fallback(self):
xml_str = "<?xml version='1.0' encoding='utf-8'?><ROOT></ROOT>"
content = _BadBytes(b"\xff\xfe", xml_str=xml_str)
client = ProverkiClient()
inspections = client._parse_xml_content(content)
self.assertEqual(inspections, [])
def test_parse_xml_streaming_decode_fallback(self):
xml_str = (
"<?xml version='1.0' encoding='utf-8'?>"
f'<ROOT><INSPECTION INN="{_digits(10)}" /></ROOT>'
)
content = _BadBytes(b"\xff\xfe", xml_str=xml_str)
client = ProverkiClient()
inspections = client._parse_xml_streaming(content)
self.assertEqual(len(inspections), 1)
def test_parse_xml_streaming_parse_error_returns_partial(self):
inn = _digits(10)
xml = (
"<?xml version='1.0' encoding='utf-8'?>"
f'<ROOT><INSPECTION INN="{inn}" /><INSPECTION'
).encode()
client = ProverkiClient()
inspections = client._parse_xml_streaming(xml)
self.assertEqual(len(inspections), 1)
def test_parse_xml_streaming_parse_error_raises_when_empty(self):
xml = b"<?xml version='1.0' encoding='utf-8'?><ROOT><INSPECTION"
client = ProverkiClient()
with self.assertRaises(ProverkiClientError):
client._parse_xml_streaming(xml)
def test_parse_xml_streaming_logs_every_10000(self):
inn = _digits(10)
record = f'<INSPECTION INN="{inn}" />'
xml = (
"<?xml version='1.0' encoding='utf-8'?>" f"<ROOT>{record * 10000}</ROOT>"
).encode()
client = ProverkiClient()
inspections = client._parse_xml_streaming(xml)
self.assertEqual(len(inspections), 10000)
def test_parse_xml_streaming_skips_invalid_record(self):
xml = b"<?xml version='1.0' encoding='utf-8'?>" b"<ROOT><INSPECTION /></ROOT>"
client = ProverkiClient()
inspections = client._parse_xml_streaming(xml)
self.assertEqual(inspections, [])
def test_parse_xml_tag_search_handles_error(self):
xml = _xml_with_tag("inspection", _inspection_attrs())
client = ProverkiClient()
original_findall = element_path.findall
def _raising_findall(elem, path, namespaces=None):
if path == ".//inspection":
raise SyntaxError("boom")
return original_findall(elem, path, namespaces)
element_path.findall = _raising_findall
try:
inspections = client._parse_xml_content(xml)
finally:
element_path.findall = original_findall
self.assertEqual(len(inspections), 1)
def test_parse_xml_record_namespace_nested_fields(self):
ns = "http://example.com/ns"
inn = _digits(10)
ogrn = _digits(13)
inspection_type = fake.word()
status = fake.word()
xml = (
"<?xml version='1.0' encoding='utf-8'?>"
f"<ns:ROOT xmlns:ns='{ns}'>"
f'<ns:INSPECTION ERPID="{_digits(12)}">'
f'<ns:I_SUBJECT INN="{inn}" OGRN="{ogrn}" ORG_NAME="{fake.company()}" />'
f'<ns:I_CLASSIFICATION ITYPE_NAME="{inspection_type}" />'
f'<ns:I_APPROVE STATUS="{status}" />'
"</ns:INSPECTION>"
"</ns:ROOT>"
).encode()
client = ProverkiClient()
inspections = client._parse_xml_content(xml)
self.assertEqual(len(inspections), 1)
self.assertEqual(inspections[0].inspection_type, inspection_type)
self.assertEqual(inspections[0].status, status)
def test_parse_xml_record_namespace_text_child_fallback(self):
ns = "http://example.com/ns"
inn = _digits(10)
xml = (
"<?xml version='1.0' encoding='utf-8'?>"
f"<ns:INSPECTION xmlns:ns='{ns}'><ns:INN>{inn}</ns:INN></ns:INSPECTION>"
)
element = ET.fromstring(xml) # noqa: S314
client = ProverkiClient()
inspection = client._parse_xml_record(element)
self.assertIsNotNone(inspection)
self.assertEqual(inspection.inn, inn)
def test_parse_xml_record_bad_element_returns_none(self):
element = ET.Element("{")
client = ProverkiClient()
self.assertIsNone(client._parse_xml_record(element))
class _BadBytes(bytes):
def __new__(cls, data: bytes, *, xml_str: str):
obj = super().__new__(cls, data)
obj._xml_str = xml_str
return obj
def decode(self, encoding="utf-8", errors="strict"):
if errors == "replace":
return self._xml_str
raise UnicodeDecodeError(encoding, b"", 0, 1, "bad bytes")
class _FakeResponse:
def __init__(self, headers: dict[str, str] | None = None):
self.headers = headers or {}
class _FakeDownload:
def __init__(self, path: Path | None):
self._path = path
def path(self):
if self._path is None:
return None
return str(self._path)
class _FakeDownloadContext:
def __init__(self, path: Path | None):
self.value = _FakeDownload(path)
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class _FakeLink:
def __init__(self, href: str | None = None):
self._href = href
def get_attribute(self, name: str):
if name == "href":
return self._href
return None
def click(self):
return None
class _FakePage:
def __init__(
self,
*,
content_type: str,
content: str,
download_path: Path | None,
download_links: list[_FakeLink] | None = None,
portal_links: list[_FakeLink] | None = None,
zip_link: _FakeLink | None = None,
xml_link: _FakeLink | None = None,
download_tab: _FakeLink | None = None,
raise_on_wait: bool = False,
):
self._content_type = content_type
self._content = content
self._download_path = download_path
self._download_links = download_links or []
self._portal_links = portal_links or []
self._zip_link = zip_link
self._xml_link = xml_link
self._download_tab = download_tab
self._last_url = ""
self._raise_on_wait = raise_on_wait
def goto(self, url, wait_until=None, timeout=None):
self._last_url = url
return _FakeResponse({"content-type": self._content_type})
def content(self):
return self._content
def title(self):
return "Page"
def wait_for_selector(self, *args, **kwargs):
if self._raise_on_wait:
raise RuntimeError("timeout")
return None
def wait_for_timeout(self, *args, **kwargs):
return None
def query_selector(self, selector: str):
if "Скачать" in selector and self._download_tab:
return self._download_tab
if ".zip" in selector and self._zip_link:
return self._zip_link
if ".xml" in selector and self._xml_link:
return self._xml_link
return None
def query_selector_all(self, selector: str):
if self._last_url == OPEN_DATA_PORTAL_URL:
return self._portal_links
return self._download_links
def expect_download(self, timeout=None):
return _FakeDownloadContext(self._download_path)
class _FakeContext:
def __init__(self, page: _FakePage):
self._page = page
self.closed = False
def new_page(self):
return self._page
def close(self):
self.closed = True
class _FakeBrowser:
def __init__(self, page: _FakePage):
self._page = page
self.closed = False
self.context_kwargs = []
def new_context(self, **kwargs):
self.context_kwargs.append(kwargs)
return _FakeContext(self._page)
def close(self):
self.closed = True
def _temp_file(content: bytes) -> Path:
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.write(content)
tmp.flush()
tmp.close()
return Path(tmp.name)
class ProverkiPlaywrightStubTest(SimpleTestCase):
databases = "__all__"
def tearDown(self):
super().tearDown()
try:
asyncio.get_running_loop()
except RuntimeError:
return
asyncio_events._set_running_loop(None)
def test_download_with_playwright_direct_response(self):
download_path = _temp_file(b"<xml></xml>")
page = _FakePage(
content_type="application/xml",
content="<xml></xml>",
download_path=download_path,
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
result = client._download_with_playwright("http://example.com")
self.assertIn(b"<xml", result)
def test_download_with_playwright_uses_configured_proxy(self):
download_path = _temp_file(b"<xml></xml>")
page = _FakePage(
content_type="application/xml",
content="<xml></xml>",
download_path=download_path,
)
browser = _FakeBrowser(page)
client = ProverkiClient(proxies=["http://192.168.1.108:3130"])
client._browser = browser
result = client._download_with_playwright("http://example.com")
self.assertIn(b"<xml", result)
self.assertEqual(
browser.context_kwargs[0]["proxy"],
{"server": "http://192.168.1.108:3130"},
)
def test_download_with_playwright_download_link(self):
download_path = _temp_file(b"zip-data")
page = _FakePage(
content_type="text/html",
content="<html></html>",
download_path=download_path,
download_links=[_FakeLink(href="file.zip")],
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
result = client._download_with_playwright("http://example.com")
self.assertEqual(result, b"zip-data")
def test_download_with_playwright_portal_fallback(self):
download_path = _temp_file(b"portal-data")
page = _FakePage(
content_type="text/html",
content="<html></html>",
download_path=download_path,
download_links=[],
portal_links=[_FakeLink(href="portal.zip")],
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
result = client._download_with_playwright("http://example.com")
self.assertEqual(result, b"portal-data")
def test_download_with_playwright_embedded_xml_after_doctype(self):
page = _FakePage(
content_type="application/xml",
content="<!DOCTYPE html><?xml version='1.0'?><ROOT/>",
download_path=None,
download_links=[],
portal_links=[],
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
result = client._download_with_playwright("http://example.com")
self.assertIn(b"<?xml", result)
def test_download_with_playwright_download_link_without_path_raises(self):
page = _FakePage(
content_type="text/html",
content="<html></html>",
download_path=None,
download_links=[_FakeLink(href="file.zip")],
portal_links=[],
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
with self.assertRaises(ProverkiClientError):
client._download_with_playwright("http://example.com")
def test_download_with_playwright_portal_link_without_path_raises(self):
page = _FakePage(
content_type="text/html",
content="<html></html>",
download_path=None,
download_links=[],
portal_links=[_FakeLink(href="portal.zip")],
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
with self.assertRaises(ProverkiClientError):
client._download_with_playwright("http://example.com")
def test_download_from_portal_zip_link(self):
download_path = _temp_file(b"zip-content")
page = _FakePage(
content_type="text/html",
content="content",
download_path=download_path,
zip_link=_FakeLink(href="file.zip"),
download_tab=_FakeLink(),
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
result = client._download_from_portal("http://portal.example.com")
self.assertEqual(result, b"zip-content")
def test_download_from_portal_downloads_zip_href_without_browser_download(self):
archive = build_zip(
[("data.xml", _xml_with_tag("INSPECTION", _inspection_attrs()))]
)
class _NoBrowserDownloadPage(_FakePage):
def expect_download(self, timeout=None):
raise AssertionError("browser download should not be used for zip href")
page = _NoBrowserDownloadPage(
content_type="text/html",
content="content",
download_path=None,
zip_link=_FakeLink(href="/blob/opendata/data.zip"),
download_tab=_FakeLink(),
)
with TestHTTPServer() as server:
server.add_bytes(
"/blob/opendata/data.zip",
archive,
content_type="application/zip",
)
client = ProverkiClient(
host="testserver",
scheme="http",
http_adapter=server.adapter,
use_playwright=True,
)
client._browser = _FakeBrowser(page)
result = client._download_from_portal(
f"{server.base_url}/portal/public-open-data/check/2026/5"
)
self.assertEqual(result, archive)
def test_download_portal_href_sends_download_headers(self):
class _RecordingHTTPClient:
def __init__(self):
self.endpoint = None
self.headers = None
def download_file(self, endpoint, headers=None):
self.endpoint = endpoint
self.headers = headers
return b"zip-content"
http_client = _RecordingHTTPClient()
client = ProverkiClient()
client._http_client = http_client
portal_url = "https://proverki.gov.ru/portal/public-open-data/check/2026/5"
result = client._download_portal_href(portal_url, "/blob/opendata/data.zip")
self.assertEqual(result, b"zip-content")
self.assertEqual(
http_client.endpoint,
"https://proverki.gov.ru/blob/opendata/data.zip",
)
self.assertEqual(http_client.headers["Referer"], portal_url)
self.assertIn("application/zip", http_client.headers["Accept"])
def test_download_from_portal_zip_link_without_download_path(self):
page = _FakePage(
content_type="text/html",
content="no data available",
download_path=None,
zip_link=_FakeLink(href="file.zip"),
xml_link=None,
download_tab=_FakeLink(),
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
with self.assertRaises(ProverkiClientError):
client._download_from_portal("http://portal.example.com")
def test_download_from_portal_xml_link(self):
download_path = _temp_file(b"<xml/>")
page = _FakePage(
content_type="text/html",
content="content",
download_path=download_path,
zip_link=None,
xml_link=_FakeLink(href="file.xml"),
download_tab=None,
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
result = client._download_from_portal("http://portal.example.com")
self.assertEqual(result, b"<xml/>")
def test_download_from_portal_xml_link_without_download_path(self):
page = _FakePage(
content_type="text/html",
content="no data",
download_path=None,
zip_link=None,
xml_link=_FakeLink(href="file.xml"),
download_tab=None,
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
with self.assertRaises(ProverkiClientError):
client._download_from_portal("http://portal.example.com")
def test_download_from_portal_no_links_not_found(self):
download_path = _temp_file(b"")
page = _FakePage(
content_type="text/html",
content="Not found",
download_path=download_path,
zip_link=None,
xml_link=None,
download_tab=None,
raise_on_wait=True,
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
with self.assertRaises(ProverkiClientError):
client._download_from_portal("http://portal.example.com")
def test_download_from_portal_no_links_generic_error(self):
page = _FakePage(
content_type="text/html",
content="no links here",
download_path=None,
zip_link=None,
xml_link=None,
download_tab=None,
)
client = ProverkiClient()
client._browser = _FakeBrowser(page)
with self.assertRaises(ProverkiClientError):
client._download_from_portal("http://portal.example.com")
def test_close_playwright_handles_errors(self):
class _BrokenBrowser:
def close(self):
raise RuntimeError("boom")
class _BrokenPlaywright:
def stop(self):
raise RuntimeError("boom")
client = ProverkiClient()
client._browser = _BrokenBrowser()
client._playwright = _BrokenPlaywright()
client._close_playwright()
self.assertIsNone(client._browser)
self.assertIsNone(client._playwright)
def test_get_browser_import_error(self):
client = ProverkiClient()
original_playwright = sys.modules.get("playwright")
original_sync_api = sys.modules.get("playwright.sync_api")
fake_playwright = types.ModuleType("playwright")
fake_playwright.__path__ = []
fake_sync_api = types.ModuleType("playwright.sync_api")
sys.modules["playwright"] = fake_playwright
sys.modules["playwright.sync_api"] = fake_sync_api
try:
with self.assertRaises(ProverkiClientError):
client._get_browser()
finally:
if original_playwright is None:
sys.modules.pop("playwright", None)
else:
sys.modules["playwright"] = original_playwright
if original_sync_api is None:
sys.modules.pop("playwright.sync_api", None)
else:
sys.modules["playwright.sync_api"] = original_sync_api
def test_get_browser_success(self):
class _FakeChromium:
def launch(self, **_kwargs):
return object()
class _FakePlaywright:
chromium = _FakeChromium()
class _FakeSyncPlaywright:
def start(self):
return _FakePlaywright()
fake_module = types.SimpleNamespace(
sync_playwright=lambda: _FakeSyncPlaywright()
)
client = ProverkiClient()
original_module = sys.modules.get("playwright.sync_api")
sys.modules["playwright.sync_api"] = fake_module
try:
browser = client._get_browser()
finally:
if original_module is None:
sys.modules.pop("playwright.sync_api", None)
else:
sys.modules["playwright.sync_api"] = original_module
self.assertIsNotNone(browser)
def test_get_browser_start_error(self):
class _BrokenPlaywright:
def start(self):
raise RuntimeError("startup failed")
fake_module = types.SimpleNamespace(sync_playwright=lambda: _BrokenPlaywright())
client = ProverkiClient()
original_module = sys.modules.get("playwright.sync_api")
sys.modules["playwright.sync_api"] = fake_module
try:
with self.assertRaises(ProverkiClientError):
client._get_browser()
finally:
if original_module is None:
sys.modules.pop("playwright.sync_api", None)
else:
sys.modules["playwright.sync_api"] = original_module