"""Tests for register snapshot imports.""" from __future__ import annotations import io from datetime import date from apps.organization.models import Organization from apps.registers.models import Register, RegisterUpload, RegistryMembershipPeriod from apps.registers.services import RegisterImportService from django.core.files.uploadedfile import SimpleUploadedFile from django.test import TestCase from openpyxl import Workbook def build_registry_upload(name: str, rows: list[dict[str, str]]) -> SimpleUploadedFile: """Build an in-memory xlsx with register headers for snapshot import.""" workbook = Workbook() worksheet = workbook.active worksheet.append(["pn_name", "mn_ogrn", "mn_inn", "mn_okpo", "in_kpp"]) for row in rows: worksheet.append( [ row["pn_name"], row["mn_ogrn"], row["mn_inn"], row["mn_okpo"], row.get("in_kpp", ""), ] ) buffer = io.BytesIO() workbook.save(buffer) workbook.close() buffer.seek(0) return SimpleUploadedFile( name, buffer.getvalue(), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) class RegisterImportServiceTest(TestCase): """Tests for register snapshot synchronization.""" def setUp(self): self.registry = Register.objects.create(name="Реестр предприятий ОПК") def test_sync_registry_memberships_creates_snapshot(self): """First upload creates organizations, upload fact and active periods.""" uploaded_file = build_registry_upload( "registry.xlsx", [ { "pn_name": "АО Альфа", "mn_ogrn": "1027700132195", "mn_inn": "7707083893", "mn_okpo": "12345678", "in_kpp": "770701001", }, { "pn_name": "АО Бета", "mn_ogrn": "1027700132196", "mn_inn": "7707083894", "mn_okpo": "12345679", "in_kpp": "770701002", }, ], ) result = RegisterImportService.sync_registry_memberships( registry=self.registry, uploaded_file=uploaded_file, actual_date=date(2026, 3, 1), ) self.assertEqual(result["rows_in_file"], 2) self.assertEqual(result["organizations_created"], 2) self.assertEqual(result["opened_periods"], 2) self.assertEqual(Organization.objects.count(), 2) self.assertEqual(RegisterUpload.objects.count(), 1) self.assertEqual( RegistryMembershipPeriod.objects.filter(ended_at__isnull=True).count(), 2, ) def test_sync_registry_memberships_closes_missing_and_opens_new(self): """Next snapshot closes missing orgs and opens periods for new ones.""" first_upload = build_registry_upload( "registry-1.xlsx", [ { "pn_name": "АО Альфа", "mn_ogrn": "1027700132195", "mn_inn": "7707083893", "mn_okpo": "12345678", "in_kpp": "770701001", }, { "pn_name": "АО Бета", "mn_ogrn": "1027700132196", "mn_inn": "7707083894", "mn_okpo": "12345679", "in_kpp": "770701002", }, ], ) RegisterImportService.sync_registry_memberships( registry=self.registry, uploaded_file=first_upload, actual_date=date(2026, 3, 1), ) second_upload = build_registry_upload( "registry-2.xlsx", [ { "pn_name": "АО Бета обновлённое", "mn_ogrn": "1027700132196", "mn_inn": "7707083894", "mn_okpo": "12345679", "in_kpp": "770701099", }, { "pn_name": "АО Гамма", "mn_ogrn": "1027700132197", "mn_inn": "7707083895", "mn_okpo": "12345680", "in_kpp": "770701003", }, ], ) result = RegisterImportService.sync_registry_memberships( registry=self.registry, uploaded_file=second_upload, actual_date=date(2026, 4, 1), ) self.assertEqual(result["organizations_created"], 1) self.assertEqual(result["organizations_updated"], 1) self.assertEqual(result["opened_periods"], 1) self.assertEqual(result["closed_periods"], 1) alpha = Organization.objects.get(inn="7707083893") beta = Organization.objects.get(inn="7707083894") gamma = Organization.objects.get(inn="7707083895") alpha_period = RegistryMembershipPeriod.objects.get( registry=self.registry, organization=alpha, ) beta_period = RegistryMembershipPeriod.objects.get( registry=self.registry, organization=beta, ended_at__isnull=True, ) gamma_period = RegistryMembershipPeriod.objects.get( registry=self.registry, organization=gamma, ended_at__isnull=True, ) self.assertEqual(alpha_period.ended_at, date(2026, 4, 1)) self.assertEqual(beta_period.started_at, date(2026, 3, 1)) self.assertEqual(gamma_period.started_at, date(2026, 4, 1)) beta.refresh_from_db() self.assertEqual(beta.name, "АО Бета обновлённое") self.assertEqual(beta.kpp, "770701099")