Add initial implementations for forms and organization apps with serializers, factories, and admin configurations
Some checks failed
CI/CD Pipeline / Run Tests (push) Failing after 45s
CI/CD Pipeline / Code Quality Checks (push) Failing after 48s
CI/CD Pipeline / Build Docker Images (push) Has been skipped
CI/CD Pipeline / Push to Gitea Registry (push) Has been skipped
CI/CD Pipeline / Deploy to Server (push) Has been skipped
Some checks failed
CI/CD Pipeline / Run Tests (push) Failing after 45s
CI/CD Pipeline / Code Quality Checks (push) Failing after 48s
CI/CD Pipeline / Build Docker Images (push) Has been skipped
CI/CD Pipeline / Push to Gitea Registry (push) Has been skipped
CI/CD Pipeline / Deploy to Server (push) Has been skipped
This commit is contained in:
172
tests/apps/registers/test_services.py
Normal file
172
tests/apps/registers/test_services.py
Normal file
@@ -0,0 +1,172 @@
|
||||
"""Tests for register snapshot imports."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
from datetime import date
|
||||
|
||||
from apps.organization.models import Organization
|
||||
from apps.registers.models import Register, RegisterUpload, RegistryMembershipPeriod
|
||||
from apps.registers.services import RegisterImportService
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.test import TestCase
|
||||
from openpyxl import Workbook
|
||||
|
||||
|
||||
def build_registry_upload(name: str, rows: list[dict[str, str]]) -> SimpleUploadedFile:
|
||||
"""Build an in-memory xlsx with register headers for snapshot import."""
|
||||
workbook = Workbook()
|
||||
worksheet = workbook.active
|
||||
worksheet.append(["pn_name", "mn_ogrn", "mn_inn", "mn_okpo", "in_kpp"])
|
||||
|
||||
for row in rows:
|
||||
worksheet.append(
|
||||
[
|
||||
row["pn_name"],
|
||||
row["mn_ogrn"],
|
||||
row["mn_inn"],
|
||||
row["mn_okpo"],
|
||||
row.get("in_kpp", ""),
|
||||
]
|
||||
)
|
||||
|
||||
buffer = io.BytesIO()
|
||||
workbook.save(buffer)
|
||||
workbook.close()
|
||||
buffer.seek(0)
|
||||
|
||||
return SimpleUploadedFile(
|
||||
name,
|
||||
buffer.getvalue(),
|
||||
content_type=(
|
||||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class RegisterImportServiceTest(TestCase):
|
||||
"""Tests for register snapshot synchronization."""
|
||||
|
||||
def setUp(self):
|
||||
self.registry = Register.objects.create(name="Реестр предприятий ОПК")
|
||||
|
||||
def test_sync_registry_memberships_creates_snapshot(self):
|
||||
"""First upload creates organizations, upload fact and active periods."""
|
||||
uploaded_file = build_registry_upload(
|
||||
"registry.xlsx",
|
||||
[
|
||||
{
|
||||
"pn_name": "АО Альфа",
|
||||
"mn_ogrn": "1027700132195",
|
||||
"mn_inn": "7707083893",
|
||||
"mn_okpo": "12345678",
|
||||
"in_kpp": "770701001",
|
||||
},
|
||||
{
|
||||
"pn_name": "АО Бета",
|
||||
"mn_ogrn": "1027700132196",
|
||||
"mn_inn": "7707083894",
|
||||
"mn_okpo": "12345679",
|
||||
"in_kpp": "770701002",
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
result = RegisterImportService.sync_registry_memberships(
|
||||
registry=self.registry,
|
||||
uploaded_file=uploaded_file,
|
||||
actual_date=date(2026, 3, 1),
|
||||
)
|
||||
|
||||
self.assertEqual(result["rows_in_file"], 2)
|
||||
self.assertEqual(result["organizations_created"], 2)
|
||||
self.assertEqual(result["opened_periods"], 2)
|
||||
self.assertEqual(Organization.objects.count(), 2)
|
||||
self.assertEqual(RegisterUpload.objects.count(), 1)
|
||||
self.assertEqual(
|
||||
RegistryMembershipPeriod.objects.filter(ended_at__isnull=True).count(),
|
||||
2,
|
||||
)
|
||||
|
||||
def test_sync_registry_memberships_closes_missing_and_opens_new(self):
|
||||
"""Next snapshot closes missing orgs and opens periods for new ones."""
|
||||
first_upload = build_registry_upload(
|
||||
"registry-1.xlsx",
|
||||
[
|
||||
{
|
||||
"pn_name": "АО Альфа",
|
||||
"mn_ogrn": "1027700132195",
|
||||
"mn_inn": "7707083893",
|
||||
"mn_okpo": "12345678",
|
||||
"in_kpp": "770701001",
|
||||
},
|
||||
{
|
||||
"pn_name": "АО Бета",
|
||||
"mn_ogrn": "1027700132196",
|
||||
"mn_inn": "7707083894",
|
||||
"mn_okpo": "12345679",
|
||||
"in_kpp": "770701002",
|
||||
},
|
||||
],
|
||||
)
|
||||
RegisterImportService.sync_registry_memberships(
|
||||
registry=self.registry,
|
||||
uploaded_file=first_upload,
|
||||
actual_date=date(2026, 3, 1),
|
||||
)
|
||||
|
||||
second_upload = build_registry_upload(
|
||||
"registry-2.xlsx",
|
||||
[
|
||||
{
|
||||
"pn_name": "АО Бета обновлённое",
|
||||
"mn_ogrn": "1027700132196",
|
||||
"mn_inn": "7707083894",
|
||||
"mn_okpo": "12345679",
|
||||
"in_kpp": "770701099",
|
||||
},
|
||||
{
|
||||
"pn_name": "АО Гамма",
|
||||
"mn_ogrn": "1027700132197",
|
||||
"mn_inn": "7707083895",
|
||||
"mn_okpo": "12345680",
|
||||
"in_kpp": "770701003",
|
||||
},
|
||||
],
|
||||
)
|
||||
result = RegisterImportService.sync_registry_memberships(
|
||||
registry=self.registry,
|
||||
uploaded_file=second_upload,
|
||||
actual_date=date(2026, 4, 1),
|
||||
)
|
||||
|
||||
self.assertEqual(result["organizations_created"], 1)
|
||||
self.assertEqual(result["organizations_updated"], 1)
|
||||
self.assertEqual(result["opened_periods"], 1)
|
||||
self.assertEqual(result["closed_periods"], 1)
|
||||
|
||||
alpha = Organization.objects.get(inn="7707083893")
|
||||
beta = Organization.objects.get(inn="7707083894")
|
||||
gamma = Organization.objects.get(inn="7707083895")
|
||||
|
||||
alpha_period = RegistryMembershipPeriod.objects.get(
|
||||
registry=self.registry,
|
||||
organization=alpha,
|
||||
)
|
||||
beta_period = RegistryMembershipPeriod.objects.get(
|
||||
registry=self.registry,
|
||||
organization=beta,
|
||||
ended_at__isnull=True,
|
||||
)
|
||||
gamma_period = RegistryMembershipPeriod.objects.get(
|
||||
registry=self.registry,
|
||||
organization=gamma,
|
||||
ended_at__isnull=True,
|
||||
)
|
||||
|
||||
self.assertEqual(alpha_period.ended_at, date(2026, 4, 1))
|
||||
self.assertEqual(beta_period.started_at, date(2026, 3, 1))
|
||||
self.assertEqual(gamma_period.started_at, date(2026, 4, 1))
|
||||
beta.refresh_from_db()
|
||||
self.assertEqual(beta.name, "АО Бета обновлённое")
|
||||
self.assertEqual(beta.kpp, "770701099")
|
||||
Reference in New Issue
Block a user