feat: import exchange registry memberships
All checks were successful
CI/CD Pipeline / Code Quality Checks (push) Successful in 2m47s
CI/CD Pipeline / Run Tests (push) Successful in 2m49s
CI/CD Pipeline / Build Docker Images (push) Successful in 41s
CI/CD Pipeline / Push to Gitea Registry (push) Successful in 1s
CI/CD Pipeline / Deploy to Server (push) Successful in 1s

This commit is contained in:
2026-05-28 12:11:53 +02:00
parent d1b0cd7945
commit e09002d3af
2 changed files with 300 additions and 3 deletions

View File

@@ -8,7 +8,7 @@ import json
import struct
import zlib
from dataclasses import dataclass
from datetime import date
from datetime import date, datetime
from decimal import Decimal, InvalidOperation
from io import BytesIO
from typing import Any
@@ -34,10 +34,12 @@ from apps.external_data.models import (
PublicProcurement,
)
from apps.organization.models import IndustryCluster, Organization, OrganizationType
from apps.registers.models import Register, RegisterUpload, RegistryMembershipPeriod
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from django.conf import settings
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
class ExchangeImportError(ValueError):
@@ -97,6 +99,7 @@ class ExchangePackageImportService:
}
SECTION_KEYS = (
"organizations",
"registry_memberships",
"industrial_certificates",
"manufacturers",
"industrial_products",
@@ -152,7 +155,11 @@ class ExchangePackageImportService:
try:
with transaction.atomic():
summary = cls._import_payload(decoded.payload)
summary = cls._import_payload(
decoded.payload,
package_hash=decoded.package_hash,
package_name=decoded.archive_name,
)
record = ExchangePackageImport.objects.create(
package_id=package_id,
source_system=source_system,
@@ -383,6 +390,27 @@ class ExchangePackageImportService:
"schema_version должен быть целым числом"
) from exc
@classmethod
def _read_actual_date(cls, manifest: dict[str, Any]) -> date:
raw_value = manifest.get("actual_date") or manifest.get("produced_at")
if raw_value in (None, ""):
return timezone.localdate()
if isinstance(raw_value, date):
return raw_value
raw_text = str(raw_value).strip()
try:
return date.fromisoformat(raw_text)
except ValueError:
pass
try:
return datetime.fromisoformat(raw_text.replace("Z", "+00:00")).date()
except ValueError as exc:
raise ExchangeImportError(
"actual_date должен быть датой или датой-временем ISO"
) from exc
@classmethod
def _find_duplicate(
cls,
@@ -453,17 +481,32 @@ class ExchangePackageImportService:
)
@classmethod
def _import_payload(cls, payload: dict[str, Any]) -> dict[str, Any]:
def _import_payload(
cls,
payload: dict[str, Any],
*,
package_hash: str,
package_name: str,
) -> dict[str, Any]:
data = payload.get("data")
if not isinstance(data, dict):
raise ExchangeImportError("Раздел data в пакете поврежден")
manifest = cls._extract_manifest(payload)
actual_date = cls._read_actual_date(manifest)
organization_rows = cls._extract_rows(data, "organizations")
allowed_organization_inns = cls._collect_package_organization_inns(
organization_rows
)
organization_summary = cls._upsert_organizations(organization_rows)
registry_membership_summary = cls._sync_registry_memberships(
cls._extract_rows(data, "registry_memberships"),
allowed_organization_inns=allowed_organization_inns,
actual_date=actual_date,
package_hash=package_hash,
package_name=package_name,
)
industrial_certificate_summary = cls._upsert_industrial_certificates(
cls._extract_rows(data, "industrial_certificates"),
allowed_organization_inns=allowed_organization_inns,
@@ -511,6 +554,7 @@ class ExchangePackageImportService:
return {
"organizations": organization_summary,
"registry_memberships": registry_membership_summary,
"industrial_certificates": industrial_certificate_summary,
"manufacturers": manufacturer_summary,
"industrial_products": industrial_summary,
@@ -685,6 +729,234 @@ class ExchangePackageImportService:
return update_fields
@classmethod
def _sync_registry_memberships(
cls,
rows: list[dict[str, Any]],
*,
allowed_organization_inns: set[str],
actual_date: date,
package_hash: str,
package_name: str,
) -> dict[str, int]:
registers_created_count = 0
uploads_created_count = 0
opened_count = 0
updated_count = 0
closed_count = 0
skipped_count = 0
(
rows_by_registry,
normalized_skipped_count,
) = cls._normalize_registry_membership_rows(
rows,
allowed_organization_inns=allowed_organization_inns,
)
skipped_count += normalized_skipped_count
for registry_name, registry_rows in rows_by_registry.items():
registry, registry_created = Register.objects.get_or_create(
name=registry_name
)
if registry_created:
registers_created_count += 1
upload, upload_created = cls._get_registry_membership_upload(
registry=registry,
actual_date=actual_date,
file_hash=package_hash,
file_name=package_name,
rows_count=len(registry_rows),
)
if upload_created:
uploads_created_count += 1
sync_result = cls._sync_registry_membership_registry(
registry=registry,
upload=upload,
actual_date=actual_date,
registry_rows=registry_rows,
)
opened_count += sync_result["opened"]
updated_count += sync_result["updated"]
closed_count += sync_result["closed"]
return {
"registers_created": registers_created_count,
"uploads_created": uploads_created_count,
"opened": opened_count,
"updated": updated_count,
"closed": closed_count,
"skipped": skipped_count,
}
@classmethod
def _normalize_registry_membership_rows(
cls,
rows: list[dict[str, Any]],
*,
allowed_organization_inns: set[str],
) -> tuple[dict[str, dict[str, tuple[Organization, date]]], int]:
rows_by_registry: dict[str, dict[str, tuple[Organization, date]]] = {}
skipped_count = 0
for row in rows:
registry_name = cls._clean_string(row.get("registry_name"))
if not registry_name:
skipped_count += 1
continue
organization = cls._resolve_organization(
row,
allowed_organization_inns=allowed_organization_inns,
)
started_at = cls._parse_date_value(
row.get("started_at"),
field_name="started_at",
allow_null=False,
)
ended_at = cls._parse_date_value(
row.get("ended_at"),
field_name="ended_at",
allow_null=True,
)
if ended_at is not None:
skipped_count += 1
continue
registry_rows = rows_by_registry.setdefault(registry_name, {})
if organization.inn in registry_rows:
skipped_count += 1
continue
registry_rows[organization.inn] = (organization, started_at)
return rows_by_registry, skipped_count
@staticmethod
def _get_registry_membership_upload(
*,
registry: Register,
actual_date: date,
file_hash: str,
file_name: str,
rows_count: int,
) -> tuple[RegisterUpload, bool]:
upload, created = RegisterUpload.objects.get_or_create(
registry=registry,
actual_date=actual_date,
file_hash=file_hash,
defaults={
"file_name": file_name,
"rows_count": rows_count,
},
)
if created:
return upload, True
update_fields: list[str] = []
if upload.file_name != file_name:
upload.file_name = file_name
update_fields.append("file_name")
if upload.rows_count != rows_count:
upload.rows_count = rows_count
update_fields.append("rows_count")
if update_fields:
upload.save(update_fields=update_fields + ["updated_at"])
return upload, False
@classmethod
def _sync_registry_membership_registry(
cls,
*,
registry: Register,
upload: RegisterUpload,
actual_date: date,
registry_rows: dict[str, tuple[Organization, date]],
) -> dict[str, int]:
desired_by_org_id = {
organization.id: started_at
for organization, started_at in registry_rows.values()
}
active_by_org_id = {
period.organization_id: period
for period in RegistryMembershipPeriod.objects.select_for_update().filter(
registry=registry,
ended_at__isnull=True,
)
}
closed_count = cls._close_stale_registry_memberships(
active_by_org_id=active_by_org_id,
desired_by_org_id=desired_by_org_id,
actual_date=actual_date,
upload=upload,
)
opened_count, updated_count = cls._upsert_active_registry_memberships(
registry=registry,
upload=upload,
registry_rows=registry_rows,
active_by_org_id=active_by_org_id,
)
return {
"opened": opened_count,
"updated": updated_count,
"closed": closed_count,
}
@staticmethod
def _close_stale_registry_memberships(
*,
active_by_org_id: dict[str, RegistryMembershipPeriod],
desired_by_org_id: dict[str, date],
actual_date: date,
upload: RegisterUpload,
) -> int:
stale_period_ids = [
period.id
for organization_id, period in active_by_org_id.items()
if organization_id not in desired_by_org_id
]
if not stale_period_ids:
return 0
return RegistryMembershipPeriod.objects.filter(id__in=stale_period_ids).update(
ended_at=actual_date,
ended_by_upload=upload,
)
@staticmethod
def _upsert_active_registry_memberships(
*,
registry: Register,
upload: RegisterUpload,
registry_rows: dict[str, tuple[Organization, date]],
active_by_org_id: dict[str, RegistryMembershipPeriod],
) -> tuple[int, int]:
opened_count = 0
updated_count = 0
for organization, started_at in registry_rows.values():
active_period = active_by_org_id.get(organization.id)
if active_period is None:
RegistryMembershipPeriod.objects.create(
registry=registry,
organization=organization,
started_at=started_at,
started_by_upload=upload,
)
opened_count += 1
continue
update_fields = []
if active_period.started_at != started_at:
active_period.started_at = started_at
update_fields.append("started_at")
if active_period.started_by_upload_id != upload.id:
active_period.started_by_upload = upload
update_fields.append("started_by_upload")
if update_fields:
active_period.save(update_fields=update_fields + ["updated_at"])
updated_count += 1
return opened_count, updated_count
@classmethod
def _upsert_industrial_certificates(
cls,

View File

@@ -29,6 +29,7 @@ from apps.external_data.models import (
PublicProcurement,
)
from apps.organization.models import Organization
from apps.registers.models import RegisterUpload, RegistryMembershipPeriod
from apps.user.models import User
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from django.conf import settings
@@ -168,6 +169,20 @@ def build_exchange_payload() -> dict[str, list[dict[str, object]]]:
"registry_number": "prod-001",
}
],
"registry_memberships": [
{
"organization_inn": "7707083893",
"registry_name": "Реестр госкорпорации Росатом",
"started_at": "2026-01-01",
"ended_at": None,
},
{
"organization_inn": "7707083894",
"registry_name": "Реестр госкорпорации Роскосмос",
"started_at": "2026-02-01",
"ended_at": None,
},
],
"industrial_certificates": [
{
"organization_inn": "7707083893",
@@ -326,7 +341,13 @@ class ExchangePackageApiTest(APITestCase):
self.assertFalse(response.data["result"]["duplicate"])
self.assertEqual(response.data["result"]["organizations"]["created"], 1)
self.assertEqual(response.data["result"]["organizations"]["updated"], 1)
self.assertEqual(
response.data["result"]["registry_memberships"]["opened"],
2,
)
self.assertEqual(Organization.objects.count(), 2)
self.assertEqual(RegisterUpload.objects.count(), 2)
self.assertEqual(RegistryMembershipPeriod.objects.count(), 2)
self.assertEqual(IndustrialCertificate.objects.count(), 1)
self.assertEqual(ManufacturerRegistryEntry.objects.count(), 1)
self.assertEqual(IndustrialProduct.objects.count(), 1)
@@ -366,6 +387,10 @@ class ExchangePackageApiTest(APITestCase):
self.assertEqual(organization.executors_count, 175)
self.assertTrue(organization.tax_reports_available)
self.assertTrue(organization.in_275_fz_registry)
self.assertEqual(
organization.get_active_registry_names(),
["Реестр госкорпорации Росатом"],
)
package_import = ExchangePackageImport.objects.get()
self.assertEqual(package_import.delivery_channel, ExchangeDeliveryChannel.API)