- перенесены тесты parsers из src/apps/parsers/tests в tests/apps/parsers - обновлены тесты задач под текущее поведение Celery (ошибки пробрасываются исключениями) - убрана зависимость тестов от внешнего брокера через локальные eager-вызовы - добавлены/уточнены фабрики и импорты для единой структуры тестов - обновлены README и CHANGELOG с новым правилом размещения тестов и запуском
1003 lines
34 KiB
Python
1003 lines
34 KiB
Python
"""Unit tests for ProverkiClient using local HTTP server (no mocks)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import sys
|
|
import tempfile
|
|
import types
|
|
from asyncio import events as asyncio_events
|
|
from pathlib import Path
|
|
from xml.etree import ElementPath as element_path
|
|
from xml.etree import ElementTree as ET
|
|
|
|
from apps.parsers.clients.base import HTTPClientError
|
|
from apps.parsers.clients.proverki import ProverkiClient
|
|
from apps.parsers.clients.proverki.client import (
|
|
OPEN_DATA_PORTAL_URL,
|
|
ProverkiClientError,
|
|
)
|
|
from django.test import SimpleTestCase
|
|
|
|
from tests.utils import TestHTTPServer
|
|
from tests.utils.fixtures import build_zip, fake
|
|
|
|
_CYRILLIC_KNM = "\u041a\u041d\u041c"
|
|
_CYRILLIC_INN = "\u0418\u041d\u041d"
|
|
_CYRILLIC_OGRN = "\u041e\u0413\u0420\u041d"
|
|
|
|
|
|
def _digits(length: int) -> str:
|
|
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
|
|
|
|
|
|
def _attrs_string(attrs: dict[str, str]) -> str:
|
|
return " ".join(f'{key}="{value}"' for key, value in attrs.items())
|
|
|
|
|
|
def _inspection_attrs() -> dict[str, str]:
|
|
return {
|
|
"ERPID": _digits(12),
|
|
"INN": _digits(10),
|
|
"OGRN": _digits(13),
|
|
"ORG_NAME": fake.company(),
|
|
"FRGU_ORG_NAME": fake.company(),
|
|
"ITYPE_NAME": fake.word(),
|
|
"ICARRYOUT_TYPE_NAME": fake.word(),
|
|
"START_DATE": str(fake.date()),
|
|
"END_DATE": str(fake.date()),
|
|
"STATUS": fake.word(),
|
|
"FZ_NAME": fake.sentence(nb_words=3),
|
|
"RESULT": fake.sentence(nb_words=2),
|
|
}
|
|
|
|
|
|
def _xml_with_tag(tag: str, attrs: dict[str, str]) -> bytes:
|
|
body = f"<{tag} {_attrs_string(attrs)} />"
|
|
xml = "<?xml version='1.0' encoding='utf-8'?>" f"<ROOT>{body}</ROOT>"
|
|
return xml.encode("utf-8")
|
|
|
|
|
|
def _xml_with_namespace(tag: str, attrs: dict[str, str]) -> bytes:
|
|
ns = "http://example.com/ns"
|
|
body = f"<ns:{tag} {_attrs_string(attrs)} />"
|
|
xml = (
|
|
"<?xml version='1.0' encoding='utf-8'?>"
|
|
f"<ns:ROOT xmlns:ns='{ns}'>{body}</ns:ROOT>"
|
|
)
|
|
return xml.encode("utf-8")
|
|
|
|
|
|
def _xml_with_container(tag: str, attrs: dict[str, str]) -> bytes:
|
|
body = f"<{tag} {_attrs_string(attrs)} />"
|
|
xml = (
|
|
"<?xml version='1.0' encoding='utf-8'?>"
|
|
f"<ROOT><CONTAINER>{body}</CONTAINER></ROOT>"
|
|
)
|
|
return xml.encode("utf-8")
|
|
|
|
|
|
def _xml_with_children() -> bytes:
|
|
inn = _digits(10)
|
|
ogrn = _digits(13)
|
|
registration = _digits(12)
|
|
xml = (
|
|
"<?xml version='1.0' encoding='utf-8'?>"
|
|
"<ROOT>"
|
|
f'<INSPECTION ERPID="{registration}">'
|
|
f'<I_SUBJECT INN="{inn}" OGRN="{ogrn}" ORG_NAME="{fake.company()}" />'
|
|
f'<I_AUTHORITY FRGU_ORG_NAME="{fake.company()}" />'
|
|
"</INSPECTION>"
|
|
"</ROOT>"
|
|
)
|
|
return xml.encode("utf-8")
|
|
|
|
|
|
def _xml_with_cyrillic_tag() -> bytes:
|
|
attrs = {
|
|
_CYRILLIC_INN: _digits(10),
|
|
_CYRILLIC_OGRN: _digits(13),
|
|
"I_NUMBER": _digits(12),
|
|
}
|
|
body = f"<{_CYRILLIC_KNM} {_attrs_string(attrs)} />"
|
|
xml = "<?xml version='1.0' encoding='utf-8'?>" f"<ROOT>{body}</ROOT>"
|
|
return xml.encode("utf-8")
|
|
|
|
|
|
def _client_for(server: TestHTTPServer) -> ProverkiClient:
|
|
return ProverkiClient(
|
|
host="testserver",
|
|
scheme="http",
|
|
http_adapter=server.adapter,
|
|
use_playwright=False,
|
|
)
|
|
|
|
|
|
class ProverkiDiscoverFilesTest(SimpleTestCase):
|
|
def test_discover_data_files_month(self):
|
|
client = ProverkiClient()
|
|
plans = client._discover_data_files(year=2025, month=2, is_federal_law_248=True)
|
|
self.assertEqual(len(plans), 1)
|
|
self.assertEqual(plans[0].month, 2)
|
|
self.assertIn("fz248", plans[0].file_name)
|
|
|
|
def test_discover_data_files_year_only(self):
|
|
client = ProverkiClient()
|
|
plans = client._discover_data_files(
|
|
year=2024, month=None, is_federal_law_248=False
|
|
)
|
|
self.assertEqual(len(plans), 1)
|
|
self.assertIsNone(plans[0].month)
|
|
self.assertIn("fz294", plans[0].file_name)
|
|
|
|
def test_discover_data_files_without_year(self):
|
|
client = ProverkiClient()
|
|
self.assertEqual(client._discover_data_files(year=None), [])
|
|
|
|
|
|
class ProverkiDownloadParseTest(SimpleTestCase):
|
|
def test_download_and_parse_zip(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
archive = build_zip([("data.xml", xml)])
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes(
|
|
"/opendata/data.zip", archive, content_type="application/zip"
|
|
)
|
|
client = _client_for(server)
|
|
inspections = client.fetch_inspections(
|
|
file_url=f"{server.base_url}/opendata/data.zip"
|
|
)
|
|
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_download_and_parse_xml(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes("/opendata/data.xml", xml, content_type="application/xml")
|
|
client = _client_for(server)
|
|
inspections = client.fetch_inspections(
|
|
file_url=f"{server.base_url}/opendata/data.xml"
|
|
)
|
|
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_download_and_parse_portal_without_playwright(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
archive = build_zip([("data.xml", xml)])
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes(
|
|
"/portal/public-open-data/check/2025/1",
|
|
archive,
|
|
content_type="application/zip",
|
|
)
|
|
client = _client_for(server)
|
|
inspections = client._download_and_parse(
|
|
f"{server.base_url}/portal/public-open-data/check/2025/1",
|
|
file_format="portal",
|
|
)
|
|
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_download_and_parse_portal_without_playwright_with_progress(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
archive = build_zip([("data.xml", xml)])
|
|
progress: list[tuple[int, str]] = []
|
|
|
|
def on_progress(value: int, message: str) -> None:
|
|
progress.append((value, message))
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes(
|
|
"/portal/public-open-data/check/2025/2",
|
|
archive,
|
|
content_type="application/zip",
|
|
)
|
|
client = _client_for(server)
|
|
inspections = client._download_and_parse(
|
|
f"{server.base_url}/portal/public-open-data/check/2025/2",
|
|
progress_callback=on_progress,
|
|
file_format="portal",
|
|
)
|
|
|
|
self.assertEqual(len(inspections), 1)
|
|
self.assertTrue(progress)
|
|
|
|
def test_download_and_parse_html_without_playwright_fails(self):
|
|
html = b"<html><body>blocked</body></html>"
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes(
|
|
"/portal/public-open-data/check/2025/1",
|
|
html,
|
|
content_type="text/html",
|
|
)
|
|
client = _client_for(server)
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._download_and_parse(
|
|
f"{server.base_url}/portal/public-open-data/check/2025/1",
|
|
file_format="portal",
|
|
)
|
|
|
|
def test_download_and_parse_html_without_playwright_non_portal(self):
|
|
html = b"<html><body>blocked</body></html>"
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes("/opendata/data.html", html, content_type="text/html")
|
|
client = _client_for(server)
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._download_and_parse(f"{server.base_url}/opendata/data.html")
|
|
|
|
def test_download_and_parse_unknown_format(self):
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes("/opendata/data.bin", b"not-xml-or-zip")
|
|
client = _client_for(server)
|
|
with self.assertRaises(ProverkiClientError):
|
|
client.fetch_inspections(
|
|
file_url=f"{server.base_url}/opendata/data.bin"
|
|
)
|
|
|
|
def test_parse_zip_archive_without_xml_files(self):
|
|
archive = build_zip([("readme.txt", b"no xml here")])
|
|
client = ProverkiClient()
|
|
inspections = client._parse_zip_archive(archive)
|
|
self.assertEqual(inspections, [])
|
|
|
|
def test_fetch_inspections_with_progress_callback(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
archive = build_zip([("data.xml", xml)])
|
|
progress: list[tuple[int, str]] = []
|
|
|
|
def on_progress(value: int, message: str) -> None:
|
|
progress.append((value, message))
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes(
|
|
"/opendata/data.zip", archive, content_type="application/zip"
|
|
)
|
|
client = _client_for(server)
|
|
inspections = client.fetch_inspections(
|
|
file_url=f"{server.base_url}/opendata/data.zip",
|
|
progress_callback=on_progress,
|
|
)
|
|
|
|
self.assertEqual(len(inspections), 1)
|
|
self.assertTrue(progress)
|
|
|
|
def test_fetch_inspections_http_error_bubbles(self):
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes("/opendata/data.zip", b"", status=500)
|
|
client = _client_for(server)
|
|
with self.assertRaises(HTTPClientError):
|
|
client.fetch_inspections(
|
|
file_url=f"{server.base_url}/opendata/data.zip"
|
|
)
|
|
|
|
def test_fetch_inspection_plans(self):
|
|
client = ProverkiClient()
|
|
plans = client.fetch_inspection_plans(2025)
|
|
self.assertEqual(len(plans), 1)
|
|
self.assertIn("plan-2025", plans[0].file_name)
|
|
|
|
def test_fetch_inspections_wraps_generic_error(self):
|
|
class _FailClient(ProverkiClient):
|
|
def _download_and_parse(self, *args, **kwargs): # type: ignore[override]
|
|
raise ValueError("boom")
|
|
|
|
client = _FailClient()
|
|
with self.assertRaises(ProverkiClientError):
|
|
client.fetch_inspections(file_url="http://example.com/data.zip")
|
|
|
|
def test_download_and_parse_portal_with_playwright_branch(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
archive = build_zip([("data.xml", xml)])
|
|
progress = []
|
|
|
|
class _PortalClient(ProverkiClient):
|
|
def _download_from_portal(self, *args, **kwargs): # type: ignore[override]
|
|
return archive
|
|
|
|
def _close_playwright(self): # type: ignore[override]
|
|
return None
|
|
|
|
def on_progress(value: int, _message: str) -> None:
|
|
progress.append(value)
|
|
|
|
client = _PortalClient(use_playwright=True)
|
|
inspections = client._download_and_parse(
|
|
"http://portal.example.com",
|
|
progress_callback=on_progress,
|
|
file_format="portal",
|
|
)
|
|
self.assertEqual(len(inspections), 1)
|
|
self.assertTrue(progress)
|
|
|
|
def test_download_and_parse_portal_with_playwright_no_progress(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
archive = build_zip([("data.xml", xml)])
|
|
|
|
class _PortalClient(ProverkiClient):
|
|
def _download_from_portal(self, *args, **kwargs): # type: ignore[override]
|
|
return archive
|
|
|
|
def _close_playwright(self): # type: ignore[override]
|
|
return None
|
|
|
|
client = _PortalClient(use_playwright=True)
|
|
inspections = client._download_and_parse(
|
|
"http://portal.example.com", file_format="portal"
|
|
)
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_download_and_parse_html_switches_to_playwright(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
|
|
class _HtmlClient(ProverkiClient):
|
|
def _download_with_playwright(self, *args, **kwargs): # type: ignore[override]
|
|
return xml
|
|
|
|
def _close_playwright(self): # type: ignore[override]
|
|
return None
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes(
|
|
"/data.html", b"<html>blocked</html>", content_type="text/html"
|
|
)
|
|
client = _HtmlClient(
|
|
host="testserver",
|
|
scheme="http",
|
|
http_adapter=server.adapter,
|
|
use_playwright=True,
|
|
)
|
|
inspections = client._download_and_parse(f"{server.base_url}/data.html")
|
|
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_download_and_parse_html_switches_to_playwright_with_progress(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
progress: list[tuple[int, str]] = []
|
|
|
|
class _HtmlClient(ProverkiClient):
|
|
def _download_with_playwright(self, *args, **kwargs): # type: ignore[override]
|
|
return xml
|
|
|
|
def _close_playwright(self): # type: ignore[override]
|
|
return None
|
|
|
|
def on_progress(value: int, message: str) -> None:
|
|
progress.append((value, message))
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes(
|
|
"/data.html", b"<html>blocked</html>", content_type="text/html"
|
|
)
|
|
client = _HtmlClient(
|
|
host="testserver",
|
|
scheme="http",
|
|
http_adapter=server.adapter,
|
|
use_playwright=True,
|
|
)
|
|
inspections = client._download_and_parse(
|
|
f"{server.base_url}/data.html", progress_callback=on_progress
|
|
)
|
|
|
|
self.assertEqual(len(inspections), 1)
|
|
self.assertTrue(progress)
|
|
|
|
def test_fetch_inspections_with_plans_and_progress(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
archive = build_zip([("data.xml", xml)])
|
|
progress: list[int] = []
|
|
|
|
class _TestClient(ProverkiClient):
|
|
def _discover_data_files(self, **_kwargs): # type: ignore[override]
|
|
from apps.parsers.clients.proverki.schemas import InspectionPlan
|
|
|
|
return [
|
|
InspectionPlan(
|
|
year=2025,
|
|
month=1,
|
|
file_url=f"{server.base_url}/opendata/data.zip",
|
|
file_name="data.zip",
|
|
file_format="auto",
|
|
)
|
|
]
|
|
|
|
def on_progress(value: int, _message: str) -> None:
|
|
progress.append(value)
|
|
|
|
with TestHTTPServer() as server:
|
|
server.add_bytes(
|
|
"/opendata/data.zip", archive, content_type="application/zip"
|
|
)
|
|
client = _TestClient(
|
|
host="testserver",
|
|
scheme="http",
|
|
http_adapter=server.adapter,
|
|
use_playwright=False,
|
|
)
|
|
inspections = client.fetch_inspections(
|
|
year=2025, month=1, progress_callback=on_progress
|
|
)
|
|
|
|
self.assertEqual(len(inspections), 1)
|
|
self.assertTrue(progress)
|
|
|
|
|
|
class ProverkiParseXMLTest(SimpleTestCase):
|
|
def test_parse_xml_with_namespace(self):
|
|
xml = _xml_with_namespace("INSPECTION", _inspection_attrs())
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_content(xml)
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_parse_xml_with_container(self):
|
|
xml = _xml_with_container("inspection", _inspection_attrs())
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_content(xml)
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_parse_xml_with_children(self):
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_content(_xml_with_children())
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_parse_xml_with_cyrillic_tag(self):
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_content(_xml_with_cyrillic_tag())
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_parse_xml_streaming_threshold(self):
|
|
xml = _xml_with_tag("INSPECTION", _inspection_attrs())
|
|
client = ProverkiClient()
|
|
client.STREAMING_THRESHOLD_BYTES = 1
|
|
inspections = client._parse_xml_content(xml)
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_parse_xml_record_missing_fields_returns_none(self):
|
|
element = ET.fromstring("<INSPECTION />") # noqa: S314
|
|
client = ProverkiClient()
|
|
self.assertIsNone(client._parse_xml_record(element))
|
|
|
|
def test_parse_xml_record_partial_fields(self):
|
|
element = ET.fromstring(f"<INSPECTION INN='{_digits(10)}' />") # noqa: S314
|
|
client = ProverkiClient()
|
|
inspection = client._parse_xml_record(element)
|
|
self.assertIsNotNone(inspection)
|
|
self.assertEqual(inspection.inn, element.attrib["INN"])
|
|
|
|
def test_parse_xml_container_records(self):
|
|
xml = (
|
|
b"<?xml version='1.0' encoding='utf-8'?>"
|
|
b"<ROOT><CONTAINER><ITEM /></CONTAINER></ROOT>"
|
|
)
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_content(xml)
|
|
self.assertEqual(inspections, [])
|
|
|
|
def test_parse_xml_content_decode_fallback(self):
|
|
xml_str = "<?xml version='1.0' encoding='utf-8'?><ROOT></ROOT>"
|
|
content = _BadBytes(b"\xff\xfe", xml_str=xml_str)
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_content(content)
|
|
self.assertEqual(inspections, [])
|
|
|
|
def test_parse_xml_streaming_decode_fallback(self):
|
|
xml_str = (
|
|
"<?xml version='1.0' encoding='utf-8'?>"
|
|
f'<ROOT><INSPECTION INN="{_digits(10)}" /></ROOT>'
|
|
)
|
|
content = _BadBytes(b"\xff\xfe", xml_str=xml_str)
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_streaming(content)
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_parse_xml_streaming_parse_error_returns_partial(self):
|
|
inn = _digits(10)
|
|
xml = (
|
|
"<?xml version='1.0' encoding='utf-8'?>"
|
|
f'<ROOT><INSPECTION INN="{inn}" /><INSPECTION'
|
|
).encode()
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_streaming(xml)
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_parse_xml_streaming_parse_error_raises_when_empty(self):
|
|
xml = b"<?xml version='1.0' encoding='utf-8'?><ROOT><INSPECTION"
|
|
client = ProverkiClient()
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._parse_xml_streaming(xml)
|
|
|
|
def test_parse_xml_streaming_logs_every_10000(self):
|
|
inn = _digits(10)
|
|
record = f'<INSPECTION INN="{inn}" />'
|
|
xml = (
|
|
"<?xml version='1.0' encoding='utf-8'?>" f"<ROOT>{record * 10000}</ROOT>"
|
|
).encode()
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_streaming(xml)
|
|
self.assertEqual(len(inspections), 10000)
|
|
|
|
def test_parse_xml_streaming_skips_invalid_record(self):
|
|
xml = b"<?xml version='1.0' encoding='utf-8'?>" b"<ROOT><INSPECTION /></ROOT>"
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_streaming(xml)
|
|
self.assertEqual(inspections, [])
|
|
|
|
def test_parse_xml_tag_search_handles_error(self):
|
|
xml = _xml_with_tag("inspection", _inspection_attrs())
|
|
client = ProverkiClient()
|
|
original_findall = element_path.findall
|
|
|
|
def _raising_findall(elem, path, namespaces=None):
|
|
if path == ".//inspection":
|
|
raise SyntaxError("boom")
|
|
return original_findall(elem, path, namespaces)
|
|
|
|
element_path.findall = _raising_findall
|
|
try:
|
|
inspections = client._parse_xml_content(xml)
|
|
finally:
|
|
element_path.findall = original_findall
|
|
|
|
self.assertEqual(len(inspections), 1)
|
|
|
|
def test_parse_xml_record_namespace_nested_fields(self):
|
|
ns = "http://example.com/ns"
|
|
inn = _digits(10)
|
|
ogrn = _digits(13)
|
|
inspection_type = fake.word()
|
|
status = fake.word()
|
|
xml = (
|
|
"<?xml version='1.0' encoding='utf-8'?>"
|
|
f"<ns:ROOT xmlns:ns='{ns}'>"
|
|
f'<ns:INSPECTION ERPID="{_digits(12)}">'
|
|
f'<ns:I_SUBJECT INN="{inn}" OGRN="{ogrn}" ORG_NAME="{fake.company()}" />'
|
|
f'<ns:I_CLASSIFICATION ITYPE_NAME="{inspection_type}" />'
|
|
f'<ns:I_APPROVE STATUS="{status}" />'
|
|
"</ns:INSPECTION>"
|
|
"</ns:ROOT>"
|
|
).encode()
|
|
client = ProverkiClient()
|
|
inspections = client._parse_xml_content(xml)
|
|
self.assertEqual(len(inspections), 1)
|
|
self.assertEqual(inspections[0].inspection_type, inspection_type)
|
|
self.assertEqual(inspections[0].status, status)
|
|
|
|
def test_parse_xml_record_namespace_text_child_fallback(self):
|
|
ns = "http://example.com/ns"
|
|
inn = _digits(10)
|
|
xml = (
|
|
"<?xml version='1.0' encoding='utf-8'?>"
|
|
f"<ns:INSPECTION xmlns:ns='{ns}'><ns:INN>{inn}</ns:INN></ns:INSPECTION>"
|
|
)
|
|
element = ET.fromstring(xml) # noqa: S314
|
|
client = ProverkiClient()
|
|
inspection = client._parse_xml_record(element)
|
|
self.assertIsNotNone(inspection)
|
|
self.assertEqual(inspection.inn, inn)
|
|
|
|
def test_parse_xml_record_bad_element_returns_none(self):
|
|
element = ET.Element("{")
|
|
client = ProverkiClient()
|
|
self.assertIsNone(client._parse_xml_record(element))
|
|
|
|
|
|
class _BadBytes(bytes):
|
|
def __new__(cls, data: bytes, *, xml_str: str):
|
|
obj = super().__new__(cls, data)
|
|
obj._xml_str = xml_str
|
|
return obj
|
|
|
|
def decode(self, encoding="utf-8", errors="strict"):
|
|
if errors == "replace":
|
|
return self._xml_str
|
|
raise UnicodeDecodeError(encoding, b"", 0, 1, "bad bytes")
|
|
|
|
|
|
class _FakeResponse:
|
|
def __init__(self, headers: dict[str, str] | None = None):
|
|
self.headers = headers or {}
|
|
|
|
|
|
class _FakeDownload:
|
|
def __init__(self, path: Path | None):
|
|
self._path = path
|
|
|
|
def path(self):
|
|
if self._path is None:
|
|
return None
|
|
return str(self._path)
|
|
|
|
|
|
class _FakeDownloadContext:
|
|
def __init__(self, path: Path | None):
|
|
self.value = _FakeDownload(path)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
|
|
class _FakeLink:
|
|
def __init__(self, href: str | None = None):
|
|
self._href = href
|
|
|
|
def get_attribute(self, name: str):
|
|
if name == "href":
|
|
return self._href
|
|
return None
|
|
|
|
def click(self):
|
|
return None
|
|
|
|
|
|
class _FakePage:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
content_type: str,
|
|
content: str,
|
|
download_path: Path | None,
|
|
download_links: list[_FakeLink] | None = None,
|
|
portal_links: list[_FakeLink] | None = None,
|
|
zip_link: _FakeLink | None = None,
|
|
xml_link: _FakeLink | None = None,
|
|
download_tab: _FakeLink | None = None,
|
|
raise_on_wait: bool = False,
|
|
):
|
|
self._content_type = content_type
|
|
self._content = content
|
|
self._download_path = download_path
|
|
self._download_links = download_links or []
|
|
self._portal_links = portal_links or []
|
|
self._zip_link = zip_link
|
|
self._xml_link = xml_link
|
|
self._download_tab = download_tab
|
|
self._last_url = ""
|
|
self._raise_on_wait = raise_on_wait
|
|
|
|
def goto(self, url, wait_until=None, timeout=None):
|
|
self._last_url = url
|
|
return _FakeResponse({"content-type": self._content_type})
|
|
|
|
def content(self):
|
|
return self._content
|
|
|
|
def title(self):
|
|
return "Page"
|
|
|
|
def wait_for_selector(self, *args, **kwargs):
|
|
if self._raise_on_wait:
|
|
raise RuntimeError("timeout")
|
|
return None
|
|
|
|
def wait_for_timeout(self, *args, **kwargs):
|
|
return None
|
|
|
|
def query_selector(self, selector: str):
|
|
if "Скачать" in selector and self._download_tab:
|
|
return self._download_tab
|
|
if ".zip" in selector and self._zip_link:
|
|
return self._zip_link
|
|
if ".xml" in selector and self._xml_link:
|
|
return self._xml_link
|
|
return None
|
|
|
|
def query_selector_all(self, selector: str):
|
|
if self._last_url == OPEN_DATA_PORTAL_URL:
|
|
return self._portal_links
|
|
return self._download_links
|
|
|
|
def expect_download(self, timeout=None):
|
|
return _FakeDownloadContext(self._download_path)
|
|
|
|
|
|
class _FakeContext:
|
|
def __init__(self, page: _FakePage):
|
|
self._page = page
|
|
self.closed = False
|
|
|
|
def new_page(self):
|
|
return self._page
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
|
|
class _FakeBrowser:
|
|
def __init__(self, page: _FakePage):
|
|
self._page = page
|
|
self.closed = False
|
|
|
|
def new_context(self, **_kwargs):
|
|
return _FakeContext(self._page)
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
|
|
def _temp_file(content: bytes) -> Path:
|
|
tmp = tempfile.NamedTemporaryFile(delete=False)
|
|
tmp.write(content)
|
|
tmp.flush()
|
|
tmp.close()
|
|
return Path(tmp.name)
|
|
|
|
|
|
class ProverkiPlaywrightStubTest(SimpleTestCase):
|
|
databases = "__all__"
|
|
|
|
def tearDown(self):
|
|
super().tearDown()
|
|
try:
|
|
asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
return
|
|
asyncio_events._set_running_loop(None)
|
|
|
|
def test_download_with_playwright_direct_response(self):
|
|
download_path = _temp_file(b"<xml></xml>")
|
|
page = _FakePage(
|
|
content_type="application/xml",
|
|
content="<xml></xml>",
|
|
download_path=download_path,
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
result = client._download_with_playwright("http://example.com")
|
|
self.assertIn(b"<xml", result)
|
|
|
|
def test_download_with_playwright_download_link(self):
|
|
download_path = _temp_file(b"zip-data")
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="<html></html>",
|
|
download_path=download_path,
|
|
download_links=[_FakeLink(href="file.zip")],
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
result = client._download_with_playwright("http://example.com")
|
|
self.assertEqual(result, b"zip-data")
|
|
|
|
def test_download_with_playwright_portal_fallback(self):
|
|
download_path = _temp_file(b"portal-data")
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="<html></html>",
|
|
download_path=download_path,
|
|
download_links=[],
|
|
portal_links=[_FakeLink(href="portal.zip")],
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
result = client._download_with_playwright("http://example.com")
|
|
self.assertEqual(result, b"portal-data")
|
|
|
|
def test_download_with_playwright_embedded_xml_after_doctype(self):
|
|
page = _FakePage(
|
|
content_type="application/xml",
|
|
content="<!DOCTYPE html><?xml version='1.0'?><ROOT/>",
|
|
download_path=None,
|
|
download_links=[],
|
|
portal_links=[],
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
result = client._download_with_playwright("http://example.com")
|
|
self.assertIn(b"<?xml", result)
|
|
|
|
def test_download_with_playwright_download_link_without_path_raises(self):
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="<html></html>",
|
|
download_path=None,
|
|
download_links=[_FakeLink(href="file.zip")],
|
|
portal_links=[],
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._download_with_playwright("http://example.com")
|
|
|
|
def test_download_with_playwright_portal_link_without_path_raises(self):
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="<html></html>",
|
|
download_path=None,
|
|
download_links=[],
|
|
portal_links=[_FakeLink(href="portal.zip")],
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._download_with_playwright("http://example.com")
|
|
|
|
def test_download_from_portal_zip_link(self):
|
|
download_path = _temp_file(b"zip-content")
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="content",
|
|
download_path=download_path,
|
|
zip_link=_FakeLink(href="file.zip"),
|
|
download_tab=_FakeLink(),
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
result = client._download_from_portal("http://portal.example.com")
|
|
self.assertEqual(result, b"zip-content")
|
|
|
|
def test_download_from_portal_zip_link_without_download_path(self):
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="no data available",
|
|
download_path=None,
|
|
zip_link=_FakeLink(href="file.zip"),
|
|
xml_link=None,
|
|
download_tab=_FakeLink(),
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._download_from_portal("http://portal.example.com")
|
|
|
|
def test_download_from_portal_xml_link(self):
|
|
download_path = _temp_file(b"<xml/>")
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="content",
|
|
download_path=download_path,
|
|
zip_link=None,
|
|
xml_link=_FakeLink(href="file.xml"),
|
|
download_tab=None,
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
result = client._download_from_portal("http://portal.example.com")
|
|
self.assertEqual(result, b"<xml/>")
|
|
|
|
def test_download_from_portal_xml_link_without_download_path(self):
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="no data",
|
|
download_path=None,
|
|
zip_link=None,
|
|
xml_link=_FakeLink(href="file.xml"),
|
|
download_tab=None,
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._download_from_portal("http://portal.example.com")
|
|
|
|
def test_download_from_portal_no_links_not_found(self):
|
|
download_path = _temp_file(b"")
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="Not found",
|
|
download_path=download_path,
|
|
zip_link=None,
|
|
xml_link=None,
|
|
download_tab=None,
|
|
raise_on_wait=True,
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._download_from_portal("http://portal.example.com")
|
|
|
|
def test_download_from_portal_no_links_generic_error(self):
|
|
page = _FakePage(
|
|
content_type="text/html",
|
|
content="no links here",
|
|
download_path=None,
|
|
zip_link=None,
|
|
xml_link=None,
|
|
download_tab=None,
|
|
)
|
|
client = ProverkiClient()
|
|
client._browser = _FakeBrowser(page)
|
|
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._download_from_portal("http://portal.example.com")
|
|
|
|
def test_close_playwright_handles_errors(self):
|
|
class _BrokenBrowser:
|
|
def close(self):
|
|
raise RuntimeError("boom")
|
|
|
|
class _BrokenPlaywright:
|
|
def stop(self):
|
|
raise RuntimeError("boom")
|
|
|
|
client = ProverkiClient()
|
|
client._browser = _BrokenBrowser()
|
|
client._playwright = _BrokenPlaywright()
|
|
|
|
client._close_playwright()
|
|
self.assertIsNone(client._browser)
|
|
self.assertIsNone(client._playwright)
|
|
|
|
def test_get_browser_import_error(self):
|
|
client = ProverkiClient()
|
|
original_playwright = sys.modules.get("playwright")
|
|
original_sync_api = sys.modules.get("playwright.sync_api")
|
|
fake_playwright = types.ModuleType("playwright")
|
|
fake_playwright.__path__ = []
|
|
fake_sync_api = types.ModuleType("playwright.sync_api")
|
|
sys.modules["playwright"] = fake_playwright
|
|
sys.modules["playwright.sync_api"] = fake_sync_api
|
|
try:
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._get_browser()
|
|
finally:
|
|
if original_playwright is None:
|
|
sys.modules.pop("playwright", None)
|
|
else:
|
|
sys.modules["playwright"] = original_playwright
|
|
if original_sync_api is None:
|
|
sys.modules.pop("playwright.sync_api", None)
|
|
else:
|
|
sys.modules["playwright.sync_api"] = original_sync_api
|
|
|
|
def test_get_browser_success(self):
|
|
class _FakeChromium:
|
|
def launch(self, **_kwargs):
|
|
return object()
|
|
|
|
class _FakePlaywright:
|
|
chromium = _FakeChromium()
|
|
|
|
class _FakeSyncPlaywright:
|
|
def start(self):
|
|
return _FakePlaywright()
|
|
|
|
fake_module = types.SimpleNamespace(
|
|
sync_playwright=lambda: _FakeSyncPlaywright()
|
|
)
|
|
client = ProverkiClient()
|
|
original_module = sys.modules.get("playwright.sync_api")
|
|
sys.modules["playwright.sync_api"] = fake_module
|
|
try:
|
|
browser = client._get_browser()
|
|
finally:
|
|
if original_module is None:
|
|
sys.modules.pop("playwright.sync_api", None)
|
|
else:
|
|
sys.modules["playwright.sync_api"] = original_module
|
|
self.assertIsNotNone(browser)
|
|
|
|
def test_get_browser_start_error(self):
|
|
class _BrokenPlaywright:
|
|
def start(self):
|
|
raise RuntimeError("startup failed")
|
|
|
|
fake_module = types.SimpleNamespace(sync_playwright=lambda: _BrokenPlaywright())
|
|
client = ProverkiClient()
|
|
original_module = sys.modules.get("playwright.sync_api")
|
|
sys.modules["playwright.sync_api"] = fake_module
|
|
try:
|
|
with self.assertRaises(ProverkiClientError):
|
|
client._get_browser()
|
|
finally:
|
|
if original_module is None:
|
|
sys.modules.pop("playwright.sync_api", None)
|
|
else:
|
|
sys.modules["playwright.sync_api"] = original_module
|