Some checks failed
CI/CD Pipeline / Manual Action Help (push) Has been skipped
CI/CD Pipeline / Build Golden Images (push) Has been skipped
CI/CD Pipeline / Quality Gate (push) Failing after 1s
CI/CD Pipeline / Start Dev Containers in Dokploy (push) Has been skipped
CI/CD Pipeline / Drop and Recreate Dev Database (push) Has been skipped
CI/CD Pipeline / Build and Push Images (push) Has been skipped
CI/CD Pipeline / Deploy Dev in Dokploy (push) Has been skipped
CI/CD Pipeline / Internal Notify (push) Successful in 1s
1213 lines
45 KiB
Python
1213 lines
45 KiB
Python
"""Tests for parsers services."""
|
||
|
||
from datetime import timedelta
|
||
from unittest.mock import patch
|
||
from urllib.parse import urlparse
|
||
|
||
from apps.core.models import BackgroundJob, JobStatus
|
||
from apps.parsers.clients.minpromtorg.industrial import IndustrialProductionClient
|
||
from apps.parsers.clients.minpromtorg.schemas import (
|
||
IndustrialCertificate,
|
||
IndustrialProduct,
|
||
Manufacturer,
|
||
)
|
||
from apps.parsers.clients.proverki.schemas import Inspection
|
||
from apps.parsers.clients.zakupki.schemas import Procurement
|
||
from apps.parsers.models import (
|
||
IndustrialCertificateRecord,
|
||
IndustrialProductRecord,
|
||
InspectionRecord,
|
||
ManufacturerRecord,
|
||
ParserLoadLog,
|
||
ProcurementRecord,
|
||
Proxy,
|
||
)
|
||
from apps.parsers.services import (
|
||
IndustrialCertificateService,
|
||
IndustrialProductService,
|
||
InspectionService,
|
||
ManufacturerService,
|
||
ParserLoadLogService,
|
||
ProcurementService,
|
||
ProxyService,
|
||
ProxyToolsSyncService,
|
||
)
|
||
from apps.registers.models import Organization
|
||
from django.test import TestCase, override_settings, tag
|
||
from django.utils import timezone
|
||
|
||
from tests.utils import TestHTTPServer
|
||
from tests.utils.fixtures import build_minpromtorg_certificates_excel, fake
|
||
|
||
from .factories import (
|
||
IndustrialCertificateRecordFactory,
|
||
IndustrialProductRecordFactory,
|
||
InspectionRecordFactory,
|
||
ManufacturerRecordFactory,
|
||
ParserLoadLogFactory,
|
||
ProxyFactory,
|
||
)
|
||
|
||
|
||
def _digits(length: int) -> str:
|
||
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
|
||
|
||
|
||
def _proxy_address() -> str:
|
||
return f"http://{fake.ipv4()}:{fake.port_number()}"
|
||
|
||
|
||
def _create_registry_organization(*, inn: str, ogrn: str) -> Organization:
|
||
return Organization.objects.create(
|
||
pn_name=fake.company(),
|
||
mn_ogrn=int(ogrn),
|
||
mn_inn=int(inn),
|
||
in_kpp=int(_digits(9)),
|
||
mn_okpo=_digits(8),
|
||
)
|
||
|
||
|
||
class ProxyServiceTest(TestCase):
|
||
"""Tests for ProxyService."""
|
||
|
||
def test_get_active_proxies_empty(self):
|
||
"""Test getting active proxies when none exist."""
|
||
proxies = ProxyService.get_active_proxies()
|
||
self.assertEqual(proxies, [])
|
||
|
||
def test_get_active_proxies_with_data(self):
|
||
"""Test getting active proxies returns only active ones."""
|
||
proxy1 = ProxyFactory(is_active=True)
|
||
proxy2 = ProxyFactory(is_active=True)
|
||
ProxyFactory(is_active=False) # Inactive - should not be returned
|
||
|
||
proxies = ProxyService.get_active_proxies()
|
||
|
||
self.assertEqual(len(proxies), 2)
|
||
self.assertIn(proxy1.address, proxies)
|
||
self.assertIn(proxy2.address, proxies)
|
||
|
||
def test_get_active_proxies_or_none_empty(self):
|
||
"""Test get_active_proxies_or_none returns None when no proxies."""
|
||
result = ProxyService.get_active_proxies_or_none()
|
||
self.assertIsNone(result)
|
||
|
||
def test_get_active_proxies_or_none_with_data(self):
|
||
"""Test get_active_proxies_or_none returns list when proxies exist."""
|
||
ProxyFactory(is_active=True)
|
||
|
||
result = ProxyService.get_active_proxies_or_none()
|
||
|
||
self.assertIsNotNone(result)
|
||
self.assertEqual(len(result), 1)
|
||
|
||
def test_mark_used(self):
|
||
"""Test marking proxy as used updates timestamp."""
|
||
proxy = ProxyFactory(last_used_at=None)
|
||
self.assertIsNone(proxy.last_used_at)
|
||
|
||
ProxyService.mark_used(proxy.address)
|
||
|
||
proxy.refresh_from_db()
|
||
self.assertIsNotNone(proxy.last_used_at)
|
||
|
||
def test_mark_failed(self):
|
||
"""Test marking proxy as failed increases fail count."""
|
||
proxy = ProxyFactory(fail_count=0)
|
||
|
||
ProxyService.mark_failed(proxy.address)
|
||
|
||
proxy.refresh_from_db()
|
||
self.assertEqual(proxy.fail_count, 1)
|
||
|
||
def test_deactivate(self):
|
||
"""Test deactivating proxy."""
|
||
proxy = ProxyFactory(is_active=True)
|
||
|
||
ProxyService.deactivate(proxy.address)
|
||
|
||
proxy.refresh_from_db()
|
||
self.assertFalse(proxy.is_active)
|
||
|
||
def test_add_proxy(self):
|
||
"""Test adding new proxy."""
|
||
address = _proxy_address()
|
||
description = fake.sentence(nb_words=3)
|
||
|
||
proxy = ProxyService.add_proxy(address, description)
|
||
|
||
self.assertEqual(proxy.address, address)
|
||
self.assertEqual(proxy.description, description)
|
||
self.assertTrue(proxy.is_active)
|
||
|
||
def test_add_proxy_idempotent(self):
|
||
"""Test adding existing proxy returns existing record."""
|
||
address = _proxy_address()
|
||
existing_description = fake.sentence(nb_words=3)
|
||
existing = ProxyFactory(address=address, description=existing_description)
|
||
|
||
new_description = fake.sentence(nb_words=3)
|
||
proxy = ProxyService.add_proxy(address, new_description)
|
||
|
||
self.assertEqual(proxy.id, existing.id)
|
||
self.assertEqual(proxy.description, existing_description) # Not updated
|
||
|
||
def test_add_proxies(self):
|
||
"""Test bulk adding proxies."""
|
||
addresses = [_proxy_address() for _ in range(3)]
|
||
|
||
created = ProxyService.add_proxies(addresses)
|
||
|
||
self.assertEqual(created, 3)
|
||
self.assertEqual(Proxy.objects.count(), 3)
|
||
|
||
def test_add_proxies_skips_existing(self):
|
||
"""Test bulk add skips existing proxies."""
|
||
existing_address = _proxy_address()
|
||
new_address = _proxy_address()
|
||
while new_address == existing_address:
|
||
new_address = _proxy_address()
|
||
ProxyFactory(address=existing_address)
|
||
addresses = [
|
||
existing_address, # Already exists
|
||
new_address,
|
||
]
|
||
|
||
created = ProxyService.add_proxies(addresses)
|
||
|
||
self.assertEqual(created, 1)
|
||
self.assertEqual(Proxy.objects.count(), 2)
|
||
|
||
def test_get_runtime_proxies_prefers_proxy_tools_ru(self):
|
||
"""Runtime should prefer RU proxies imported from Proxy-Tools."""
|
||
manual_ru = ProxyFactory(
|
||
source=ProxyService.MANUAL_SOURCE,
|
||
country_code="RU",
|
||
)
|
||
imported_ru = ProxyFactory(
|
||
source=ProxyService.PROXY_TOOLS_SOURCE,
|
||
country_code="RU",
|
||
)
|
||
ProxyFactory(
|
||
source=ProxyService.PROXY_TOOLS_SOURCE,
|
||
country_code="US",
|
||
)
|
||
|
||
result = ProxyService.get_runtime_proxies()
|
||
|
||
self.assertEqual(result, [imported_ru.address])
|
||
self.assertNotIn(manual_ru.address, result)
|
||
|
||
def test_get_runtime_proxies_falls_back_to_any_ru_proxy(self):
|
||
"""Runtime should fall back to any RU proxy when imported list is empty."""
|
||
manual_ru = ProxyFactory(
|
||
source=ProxyService.MANUAL_SOURCE,
|
||
country_code="RU",
|
||
)
|
||
ProxyFactory(
|
||
source=ProxyService.MANUAL_SOURCE,
|
||
country_code="US",
|
||
)
|
||
|
||
result = ProxyService.get_runtime_proxies()
|
||
|
||
self.assertEqual(result, [manual_ru.address])
|
||
|
||
|
||
class ProxyToolsSyncServiceTest(TestCase):
|
||
"""Tests for ProxyToolsSyncService."""
|
||
|
||
def test_sync_ru_proxies_skips_without_api_key(self):
|
||
"""Sync should be skipped when API key is missing."""
|
||
result = ProxyToolsSyncService.sync_ru_proxies()
|
||
|
||
self.assertEqual(result["status"], "skipped")
|
||
self.assertEqual(result["reason"], "missing_api_key")
|
||
|
||
@override_settings(
|
||
PROXY_TOOLS_API_KEY="test-token",
|
||
PROXY_TOOLS_LIMIT=2,
|
||
PROXY_TOOLS_MAX_PAGES=2,
|
||
)
|
||
@patch("apps.parsers.services.ProxyToolsClient.fetch_proxies")
|
||
def test_sync_ru_proxies_upserts_and_deactivates(self, fetch_proxies_mock):
|
||
"""Sync should create, reactivate and deactivate imported proxies."""
|
||
active_stale = ProxyFactory(
|
||
address="http://10.0.0.10:8000",
|
||
source=ProxyService.PROXY_TOOLS_SOURCE,
|
||
country_code="RU",
|
||
is_active=True,
|
||
)
|
||
inactive_existing = ProxyFactory(
|
||
address="http://10.0.0.20:8000",
|
||
source=ProxyService.PROXY_TOOLS_SOURCE,
|
||
country_code="RU",
|
||
is_active=False,
|
||
)
|
||
manual_ru = ProxyFactory(
|
||
address="http://10.0.0.30:8000",
|
||
source=ProxyService.MANUAL_SOURCE,
|
||
country_code="RU",
|
||
is_active=True,
|
||
)
|
||
|
||
fetch_proxies_mock.side_effect = [
|
||
{
|
||
"data": [
|
||
{"host": "10.0.0.20", "port": 8000, "type": "4"},
|
||
{"proxy": "socks5://10.0.0.40:1080"},
|
||
],
|
||
"meta": {"total_pages": 2},
|
||
},
|
||
{
|
||
"data": [
|
||
"https://10.0.0.50:8443",
|
||
],
|
||
"meta": {"total_pages": 2},
|
||
},
|
||
]
|
||
|
||
result = ProxyToolsSyncService.sync_ru_proxies()
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["fetched"], 3)
|
||
self.assertEqual(result["created"], 2)
|
||
self.assertEqual(result["updated"], 1)
|
||
self.assertEqual(result["deactivated"], 1)
|
||
|
||
active_stale.refresh_from_db()
|
||
inactive_existing.refresh_from_db()
|
||
manual_ru.refresh_from_db()
|
||
|
||
self.assertFalse(active_stale.is_active)
|
||
self.assertTrue(inactive_existing.is_active)
|
||
self.assertTrue(manual_ru.is_active)
|
||
|
||
imported_addresses = set(
|
||
Proxy.objects.filter(
|
||
source=ProxyService.PROXY_TOOLS_SOURCE,
|
||
country_code="RU",
|
||
is_active=True,
|
||
).values_list("address", flat=True)
|
||
)
|
||
self.assertSetEqual(
|
||
imported_addresses,
|
||
{
|
||
"http://10.0.0.20:8000",
|
||
"socks5://10.0.0.40:1080",
|
||
"https://10.0.0.50:8443",
|
||
},
|
||
)
|
||
|
||
|
||
class ParserLoadLogServiceTest(TestCase):
|
||
"""Tests for ParserLoadLogService."""
|
||
|
||
def test_get_next_batch_id_first(self):
|
||
"""Test getting first batch_id for new source."""
|
||
batch_id = ParserLoadLogService.get_next_batch_id(
|
||
ParserLoadLog.Source.INDUSTRIAL
|
||
)
|
||
self.assertEqual(batch_id, 1)
|
||
|
||
def test_get_next_batch_id_increment(self):
|
||
"""Test batch_id increments correctly."""
|
||
ParserLoadLogFactory(batch_id=5, source=ParserLoadLog.Source.INDUSTRIAL)
|
||
ParserLoadLogFactory(batch_id=3, source=ParserLoadLog.Source.INDUSTRIAL)
|
||
|
||
batch_id = ParserLoadLogService.get_next_batch_id(
|
||
ParserLoadLog.Source.INDUSTRIAL
|
||
)
|
||
self.assertEqual(batch_id, 6)
|
||
|
||
def test_get_next_batch_id_per_source(self):
|
||
"""Test batch_id is tracked per source."""
|
||
ParserLoadLogFactory(batch_id=10, source=ParserLoadLog.Source.INDUSTRIAL)
|
||
ParserLoadLogFactory(batch_id=5, source=ParserLoadLog.Source.MANUFACTURES)
|
||
|
||
industrial_batch = ParserLoadLogService.get_next_batch_id(
|
||
ParserLoadLog.Source.INDUSTRIAL
|
||
)
|
||
manufactures_batch = ParserLoadLogService.get_next_batch_id(
|
||
ParserLoadLog.Source.MANUFACTURES
|
||
)
|
||
|
||
self.assertEqual(industrial_batch, 11)
|
||
self.assertEqual(manufactures_batch, 6)
|
||
|
||
def test_create_load_log(self):
|
||
"""Test creating load log."""
|
||
log = ParserLoadLogService.create_load_log(
|
||
source=ParserLoadLog.Source.INDUSTRIAL,
|
||
batch_id=1,
|
||
records_count=100,
|
||
status="success",
|
||
)
|
||
|
||
self.assertIsInstance(log, ParserLoadLog)
|
||
self.assertEqual(log.source, ParserLoadLog.Source.INDUSTRIAL)
|
||
self.assertEqual(log.batch_id, 1)
|
||
self.assertEqual(log.records_count, 100)
|
||
self.assertEqual(log.status, "success")
|
||
|
||
def test_mark_failed(self):
|
||
"""Test marking log as failed."""
|
||
log = ParserLoadLogFactory(status="success")
|
||
|
||
error_message = fake.sentence(nb_words=4)
|
||
ParserLoadLogService.mark_failed(log, error_message)
|
||
|
||
log.refresh_from_db()
|
||
self.assertEqual(log.status, "failed")
|
||
self.assertEqual(log.error_message, error_message)
|
||
|
||
def test_update_records_count(self):
|
||
"""Test updating records count."""
|
||
log = ParserLoadLogFactory(records_count=0)
|
||
|
||
ParserLoadLogService.update_records_count(log, 250)
|
||
|
||
log.refresh_from_db()
|
||
self.assertEqual(log.records_count, 250)
|
||
|
||
def test_mark_stale_in_progress_failed_marks_old_logs(self):
|
||
"""Old in_progress logs without a fresh active job are closed."""
|
||
log = ParserLoadLogFactory(
|
||
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
|
||
batch_id=1,
|
||
status=ParserLoadLog.Status.IN_PROGRESS,
|
||
)
|
||
ParserLoadLog.objects.filter(pk=log.pk).update(
|
||
updated_at=timezone.now() - timedelta(hours=3)
|
||
)
|
||
|
||
updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90)
|
||
|
||
log.refresh_from_db()
|
||
self.assertEqual(updated, 1)
|
||
self.assertEqual(log.status, ParserLoadLog.Status.FAILED)
|
||
self.assertIn("Stale parser load", log.error_message)
|
||
|
||
def test_mark_stale_in_progress_failed_keeps_fresh_active_job(self):
|
||
"""A fresh active BackgroundJob keeps the matching load in progress."""
|
||
log = ParserLoadLogFactory(
|
||
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
|
||
batch_id=1,
|
||
status=ParserLoadLog.Status.IN_PROGRESS,
|
||
)
|
||
ParserLoadLog.objects.filter(pk=log.pk).update(
|
||
updated_at=timezone.now() - timedelta(hours=3)
|
||
)
|
||
BackgroundJob.objects.create(
|
||
task_id="active-task",
|
||
task_name="apps.parsers.tasks.parse_industrial_products",
|
||
status=JobStatus.STARTED,
|
||
meta={"source": log.source, "batch_id": log.batch_id},
|
||
)
|
||
|
||
updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90)
|
||
|
||
log.refresh_from_db()
|
||
self.assertEqual(updated, 0)
|
||
self.assertEqual(log.status, ParserLoadLog.Status.IN_PROGRESS)
|
||
|
||
|
||
class IndustrialCertificateServiceTest(TestCase):
|
||
"""Tests for IndustrialCertificateService."""
|
||
|
||
def test_save_certificates_empty(self):
|
||
"""Test saving empty list returns 0."""
|
||
count = IndustrialCertificateService.save_certificates([], batch_id=1)
|
||
self.assertEqual(count, 0)
|
||
|
||
def test_save_certificates(self):
|
||
"""Test saving certificates from dataclass."""
|
||
certificates = [
|
||
IndustrialCertificate(
|
||
issue_date=str(fake.date()),
|
||
certificate_number=fake.bothify(text="??-####-#####"),
|
||
expiry_date=str(fake.date()),
|
||
certificate_file_url=fake.url(),
|
||
organisation_name=fake.company(),
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
)
|
||
for i in range(5)
|
||
]
|
||
|
||
count = IndustrialCertificateService.save_certificates(certificates, batch_id=1)
|
||
|
||
self.assertEqual(count, 5)
|
||
self.assertEqual(IndustrialCertificateRecord.objects.count(), 5)
|
||
record = IndustrialCertificateRecord.objects.first()
|
||
self.assertIsNotNone(record.issue_date_normalized)
|
||
self.assertIsNotNone(record.expiry_date_normalized)
|
||
|
||
def test_save_certificates_links_registry_organization_when_exists(self):
|
||
"""Test linking to registers organization is created when identifiers match."""
|
||
inn = _digits(10)
|
||
ogrn = _digits(13)
|
||
organization = _create_registry_organization(inn=inn, ogrn=ogrn)
|
||
certificate_number = fake.bothify(text="??-####-#####")
|
||
|
||
certificates = [
|
||
IndustrialCertificate(
|
||
issue_date=str(fake.date()),
|
||
certificate_number=certificate_number,
|
||
expiry_date=str(fake.date()),
|
||
certificate_file_url=fake.url(),
|
||
organisation_name=fake.company(),
|
||
inn=inn,
|
||
ogrn=ogrn,
|
||
)
|
||
]
|
||
|
||
saved = IndustrialCertificateService.save_certificates(certificates, batch_id=1)
|
||
|
||
self.assertEqual(saved, 1)
|
||
record = IndustrialCertificateRecord.objects.get(
|
||
certificate_number=certificate_number
|
||
)
|
||
self.assertEqual(record.registry_organization_id, organization.id)
|
||
|
||
def test_save_certificates_keeps_null_registry_organization_when_not_found(self):
|
||
"""Test parser save does not fail and keeps null when organization is absent."""
|
||
certificate_number = fake.bothify(text="??-####-#####")
|
||
certificates = [
|
||
IndustrialCertificate(
|
||
issue_date=str(fake.date()),
|
||
certificate_number=certificate_number,
|
||
expiry_date=str(fake.date()),
|
||
certificate_file_url=fake.url(),
|
||
organisation_name=fake.company(),
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
)
|
||
]
|
||
|
||
saved = IndustrialCertificateService.save_certificates(certificates, batch_id=1)
|
||
|
||
self.assertEqual(saved, 1)
|
||
record = IndustrialCertificateRecord.objects.get(
|
||
certificate_number=certificate_number
|
||
)
|
||
self.assertIsNone(record.registry_organization_id)
|
||
|
||
def test_save_certificates_with_chunk_size(self):
|
||
"""Test saving certificates in chunks."""
|
||
certificates = [
|
||
IndustrialCertificate(
|
||
issue_date=str(fake.date()),
|
||
certificate_number=fake.bothify(text="??-####-#####"),
|
||
expiry_date=str(fake.date()),
|
||
certificate_file_url=fake.url(),
|
||
organisation_name=fake.company(),
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
)
|
||
for i in range(10)
|
||
]
|
||
|
||
count = IndustrialCertificateService.save_certificates(
|
||
certificates, batch_id=1, chunk_size=3
|
||
)
|
||
|
||
self.assertEqual(count, 10)
|
||
|
||
def test_find_by_inn(self):
|
||
"""Test finding certificates by INN."""
|
||
inn_a = _digits(10)
|
||
inn_b = _digits(10)
|
||
IndustrialCertificateRecordFactory(
|
||
inn=inn_a, certificate_number=fake.bothify(text="CERT-####"), load_batch=1
|
||
)
|
||
IndustrialCertificateRecordFactory(
|
||
inn=inn_a, certificate_number=fake.bothify(text="CERT-####"), load_batch=2
|
||
)
|
||
IndustrialCertificateRecordFactory(
|
||
inn=inn_b, certificate_number=fake.bothify(text="CERT-####"), load_batch=1
|
||
)
|
||
|
||
results = IndustrialCertificateService.find_by_inn(inn_a)
|
||
self.assertEqual(results.count(), 2)
|
||
|
||
results_batch1 = IndustrialCertificateService.find_by_inn(inn_a, batch_id=1)
|
||
self.assertEqual(results_batch1.count(), 1)
|
||
|
||
def test_find_by_certificate_number(self):
|
||
"""Test finding certificate by number."""
|
||
unique_number = fake.bothify(text="CERT-#####")
|
||
IndustrialCertificateRecordFactory(certificate_number=unique_number)
|
||
IndustrialCertificateRecordFactory(
|
||
certificate_number=fake.bothify(text="CERT-#####")
|
||
)
|
||
|
||
results = IndustrialCertificateService.find_by_certificate_number(unique_number)
|
||
self.assertEqual(results.count(), 1)
|
||
|
||
def test_save_certificates_deduplication(self):
|
||
"""Test saving certificates skips duplicates by certificate_number."""
|
||
# Create initial certificate
|
||
cert_number = fake.bothify(text="CERT-DEDUP-#####")
|
||
initial = [
|
||
IndustrialCertificate(
|
||
issue_date=str(fake.date()),
|
||
certificate_number=cert_number,
|
||
expiry_date=str(fake.date()),
|
||
certificate_file_url=fake.url(),
|
||
organisation_name=fake.company(),
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
)
|
||
]
|
||
count1 = IndustrialCertificateService.save_certificates(initial, batch_id=1)
|
||
self.assertEqual(count1, 1)
|
||
self.assertEqual(IndustrialCertificateRecord.objects.count(), 1)
|
||
|
||
# Try to save with same certificate_number - should be skipped
|
||
duplicate = [
|
||
IndustrialCertificate(
|
||
issue_date=str(fake.date()),
|
||
certificate_number=cert_number, # Same number - will be skipped
|
||
expiry_date=str(fake.date()),
|
||
certificate_file_url=fake.url(),
|
||
organisation_name=fake.company(),
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
)
|
||
]
|
||
count2 = IndustrialCertificateService.save_certificates(duplicate, batch_id=2)
|
||
|
||
# Should still be 1 record (duplicate skipped)
|
||
self.assertEqual(count2, 0)
|
||
self.assertEqual(IndustrialCertificateRecord.objects.count(), 1)
|
||
|
||
# Verify original data preserved
|
||
record = IndustrialCertificateRecord.objects.first()
|
||
self.assertEqual(record.organisation_name, initial[0].organisation_name)
|
||
self.assertEqual(record.inn, initial[0].inn)
|
||
self.assertEqual(record.load_batch, 1) # Original batch
|
||
|
||
|
||
class ManufacturerServiceTest(TestCase):
|
||
"""Tests for ManufacturerService."""
|
||
|
||
def test_save_manufacturers_empty(self):
|
||
"""Test saving empty list returns 0."""
|
||
count = ManufacturerService.save_manufacturers([], batch_id=1)
|
||
self.assertEqual(count, 0)
|
||
|
||
def test_save_manufacturers(self):
|
||
"""Test saving manufacturers from dataclass."""
|
||
manufacturers = [
|
||
Manufacturer(
|
||
full_legal_name=fake.company(),
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
address=fake.address().replace("\n", ", "),
|
||
)
|
||
for i in range(5)
|
||
]
|
||
|
||
count = ManufacturerService.save_manufacturers(manufacturers, batch_id=1)
|
||
|
||
self.assertEqual(count, 5)
|
||
self.assertEqual(ManufacturerRecord.objects.count(), 5)
|
||
|
||
def test_save_manufacturers_links_registry_organization_when_exists(self):
|
||
"""Test linking manufacturer to registers organization by INN/ОГРН."""
|
||
inn = _digits(10)
|
||
ogrn = _digits(13)
|
||
organization = _create_registry_organization(inn=inn, ogrn=ogrn)
|
||
|
||
manufacturers = [
|
||
Manufacturer(
|
||
full_legal_name=fake.company(),
|
||
inn=inn,
|
||
ogrn=ogrn,
|
||
address=fake.address().replace("\n", ", "),
|
||
)
|
||
]
|
||
|
||
saved = ManufacturerService.save_manufacturers(manufacturers, batch_id=1)
|
||
|
||
self.assertEqual(saved, 1)
|
||
record = ManufacturerRecord.objects.get(inn=inn)
|
||
self.assertEqual(record.registry_organization_id, organization.id)
|
||
|
||
def test_save_manufacturers_with_chunk_size(self):
|
||
"""Test saving manufacturers in chunks."""
|
||
manufacturers = [
|
||
Manufacturer(
|
||
full_legal_name=fake.company(),
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
address=fake.address().replace("\n", ", "),
|
||
)
|
||
for i in range(10)
|
||
]
|
||
|
||
count = ManufacturerService.save_manufacturers(
|
||
manufacturers, batch_id=1, chunk_size=3
|
||
)
|
||
|
||
self.assertEqual(count, 10)
|
||
|
||
def test_find_by_inn(self):
|
||
"""Test finding manufacturers by INN."""
|
||
inn_target = _digits(10)
|
||
inn_other = _digits(10)
|
||
inn_third = _digits(10)
|
||
ManufacturerRecordFactory(inn=inn_target, load_batch=1)
|
||
ManufacturerRecordFactory(inn=inn_other, load_batch=1)
|
||
ManufacturerRecordFactory(inn=inn_third, load_batch=2)
|
||
|
||
results = ManufacturerService.find_by_inn(inn_target)
|
||
self.assertEqual(results.count(), 1)
|
||
|
||
def test_find_by_inn_with_batch_filter(self):
|
||
"""Test finding manufacturers by INN with batch filter."""
|
||
inn_value = _digits(10)
|
||
ManufacturerRecordFactory(inn=inn_value, load_batch=1)
|
||
ManufacturerRecordFactory(inn=_digits(10), load_batch=2)
|
||
|
||
results_batch1 = ManufacturerService.find_by_inn(inn_value, batch_id=1)
|
||
self.assertEqual(results_batch1.count(), 1)
|
||
|
||
results_batch2 = ManufacturerService.find_by_inn(inn_value, batch_id=2)
|
||
self.assertEqual(results_batch2.count(), 0)
|
||
|
||
def test_find_by_ogrn(self):
|
||
"""Test finding manufacturers by OGRN."""
|
||
ogrn_target = _digits(13)
|
||
ManufacturerRecordFactory(ogrn=ogrn_target)
|
||
ManufacturerRecordFactory(ogrn=_digits(13))
|
||
|
||
results = ManufacturerService.find_by_ogrn(ogrn_target)
|
||
self.assertEqual(results.count(), 1)
|
||
|
||
def test_save_manufacturers_updates_existing_record(self):
|
||
"""Test saving manufacturers refreshes existing record by INN."""
|
||
# Create initial manufacturer
|
||
inn_value = _digits(10)
|
||
ogrn_value = _digits(13)
|
||
address_value = fake.address().replace("\n", ", ")
|
||
company_name = fake.company()
|
||
initial = [
|
||
Manufacturer(
|
||
full_legal_name=company_name,
|
||
inn=inn_value,
|
||
ogrn=ogrn_value,
|
||
address=address_value,
|
||
)
|
||
]
|
||
count1 = ManufacturerService.save_manufacturers(initial, batch_id=1)
|
||
self.assertEqual(count1, 1)
|
||
self.assertEqual(ManufacturerRecord.objects.count(), 1)
|
||
|
||
updated_name = fake.company()
|
||
updated_address = fake.address().replace("\n", ", ")
|
||
updated_ogrn = _digits(13)
|
||
duplicate = [
|
||
Manufacturer(
|
||
full_legal_name=updated_name,
|
||
inn=inn_value,
|
||
ogrn=updated_ogrn,
|
||
address=updated_address,
|
||
)
|
||
]
|
||
count2 = ManufacturerService.save_manufacturers(duplicate, batch_id=2)
|
||
|
||
# Existing record should be updated in place.
|
||
self.assertEqual(count2, 1)
|
||
self.assertEqual(ManufacturerRecord.objects.count(), 1)
|
||
|
||
# Verify latest data preserved
|
||
record = ManufacturerRecord.objects.first()
|
||
self.assertEqual(record.full_legal_name, updated_name)
|
||
self.assertEqual(record.ogrn, updated_ogrn)
|
||
self.assertEqual(record.address, updated_address)
|
||
self.assertEqual(record.load_batch, 2)
|
||
|
||
|
||
class IndustrialProductServiceTest(TestCase):
|
||
"""Tests for IndustrialProductService."""
|
||
|
||
def test_save_products_empty(self):
|
||
"""Test saving empty list returns 0."""
|
||
count = IndustrialProductService.save_products([], batch_id=1)
|
||
self.assertEqual(count, 0)
|
||
|
||
def test_save_products(self):
|
||
"""Test saving industrial products from dataclass."""
|
||
products = [
|
||
IndustrialProduct(
|
||
full_organisation_name=fake.company(),
|
||
ogrn=_digits(13),
|
||
inn=_digits(10),
|
||
registry_number=f"MPP-{_digits(8)}",
|
||
product_name=fake.sentence(nb_words=4),
|
||
product_model=fake.bothify(text="MODEL-###"),
|
||
okpd2_code=f"{fake.random_int(min=10, max=99)}.{fake.random_int(min=10, max=99)}",
|
||
tnved_code=_digits(10),
|
||
regulatory_document=fake.sentence(nb_words=5),
|
||
)
|
||
for _ in range(5)
|
||
]
|
||
|
||
count = IndustrialProductService.save_products(products, batch_id=1)
|
||
|
||
self.assertEqual(count, 5)
|
||
self.assertEqual(IndustrialProductRecord.objects.count(), 5)
|
||
|
||
def test_save_products_links_registry_organization_when_exists(self):
|
||
"""Test linking industrial product to registers organization."""
|
||
inn = _digits(10)
|
||
ogrn = _digits(13)
|
||
organization = _create_registry_organization(inn=inn, ogrn=ogrn)
|
||
registry_number = f"MPP-{_digits(8)}"
|
||
|
||
products = [
|
||
IndustrialProduct(
|
||
full_organisation_name=fake.company(),
|
||
ogrn=ogrn,
|
||
inn=inn,
|
||
registry_number=registry_number,
|
||
product_name=fake.sentence(nb_words=4),
|
||
product_model=fake.bothify(text="MODEL-###"),
|
||
okpd2_code=f"{fake.random_int(min=10, max=99)}.{fake.random_int(min=10, max=99)}",
|
||
tnved_code=_digits(10),
|
||
regulatory_document=fake.sentence(nb_words=5),
|
||
)
|
||
]
|
||
|
||
saved = IndustrialProductService.save_products(products, batch_id=1)
|
||
|
||
self.assertEqual(saved, 1)
|
||
record = IndustrialProductRecord.objects.get(registry_number=registry_number)
|
||
self.assertEqual(record.registry_organization_id, organization.id)
|
||
|
||
def test_find_by_registry_number(self):
|
||
"""Test finding industrial product by registry number."""
|
||
registry_number = f"MPP-{_digits(8)}"
|
||
IndustrialProductRecordFactory(registry_number=registry_number)
|
||
IndustrialProductRecordFactory(registry_number=f"MPP-{_digits(8)}")
|
||
|
||
results = IndustrialProductService.find_by_registry_number(registry_number)
|
||
self.assertEqual(results.count(), 1)
|
||
|
||
def test_save_products_updates_existing_record(self):
|
||
"""Test saving products refreshes existing record by registry number."""
|
||
registry_number = f"MPP-{_digits(8)}"
|
||
initial = [
|
||
IndustrialProduct(
|
||
full_organisation_name=fake.company(),
|
||
ogrn=_digits(13),
|
||
inn=_digits(10),
|
||
registry_number=registry_number,
|
||
product_name="Начальное имя",
|
||
product_model="MODEL-001",
|
||
okpd2_code="25.11",
|
||
tnved_code=_digits(10),
|
||
regulatory_document="ГОСТ 1",
|
||
)
|
||
]
|
||
count1 = IndustrialProductService.save_products(initial, batch_id=1)
|
||
self.assertEqual(count1, 1)
|
||
self.assertEqual(IndustrialProductRecord.objects.count(), 1)
|
||
|
||
updated = [
|
||
IndustrialProduct(
|
||
full_organisation_name=fake.company(),
|
||
ogrn=_digits(13),
|
||
inn=_digits(10),
|
||
registry_number=registry_number,
|
||
product_name="Обновленное имя",
|
||
product_model="MODEL-777",
|
||
okpd2_code="28.99",
|
||
tnved_code=_digits(10),
|
||
regulatory_document="ГОСТ 2",
|
||
)
|
||
]
|
||
count2 = IndustrialProductService.save_products(updated, batch_id=2)
|
||
|
||
self.assertEqual(count2, 1)
|
||
self.assertEqual(IndustrialProductRecord.objects.count(), 1)
|
||
|
||
record = IndustrialProductRecord.objects.first()
|
||
self.assertEqual(record.product_name, "Обновленное имя")
|
||
self.assertEqual(record.product_model, "MODEL-777")
|
||
self.assertEqual(record.load_batch, 2)
|
||
|
||
|
||
class InspectionServiceTest(TestCase):
|
||
"""Tests for InspectionService."""
|
||
|
||
def test_save_inspections_empty(self):
|
||
"""Test saving empty list returns 0."""
|
||
count = InspectionService.save_inspections([], batch_id=1)
|
||
self.assertEqual(count, 0)
|
||
|
||
def test_save_inspections(self):
|
||
"""Test saving inspections from dataclass."""
|
||
inspections = [
|
||
Inspection(
|
||
registration_number=_digits(12),
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
organisation_name=fake.company(),
|
||
control_authority=fake.company(),
|
||
inspection_type=fake.word(),
|
||
inspection_form=fake.word(),
|
||
start_date=str(fake.date()),
|
||
end_date=str(fake.date()),
|
||
status=fake.word(),
|
||
legal_basis=fake.sentence(nb_words=3),
|
||
result=fake.sentence(nb_words=3),
|
||
)
|
||
for i in range(5)
|
||
]
|
||
|
||
count = InspectionService.save_inspections(inspections, batch_id=1)
|
||
|
||
self.assertEqual(count, 5)
|
||
self.assertEqual(InspectionRecord.objects.count(), 5)
|
||
record = InspectionRecord.objects.first()
|
||
self.assertIsNotNone(record.start_date_normalized)
|
||
self.assertIsNotNone(record.end_date_normalized)
|
||
|
||
def test_save_inspections_links_registry_organization_when_exists(self):
|
||
"""Test linking inspection to registers organization by INN/ОГРН."""
|
||
inn = _digits(10)
|
||
ogrn = _digits(13)
|
||
organization = _create_registry_organization(inn=inn, ogrn=ogrn)
|
||
registration_number = _digits(12)
|
||
|
||
inspections = [
|
||
Inspection(
|
||
registration_number=registration_number,
|
||
inn=inn,
|
||
ogrn=ogrn,
|
||
organisation_name=fake.company(),
|
||
control_authority=fake.company(),
|
||
inspection_type=fake.word(),
|
||
inspection_form=fake.word(),
|
||
start_date=str(fake.date()),
|
||
end_date=str(fake.date()),
|
||
status=fake.word(),
|
||
legal_basis=fake.sentence(nb_words=3),
|
||
result=fake.sentence(nb_words=3),
|
||
)
|
||
]
|
||
|
||
saved = InspectionService.save_inspections(inspections, batch_id=1)
|
||
|
||
self.assertEqual(saved, 1)
|
||
record = InspectionRecord.objects.get(registration_number=registration_number)
|
||
self.assertEqual(record.registry_organization_id, organization.id)
|
||
|
||
def test_save_inspections_with_chunk_size(self):
|
||
"""Test saving inspections in chunks."""
|
||
inspections = [
|
||
Inspection(
|
||
registration_number=_digits(12),
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
organisation_name=fake.company(),
|
||
control_authority=fake.company(),
|
||
inspection_type=fake.word(),
|
||
inspection_form=fake.word(),
|
||
start_date=str(fake.date()),
|
||
end_date=str(fake.date()),
|
||
status=fake.word(),
|
||
legal_basis=fake.sentence(nb_words=3),
|
||
)
|
||
for i in range(10)
|
||
]
|
||
|
||
count = InspectionService.save_inspections(
|
||
inspections, batch_id=1, chunk_size=3
|
||
)
|
||
|
||
self.assertEqual(count, 10)
|
||
|
||
def test_find_by_inn(self):
|
||
"""Test finding inspections by INN."""
|
||
inn_value = _digits(10)
|
||
InspectionRecordFactory(inn=inn_value, load_batch=1)
|
||
InspectionRecordFactory(inn=inn_value, load_batch=2)
|
||
InspectionRecordFactory(inn=_digits(10), load_batch=1)
|
||
|
||
results = InspectionService.find_by_inn(inn_value)
|
||
self.assertEqual(results.count(), 2)
|
||
|
||
results_batch1 = InspectionService.find_by_inn(inn_value, batch_id=1)
|
||
self.assertEqual(results_batch1.count(), 1)
|
||
|
||
def test_find_by_registration_number(self):
|
||
"""Test finding inspection by registration number."""
|
||
target_number = _digits(12)
|
||
other_number = _digits(12)
|
||
InspectionRecordFactory(registration_number=target_number)
|
||
InspectionRecordFactory(registration_number=other_number)
|
||
|
||
results = InspectionService.find_by_registration_number(target_number)
|
||
self.assertEqual(results.count(), 1)
|
||
|
||
def test_find_by_control_authority(self):
|
||
"""Test finding inspections by control authority."""
|
||
authority_key = fake.word()
|
||
authority_match_1 = f"{fake.company()} {authority_key}"
|
||
authority_match_2 = f"{authority_key} {fake.company()}"
|
||
authority_other = fake.company()
|
||
InspectionRecordFactory(control_authority=authority_match_1, load_batch=1)
|
||
InspectionRecordFactory(control_authority=authority_match_2, load_batch=1)
|
||
InspectionRecordFactory(control_authority=authority_other, load_batch=1)
|
||
|
||
results = InspectionService.find_by_control_authority(authority_key)
|
||
self.assertEqual(results.count(), 2)
|
||
|
||
results_batch1 = InspectionService.find_by_control_authority(
|
||
authority_key, batch_id=1
|
||
)
|
||
self.assertEqual(results_batch1.count(), 2)
|
||
|
||
def test_save_inspections_updates_existing_record(self):
|
||
"""Test saving inspections refreshes existing record by registration_number."""
|
||
# Create initial inspection
|
||
reg_number = _digits(12)
|
||
inn_value = _digits(10)
|
||
ogrn_value = _digits(13)
|
||
org_name = fake.company()
|
||
control_authority = fake.company()
|
||
inspection_type = fake.word()
|
||
inspection_form = fake.word()
|
||
start_date = str(fake.date())
|
||
end_date = str(fake.date())
|
||
status = fake.word()
|
||
legal_basis = fake.sentence(nb_words=3)
|
||
result_text = fake.sentence(nb_words=3)
|
||
initial = [
|
||
Inspection(
|
||
registration_number=reg_number,
|
||
inn=inn_value,
|
||
ogrn=ogrn_value,
|
||
organisation_name=org_name,
|
||
control_authority=control_authority,
|
||
inspection_type=inspection_type,
|
||
inspection_form=inspection_form,
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
status=status,
|
||
legal_basis=legal_basis,
|
||
result=result_text,
|
||
)
|
||
]
|
||
count1 = InspectionService.save_inspections(initial, batch_id=1)
|
||
self.assertEqual(count1, 1)
|
||
self.assertEqual(InspectionRecord.objects.count(), 1)
|
||
|
||
updated_name = fake.company()
|
||
updated_authority = fake.company()
|
||
updated_status = fake.word()
|
||
duplicate = [
|
||
Inspection(
|
||
registration_number=reg_number,
|
||
inn=_digits(10),
|
||
ogrn=_digits(13),
|
||
organisation_name=updated_name,
|
||
control_authority=updated_authority,
|
||
inspection_type=fake.word(),
|
||
inspection_form=fake.word(),
|
||
start_date=str(fake.date()),
|
||
end_date=str(fake.date()),
|
||
status=updated_status,
|
||
legal_basis=fake.sentence(nb_words=3),
|
||
result=fake.sentence(nb_words=3),
|
||
)
|
||
]
|
||
count2 = InspectionService.save_inspections(duplicate, batch_id=2)
|
||
|
||
# Existing record should be updated in place.
|
||
self.assertEqual(count2, 1)
|
||
self.assertEqual(InspectionRecord.objects.count(), 1)
|
||
|
||
# Verify latest data preserved
|
||
record = InspectionRecord.objects.first()
|
||
self.assertEqual(record.organisation_name, updated_name)
|
||
self.assertNotEqual(record.inn, inn_value)
|
||
self.assertEqual(record.control_authority, updated_authority)
|
||
self.assertEqual(record.status, updated_status)
|
||
self.assertEqual(record.load_batch, 2)
|
||
|
||
|
||
class ProcurementServiceTest(TestCase):
|
||
"""Tests for ProcurementService."""
|
||
|
||
def _build_procurement(self, **overrides) -> Procurement:
|
||
data = {
|
||
"purchase_number": _digits(19),
|
||
"purchase_name": fake.sentence(nb_words=4),
|
||
"customer_inn": _digits(10),
|
||
"customer_kpp": _digits(9),
|
||
"customer_ogrn": _digits(13),
|
||
"customer_name": fake.company(),
|
||
"max_price": "1 234 567,89",
|
||
"currency_code": "RUB",
|
||
"placement_method": fake.word(),
|
||
"publish_date": "01.03.2026",
|
||
"end_date": "2026-03-15",
|
||
"status": fake.word(),
|
||
"law_type": "44-FZ",
|
||
"purchase_object_info": fake.sentence(nb_words=4),
|
||
"href": fake.url(),
|
||
}
|
||
data.update(overrides)
|
||
return Procurement(**data)
|
||
|
||
def test_save_procurements_sets_normalized_fields(self):
|
||
procurement = self._build_procurement()
|
||
|
||
saved = ProcurementService.save_procurements([procurement], batch_id=1)
|
||
|
||
self.assertEqual(saved, 1)
|
||
record = ProcurementRecord.objects.get(
|
||
purchase_number=procurement.purchase_number
|
||
)
|
||
self.assertEqual(str(record.max_price_amount), "1234567.89")
|
||
self.assertEqual(str(record.publish_date_normalized), "2026-03-01")
|
||
self.assertEqual(str(record.end_date_normalized), "2026-03-15")
|
||
|
||
def test_save_procurements_duplicate_updates_existing_record(self):
|
||
purchase_number = _digits(19)
|
||
first = self._build_procurement(purchase_number=purchase_number)
|
||
updated_customer_name = fake.company()
|
||
updated_status = fake.word()
|
||
duplicate = self._build_procurement(
|
||
purchase_number=purchase_number,
|
||
customer_name=updated_customer_name,
|
||
status=updated_status,
|
||
)
|
||
|
||
saved_first = ProcurementService.save_procurements([first], batch_id=1)
|
||
saved_second = ProcurementService.save_procurements([duplicate], batch_id=2)
|
||
|
||
self.assertEqual(saved_first, 1)
|
||
self.assertEqual(saved_second, 1)
|
||
self.assertEqual(ProcurementRecord.objects.count(), 1)
|
||
record = ProcurementRecord.objects.get(purchase_number=purchase_number)
|
||
self.assertEqual(record.customer_name, updated_customer_name)
|
||
self.assertEqual(record.status, updated_status)
|
||
self.assertEqual(record.load_batch, 2)
|
||
|
||
|
||
@tag("integration", "slow", "e2e")
|
||
class EndToEndIntegrationTest(TestCase):
|
||
"""
|
||
End-to-end интеграционные тесты полного flow.
|
||
|
||
Тестирует: Загрузка с API -> Парсинг -> Сохранение в БД -> Проверка.
|
||
Запуск: uv run python run_tests.py tests.apps.parsers.test_services.EndToEndIntegrationTest
|
||
"""
|
||
|
||
def test_full_flow_fetch_and_save_certificates(self):
|
||
"""
|
||
Полный E2E тест: загрузка сертификатов и сохранение в БД.
|
||
|
||
1. Загружаем данные с реального API
|
||
2. Создаём лог загрузки
|
||
3. Сохраняем первые N записей в БД
|
||
4. Проверяем что данные корректно сохранились
|
||
"""
|
||
# 1. Загружаем данные через локальный HTTP сервер (без внешнего API)
|
||
print("\n[E2E] Step 1: Fetching certificates from local API...")
|
||
excel_bytes, rows = build_minpromtorg_certificates_excel(count=5)
|
||
date_str = fake.date_between(start_date="-30d", end_date="today").strftime(
|
||
"%Y%m%d"
|
||
)
|
||
file_name = f"data_resolutions_{date_str}.xlsx"
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_json(
|
||
"/api/kss-document-preview",
|
||
{
|
||
"data": [
|
||
{
|
||
"name": IndustrialProductionClient().query,
|
||
"files": [
|
||
{"name": file_name, "url": f"/files/{file_name}"}
|
||
],
|
||
}
|
||
]
|
||
},
|
||
)
|
||
server.add_bytes(f"/files/{file_name}", excel_bytes)
|
||
host = urlparse(server.base_url)
|
||
client_host = f"{host.hostname}:{host.port}" if host.port else host.hostname
|
||
with IndustrialProductionClient(
|
||
host=client_host,
|
||
scheme="http",
|
||
timeout=30,
|
||
http_adapter=server.adapter,
|
||
) as client:
|
||
all_certificates = client.fetch_certificates()
|
||
|
||
self.assertEqual(len(all_certificates), len(rows))
|
||
print(f"[E2E] Loaded {len(all_certificates)} certificates from local API")
|
||
|
||
# Берём все для теста
|
||
certificates = all_certificates
|
||
|
||
# 2. Создаём batch_id и лог
|
||
print("[E2E] Step 2: Creating load log...")
|
||
batch_id = ParserLoadLogService.get_next_batch_id(
|
||
ParserLoadLog.Source.INDUSTRIAL
|
||
)
|
||
log = ParserLoadLogService.create_load_log(
|
||
source=ParserLoadLog.Source.INDUSTRIAL,
|
||
batch_id=batch_id,
|
||
records_count=0,
|
||
)
|
||
print(f"[E2E] Created batch_id={batch_id}")
|
||
|
||
# 3. Сохраняем в БД
|
||
print("[E2E] Step 3: Saving certificates to database...")
|
||
saved_count = IndustrialCertificateService.save_certificates(
|
||
certificates, batch_id=batch_id
|
||
)
|
||
ParserLoadLogService.update_records_count(log, saved_count)
|
||
|
||
print(f"[E2E] Saved {saved_count} certificates")
|
||
|
||
# 4. Проверяем результат
|
||
print("[E2E] Step 4: Verifying saved data...")
|
||
|
||
# Проверяем количество
|
||
db_count = IndustrialCertificateRecord.objects.filter(
|
||
load_batch=batch_id
|
||
).count()
|
||
self.assertEqual(db_count, saved_count)
|
||
self.assertEqual(db_count, len(certificates))
|
||
|
||
# Проверяем первую запись
|
||
first_cert = certificates[0]
|
||
db_record = IndustrialCertificateRecord.objects.filter(
|
||
load_batch=batch_id,
|
||
certificate_number=first_cert.certificate_number,
|
||
).first()
|
||
|
||
self.assertIsNotNone(db_record)
|
||
self.assertEqual(db_record.inn, first_cert.inn)
|
||
self.assertEqual(db_record.ogrn, first_cert.ogrn)
|
||
self.assertEqual(db_record.organisation_name, first_cert.organisation_name)
|
||
|
||
# Проверяем лог
|
||
log.refresh_from_db()
|
||
self.assertEqual(log.records_count, saved_count)
|
||
self.assertEqual(log.status, "success")
|
||
|
||
print("[E2E] ✅ All checks passed!")
|
||
print(f"[E2E] Sample record: {db_record.certificate_number}")
|
||
print(f"[E2E] Organisation: {db_record.organisation_name}")
|
||
print(f"[E2E] INN: {db_record.inn}, OGRN: {db_record.ogrn}")
|