"""Tests for register snapshot imports.""" from __future__ import annotations import io from datetime import date from apps.organization.models import Organization from apps.registers.models import Register, RegisterUpload, RegistryMembershipPeriod from apps.registers.services import RegisterImportService from django.core.files.uploadedfile import SimpleUploadedFile from django.test import TestCase from openpyxl import Workbook def build_registry_upload(name: str, rows: list[dict[str, str]]) -> SimpleUploadedFile: """Build an in-memory xlsx with register headers for snapshot import.""" workbook = Workbook() worksheet = workbook.active worksheet.append(["pn_name", "mn_ogrn", "mn_inn", "mn_okpo", "in_kpp"]) for row in rows: worksheet.append( [ row["pn_name"], row["mn_ogrn"], row["mn_inn"], row["mn_okpo"], row.get("in_kpp", ""), ] ) buffer = io.BytesIO() workbook.save(buffer) workbook.close() buffer.seek(0) return SimpleUploadedFile( name, buffer.getvalue(), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) class RegisterImportServiceTest(TestCase): """Tests for register snapshot synchronization.""" def setUp(self): self.registry = Register.objects.create(name="Реестр предприятий ОПК") def test_sync_registry_memberships_creates_snapshot(self): """First upload links only organizations already loaded from Mostovik.""" Organization.objects.create( name="АО Альфа", inn="7707083893", ogrn="1027700132195", okpo="12345678", kpp="770701001", ) Organization.objects.create( name="АО Бета", inn="7707083894", ogrn="1027700132196", okpo="12345679", kpp="770701002", ) uploaded_file = build_registry_upload( "registry.xlsx", [ { "pn_name": "АО Альфа", "mn_ogrn": "1027700132195", "mn_inn": "7707083893", "mn_okpo": "12345678", "in_kpp": "770701001", }, { "pn_name": "АО Бета", "mn_ogrn": "1027700132196", "mn_inn": "7707083894", "mn_okpo": "12345679", "in_kpp": "770701002", }, ], ) result = RegisterImportService.sync_registry_memberships( registry=self.registry, uploaded_file=uploaded_file, actual_date=date(2026, 3, 1), ) self.assertEqual(result["rows_in_file"], 2) self.assertEqual(result["organizations_created"], 0) self.assertEqual(result["organizations_updated"], 0) self.assertEqual(result["organizations_skipped"], 0) self.assertEqual(result["opened_periods"], 2) self.assertEqual(Organization.objects.count(), 2) self.assertEqual(RegisterUpload.objects.count(), 1) self.assertEqual( RegistryMembershipPeriod.objects.filter(ended_at__isnull=True).count(), 2, ) def test_sync_registry_memberships_closes_missing_and_skips_unknown_orgs(self): """Next snapshot closes missing orgs but does not create unknown ones.""" Organization.objects.create( name="АО Альфа", inn="7707083893", ogrn="1027700132195", okpo="12345678", kpp="770701001", ) Organization.objects.create( name="АО Бета", inn="7707083894", ogrn="1027700132196", okpo="12345679", kpp="770701002", ) first_upload = build_registry_upload( "registry-1.xlsx", [ { "pn_name": "АО Альфа", "mn_ogrn": "1027700132195", "mn_inn": "7707083893", "mn_okpo": "12345678", "in_kpp": "770701001", }, { "pn_name": "АО Бета", "mn_ogrn": "1027700132196", "mn_inn": "7707083894", "mn_okpo": "12345679", "in_kpp": "770701002", }, ], ) RegisterImportService.sync_registry_memberships( registry=self.registry, uploaded_file=first_upload, actual_date=date(2026, 3, 1), ) second_upload = build_registry_upload( "registry-2.xlsx", [ { "pn_name": "АО Бета обновлённое", "mn_ogrn": "1027700132196", "mn_inn": "7707083894", "mn_okpo": "12345679", "in_kpp": "770701099", }, { "pn_name": "АО Гамма", "mn_ogrn": "1027700132197", "mn_inn": "7707083895", "mn_okpo": "12345680", "in_kpp": "770701003", }, ], ) result = RegisterImportService.sync_registry_memberships( registry=self.registry, uploaded_file=second_upload, actual_date=date(2026, 4, 1), ) self.assertEqual(result["organizations_created"], 0) self.assertEqual(result["organizations_updated"], 0) self.assertEqual(result["organizations_skipped"], 1) self.assertEqual(result["opened_periods"], 0) self.assertEqual(result["closed_periods"], 1) alpha = Organization.objects.get(inn="7707083893") beta = Organization.objects.get(inn="7707083894") alpha_period = RegistryMembershipPeriod.objects.get( registry=self.registry, organization=alpha, ) beta_period = RegistryMembershipPeriod.objects.get( registry=self.registry, organization=beta, ended_at__isnull=True, ) self.assertEqual(alpha_period.ended_at, date(2026, 4, 1)) self.assertEqual(beta_period.started_at, date(2026, 3, 1)) self.assertFalse(Organization.objects.filter(inn="7707083895").exists()) beta.refresh_from_db() self.assertEqual(beta.name, "АО Бета") self.assertEqual(beta.kpp, "770701002")