From e09002d3af4d13bf2229283944a2bfb842bcee1b Mon Sep 17 00:00:00 2001 From: Aleksandr Meshchriakov Date: Thu, 28 May 2026 12:11:53 +0200 Subject: [PATCH] feat: import exchange registry memberships --- src/apps/exchange/services.py | 278 +++++++++++++++++++++++++++++++- tests/apps/exchange/test_api.py | 25 +++ 2 files changed, 300 insertions(+), 3 deletions(-) diff --git a/src/apps/exchange/services.py b/src/apps/exchange/services.py index 537ce47..99aa4fb 100644 --- a/src/apps/exchange/services.py +++ b/src/apps/exchange/services.py @@ -8,7 +8,7 @@ import json import struct import zlib from dataclasses import dataclass -from datetime import date +from datetime import date, datetime from decimal import Decimal, InvalidOperation from io import BytesIO from typing import Any @@ -34,10 +34,12 @@ from apps.external_data.models import ( PublicProcurement, ) from apps.organization.models import IndustryCluster, Organization, OrganizationType +from apps.registers.models import Register, RegisterUpload, RegistryMembershipPeriod from cryptography.hazmat.primitives.ciphers.aead import AESGCM from django.conf import settings from django.db import transaction from django.db.models import Q +from django.utils import timezone class ExchangeImportError(ValueError): @@ -97,6 +99,7 @@ class ExchangePackageImportService: } SECTION_KEYS = ( "organizations", + "registry_memberships", "industrial_certificates", "manufacturers", "industrial_products", @@ -152,7 +155,11 @@ class ExchangePackageImportService: try: with transaction.atomic(): - summary = cls._import_payload(decoded.payload) + summary = cls._import_payload( + decoded.payload, + package_hash=decoded.package_hash, + package_name=decoded.archive_name, + ) record = ExchangePackageImport.objects.create( package_id=package_id, source_system=source_system, @@ -383,6 +390,27 @@ class ExchangePackageImportService: "schema_version должен быть целым числом" ) from exc + @classmethod + def _read_actual_date(cls, manifest: dict[str, Any]) -> date: + raw_value = manifest.get("actual_date") or manifest.get("produced_at") + if raw_value in (None, ""): + return timezone.localdate() + if isinstance(raw_value, date): + return raw_value + + raw_text = str(raw_value).strip() + try: + return date.fromisoformat(raw_text) + except ValueError: + pass + + try: + return datetime.fromisoformat(raw_text.replace("Z", "+00:00")).date() + except ValueError as exc: + raise ExchangeImportError( + "actual_date должен быть датой или датой-временем ISO" + ) from exc + @classmethod def _find_duplicate( cls, @@ -453,17 +481,32 @@ class ExchangePackageImportService: ) @classmethod - def _import_payload(cls, payload: dict[str, Any]) -> dict[str, Any]: + def _import_payload( + cls, + payload: dict[str, Any], + *, + package_hash: str, + package_name: str, + ) -> dict[str, Any]: data = payload.get("data") if not isinstance(data, dict): raise ExchangeImportError("Раздел data в пакете поврежден") + manifest = cls._extract_manifest(payload) + actual_date = cls._read_actual_date(manifest) organization_rows = cls._extract_rows(data, "organizations") allowed_organization_inns = cls._collect_package_organization_inns( organization_rows ) organization_summary = cls._upsert_organizations(organization_rows) + registry_membership_summary = cls._sync_registry_memberships( + cls._extract_rows(data, "registry_memberships"), + allowed_organization_inns=allowed_organization_inns, + actual_date=actual_date, + package_hash=package_hash, + package_name=package_name, + ) industrial_certificate_summary = cls._upsert_industrial_certificates( cls._extract_rows(data, "industrial_certificates"), allowed_organization_inns=allowed_organization_inns, @@ -511,6 +554,7 @@ class ExchangePackageImportService: return { "organizations": organization_summary, + "registry_memberships": registry_membership_summary, "industrial_certificates": industrial_certificate_summary, "manufacturers": manufacturer_summary, "industrial_products": industrial_summary, @@ -685,6 +729,234 @@ class ExchangePackageImportService: return update_fields + @classmethod + def _sync_registry_memberships( + cls, + rows: list[dict[str, Any]], + *, + allowed_organization_inns: set[str], + actual_date: date, + package_hash: str, + package_name: str, + ) -> dict[str, int]: + registers_created_count = 0 + uploads_created_count = 0 + opened_count = 0 + updated_count = 0 + closed_count = 0 + skipped_count = 0 + ( + rows_by_registry, + normalized_skipped_count, + ) = cls._normalize_registry_membership_rows( + rows, + allowed_organization_inns=allowed_organization_inns, + ) + skipped_count += normalized_skipped_count + + for registry_name, registry_rows in rows_by_registry.items(): + registry, registry_created = Register.objects.get_or_create( + name=registry_name + ) + if registry_created: + registers_created_count += 1 + + upload, upload_created = cls._get_registry_membership_upload( + registry=registry, + actual_date=actual_date, + file_hash=package_hash, + file_name=package_name, + rows_count=len(registry_rows), + ) + if upload_created: + uploads_created_count += 1 + + sync_result = cls._sync_registry_membership_registry( + registry=registry, + upload=upload, + actual_date=actual_date, + registry_rows=registry_rows, + ) + opened_count += sync_result["opened"] + updated_count += sync_result["updated"] + closed_count += sync_result["closed"] + + return { + "registers_created": registers_created_count, + "uploads_created": uploads_created_count, + "opened": opened_count, + "updated": updated_count, + "closed": closed_count, + "skipped": skipped_count, + } + + @classmethod + def _normalize_registry_membership_rows( + cls, + rows: list[dict[str, Any]], + *, + allowed_organization_inns: set[str], + ) -> tuple[dict[str, dict[str, tuple[Organization, date]]], int]: + rows_by_registry: dict[str, dict[str, tuple[Organization, date]]] = {} + skipped_count = 0 + + for row in rows: + registry_name = cls._clean_string(row.get("registry_name")) + if not registry_name: + skipped_count += 1 + continue + + organization = cls._resolve_organization( + row, + allowed_organization_inns=allowed_organization_inns, + ) + started_at = cls._parse_date_value( + row.get("started_at"), + field_name="started_at", + allow_null=False, + ) + ended_at = cls._parse_date_value( + row.get("ended_at"), + field_name="ended_at", + allow_null=True, + ) + if ended_at is not None: + skipped_count += 1 + continue + + registry_rows = rows_by_registry.setdefault(registry_name, {}) + if organization.inn in registry_rows: + skipped_count += 1 + continue + registry_rows[organization.inn] = (organization, started_at) + + return rows_by_registry, skipped_count + + @staticmethod + def _get_registry_membership_upload( + *, + registry: Register, + actual_date: date, + file_hash: str, + file_name: str, + rows_count: int, + ) -> tuple[RegisterUpload, bool]: + upload, created = RegisterUpload.objects.get_or_create( + registry=registry, + actual_date=actual_date, + file_hash=file_hash, + defaults={ + "file_name": file_name, + "rows_count": rows_count, + }, + ) + if created: + return upload, True + + update_fields: list[str] = [] + if upload.file_name != file_name: + upload.file_name = file_name + update_fields.append("file_name") + if upload.rows_count != rows_count: + upload.rows_count = rows_count + update_fields.append("rows_count") + if update_fields: + upload.save(update_fields=update_fields + ["updated_at"]) + return upload, False + + @classmethod + def _sync_registry_membership_registry( + cls, + *, + registry: Register, + upload: RegisterUpload, + actual_date: date, + registry_rows: dict[str, tuple[Organization, date]], + ) -> dict[str, int]: + desired_by_org_id = { + organization.id: started_at + for organization, started_at in registry_rows.values() + } + active_by_org_id = { + period.organization_id: period + for period in RegistryMembershipPeriod.objects.select_for_update().filter( + registry=registry, + ended_at__isnull=True, + ) + } + closed_count = cls._close_stale_registry_memberships( + active_by_org_id=active_by_org_id, + desired_by_org_id=desired_by_org_id, + actual_date=actual_date, + upload=upload, + ) + opened_count, updated_count = cls._upsert_active_registry_memberships( + registry=registry, + upload=upload, + registry_rows=registry_rows, + active_by_org_id=active_by_org_id, + ) + return { + "opened": opened_count, + "updated": updated_count, + "closed": closed_count, + } + + @staticmethod + def _close_stale_registry_memberships( + *, + active_by_org_id: dict[str, RegistryMembershipPeriod], + desired_by_org_id: dict[str, date], + actual_date: date, + upload: RegisterUpload, + ) -> int: + stale_period_ids = [ + period.id + for organization_id, period in active_by_org_id.items() + if organization_id not in desired_by_org_id + ] + if not stale_period_ids: + return 0 + return RegistryMembershipPeriod.objects.filter(id__in=stale_period_ids).update( + ended_at=actual_date, + ended_by_upload=upload, + ) + + @staticmethod + def _upsert_active_registry_memberships( + *, + registry: Register, + upload: RegisterUpload, + registry_rows: dict[str, tuple[Organization, date]], + active_by_org_id: dict[str, RegistryMembershipPeriod], + ) -> tuple[int, int]: + opened_count = 0 + updated_count = 0 + for organization, started_at in registry_rows.values(): + active_period = active_by_org_id.get(organization.id) + if active_period is None: + RegistryMembershipPeriod.objects.create( + registry=registry, + organization=organization, + started_at=started_at, + started_by_upload=upload, + ) + opened_count += 1 + continue + + update_fields = [] + if active_period.started_at != started_at: + active_period.started_at = started_at + update_fields.append("started_at") + if active_period.started_by_upload_id != upload.id: + active_period.started_by_upload = upload + update_fields.append("started_by_upload") + if update_fields: + active_period.save(update_fields=update_fields + ["updated_at"]) + updated_count += 1 + + return opened_count, updated_count + @classmethod def _upsert_industrial_certificates( cls, diff --git a/tests/apps/exchange/test_api.py b/tests/apps/exchange/test_api.py index 8199b5f..d5ee1bb 100644 --- a/tests/apps/exchange/test_api.py +++ b/tests/apps/exchange/test_api.py @@ -29,6 +29,7 @@ from apps.external_data.models import ( PublicProcurement, ) from apps.organization.models import Organization +from apps.registers.models import RegisterUpload, RegistryMembershipPeriod from apps.user.models import User from cryptography.hazmat.primitives.ciphers.aead import AESGCM from django.conf import settings @@ -168,6 +169,20 @@ def build_exchange_payload() -> dict[str, list[dict[str, object]]]: "registry_number": "prod-001", } ], + "registry_memberships": [ + { + "organization_inn": "7707083893", + "registry_name": "Реестр госкорпорации Росатом", + "started_at": "2026-01-01", + "ended_at": None, + }, + { + "organization_inn": "7707083894", + "registry_name": "Реестр госкорпорации Роскосмос", + "started_at": "2026-02-01", + "ended_at": None, + }, + ], "industrial_certificates": [ { "organization_inn": "7707083893", @@ -326,7 +341,13 @@ class ExchangePackageApiTest(APITestCase): self.assertFalse(response.data["result"]["duplicate"]) self.assertEqual(response.data["result"]["organizations"]["created"], 1) self.assertEqual(response.data["result"]["organizations"]["updated"], 1) + self.assertEqual( + response.data["result"]["registry_memberships"]["opened"], + 2, + ) self.assertEqual(Organization.objects.count(), 2) + self.assertEqual(RegisterUpload.objects.count(), 2) + self.assertEqual(RegistryMembershipPeriod.objects.count(), 2) self.assertEqual(IndustrialCertificate.objects.count(), 1) self.assertEqual(ManufacturerRegistryEntry.objects.count(), 1) self.assertEqual(IndustrialProduct.objects.count(), 1) @@ -366,6 +387,10 @@ class ExchangePackageApiTest(APITestCase): self.assertEqual(organization.executors_count, 175) self.assertTrue(organization.tax_reports_available) self.assertTrue(organization.in_275_fz_registry) + self.assertEqual( + organization.get_active_registry_names(), + ["Реестр госкорпорации Росатом"], + ) package_import = ExchangePackageImport.objects.get() self.assertEqual(package_import.delivery_channel, ExchangeDeliveryChannel.API)