Add organizations v2 API and registry enrichment
All checks were successful
CI/CD Pipeline / Quality Gate (push) Successful in 26s
CI/CD Pipeline / Build and Push Images (push) Successful in 6s
CI/CD Pipeline / Internal Notify (push) Successful in 0s
CI/CD Pipeline / Deploy Dev in Dokploy (push) Successful in 1s

This commit is contained in:
2026-05-06 19:04:46 +02:00
parent f54aa4cb0b
commit 0f17ff6773
62 changed files with 10311 additions and 430 deletions

View File

@@ -0,0 +1,128 @@
"""Tests for organizations services."""
from apps.parsers.models import FinancialReport, ParserLoadLog
from django.test import SimpleTestCase, TestCase
from organizations.models import Organization, OrganizationDataSnapshot
from organizations.services import (
OrganizationDataSnapshotRefreshService,
OrganizationPopulationService,
normalize_organization_name,
)
from tests.apps.parsers.factories import IndustrialCertificateRecordFactory
class NormalizeOrganizationNameTest(SimpleTestCase):
"""Checks canonical name matching for common Russian legal forms."""
def test_normalizes_ooo_spelling_variants(self):
expected = "ромашка"
self.assertEqual(normalize_organization_name('OOO "Ромашка"'), expected)
self.assertEqual(normalize_organization_name("ООО «Ромашка»"), expected)
self.assertEqual(
normalize_organization_name(
'Общество с ограниченной ответственностью "Ромашка"'
),
expected,
)
self.assertEqual(
normalize_organization_name(
"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ РОМАШКА"
),
expected,
)
def test_normalizes_name_spacing_and_punctuation(self):
self.assertEqual(
normalize_organization_name(' АО "Научно-производственный центр" '),
"научно производственный центр",
)
def test_candidate_ignores_ogrip_like_value_for_legal_entity_inn(self):
candidate = OrganizationPopulationService._candidate(
name="ООО Ромашка",
inn="7720525156",
ogrn="105774446645395",
)
self.assertEqual(candidate.inn, "7720525156")
self.assertEqual(candidate.ogrn, "")
self.assertEqual(candidate.ogrip, "")
class OrganizationDataSnapshotRefreshServiceTest(TestCase):
"""Checks incremental snapshot refresh by parser batch."""
def test_refresh_for_parser_batch_updates_only_affected_organizations(self):
affected_organization = Organization.objects.create(
name='ООО "Затронутая"',
inn="7800000301",
ogrn="1027700133301",
)
unaffected_organization = Organization.objects.create(
name='ООО "Не затронутая"',
inn="7800000302",
ogrn="1027700133302",
)
IndustrialCertificateRecordFactory(
inn=affected_organization.inn,
ogrn=affected_organization.ogrn,
load_batch=77,
certificate_number="SNAPSHOT-BATCH-CERT",
)
IndustrialCertificateRecordFactory(
inn=unaffected_organization.inn,
ogrn=unaffected_organization.ogrn,
load_batch=78,
)
result = OrganizationDataSnapshotRefreshService.refresh_for_parser_batch(
source=ParserLoadLog.Source.INDUSTRIAL,
batch_id=77,
)
self.assertEqual(result.processed, 1)
self.assertTrue(
OrganizationDataSnapshot.objects.filter(
organization=affected_organization
).exists()
)
self.assertFalse(
OrganizationDataSnapshot.objects.filter(
organization=unaffected_organization
).exists()
)
snapshot = affected_organization.data_snapshot
self.assertEqual(
snapshot.data["industrial"][0]["certificate_number"],
"SNAPSHOT-BATCH-CERT",
)
def test_refresh_for_fns_batch_matches_by_ogrn(self):
organization = Organization.objects.create(
name='ООО "ФНС"',
inn="7800000303",
ogrn="1027700133303",
)
FinancialReport.objects.create(
external_id="snapshot-fns",
ogrn=organization.ogrn,
file_name="snapshot_fns.xlsx",
file_hash="d" * 64,
load_batch=88,
status=FinancialReport.Status.SUCCESS,
source=FinancialReport.SourceType.API,
)
result = OrganizationDataSnapshotRefreshService.refresh_for_parser_batch(
source=ParserLoadLog.Source.FNS_REPORTS,
batch_id=88,
)
self.assertEqual(result.processed, 1)
self.assertEqual(
organization.data_snapshot.data["fns_reports"][0]["external_id"],
"snapshot-fns",
)