Files
mostovik-backend/tests/apps/parsers/test_services.py
Aleksandr Meshchriakov 44355deeb3
All checks were successful
CI/CD Pipeline / Code Quality Checks (pull_request) Successful in 1m6s
CI/CD Pipeline / Run Tests (pull_request) Successful in 1m18s
CI/CD Pipeline / Build Docker Images (pull_request) Has been skipped
CI/CD Pipeline / Push to Gitea Registry (pull_request) Has been skipped
feat: add parser source dashboard and scheduling
2026-04-28 00:20:08 +02:00

856 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for parsers services."""
from decimal import Decimal
from apps.parsers.clients.common.schemas import GenericParserItem
from apps.parsers.clients.minpromtorg.schemas import IndustrialCertificate, Manufacturer
from apps.parsers.clients.proverki.schemas import Inspection
from apps.parsers.models import (
GenericParserRecord,
IndustrialCertificateRecord,
InspectionRecord,
ManufacturerRecord,
ParserLoadLog,
Proxy,
)
from apps.parsers.services import (
GenericParserRecordService,
IndustrialCertificateService,
InspectionService,
ManufacturerService,
ParserLoadLogService,
ProxyService,
)
from django.test import TestCase
from faker import Faker
from .factories import (
GenericParserRecordFactory,
IndustrialCertificateRecordFactory,
InspectionRecordFactory,
ManufacturerRecordFactory,
ParserLoadLogFactory,
ProxyFactory,
)
fake = Faker("ru_RU")
class ProxyServiceTest(TestCase):
"""Tests for ProxyService."""
def test_get_active_proxies_empty(self):
"""Test getting active proxies when none exist."""
proxies = ProxyService.get_active_proxies()
self.assertEqual(proxies, [])
def test_get_active_proxies_with_data(self):
"""Test getting active proxies returns only active ones."""
proxy1 = ProxyFactory(is_active=True)
proxy2 = ProxyFactory(is_active=True)
ProxyFactory(is_active=False) # Inactive - should not be returned
proxies = ProxyService.get_active_proxies()
self.assertEqual(len(proxies), 2)
self.assertIn(proxy1.address, proxies)
self.assertIn(proxy2.address, proxies)
def test_get_active_proxies_or_none_empty(self):
"""Test get_active_proxies_or_none returns None when no proxies."""
result = ProxyService.get_active_proxies_or_none()
self.assertIsNone(result)
def test_get_active_proxies_or_none_with_data(self):
"""Test get_active_proxies_or_none returns list when proxies exist."""
ProxyFactory(is_active=True)
result = ProxyService.get_active_proxies_or_none()
self.assertIsNotNone(result)
self.assertEqual(len(result), 1)
def test_mark_used(self):
"""Test marking proxy as used updates timestamp."""
proxy = ProxyFactory()
self.assertIsNone(proxy.last_used_at)
ProxyService.mark_used(proxy.address)
proxy.refresh_from_db()
self.assertIsNotNone(proxy.last_used_at)
def test_mark_failed(self):
"""Test marking proxy as failed increases fail count."""
proxy = ProxyFactory(fail_count=0)
ProxyService.mark_failed(proxy.address)
proxy.refresh_from_db()
self.assertEqual(proxy.fail_count, 1)
def test_deactivate(self):
"""Test deactivating proxy."""
proxy = ProxyFactory(is_active=True)
ProxyService.deactivate(proxy.address)
proxy.refresh_from_db()
self.assertFalse(proxy.is_active)
def test_add_proxy(self):
"""Test adding new proxy."""
address = "http://new-proxy:8080"
description = "Test proxy"
proxy = ProxyService.add_proxy(address, description)
self.assertEqual(proxy.address, address)
self.assertEqual(proxy.description, description)
self.assertTrue(proxy.is_active)
def test_add_proxy_idempotent(self):
"""Test adding existing proxy returns existing record."""
address = "http://existing:8080"
existing = ProxyFactory(address=address, description="Original")
proxy = ProxyService.add_proxy(address, "New description")
self.assertEqual(proxy.id, existing.id)
self.assertEqual(proxy.description, "Original") # Not updated
def test_add_proxies(self):
"""Test bulk adding proxies."""
addresses = [
"http://proxy1:8080",
"http://proxy2:8080",
"http://proxy3:8080",
]
created = ProxyService.add_proxies(addresses)
self.assertEqual(created, 3)
self.assertEqual(Proxy.objects.count(), 3)
def test_add_proxies_skips_existing(self):
"""Test bulk add skips existing proxies."""
ProxyFactory(address="http://existing:8080")
addresses = [
"http://existing:8080", # Already exists
"http://new:8080",
]
created = ProxyService.add_proxies(addresses)
self.assertEqual(created, 1)
self.assertEqual(Proxy.objects.count(), 2)
class ParserLoadLogServiceTest(TestCase):
"""Tests for ParserLoadLogService."""
def test_get_next_batch_id_first(self):
"""Test getting first batch_id for new source."""
batch_id = ParserLoadLogService.get_next_batch_id(
ParserLoadLog.Source.INDUSTRIAL
)
self.assertEqual(batch_id, 1)
def test_get_next_batch_id_increment(self):
"""Test batch_id increments correctly."""
ParserLoadLogFactory(batch_id=5, source=ParserLoadLog.Source.INDUSTRIAL)
ParserLoadLogFactory(batch_id=3, source=ParserLoadLog.Source.INDUSTRIAL)
batch_id = ParserLoadLogService.get_next_batch_id(
ParserLoadLog.Source.INDUSTRIAL
)
self.assertEqual(batch_id, 6)
def test_get_next_batch_id_per_source(self):
"""Test batch_id is tracked per source."""
ParserLoadLogFactory(batch_id=10, source=ParserLoadLog.Source.INDUSTRIAL)
ParserLoadLogFactory(batch_id=5, source=ParserLoadLog.Source.MANUFACTURES)
industrial_batch = ParserLoadLogService.get_next_batch_id(
ParserLoadLog.Source.INDUSTRIAL
)
manufactures_batch = ParserLoadLogService.get_next_batch_id(
ParserLoadLog.Source.MANUFACTURES
)
self.assertEqual(industrial_batch, 11)
self.assertEqual(manufactures_batch, 6)
def test_create_next_load_log_reserves_batch_ids(self):
"""Test atomic batch allocator creates sequential load logs."""
first = ParserLoadLogService.create_next_load_log(
source=ParserLoadLog.Source.INDUSTRIAL,
status="in_progress",
)
second = ParserLoadLogService.create_next_load_log(
source=ParserLoadLog.Source.INDUSTRIAL,
status="in_progress",
)
self.assertEqual(first.batch_id, 1)
self.assertEqual(second.batch_id, 2)
self.assertEqual(ParserLoadLog.objects.count(), 2)
def test_create_next_load_log_honors_existing_logs(self):
"""Test batch allocator starts after existing manual logs."""
ParserLoadLogFactory(batch_id=7, source=ParserLoadLog.Source.INDUSTRIAL)
log = ParserLoadLogService.create_next_load_log(
source=ParserLoadLog.Source.INDUSTRIAL,
status="in_progress",
)
self.assertEqual(log.batch_id, 8)
def test_create_load_log(self):
"""Test creating load log."""
log = ParserLoadLogService.create_load_log(
source=ParserLoadLog.Source.INDUSTRIAL,
batch_id=1,
records_count=100,
status="success",
)
self.assertIsInstance(log, ParserLoadLog)
self.assertEqual(log.source, ParserLoadLog.Source.INDUSTRIAL)
self.assertEqual(log.batch_id, 1)
self.assertEqual(log.records_count, 100)
self.assertEqual(log.status, "success")
def test_mark_failed(self):
"""Test marking log as failed."""
log = ParserLoadLogFactory(status="success")
ParserLoadLogService.mark_failed(log, "Connection error")
log.refresh_from_db()
self.assertEqual(log.status, "failed")
self.assertEqual(log.error_message, "Connection error")
def test_update_records_count(self):
"""Test updating records count."""
log = ParserLoadLogFactory(records_count=0)
ParserLoadLogService.update_records_count(log, 250)
log.refresh_from_db()
self.assertEqual(log.records_count, 250)
class IndustrialCertificateServiceTest(TestCase):
"""Tests for IndustrialCertificateService."""
def test_save_certificates_empty(self):
"""Test saving empty list returns 0."""
count = IndustrialCertificateService.save_certificates([], batch_id=1)
self.assertEqual(count, 0)
def test_save_certificates(self):
"""Test saving certificates from dataclass."""
certificates = [
IndustrialCertificate(
issue_date="2024-01-01",
certificate_number=f"CERT-{i}",
expiry_date="2025-01-01",
certificate_file_url=f"https://example.com/cert{i}.pdf",
organisation_name=f"Company {i}",
inn=f"123456789{i}",
ogrn=f"123456789012{i}",
)
for i in range(5)
]
count = IndustrialCertificateService.save_certificates(certificates, batch_id=1)
self.assertEqual(count, 5)
self.assertEqual(IndustrialCertificateRecord.objects.count(), 5)
def test_save_certificates_with_chunk_size(self):
"""Test saving certificates in chunks."""
certificates = [
IndustrialCertificate(
issue_date="2024-01-01",
certificate_number=f"CERT-{i}",
expiry_date="2025-01-01",
certificate_file_url=f"https://example.com/cert{i}.pdf",
organisation_name=f"Company {i}",
inn=f"12345678{i:02d}",
ogrn=f"1234567890{i:03d}",
)
for i in range(10)
]
count = IndustrialCertificateService.save_certificates(
certificates, batch_id=1, chunk_size=3
)
self.assertEqual(count, 10)
def test_find_by_inn(self):
"""Test finding certificates by INN."""
IndustrialCertificateRecordFactory(
inn="1111111111", certificate_number="CERT-A1", load_batch=1
)
IndustrialCertificateRecordFactory(
inn="1111111111", certificate_number="CERT-A2", load_batch=2
)
IndustrialCertificateRecordFactory(
inn="2222222222", certificate_number="CERT-B1", load_batch=1
)
results = IndustrialCertificateService.find_by_inn("1111111111")
self.assertEqual(results.count(), 2)
results_batch1 = IndustrialCertificateService.find_by_inn(
"1111111111", batch_id=1
)
self.assertEqual(results_batch1.count(), 1)
def test_find_by_certificate_number(self):
"""Test finding certificate by number."""
IndustrialCertificateRecordFactory(certificate_number="CERT-UNIQUE")
IndustrialCertificateRecordFactory(certificate_number="CERT-OTHER")
results = IndustrialCertificateService.find_by_certificate_number("CERT-UNIQUE")
self.assertEqual(results.count(), 1)
def test_save_certificates_deduplication(self):
"""Test saving certificates skips duplicates by certificate_number."""
# Create initial certificate
initial = [
IndustrialCertificate(
issue_date="2024-01-01",
certificate_number="CERT-DEDUP-001",
expiry_date="2025-01-01",
certificate_file_url="https://example.com/old.pdf",
organisation_name="Old Company Name",
inn="1234567890",
ogrn="1234567890123",
)
]
count1 = IndustrialCertificateService.save_certificates(initial, batch_id=1)
self.assertEqual(count1, 1)
self.assertEqual(IndustrialCertificateRecord.objects.count(), 1)
# Try to save with same certificate_number - should be skipped
duplicate = [
IndustrialCertificate(
issue_date="2024-06-01",
certificate_number="CERT-DEDUP-001", # Same number - will be skipped
expiry_date="2026-01-01",
certificate_file_url="https://example.com/new.pdf",
organisation_name="New Company Name",
inn="9999999999",
ogrn="9999999999999",
)
]
count2 = IndustrialCertificateService.save_certificates(duplicate, batch_id=2)
self.assertEqual(count2, 0)
# Should still be 1 record (duplicate skipped)
self.assertEqual(IndustrialCertificateRecord.objects.count(), 1)
# Verify original data preserved
record = IndustrialCertificateRecord.objects.first()
self.assertEqual(record.organisation_name, "Old Company Name")
self.assertEqual(record.inn, "1234567890")
self.assertEqual(record.load_batch, 1) # Original batch
class ManufacturerServiceTest(TestCase):
"""Tests for ManufacturerService."""
def test_save_manufacturers_empty(self):
"""Test saving empty list returns 0."""
count = ManufacturerService.save_manufacturers([], batch_id=1)
self.assertEqual(count, 0)
def test_save_manufacturers(self):
"""Test saving manufacturers from dataclass."""
manufacturers = [
Manufacturer(
full_legal_name=f"Company {i} LLC",
inn=f"123456789{i}",
ogrn=f"123456789012{i}",
address=f"Address {i}",
)
for i in range(5)
]
count = ManufacturerService.save_manufacturers(manufacturers, batch_id=1)
self.assertEqual(count, 5)
self.assertEqual(ManufacturerRecord.objects.count(), 5)
def test_save_manufacturers_with_chunk_size(self):
"""Test saving manufacturers in chunks."""
manufacturers = [
Manufacturer(
full_legal_name=f"Company {i}",
inn=f"12345678{i:02d}",
ogrn=f"1234567890{i:03d}",
address=f"Address {i}",
)
for i in range(10)
]
count = ManufacturerService.save_manufacturers(
manufacturers, batch_id=1, chunk_size=3
)
self.assertEqual(count, 10)
def test_find_by_inn(self):
"""Test finding manufacturers by INN."""
ManufacturerRecordFactory(inn="1111111111", load_batch=1)
ManufacturerRecordFactory(inn="2222222222", load_batch=1)
ManufacturerRecordFactory(inn="3333333333", load_batch=2)
results = ManufacturerService.find_by_inn("1111111111")
self.assertEqual(results.count(), 1)
def test_find_by_inn_with_batch_filter(self):
"""Test finding manufacturers by INN with batch filter."""
ManufacturerRecordFactory(inn="4444444444", load_batch=1)
ManufacturerRecordFactory(inn="5555555555", load_batch=2)
results_batch1 = ManufacturerService.find_by_inn("4444444444", batch_id=1)
self.assertEqual(results_batch1.count(), 1)
results_batch2 = ManufacturerService.find_by_inn("4444444444", batch_id=2)
self.assertEqual(results_batch2.count(), 0)
def test_find_by_ogrn(self):
"""Test finding manufacturers by OGRN."""
ManufacturerRecordFactory(ogrn="1234567890123")
ManufacturerRecordFactory(ogrn="9999999999999")
results = ManufacturerService.find_by_ogrn("1234567890123")
self.assertEqual(results.count(), 1)
def test_save_manufacturers_deduplication(self):
"""Test saving manufacturers skips duplicates by INN."""
# Create initial manufacturer
initial = [
Manufacturer(
full_legal_name="Old Company Name LLC",
inn="7777777777",
ogrn="1234567890123",
address="Old Address",
)
]
count1 = ManufacturerService.save_manufacturers(initial, batch_id=1)
self.assertEqual(count1, 1)
self.assertEqual(ManufacturerRecord.objects.count(), 1)
# Try to save with same INN - should be skipped
duplicate = [
Manufacturer(
full_legal_name="New Company Name LLC",
inn="7777777777", # Same INN - will be skipped
ogrn="9999999999999",
address="New Address",
)
]
count2 = ManufacturerService.save_manufacturers(duplicate, batch_id=2)
self.assertEqual(count2, 0)
# Should still be 1 record (duplicate skipped)
self.assertEqual(ManufacturerRecord.objects.count(), 1)
# Verify original data preserved
record = ManufacturerRecord.objects.first()
self.assertEqual(record.full_legal_name, "Old Company Name LLC")
self.assertEqual(record.ogrn, "1234567890123")
self.assertEqual(record.address, "Old Address")
self.assertEqual(record.load_batch, 1) # Original batch
class GenericParserRecordServiceTest(TestCase):
"""Tests for GenericParserRecordService."""
def test_save_records_empty(self):
"""Test saving empty generic records returns 0."""
count = GenericParserRecordService.save_records(
[],
batch_id=1,
source=ParserLoadLog.Source.FNS_FINANCIAL,
)
self.assertEqual(count, 0)
def test_save_records(self):
"""Test saving generic records from dataclass."""
records = [
GenericParserItem(
source=ParserLoadLog.Source.FNS_FINANCIAL,
external_id=f"FIN-{i}",
inn=f"123456789{i}",
ogrn=f"123456789012{i}",
organisation_name=f"Company {i}",
title="Выручка",
record_date="2024",
amount=Decimal("1000.00"),
status="active",
url=f"https://example.com/{i}",
payload={"external_id": f"FIN-{i}"},
)
for i in range(5)
]
count = GenericParserRecordService.save_records(
records,
batch_id=1,
source=ParserLoadLog.Source.FNS_FINANCIAL,
)
self.assertEqual(count, 5)
self.assertEqual(GenericParserRecord.objects.count(), 5)
def test_save_records_deduplication(self):
"""Test saving generic records skips duplicates by source and external_id."""
record = GenericParserItem(
source=ParserLoadLog.Source.TRUDVSEM,
external_id="VAC-1",
inn="1234567890",
organisation_name="Old Company",
payload={"name": "old"},
)
count1 = GenericParserRecordService.save_records(
[record],
batch_id=1,
source=ParserLoadLog.Source.TRUDVSEM,
)
count2 = GenericParserRecordService.save_records(
[record],
batch_id=2,
source=ParserLoadLog.Source.TRUDVSEM,
)
self.assertEqual(count1, 1)
self.assertEqual(count2, 0)
self.assertEqual(GenericParserRecord.objects.count(), 1)
def test_save_records_deduplicates_incoming_batch(self):
"""Test duplicate external IDs inside one payload count as one save."""
records = [
GenericParserItem(
source=ParserLoadLog.Source.TRUDVSEM,
external_id="VAC-SAME",
inn="1234567890",
organisation_name="First Company",
payload={"name": "first"},
),
GenericParserItem(
source=ParserLoadLog.Source.TRUDVSEM,
external_id="VAC-SAME",
inn="9999999999",
organisation_name="Second Company",
payload={"name": "second"},
),
]
count = GenericParserRecordService.save_records(
records,
batch_id=1,
source=ParserLoadLog.Source.TRUDVSEM,
)
self.assertEqual(count, 1)
self.assertEqual(GenericParserRecord.objects.count(), 1)
self.assertEqual(
GenericParserRecord.objects.get().organisation_name,
"First Company",
)
def test_create_with_exact_count_returns_zero_for_conflict(self):
"""Test fallback count is exact when DB unique constraint rejects insert."""
GenericParserRecordFactory(
source=ParserLoadLog.Source.TRUDVSEM,
external_id="VAC-RACE",
)
instance = GenericParserRecord(
load_batch=2,
source=ParserLoadLog.Source.TRUDVSEM,
external_id="VAC-RACE",
inn="1234567890",
organisation_name="Concurrent Company",
payload={"external_id": "VAC-RACE"},
)
count = GenericParserRecordService._create_with_exact_count(
[instance],
unique_fields=["source", "external_id"],
chunk_size=500,
)
self.assertEqual(count, 0)
self.assertEqual(GenericParserRecord.objects.count(), 1)
def test_find_by_inn_with_source(self):
"""Test finding generic records by INN and source."""
GenericParserRecordFactory(
source=ParserLoadLog.Source.TRUDVSEM,
inn="1234567890",
)
GenericParserRecordFactory(
source=ParserLoadLog.Source.FNS_FINANCIAL,
inn="1234567890",
)
result = GenericParserRecordService.find_by_inn(
"1234567890",
source=ParserLoadLog.Source.TRUDVSEM,
)
self.assertEqual(result.count(), 1)
self.assertEqual(result.first().source, ParserLoadLog.Source.TRUDVSEM)
class InspectionServiceTest(TestCase):
"""Tests for InspectionService."""
def test_save_inspections_empty(self):
"""Test saving empty list returns 0."""
count = InspectionService.save_inspections([], batch_id=1)
self.assertEqual(count, 0)
def test_save_inspections(self):
"""Test saving inspections from dataclass."""
inspections = [
Inspection(
registration_number=f"77202400000{i}",
inn=f"770{i}234567",
ogrn=f"102770000000{i}",
organisation_name=f"Компания {i}",
control_authority="Роспотребнадзор",
inspection_type="плановая",
inspection_form="документарная",
start_date="2024-01-15",
end_date="2024-01-30",
status="завершена",
legal_basis="294-ФЗ",
result="нарушения не выявлены",
)
for i in range(5)
]
count = InspectionService.save_inspections(inspections, batch_id=1)
self.assertEqual(count, 5)
self.assertEqual(InspectionRecord.objects.count(), 5)
def test_save_inspections_with_chunk_size(self):
"""Test saving inspections in chunks."""
inspections = [
Inspection(
registration_number=f"7720240000{i:02d}",
inn=f"770{i:02d}34567",
ogrn=f"10277000000{i:02d}",
organisation_name=f"Компания {i}",
control_authority="Ростехнадзор",
inspection_type="внеплановая",
inspection_form="выездная",
start_date="2024-02-01",
end_date="2024-02-15",
status="завершена",
legal_basis="248-ФЗ",
)
for i in range(10)
]
count = InspectionService.save_inspections(
inspections, batch_id=1, chunk_size=3
)
self.assertEqual(count, 10)
def test_find_by_inn(self):
"""Test finding inspections by INN."""
InspectionRecordFactory(inn="1111111111", load_batch=1)
InspectionRecordFactory(inn="1111111111", load_batch=2)
InspectionRecordFactory(inn="2222222222", load_batch=1)
results = InspectionService.find_by_inn("1111111111")
self.assertEqual(results.count(), 2)
results_batch1 = InspectionService.find_by_inn("1111111111", batch_id=1)
self.assertEqual(results_batch1.count(), 1)
def test_find_by_registration_number(self):
"""Test finding inspection by registration number."""
InspectionRecordFactory(registration_number="772024000001")
InspectionRecordFactory(registration_number="772024000002")
results = InspectionService.find_by_registration_number("772024000001")
self.assertEqual(results.count(), 1)
def test_find_by_control_authority(self):
"""Test finding inspections by control authority."""
InspectionRecordFactory(control_authority="Роспотребнадзор", load_batch=1)
InspectionRecordFactory(
control_authority="Управление Роспотребнадзора по г. Москве", load_batch=1
)
InspectionRecordFactory(control_authority="Ростехнадзор", load_batch=1)
results = InspectionService.find_by_control_authority("Роспотребнадзор")
self.assertEqual(results.count(), 2)
results_batch1 = InspectionService.find_by_control_authority(
"Роспотребнадзор", batch_id=1
)
self.assertEqual(results_batch1.count(), 2)
def test_save_inspections_deduplication(self):
"""Test saving inspections skips duplicates by registration_number."""
# Create initial inspection
initial = [
Inspection(
registration_number="DEDUP-REG-001",
inn="1234567890",
ogrn="1234567890123",
organisation_name="Old Organisation",
control_authority="Роспотребнадзор",
inspection_type="плановая",
inspection_form="документарная",
start_date="2024-01-01",
end_date="2024-01-15",
status="завершена",
legal_basis="294-ФЗ",
result="нарушения не выявлены",
)
]
count1 = InspectionService.save_inspections(initial, batch_id=1)
self.assertEqual(count1, 1)
self.assertEqual(InspectionRecord.objects.count(), 1)
# Try to save with same registration_number - should be skipped
duplicate = [
Inspection(
registration_number="DEDUP-REG-001", # Same number - will be skipped
inn="9999999999",
ogrn="9999999999999",
organisation_name="New Organisation",
control_authority="Ростехнадзор",
inspection_type="внеплановая",
inspection_form="выездная",
start_date="2024-06-01",
end_date="2024-06-30",
status="в процессе",
legal_basis="248-ФЗ",
result="выявлены нарушения",
)
]
count2 = InspectionService.save_inspections(duplicate, batch_id=2)
self.assertEqual(count2, 0)
# Should still be 1 record (duplicate skipped)
self.assertEqual(InspectionRecord.objects.count(), 1)
# Verify original data preserved
record = InspectionRecord.objects.first()
self.assertEqual(record.organisation_name, "Old Organisation")
self.assertEqual(record.inn, "1234567890")
self.assertEqual(record.control_authority, "Роспотребнадзор")
self.assertEqual(record.status, "завершена")
self.assertEqual(record.load_batch, 1) # Original batch
from apps.parsers.clients.base import HTTPClientError
from apps.parsers.clients.minpromtorg.industrial import IndustrialProductionClient
from django.test import tag
@tag("integration", "slow", "network", "e2e")
class EndToEndIntegrationTest(TestCase):
"""
End-to-end интеграционные тесты полного flow.
Тестирует: Загрузка с API -> Парсинг -> Сохранение в БД -> Проверка.
Запуск: uv run python run_tests.py tests.apps.parsers.test_services.EndToEndIntegrationTest
"""
def test_full_flow_fetch_and_save_certificates(self):
"""
Полный E2E тест: загрузка сертификатов и сохранение в БД.
1. Загружаем данные с реального API
2. Создаём лог загрузки
3. Сохраняем первые N записей в БД
4. Проверяем что данные корректно сохранились
"""
try:
# 1. Загружаем данные с API
print("\n[E2E] Step 1: Fetching certificates from API...")
with IndustrialProductionClient(timeout=120) as client:
all_certificates = client.fetch_certificates()
if not all_certificates:
self.skipTest("No certificates returned from API")
print(f"[E2E] Loaded {len(all_certificates)} certificates from API")
# Берём только первые 100 для теста
certificates = all_certificates[:100]
# 2. Создаём batch_id и лог
print("[E2E] Step 2: Creating load log...")
batch_id = ParserLoadLogService.get_next_batch_id(
ParserLoadLog.Source.INDUSTRIAL
)
log = ParserLoadLogService.create_load_log(
source=ParserLoadLog.Source.INDUSTRIAL,
batch_id=batch_id,
records_count=0,
)
print(f"[E2E] Created batch_id={batch_id}")
# 3. Сохраняем в БД
print("[E2E] Step 3: Saving certificates to database...")
saved_count = IndustrialCertificateService.save_certificates(
certificates, batch_id=batch_id
)
ParserLoadLogService.update_records_count(log, saved_count)
print(f"[E2E] Saved {saved_count} certificates")
# 4. Проверяем результат
print("[E2E] Step 4: Verifying saved data...")
# Проверяем количество
db_count = IndustrialCertificateRecord.objects.filter(
load_batch=batch_id
).count()
self.assertEqual(db_count, saved_count)
self.assertEqual(db_count, len(certificates))
# Проверяем первую запись
first_cert = certificates[0]
db_record = IndustrialCertificateRecord.objects.filter(
load_batch=batch_id,
certificate_number=first_cert.certificate_number,
).first()
self.assertIsNotNone(db_record)
self.assertEqual(db_record.inn, first_cert.inn)
self.assertEqual(db_record.ogrn, first_cert.ogrn)
self.assertEqual(db_record.organisation_name, first_cert.organisation_name)
# Проверяем лог
log.refresh_from_db()
self.assertEqual(log.records_count, saved_count)
self.assertEqual(log.status, "success")
print("[E2E] ✅ All checks passed!")
print(f"[E2E] Sample record: {db_record.certificate_number}")
print(f"[E2E] Organisation: {db_record.organisation_name}")
print(f"[E2E] INN: {db_record.inn}, OGRN: {db_record.ogrn}")
except HTTPClientError as e:
self.skipTest(f"External API unavailable: {e}")