Files
mostovik-backend/tests/apps/registers/test_services.py
Aleksandr Meshchriakov 25176f31b4
Some checks failed
CI/CD Pipeline / Telegram Notify Success (push) Has been cancelled
CI/CD Pipeline / Run Tests (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (pull_request) Successful in 1m42s
CI/CD Pipeline / Run Tests (pull_request) Successful in 2m25s
CI/CD Pipeline / Telegram Notify Success (pull_request) Successful in 1m34s
fix pre-commit
2026-03-17 13:55:34 +01:00

375 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import io
from datetime import date
from apps.registers.services import (
ParsedOrganization,
RegisterImportError,
RegisterImportService,
)
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase
from openpyxl import Workbook
from tests.apps.registers.factories import (
OrganizationFactory,
RegisterFactory,
RegisterUploadFactory,
RegistryMembershipPeriodFactory,
)
def _build_workbook(rows: list[list[object]]) -> bytes:
workbook = Workbook()
worksheet = workbook.active
for row in rows:
worksheet.append(row)
buffer = io.BytesIO()
workbook.save(buffer)
workbook.close()
return buffer.getvalue()
def _upload(name: str, rows: list[list[object]]) -> SimpleUploadedFile:
return SimpleUploadedFile(
name,
_build_workbook(rows),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
class RegisterImportServiceTest(TestCase):
def test_update_organization_fields_returns_false_when_nothing_changed(self):
organization = OrganizationFactory(
pn_name="Org",
in_kpp=123456789,
mn_okpo="12345678",
)
row = ParsedOrganization(
pn_name="Org",
mn_ogrn=organization.mn_ogrn,
mn_inn=organization.mn_inn,
in_kpp=123456789,
mn_okpo="12345678",
)
updated = RegisterImportService._update_organization_fields(
organization=organization,
row=row,
)
self.assertFalse(updated)
def test_update_organization_fields_updates_changed_values(self):
organization = OrganizationFactory(
pn_name="Old",
in_kpp=123456789,
mn_okpo="12345678",
)
row = ParsedOrganization(
pn_name="New",
mn_ogrn=organization.mn_ogrn,
mn_inn=organization.mn_inn,
in_kpp=987654321,
mn_okpo="87654321",
)
updated = RegisterImportService._update_organization_fields(
organization=organization,
row=row,
)
self.assertTrue(updated)
organization.refresh_from_db()
self.assertEqual(organization.pn_name, "New")
self.assertEqual(organization.in_kpp, 987654321)
self.assertEqual(organization.mn_okpo, "87654321")
def test_get_active_periods_by_org_returns_mapping(self):
registry = RegisterFactory()
active_period = RegistryMembershipPeriodFactory(
registry=registry, ended_at=None
)
RegistryMembershipPeriodFactory(
registry=registry,
started_at=date(2026, 1, 1),
ended_at=date(2026, 2, 1),
)
active_by_org = RegisterImportService._get_active_periods_by_org(registry)
self.assertEqual(list(active_by_org.keys()), [active_period.organization_id])
def test_close_missing_periods_updates_and_deletes(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry, actual_date=date(2026, 3, 1))
active_period = RegistryMembershipPeriodFactory(
registry=registry,
started_at=date(2026, 2, 1),
ended_at=None,
)
same_day_period = RegistryMembershipPeriodFactory(
registry=registry,
started_at=date(2026, 3, 1),
ended_at=None,
)
closed = RegisterImportService._close_missing_periods(
active_by_org={
active_period.organization_id: active_period,
same_day_period.organization_id: same_day_period,
},
snapshot_org_ids=set(),
snapshot_date=date(2026, 3, 1),
upload=upload,
)
self.assertEqual(closed, 2)
active_period.refresh_from_db()
self.assertEqual(active_period.ended_at, date(2026, 3, 1))
self.assertFalse(
active_period.__class__.objects.filter(id=same_day_period.id).exists()
)
def test_open_new_periods_returns_zero_when_all_active(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry)
organization = OrganizationFactory()
opened = RegisterImportService._open_new_periods(
registry=registry,
snapshot_org_ids={organization.id},
active_org_ids={organization.id},
snapshot_date=date(2026, 3, 1),
upload=upload,
)
self.assertEqual(opened, 0)
def test_open_new_periods_creates_missing_memberships(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry)
organization = OrganizationFactory()
opened = RegisterImportService._open_new_periods(
registry=registry,
snapshot_org_ids={organization.id},
active_org_ids=set(),
snapshot_date=date(2026, 3, 1),
upload=upload,
)
self.assertEqual(opened, 1)
def test_resolve_actual_date_returns_requested_latest_or_today(self):
registry = RegisterFactory()
requested = RegisterImportService.resolve_actual_date(
registry=registry,
requested_date=date(2026, 3, 10),
)
self.assertEqual(requested, date(2026, 3, 10))
RegisterUploadFactory(registry=registry, actual_date=date(2026, 2, 1))
latest_upload = RegisterUploadFactory(
registry=registry,
actual_date=date(2026, 3, 5),
)
latest = RegisterImportService.resolve_actual_date(
registry=registry,
requested_date=None,
)
self.assertEqual(latest, latest_upload.actual_date)
empty_registry = RegisterFactory()
today = RegisterImportService.resolve_actual_date(
registry=empty_registry,
requested_date=None,
)
self.assertEqual(today, date.today())
def test_get_organizations_queryset_applies_registry_filters_and_search(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry, actual_date=date(2026, 3, 1))
match = OrganizationFactory(pn_name="Target organization", mn_okpo="11111111")
other = OrganizationFactory(pn_name="Other company", mn_okpo="22222222")
RegistryMembershipPeriodFactory(
registry=registry,
organization=match,
started_at=date(2026, 3, 1),
started_by_upload=upload,
)
RegistryMembershipPeriodFactory(
registry=registry,
organization=other,
started_at=date(2026, 1, 1),
ended_at=date(2026, 2, 1),
started_by_upload=upload,
ended_by_upload=upload,
)
queryset, resolved_date = RegisterImportService.get_organizations_queryset(
registry=registry,
search="Target",
mn_okpo="11111111",
)
self.assertEqual(resolved_date, date(2026, 3, 1))
self.assertEqual(list(queryset.values_list("id", flat=True)), [match.id])
def test_get_registry_organizations_queryset_applies_exact_filters(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry, actual_date=date(2026, 3, 1))
organization = OrganizationFactory()
RegistryMembershipPeriodFactory(
registry=registry,
organization=organization,
started_at=date(2026, 3, 1),
started_by_upload=upload,
)
(
queryset,
resolved_date,
) = RegisterImportService.get_registry_organizations_queryset(
registry=registry,
mn_ogrn=organization.mn_ogrn,
mn_inn=organization.mn_inn,
in_kpp=organization.in_kpp,
mn_okpo=organization.mn_okpo,
)
self.assertEqual(resolved_date, date(2026, 3, 1))
self.assertEqual(list(queryset.values_list("id", flat=True)), [organization.id])
def test_parse_xlsx_parses_rows_and_supports_optional_kpp(self):
upload = _upload(
"register.xlsx",
[
["pn_name", "mn_ogrn", "mn_inn", "mn_okpo"],
["Org", "1027700118984", "7702000000", "12345678"],
],
)
rows = RegisterImportService.parse_xlsx(upload)
self.assertEqual(
rows,
[
ParsedOrganization(
pn_name="Org",
mn_ogrn=1027700118984,
mn_inn=7702000000,
in_kpp=None,
mn_okpo="12345678",
)
],
)
def test_parse_xlsx_raises_on_invalid_workbook_headers_and_empty_rows(self):
broken = SimpleUploadedFile("broken.xlsx", b"not-an-excel")
with self.assertRaisesMessage(
RegisterImportError, "Не удалось прочитать Excel файл"
):
RegisterImportService.parse_xlsx(broken)
no_headers = _upload("no-headers.xlsx", [])
with self.assertRaisesMessage(
RegisterImportError, "Файл не содержит заголовков"
):
RegisterImportService.parse_xlsx(no_headers)
missing_headers = _upload("missing.xlsx", [["pn_name", "mn_ogrn"]])
with self.assertRaisesMessage(
RegisterImportError, "Отсутствуют обязательные колонки"
):
RegisterImportService.parse_xlsx(missing_headers)
empty_rows = _upload(
"empty.xlsx",
[["pn_name", "mn_ogrn", "mn_inn", "mn_okpo"], ["", "", "", ""]],
)
with self.assertRaisesMessage(
RegisterImportError,
"Файл не содержит строк с организациями",
):
RegisterImportService.parse_xlsx(empty_rows)
def test_validate_snapshot_date_and_unique_identities(self):
registry = RegisterFactory()
RegisterUploadFactory(registry=registry, actual_date=date(2026, 3, 5))
with self.assertRaisesMessage(
RegisterImportError,
"Дата актуальности не может быть раньше последней загрузки",
):
RegisterImportService._validate_snapshot_date(
registry=registry,
snapshot_date=date(2026, 3, 1),
)
row = ParsedOrganization(
pn_name="Org",
mn_ogrn=1,
mn_inn=2,
in_kpp=None,
mn_okpo="12345678",
)
with self.assertRaisesMessage(
RegisterImportError,
"Файл содержит дубли по ключу",
):
RegisterImportService._ensure_unique_identities([row, row])
def test_scalar_parsers_validate_required_and_numeric_values(self):
self.assertEqual(RegisterImportService._normalize_header(" Mn_Inn "), "mn_inn")
self.assertTrue(RegisterImportService._is_empty_row((None, " ")))
self.assertFalse(RegisterImportService._is_empty_row((None, "x")))
self.assertEqual(
RegisterImportService._as_required_text(
" Org ", field_name="pn_name", row_number=2
),
"Org",
)
self.assertEqual(
RegisterImportService._as_required_int(
"123.0", field_name="mn_inn", row_number=2
),
123,
)
self.assertIsNone(
RegisterImportService._as_optional_int(
" ", field_name="in_kpp", row_number=2
)
)
self.assertEqual(
RegisterImportService._as_numeric_text(
"123 456.0", field_name="mn_okpo", row_number=2
),
"123456",
)
with self.assertRaisesMessage(RegisterImportError, "поле pn_name обязательно"):
RegisterImportService._as_required_text(
"", field_name="pn_name", row_number=2
)
with self.assertRaisesMessage(RegisterImportError, "поле mn_inn обязательно"):
RegisterImportService._as_required_int(
None, field_name="mn_inn", row_number=2
)
with self.assertRaisesMessage(
RegisterImportError, "поле in_kpp должно быть числом"
):
RegisterImportService._as_optional_int(
"abc", field_name="in_kpp", row_number=2
)
with self.assertRaisesMessage(
RegisterImportError,
"поле mn_okpo должно содержать только цифры",
):
RegisterImportService._as_numeric_text(
"ab12", field_name="mn_okpo", row_number=2
)