Add initial implementations for forms and organization apps with serializers, factories, and admin configurations
Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 5m5s
CI/CD Pipeline / Run Tests (push) Failing after 5m5s
CI/CD Pipeline / Build Docker Images (push) Has been skipped
CI/CD Pipeline / Push to Gitea Registry (push) Has been skipped
CI/CD Pipeline / Deploy to Server (push) Has been skipped
Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 5m5s
CI/CD Pipeline / Run Tests (push) Failing after 5m5s
CI/CD Pipeline / Build Docker Images (push) Has been skipped
CI/CD Pipeline / Push to Gitea Registry (push) Has been skipped
CI/CD Pipeline / Deploy to Server (push) Has been skipped
This commit is contained in:
5
tests/utils/__init__.py
Normal file
5
tests/utils/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Test utilities."""
|
||||
|
||||
from .http_server import TestHTTPServer, Response
|
||||
|
||||
__all__ = ["TestHTTPServer", "Response"]
|
||||
241
tests/utils/fixtures.py
Normal file
241
tests/utils/fixtures.py
Normal file
@@ -0,0 +1,241 @@
|
||||
"""Fixture builders for integration tests (Faker-based)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import zipfile
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable
|
||||
|
||||
from faker import Faker
|
||||
from openpyxl import Workbook
|
||||
|
||||
fake = Faker("ru_RU")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CertificateRow:
|
||||
issue_date: str
|
||||
certificate_number: str
|
||||
expiry_date: str
|
||||
certificate_file_url: str
|
||||
organisation_name: str
|
||||
inn: str
|
||||
ogrn: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ManufacturerRow:
|
||||
full_legal_name: str
|
||||
inn: str
|
||||
ogrn: str
|
||||
address: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class InspectionRow:
|
||||
registration_number: str
|
||||
inn: str
|
||||
ogrn: str
|
||||
organisation_name: str
|
||||
control_authority: str
|
||||
inspection_type: str
|
||||
inspection_form: str
|
||||
start_date: str
|
||||
end_date: str
|
||||
status: str
|
||||
legal_basis: str
|
||||
result: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ProcurementRow:
|
||||
purchase_number: str
|
||||
purchase_name: str
|
||||
customer_inn: str
|
||||
customer_kpp: str
|
||||
customer_ogrn: str
|
||||
customer_name: str
|
||||
max_price: str
|
||||
publish_date: str
|
||||
end_date: str
|
||||
status: str
|
||||
href: str
|
||||
|
||||
|
||||
def _digits(length: int) -> str:
|
||||
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
|
||||
|
||||
|
||||
def build_minpromtorg_certificates_excel(count: int = 5) -> tuple[bytes, list[CertificateRow]]:
|
||||
wb = Workbook()
|
||||
ws = wb.active
|
||||
ws.append(
|
||||
[
|
||||
"issue_date",
|
||||
"certificate_number",
|
||||
"expiry_date",
|
||||
"certificate_file_url",
|
||||
"organisation_name",
|
||||
"inn",
|
||||
"ogrn",
|
||||
]
|
||||
)
|
||||
|
||||
rows: list[CertificateRow] = []
|
||||
for _ in range(count):
|
||||
row = CertificateRow(
|
||||
issue_date=str(fake.date()),
|
||||
certificate_number=f"{fake.bothify(text='??-####-#####')}",
|
||||
expiry_date=str(fake.date()),
|
||||
certificate_file_url=fake.url(),
|
||||
organisation_name=fake.company(),
|
||||
inn=_digits(10),
|
||||
ogrn=_digits(13),
|
||||
)
|
||||
rows.append(row)
|
||||
ws.append(
|
||||
[
|
||||
row.issue_date,
|
||||
row.certificate_number,
|
||||
row.expiry_date,
|
||||
row.certificate_file_url,
|
||||
row.organisation_name,
|
||||
row.inn,
|
||||
row.ogrn,
|
||||
]
|
||||
)
|
||||
|
||||
buf = io.BytesIO()
|
||||
wb.save(buf)
|
||||
wb.close()
|
||||
return buf.getvalue(), rows
|
||||
|
||||
|
||||
def build_minpromtorg_manufacturers_excel(
|
||||
count: int = 5,
|
||||
) -> tuple[bytes, list[ManufacturerRow]]:
|
||||
wb = Workbook()
|
||||
ws = wb.active
|
||||
ws.append(["full_legal_name", "inn", "ogrn", "address"])
|
||||
|
||||
rows: list[ManufacturerRow] = []
|
||||
for _ in range(count):
|
||||
row = ManufacturerRow(
|
||||
full_legal_name=fake.company(),
|
||||
inn=_digits(10),
|
||||
ogrn=_digits(13),
|
||||
address=fake.address().replace("\n", ", "),
|
||||
)
|
||||
rows.append(row)
|
||||
ws.append([row.full_legal_name, row.inn, row.ogrn, row.address])
|
||||
|
||||
buf = io.BytesIO()
|
||||
wb.save(buf)
|
||||
wb.close()
|
||||
return buf.getvalue(), rows
|
||||
|
||||
|
||||
def build_proverki_xml(count: int = 3) -> tuple[bytes, list[InspectionRow]]:
|
||||
rows: list[InspectionRow] = []
|
||||
parts = ["<?xml version='1.0' encoding='utf-8'?>", "<INSPECTIONS>"]
|
||||
|
||||
for _ in range(count):
|
||||
row = InspectionRow(
|
||||
registration_number=_digits(12),
|
||||
inn=_digits(10),
|
||||
ogrn=_digits(13),
|
||||
organisation_name=fake.company(),
|
||||
control_authority=fake.company(),
|
||||
inspection_type=fake.word(),
|
||||
inspection_form=fake.word(),
|
||||
start_date=str(fake.date()),
|
||||
end_date=str(fake.date()),
|
||||
status=fake.word(),
|
||||
legal_basis=fake.sentence(nb_words=4),
|
||||
result=fake.sentence(nb_words=3),
|
||||
)
|
||||
rows.append(row)
|
||||
parts.append(
|
||||
"<INSPECTION "
|
||||
f"ERPID=\"{row.registration_number}\" "
|
||||
f"INN=\"{row.inn}\" "
|
||||
f"OGRN=\"{row.ogrn}\" "
|
||||
f"ORG_NAME=\"{row.organisation_name}\" "
|
||||
f"FRGU_ORG_NAME=\"{row.control_authority}\" "
|
||||
f"ITYPE_NAME=\"{row.inspection_type}\" "
|
||||
f"ICARRYOUT_TYPE_NAME=\"{row.inspection_form}\" "
|
||||
f"START_DATE=\"{row.start_date}\" "
|
||||
f"END_DATE=\"{row.end_date}\" "
|
||||
f"STATUS=\"{row.status}\" "
|
||||
f"FZ_NAME=\"{row.legal_basis}\" "
|
||||
f"RESULT=\"{row.result}\" />"
|
||||
)
|
||||
|
||||
parts.append("</INSPECTIONS>")
|
||||
xml = "".join(parts).encode("utf-8")
|
||||
return xml, rows
|
||||
|
||||
|
||||
def build_zakupki_xml(count: int = 3) -> tuple[bytes, list[ProcurementRow]]:
|
||||
rows: list[ProcurementRow] = []
|
||||
parts = ["<?xml version='1.0' encoding='utf-8'?>", "<root>"]
|
||||
|
||||
for _ in range(count):
|
||||
row = ProcurementRow(
|
||||
purchase_number=_digits(19),
|
||||
purchase_name=fake.sentence(nb_words=6),
|
||||
customer_inn=_digits(10),
|
||||
customer_kpp=_digits(9),
|
||||
customer_ogrn=_digits(13),
|
||||
customer_name=fake.company(),
|
||||
max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)),
|
||||
publish_date=str(fake.date()),
|
||||
end_date=str(fake.date()),
|
||||
status=fake.word(),
|
||||
href=fake.url(),
|
||||
)
|
||||
rows.append(row)
|
||||
parts.append(
|
||||
"<notification>"
|
||||
f"<purchaseNumber>{row.purchase_number}</purchaseNumber>"
|
||||
f"<purchaseObjectInfo>{row.purchase_name}</purchaseObjectInfo>"
|
||||
"<customer>"
|
||||
f"<INN>{row.customer_inn}</INN>"
|
||||
f"<KPP>{row.customer_kpp}</KPP>"
|
||||
f"<OGRN>{row.customer_ogrn}</OGRN>"
|
||||
f"<fullName>{row.customer_name}</fullName>"
|
||||
"</customer>"
|
||||
f"<maxPrice>{row.max_price}</maxPrice>"
|
||||
f"<publishDate>{row.publish_date}</publishDate>"
|
||||
f"<endDate>{row.end_date}</endDate>"
|
||||
f"<state>{row.status}</state>"
|
||||
f"<href>{row.href}</href>"
|
||||
"</notification>"
|
||||
)
|
||||
|
||||
parts.append("</root>")
|
||||
xml = "".join(parts).encode("utf-8")
|
||||
return xml, rows
|
||||
|
||||
|
||||
def build_zip(files: Iterable[tuple[str, bytes]]) -> bytes:
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
for name, content in files:
|
||||
zf.writestr(name, content)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
__all__ = [
|
||||
"fake",
|
||||
"CertificateRow",
|
||||
"ManufacturerRow",
|
||||
"InspectionRow",
|
||||
"ProcurementRow",
|
||||
"build_minpromtorg_certificates_excel",
|
||||
"build_minpromtorg_manufacturers_excel",
|
||||
"build_proverki_xml",
|
||||
"build_zakupki_xml",
|
||||
"build_zip",
|
||||
]
|
||||
134
tests/utils/http_server.py
Normal file
134
tests/utils/http_server.py
Normal file
@@ -0,0 +1,134 @@
|
||||
"""Lightweight in-memory HTTP router for integration tests (no sockets)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
from types import SimpleNamespace
|
||||
from typing import Callable
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
from requests.adapters import BaseAdapter
|
||||
from requests.models import Response as RequestsResponse
|
||||
|
||||
|
||||
@dataclass
|
||||
class Response:
|
||||
status: int = 200
|
||||
body: bytes = b""
|
||||
headers: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
RouteHandler = Callable[[SimpleNamespace, bytes], Response]
|
||||
|
||||
|
||||
def _json_response(data: object, status: int = 200) -> Response:
|
||||
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
|
||||
return Response(
|
||||
status=status,
|
||||
body=body,
|
||||
headers={"Content-Type": "application/json; charset=utf-8"},
|
||||
)
|
||||
|
||||
|
||||
class _InMemoryAdapter(BaseAdapter):
|
||||
def __init__(self, routes: dict[tuple[str, str], Response | RouteHandler]) -> None:
|
||||
super().__init__()
|
||||
self._routes = routes
|
||||
|
||||
def send(self, request, **_kwargs): # noqa: D401
|
||||
parsed = urlparse(request.url)
|
||||
key = (request.method.upper(), parsed.path)
|
||||
route = self._routes.get(key)
|
||||
|
||||
if route is None:
|
||||
response = Response(status=404, body=b"", headers={})
|
||||
else:
|
||||
body = request.body or b""
|
||||
if isinstance(body, str):
|
||||
body = body.encode("utf-8")
|
||||
if callable(route):
|
||||
response = route(
|
||||
SimpleNamespace(
|
||||
path=parsed.path,
|
||||
query=parsed.query,
|
||||
method=request.method.upper(),
|
||||
),
|
||||
body,
|
||||
)
|
||||
else:
|
||||
response = route
|
||||
|
||||
return self._build_response(request, response)
|
||||
|
||||
def _build_response(self, request, response: Response) -> RequestsResponse:
|
||||
resp = RequestsResponse()
|
||||
resp.status_code = response.status
|
||||
resp._content = response.body
|
||||
resp.headers.update(response.headers)
|
||||
resp.url = request.url
|
||||
resp.request = request
|
||||
return resp
|
||||
|
||||
def close(self) -> None: # noqa: D401
|
||||
return
|
||||
|
||||
|
||||
class TestHTTPServer:
|
||||
"""Context-managed in-memory HTTP router with requests adapter."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._routes: dict[tuple[str, str], Response | RouteHandler] = {}
|
||||
self._adapter = _InMemoryAdapter(self._routes)
|
||||
self._base_url = "http://testserver"
|
||||
self._started = False
|
||||
|
||||
@property
|
||||
def base_url(self) -> str:
|
||||
return self._base_url
|
||||
|
||||
@property
|
||||
def adapter(self) -> BaseAdapter:
|
||||
return self._adapter
|
||||
|
||||
def mount(self, client_or_session) -> None:
|
||||
session = getattr(client_or_session, "session", client_or_session)
|
||||
session.mount(self._base_url, self._adapter)
|
||||
session.mount(self._base_url.replace("http://", "https://", 1), self._adapter)
|
||||
|
||||
def add_json(self, path: str, data: object, *, status: int = 200) -> None:
|
||||
self._routes[("GET", path)] = _json_response(data, status=status)
|
||||
|
||||
def add_bytes(
|
||||
self,
|
||||
path: str,
|
||||
data: bytes,
|
||||
*,
|
||||
content_type: str = "application/octet-stream",
|
||||
status: int = 200,
|
||||
) -> None:
|
||||
self._routes[("GET", path)] = Response(
|
||||
status=status,
|
||||
body=data,
|
||||
headers={"Content-Type": content_type},
|
||||
)
|
||||
|
||||
def add_route(self, method: str, path: str, handler: RouteHandler) -> None:
|
||||
self._routes[(method.upper(), path)] = handler
|
||||
|
||||
def start(self) -> None:
|
||||
self._started = True
|
||||
|
||||
def stop(self) -> None:
|
||||
self._started = False
|
||||
|
||||
def __enter__(self) -> "TestHTTPServer":
|
||||
self.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
self.stop()
|
||||
|
||||
|
||||
__all__ = ["TestHTTPServer", "Response"]
|
||||
Reference in New Issue
Block a user