Files
mostovik-backend/tests/apps/registers/test_services.py
Aleksandr Meshchriakov 15360a3c8e
All checks were successful
CI/CD Pipeline / Quality Gate (push) Successful in 20s
CI/CD Pipeline / Build and Push Images (push) Successful in 10s
CI/CD Pipeline / Internal Notify (push) Successful in 0s
CI/CD Pipeline / Deploy Dev in Dokploy (push) Successful in 1s
Add v2 registry uploads and source CSV exports
2026-05-07 14:41:01 +02:00

541 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import io
from datetime import date
from unittest.mock import patch
from apps.registers.services import (
ParsedOrganization,
RegisterImportError,
RegisterImportService,
)
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase
from openpyxl import Workbook
from tests.apps.registers.factories import (
OrganizationFactory,
RegisterFactory,
RegisterUploadFactory,
RegistryMembershipPeriodFactory,
)
def _build_workbook(rows: list[list[object]]) -> bytes:
workbook = Workbook()
worksheet = workbook.active
for row in rows:
worksheet.append(row)
buffer = io.BytesIO()
workbook.save(buffer)
workbook.close()
return buffer.getvalue()
def _upload(name: str, rows: list[list[object]]) -> SimpleUploadedFile:
return SimpleUploadedFile(
name,
_build_workbook(rows),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
class RegisterImportServiceTest(TestCase):
def test_update_organization_fields_returns_false_when_nothing_changed(self):
organization = OrganizationFactory(
pn_name="Org",
in_kpp=123456789,
mn_okpo="12345678",
)
row = ParsedOrganization(
pn_name="Org",
mn_ogrn=organization.mn_ogrn,
mn_inn=organization.mn_inn,
in_kpp=123456789,
mn_okpo="12345678",
)
updated = RegisterImportService._update_organization_fields(
organization=organization,
row=row,
)
self.assertFalse(updated)
def test_update_organization_fields_updates_changed_values(self):
organization = OrganizationFactory(
pn_name="Old",
in_kpp=123456789,
mn_okpo="12345678",
)
row = ParsedOrganization(
pn_name="New",
mn_ogrn=organization.mn_ogrn,
mn_inn=organization.mn_inn,
in_kpp=987654321,
mn_okpo="87654321",
)
updated = RegisterImportService._update_organization_fields(
organization=organization,
row=row,
)
self.assertTrue(updated)
organization.refresh_from_db()
self.assertEqual(organization.pn_name, "New")
self.assertEqual(organization.in_kpp, 987654321)
self.assertEqual(organization.mn_okpo, "87654321")
def test_update_organization_fields_does_not_clear_existing_kpp_when_missing(self):
organization = OrganizationFactory(
pn_name="Old",
in_kpp=123456789,
mn_okpo="12345678",
)
row = ParsedOrganization(
pn_name="Old",
mn_ogrn=organization.mn_ogrn,
mn_inn=organization.mn_inn,
in_kpp=None,
mn_okpo="12345678",
)
updated = RegisterImportService._update_organization_fields(
organization=organization,
row=row,
)
self.assertFalse(updated)
organization.refresh_from_db()
self.assertEqual(organization.in_kpp, 123456789)
def test_upsert_organizations_uses_bulk_operations(self):
existing = OrganizationFactory(
pn_name="Old",
mn_ogrn=111,
mn_inn=222,
in_kpp=333,
mn_okpo="12345678",
)
rows = [
ParsedOrganization(
pn_name="New",
mn_ogrn=111,
mn_inn=222,
in_kpp=444,
mn_okpo="87654321",
),
ParsedOrganization(
pn_name="Created",
mn_ogrn=333,
mn_inn=444,
in_kpp=None,
mn_okpo="22222222",
),
]
with (
patch(
"django.db.models.query.QuerySet.get_or_create",
side_effect=AssertionError("per-row get_or_create must not be used"),
),
patch.object(
OrganizationFactory._meta.model,
"save",
side_effect=AssertionError("per-row save must not be used"),
),
):
snapshot_org_ids, created, updated = (
RegisterImportService._upsert_organizations(rows)
)
self.assertEqual(created, 1)
self.assertEqual(updated, 1)
self.assertEqual(len(snapshot_org_ids), 2)
existing.refresh_from_db()
self.assertEqual(existing.pn_name, "New")
self.assertEqual(existing.in_kpp, 444)
self.assertEqual(existing.mn_okpo, "87654321")
def test_parse_xlsx_accepts_opk_source_header_aliases(self):
upload = _upload(
"opk.xlsx",
[
[
"rn",
"okpo",
"ogrn",
"inn",
"filial",
"ropk_num",
"ropk_razdel_num",
"ropk_razdel_name",
"short_name",
"full_name",
],
[
1,
"07506197",
"1027600980990",
"7601000086",
"",
"1",
"1",
"Раздел",
'АО "ЯРЗ"',
'АКЦИОНЕРНОЕ ОБЩЕСТВО "ЯРОСЛАВСКИЙ РАДИОЗАВОД"',
],
],
)
rows = RegisterImportService.parse_xlsx(upload)
self.assertEqual(len(rows), 1)
self.assertEqual(
rows[0].pn_name,
'АКЦИОНЕРНОЕ ОБЩЕСТВО "ЯРОСЛАВСКИЙ РАДИОЗАВОД"',
)
self.assertEqual(rows[0].mn_ogrn, 1027600980990)
self.assertEqual(rows[0].mn_inn, 7601000086)
self.assertIsNone(rows[0].in_kpp)
self.assertEqual(rows[0].mn_okpo, "07506197")
def test_parse_xlsx_skips_opk_branch_rows_without_identity(self):
upload = _upload(
"opk.xlsx",
[
[
"rn",
"okpo",
"ogrn",
"inn",
"filial",
"ropk_num",
"ropk_razdel_num",
"ropk_razdel_name",
"short_name",
"full_name",
],
[
100,
"52511425",
None,
None,
True,
48,
1,
"Минпромторг России",
'Филиал ПАО "Ил" - ВАСО',
(
'Филиал публичного акционерного общества "Авиационный '
'комплекс им. С.В. Ильюшина" - ВАСО'
),
],
[
1,
"07506197",
"1027600980990",
"7601000086",
False,
1,
1,
"Минпромторг России",
'АО "ЯРЗ"',
'АКЦИОНЕРНОЕ ОБЩЕСТВО "ЯРОСЛАВСКИЙ РАДИОЗАВОД"',
],
],
)
rows = RegisterImportService.parse_xlsx(upload)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].mn_ogrn, 1027600980990)
def test_get_active_periods_by_org_returns_mapping(self):
registry = RegisterFactory()
active_period = RegistryMembershipPeriodFactory(
registry=registry, ended_at=None
)
RegistryMembershipPeriodFactory(
registry=registry,
started_at=date(2026, 1, 1),
ended_at=date(2026, 2, 1),
)
active_by_org = RegisterImportService._get_active_periods_by_org(registry)
self.assertEqual(list(active_by_org.keys()), [active_period.organization_id])
def test_close_missing_periods_updates_and_deletes(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry, actual_date=date(2026, 3, 1))
active_period = RegistryMembershipPeriodFactory(
registry=registry,
started_at=date(2026, 2, 1),
ended_at=None,
)
same_day_period = RegistryMembershipPeriodFactory(
registry=registry,
started_at=date(2026, 3, 1),
ended_at=None,
)
closed = RegisterImportService._close_missing_periods(
active_by_org={
active_period.organization_id: active_period,
same_day_period.organization_id: same_day_period,
},
snapshot_org_ids=set(),
snapshot_date=date(2026, 3, 1),
upload=upload,
)
self.assertEqual(closed, 2)
active_period.refresh_from_db()
self.assertEqual(active_period.ended_at, date(2026, 3, 1))
self.assertFalse(
active_period.__class__.objects.filter(id=same_day_period.id).exists()
)
def test_open_new_periods_returns_zero_when_all_active(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry)
organization = OrganizationFactory()
opened = RegisterImportService._open_new_periods(
registry=registry,
snapshot_org_ids={organization.id},
active_org_ids={organization.id},
snapshot_date=date(2026, 3, 1),
upload=upload,
)
self.assertEqual(opened, 0)
def test_open_new_periods_creates_missing_memberships(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry)
organization = OrganizationFactory()
opened = RegisterImportService._open_new_periods(
registry=registry,
snapshot_org_ids={organization.id},
active_org_ids=set(),
snapshot_date=date(2026, 3, 1),
upload=upload,
)
self.assertEqual(opened, 1)
def test_resolve_actual_date_returns_requested_latest_or_today(self):
registry = RegisterFactory()
requested = RegisterImportService.resolve_actual_date(
registry=registry,
requested_date=date(2026, 3, 10),
)
self.assertEqual(requested, date(2026, 3, 10))
RegisterUploadFactory(registry=registry, actual_date=date(2026, 2, 1))
latest_upload = RegisterUploadFactory(
registry=registry,
actual_date=date(2026, 3, 5),
)
latest = RegisterImportService.resolve_actual_date(
registry=registry,
requested_date=None,
)
self.assertEqual(latest, latest_upload.actual_date)
empty_registry = RegisterFactory()
today = RegisterImportService.resolve_actual_date(
registry=empty_registry,
requested_date=None,
)
self.assertEqual(today, date.today())
def test_get_organizations_queryset_applies_registry_filters_and_search(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry, actual_date=date(2026, 3, 1))
match = OrganizationFactory(pn_name="Target organization", mn_okpo="11111111")
other = OrganizationFactory(pn_name="Other company", mn_okpo="22222222")
RegistryMembershipPeriodFactory(
registry=registry,
organization=match,
started_at=date(2026, 3, 1),
started_by_upload=upload,
)
RegistryMembershipPeriodFactory(
registry=registry,
organization=other,
started_at=date(2026, 1, 1),
ended_at=date(2026, 2, 1),
started_by_upload=upload,
ended_by_upload=upload,
)
queryset, resolved_date = RegisterImportService.get_organizations_queryset(
registry=registry,
search="Target",
mn_okpo="11111111",
)
self.assertEqual(resolved_date, date(2026, 3, 1))
self.assertEqual(list(queryset.values_list("id", flat=True)), [match.id])
def test_get_registry_organizations_queryset_applies_exact_filters(self):
registry = RegisterFactory()
upload = RegisterUploadFactory(registry=registry, actual_date=date(2026, 3, 1))
organization = OrganizationFactory()
RegistryMembershipPeriodFactory(
registry=registry,
organization=organization,
started_at=date(2026, 3, 1),
started_by_upload=upload,
)
(
queryset,
resolved_date,
) = RegisterImportService.get_registry_organizations_queryset(
registry=registry,
mn_ogrn=organization.mn_ogrn,
mn_inn=organization.mn_inn,
in_kpp=organization.in_kpp,
mn_okpo=organization.mn_okpo,
)
self.assertEqual(resolved_date, date(2026, 3, 1))
self.assertEqual(list(queryset.values_list("id", flat=True)), [organization.id])
def test_parse_xlsx_parses_rows_and_supports_optional_kpp(self):
upload = _upload(
"register.xlsx",
[
["pn_name", "mn_ogrn", "mn_inn", "mn_okpo"],
["Org", "1027700118984", "7702000000", "12345678"],
],
)
rows = RegisterImportService.parse_xlsx(upload)
self.assertEqual(
rows,
[
ParsedOrganization(
pn_name="Org",
mn_ogrn=1027700118984,
mn_inn=7702000000,
in_kpp=None,
mn_okpo="12345678",
)
],
)
def test_parse_xlsx_raises_on_invalid_workbook_headers_and_empty_rows(self):
broken = SimpleUploadedFile("broken.xlsx", b"not-an-excel")
with self.assertRaisesMessage(
RegisterImportError, "Не удалось прочитать Excel файл"
):
RegisterImportService.parse_xlsx(broken)
no_headers = _upload("no-headers.xlsx", [])
with self.assertRaisesMessage(
RegisterImportError, "Файл не содержит заголовков"
):
RegisterImportService.parse_xlsx(no_headers)
missing_headers = _upload("missing.xlsx", [["pn_name", "mn_ogrn"]])
with self.assertRaisesMessage(
RegisterImportError, "Отсутствуют обязательные колонки"
):
RegisterImportService.parse_xlsx(missing_headers)
empty_rows = _upload(
"empty.xlsx",
[["pn_name", "mn_ogrn", "mn_inn", "mn_okpo"], ["", "", "", ""]],
)
with self.assertRaisesMessage(
RegisterImportError,
"Файл не содержит строк с организациями",
):
RegisterImportService.parse_xlsx(empty_rows)
def test_validate_snapshot_date_and_unique_identities(self):
registry = RegisterFactory()
RegisterUploadFactory(registry=registry, actual_date=date(2026, 3, 5))
with self.assertRaisesMessage(
RegisterImportError,
"Дата актуальности не может быть раньше последней загрузки",
):
RegisterImportService._validate_snapshot_date(
registry=registry,
snapshot_date=date(2026, 3, 1),
)
row = ParsedOrganization(
pn_name="Org",
mn_ogrn=1,
mn_inn=2,
in_kpp=None,
mn_okpo="12345678",
)
with self.assertRaisesMessage(
RegisterImportError,
"Файл содержит дубли по ключу",
):
RegisterImportService._ensure_unique_identities([row, row])
def test_scalar_parsers_validate_required_and_numeric_values(self):
self.assertEqual(RegisterImportService._normalize_header(" Mn_Inn "), "mn_inn")
self.assertTrue(RegisterImportService._is_empty_row((None, " ")))
self.assertFalse(RegisterImportService._is_empty_row((None, "x")))
self.assertEqual(
RegisterImportService._as_required_text(
" Org ", field_name="pn_name", row_number=2
),
"Org",
)
self.assertEqual(
RegisterImportService._as_required_int(
"123.0", field_name="mn_inn", row_number=2
),
123,
)
self.assertIsNone(
RegisterImportService._as_optional_int(
" ", field_name="in_kpp", row_number=2
)
)
self.assertEqual(
RegisterImportService._as_numeric_text(
"123 456.0", field_name="mn_okpo", row_number=2
),
"123456",
)
with self.assertRaisesMessage(RegisterImportError, "поле pn_name обязательно"):
RegisterImportService._as_required_text(
"", field_name="pn_name", row_number=2
)
with self.assertRaisesMessage(RegisterImportError, "поле mn_inn обязательно"):
RegisterImportService._as_required_int(
None, field_name="mn_inn", row_number=2
)
with self.assertRaisesMessage(
RegisterImportError, "поле in_kpp должно быть числом"
):
RegisterImportService._as_optional_int(
"abc", field_name="in_kpp", row_number=2
)
with self.assertRaisesMessage(
RegisterImportError,
"поле mn_okpo должно содержать только цифры",
):
RegisterImportService._as_numeric_text(
"ab12", field_name="mn_okpo", row_number=2
)