Add v2 registry uploads and source CSV exports
Some checks failed
CI/CD Pipeline / Quality Gate (push) Failing after 9s
CI/CD Pipeline / Build and Push Images (push) Has been skipped
CI/CD Pipeline / Deploy Dev in Dokploy (push) Has been skipped
CI/CD Pipeline / Internal Notify (push) Successful in 0s

This commit is contained in:
2026-05-07 14:39:20 +02:00
parent 507ae2063a
commit 82ba7b78cd
13 changed files with 637 additions and 8 deletions

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import io
from datetime import date
from unittest.mock import patch
from apps.registers.models import Organization, RegistryMembershipPeriod
from django.core.files.uploadedfile import SimpleUploadedFile
@@ -94,6 +95,35 @@ class RegistersViewsTest(APITestCase):
self.client.force_authenticate(self.user)
return response
def _post_v2_slug_upload(
self,
*,
slug: str,
rows: list[dict],
actual_date_value: date,
with_kpp: bool = True,
file_name: str = "registry.xlsx",
):
self.client.force_authenticate(self.admin)
content = _build_register_excel_bytes(rows, with_kpp=with_kpp)
upload = SimpleUploadedFile(
file_name,
content,
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
response = self.client.post(
reverse(f"api_v2:registers:register-upload-{slug}"),
{
"actual_date": actual_date_value.isoformat(),
"file": upload,
},
format="multipart",
)
self.client.force_authenticate(self.user)
return response
def test_registries_list_and_retrieve(self):
registry = RegisterFactory(name="Росатом")
RegisterUploadFactory(registry=registry)
@@ -130,6 +160,66 @@ class RegistersViewsTest(APITestCase):
self.assertIn("Реестр госкорпорации Росатом ГОЗ", names)
self.assertIn("Реестр госкорпорации Росатом ОПК", names)
def test_v2_registry_slug_upload_uses_fixed_registry_and_refreshes_snapshots(self):
rows = [
{
"pn_name": 'АО "Росатом ГОЗ"',
"mn_ogrn": "1027600980990",
"mn_inn": "7601000086",
"in_kpp": "760401001",
"mn_okpo": "07506197",
}
]
with patch("registers.views._start_snapshot_refresh_task") as refresh_task:
response = self._post_v2_slug_upload(
slug="rosatom-goz",
rows=rows,
actual_date_value=date(2026, 5, 7),
file_name="rosatom_goz.xlsx",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertTrue(response.data["success"])
self.assertEqual(
response.data["registry_name"],
"Реестр госкорпорации Росатом ГОЗ",
)
refresh_task.assert_called_once_with()
membership = RegistryMembershipPeriod.objects.get(ended_at__isnull=True)
self.assertEqual(
membership.registry.name,
"Реестр госкорпорации Росатом ГОЗ",
)
def test_v2_registry_slug_upload_url_does_not_duplicate_registers_segment(self):
self.assertEqual(
reverse("api_v2:registers:register-upload-rosatom-goz"),
"/api/v2/registers/rosatom-goz/upload/",
)
def test_v2_registry_slug_upload_does_not_refresh_snapshots_after_import_error(self):
rows = [
{
"pn_name": 'АО "Невалидный ОКПО"',
"mn_ogrn": "1027600980990",
"mn_inn": "7601000086",
"in_kpp": "760401001",
"mn_okpo": "07A06197",
}
]
with patch("registers.views._start_snapshot_refresh_task") as refresh_task:
response = self._post_v2_slug_upload(
slug="rosatom-goz",
rows=rows,
actual_date_value=date(2026, 5, 7),
file_name="invalid_rosatom_goz.xlsx",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
refresh_task.assert_not_called()
def test_organizations_list_and_retrieve(self):
organization = OrganizationFactory()