"""Tests for parsers services.""" from unittest.mock import patch from urllib.parse import urlparse from apps.parsers.clients.minpromtorg.industrial import IndustrialProductionClient from apps.parsers.clients.minpromtorg.schemas import ( IndustrialCertificate, IndustrialProduct, Manufacturer, ) from apps.parsers.clients.proverki.schemas import Inspection from apps.parsers.clients.zakupki.schemas import Procurement from apps.parsers.models import ( IndustrialCertificateRecord, IndustrialProductRecord, InspectionRecord, ManufacturerRecord, ParserLoadLog, ProcurementRecord, Proxy, ) from apps.parsers.services import ( IndustrialCertificateService, IndustrialProductService, InspectionService, ManufacturerService, ParserLoadLogService, ProcurementService, ProxyService, ProxyToolsSyncService, ) from apps.registers.models import Organization from django.test import TestCase, override_settings, tag from tests.utils import TestHTTPServer from tests.utils.fixtures import build_minpromtorg_certificates_excel, fake from .factories import ( IndustrialCertificateRecordFactory, IndustrialProductRecordFactory, InspectionRecordFactory, ManufacturerRecordFactory, ParserLoadLogFactory, ProxyFactory, ) def _digits(length: int) -> str: return "".join(str(fake.random_int(0, 9)) for _ in range(length)) def _proxy_address() -> str: return f"http://{fake.ipv4()}:{fake.port_number()}" def _create_registry_organization(*, inn: str, ogrn: str) -> Organization: return Organization.objects.create( pn_name=fake.company(), mn_ogrn=int(ogrn), mn_inn=int(inn), in_kpp=int(_digits(9)), mn_okpo=_digits(8), ) class ProxyServiceTest(TestCase): """Tests for ProxyService.""" def test_get_active_proxies_empty(self): """Test getting active proxies when none exist.""" proxies = ProxyService.get_active_proxies() self.assertEqual(proxies, []) def test_get_active_proxies_with_data(self): """Test getting active proxies returns only active ones.""" proxy1 = ProxyFactory(is_active=True) proxy2 = ProxyFactory(is_active=True) ProxyFactory(is_active=False) # Inactive - should not be returned proxies = ProxyService.get_active_proxies() self.assertEqual(len(proxies), 2) self.assertIn(proxy1.address, proxies) self.assertIn(proxy2.address, proxies) def test_get_active_proxies_or_none_empty(self): """Test get_active_proxies_or_none returns None when no proxies.""" result = ProxyService.get_active_proxies_or_none() self.assertIsNone(result) def test_get_active_proxies_or_none_with_data(self): """Test get_active_proxies_or_none returns list when proxies exist.""" ProxyFactory(is_active=True) result = ProxyService.get_active_proxies_or_none() self.assertIsNotNone(result) self.assertEqual(len(result), 1) def test_mark_used(self): """Test marking proxy as used updates timestamp.""" proxy = ProxyFactory(last_used_at=None) self.assertIsNone(proxy.last_used_at) ProxyService.mark_used(proxy.address) proxy.refresh_from_db() self.assertIsNotNone(proxy.last_used_at) def test_mark_failed(self): """Test marking proxy as failed increases fail count.""" proxy = ProxyFactory(fail_count=0) ProxyService.mark_failed(proxy.address) proxy.refresh_from_db() self.assertEqual(proxy.fail_count, 1) def test_deactivate(self): """Test deactivating proxy.""" proxy = ProxyFactory(is_active=True) ProxyService.deactivate(proxy.address) proxy.refresh_from_db() self.assertFalse(proxy.is_active) def test_add_proxy(self): """Test adding new proxy.""" address = _proxy_address() description = fake.sentence(nb_words=3) proxy = ProxyService.add_proxy(address, description) self.assertEqual(proxy.address, address) self.assertEqual(proxy.description, description) self.assertTrue(proxy.is_active) def test_add_proxy_idempotent(self): """Test adding existing proxy returns existing record.""" address = _proxy_address() existing_description = fake.sentence(nb_words=3) existing = ProxyFactory(address=address, description=existing_description) new_description = fake.sentence(nb_words=3) proxy = ProxyService.add_proxy(address, new_description) self.assertEqual(proxy.id, existing.id) self.assertEqual(proxy.description, existing_description) # Not updated def test_add_proxies(self): """Test bulk adding proxies.""" addresses = [_proxy_address() for _ in range(3)] created = ProxyService.add_proxies(addresses) self.assertEqual(created, 3) self.assertEqual(Proxy.objects.count(), 3) def test_add_proxies_skips_existing(self): """Test bulk add skips existing proxies.""" existing_address = _proxy_address() new_address = _proxy_address() while new_address == existing_address: new_address = _proxy_address() ProxyFactory(address=existing_address) addresses = [ existing_address, # Already exists new_address, ] created = ProxyService.add_proxies(addresses) self.assertEqual(created, 1) self.assertEqual(Proxy.objects.count(), 2) def test_get_runtime_proxies_prefers_proxy_tools_ru(self): """Runtime should prefer RU proxies imported from Proxy-Tools.""" manual_ru = ProxyFactory( source=ProxyService.MANUAL_SOURCE, country_code="RU", ) imported_ru = ProxyFactory( source=ProxyService.PROXY_TOOLS_SOURCE, country_code="RU", ) ProxyFactory( source=ProxyService.PROXY_TOOLS_SOURCE, country_code="US", ) result = ProxyService.get_runtime_proxies() self.assertEqual(result, [imported_ru.address]) self.assertNotIn(manual_ru.address, result) def test_get_runtime_proxies_falls_back_to_any_ru_proxy(self): """Runtime should fall back to any RU proxy when imported list is empty.""" manual_ru = ProxyFactory( source=ProxyService.MANUAL_SOURCE, country_code="RU", ) ProxyFactory( source=ProxyService.MANUAL_SOURCE, country_code="US", ) result = ProxyService.get_runtime_proxies() self.assertEqual(result, [manual_ru.address]) class ProxyToolsSyncServiceTest(TestCase): """Tests for ProxyToolsSyncService.""" def test_sync_ru_proxies_skips_without_api_key(self): """Sync should be skipped when API key is missing.""" result = ProxyToolsSyncService.sync_ru_proxies() self.assertEqual(result["status"], "skipped") self.assertEqual(result["reason"], "missing_api_key") @override_settings( PROXY_TOOLS_API_KEY="test-token", PROXY_TOOLS_LIMIT=2, PROXY_TOOLS_MAX_PAGES=2, ) @patch("apps.parsers.services.ProxyToolsClient.fetch_proxies") def test_sync_ru_proxies_upserts_and_deactivates(self, fetch_proxies_mock): """Sync should create, reactivate and deactivate imported proxies.""" active_stale = ProxyFactory( address="http://10.0.0.10:8000", source=ProxyService.PROXY_TOOLS_SOURCE, country_code="RU", is_active=True, ) inactive_existing = ProxyFactory( address="http://10.0.0.20:8000", source=ProxyService.PROXY_TOOLS_SOURCE, country_code="RU", is_active=False, ) manual_ru = ProxyFactory( address="http://10.0.0.30:8000", source=ProxyService.MANUAL_SOURCE, country_code="RU", is_active=True, ) fetch_proxies_mock.side_effect = [ { "data": [ {"host": "10.0.0.20", "port": 8000, "type": "4"}, {"proxy": "socks5://10.0.0.40:1080"}, ], "meta": {"total_pages": 2}, }, { "data": [ "https://10.0.0.50:8443", ], "meta": {"total_pages": 2}, }, ] result = ProxyToolsSyncService.sync_ru_proxies() self.assertEqual(result["status"], "success") self.assertEqual(result["fetched"], 3) self.assertEqual(result["created"], 2) self.assertEqual(result["updated"], 1) self.assertEqual(result["deactivated"], 1) active_stale.refresh_from_db() inactive_existing.refresh_from_db() manual_ru.refresh_from_db() self.assertFalse(active_stale.is_active) self.assertTrue(inactive_existing.is_active) self.assertTrue(manual_ru.is_active) imported_addresses = set( Proxy.objects.filter( source=ProxyService.PROXY_TOOLS_SOURCE, country_code="RU", is_active=True, ).values_list("address", flat=True) ) self.assertSetEqual( imported_addresses, { "http://10.0.0.20:8000", "socks5://10.0.0.40:1080", "https://10.0.0.50:8443", }, ) class ParserLoadLogServiceTest(TestCase): """Tests for ParserLoadLogService.""" def test_get_next_batch_id_first(self): """Test getting first batch_id for new source.""" batch_id = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.INDUSTRIAL ) self.assertEqual(batch_id, 1) def test_get_next_batch_id_increment(self): """Test batch_id increments correctly.""" ParserLoadLogFactory(batch_id=5, source=ParserLoadLog.Source.INDUSTRIAL) ParserLoadLogFactory(batch_id=3, source=ParserLoadLog.Source.INDUSTRIAL) batch_id = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.INDUSTRIAL ) self.assertEqual(batch_id, 6) def test_get_next_batch_id_per_source(self): """Test batch_id is tracked per source.""" ParserLoadLogFactory(batch_id=10, source=ParserLoadLog.Source.INDUSTRIAL) ParserLoadLogFactory(batch_id=5, source=ParserLoadLog.Source.MANUFACTURES) industrial_batch = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.INDUSTRIAL ) manufactures_batch = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.MANUFACTURES ) self.assertEqual(industrial_batch, 11) self.assertEqual(manufactures_batch, 6) def test_create_load_log(self): """Test creating load log.""" log = ParserLoadLogService.create_load_log( source=ParserLoadLog.Source.INDUSTRIAL, batch_id=1, records_count=100, status="success", ) self.assertIsInstance(log, ParserLoadLog) self.assertEqual(log.source, ParserLoadLog.Source.INDUSTRIAL) self.assertEqual(log.batch_id, 1) self.assertEqual(log.records_count, 100) self.assertEqual(log.status, "success") def test_mark_failed(self): """Test marking log as failed.""" log = ParserLoadLogFactory(status="success") error_message = fake.sentence(nb_words=4) ParserLoadLogService.mark_failed(log, error_message) log.refresh_from_db() self.assertEqual(log.status, "failed") self.assertEqual(log.error_message, error_message) def test_update_records_count(self): """Test updating records count.""" log = ParserLoadLogFactory(records_count=0) ParserLoadLogService.update_records_count(log, 250) log.refresh_from_db() self.assertEqual(log.records_count, 250) class IndustrialCertificateServiceTest(TestCase): """Tests for IndustrialCertificateService.""" def test_save_certificates_empty(self): """Test saving empty list returns 0.""" count = IndustrialCertificateService.save_certificates([], batch_id=1) self.assertEqual(count, 0) def test_save_certificates(self): """Test saving certificates from dataclass.""" certificates = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=fake.bothify(text="??-####-#####"), expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=_digits(10), ogrn=_digits(13), ) for i in range(5) ] count = IndustrialCertificateService.save_certificates(certificates, batch_id=1) self.assertEqual(count, 5) self.assertEqual(IndustrialCertificateRecord.objects.count(), 5) record = IndustrialCertificateRecord.objects.first() self.assertIsNotNone(record.issue_date_normalized) self.assertIsNotNone(record.expiry_date_normalized) def test_save_certificates_links_registry_organization_when_exists(self): """Test linking to registers organization is created when identifiers match.""" inn = _digits(10) ogrn = _digits(13) organization = _create_registry_organization(inn=inn, ogrn=ogrn) certificate_number = fake.bothify(text="??-####-#####") certificates = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=certificate_number, expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=inn, ogrn=ogrn, ) ] saved = IndustrialCertificateService.save_certificates(certificates, batch_id=1) self.assertEqual(saved, 1) record = IndustrialCertificateRecord.objects.get( certificate_number=certificate_number ) self.assertEqual(record.registry_organization_id, organization.id) def test_save_certificates_keeps_null_registry_organization_when_not_found(self): """Test parser save does not fail and keeps null when organization is absent.""" certificate_number = fake.bothify(text="??-####-#####") certificates = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=certificate_number, expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=_digits(10), ogrn=_digits(13), ) ] saved = IndustrialCertificateService.save_certificates(certificates, batch_id=1) self.assertEqual(saved, 1) record = IndustrialCertificateRecord.objects.get( certificate_number=certificate_number ) self.assertIsNone(record.registry_organization_id) def test_save_certificates_with_chunk_size(self): """Test saving certificates in chunks.""" certificates = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=fake.bothify(text="??-####-#####"), expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=_digits(10), ogrn=_digits(13), ) for i in range(10) ] count = IndustrialCertificateService.save_certificates( certificates, batch_id=1, chunk_size=3 ) self.assertEqual(count, 10) def test_find_by_inn(self): """Test finding certificates by INN.""" inn_a = _digits(10) inn_b = _digits(10) IndustrialCertificateRecordFactory( inn=inn_a, certificate_number=fake.bothify(text="CERT-####"), load_batch=1 ) IndustrialCertificateRecordFactory( inn=inn_a, certificate_number=fake.bothify(text="CERT-####"), load_batch=2 ) IndustrialCertificateRecordFactory( inn=inn_b, certificate_number=fake.bothify(text="CERT-####"), load_batch=1 ) results = IndustrialCertificateService.find_by_inn(inn_a) self.assertEqual(results.count(), 2) results_batch1 = IndustrialCertificateService.find_by_inn(inn_a, batch_id=1) self.assertEqual(results_batch1.count(), 1) def test_find_by_certificate_number(self): """Test finding certificate by number.""" unique_number = fake.bothify(text="CERT-#####") IndustrialCertificateRecordFactory(certificate_number=unique_number) IndustrialCertificateRecordFactory( certificate_number=fake.bothify(text="CERT-#####") ) results = IndustrialCertificateService.find_by_certificate_number(unique_number) self.assertEqual(results.count(), 1) def test_save_certificates_deduplication(self): """Test saving certificates skips duplicates by certificate_number.""" # Create initial certificate cert_number = fake.bothify(text="CERT-DEDUP-#####") initial = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=cert_number, expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=_digits(10), ogrn=_digits(13), ) ] count1 = IndustrialCertificateService.save_certificates(initial, batch_id=1) self.assertEqual(count1, 1) self.assertEqual(IndustrialCertificateRecord.objects.count(), 1) # Try to save with same certificate_number - should be skipped duplicate = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=cert_number, # Same number - will be skipped expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=_digits(10), ogrn=_digits(13), ) ] count2 = IndustrialCertificateService.save_certificates(duplicate, batch_id=2) # Should still be 1 record (duplicate skipped) self.assertEqual(count2, 0) self.assertEqual(IndustrialCertificateRecord.objects.count(), 1) # Verify original data preserved record = IndustrialCertificateRecord.objects.first() self.assertEqual(record.organisation_name, initial[0].organisation_name) self.assertEqual(record.inn, initial[0].inn) self.assertEqual(record.load_batch, 1) # Original batch class ManufacturerServiceTest(TestCase): """Tests for ManufacturerService.""" def test_save_manufacturers_empty(self): """Test saving empty list returns 0.""" count = ManufacturerService.save_manufacturers([], batch_id=1) self.assertEqual(count, 0) def test_save_manufacturers(self): """Test saving manufacturers from dataclass.""" manufacturers = [ Manufacturer( full_legal_name=fake.company(), inn=_digits(10), ogrn=_digits(13), address=fake.address().replace("\n", ", "), ) for i in range(5) ] count = ManufacturerService.save_manufacturers(manufacturers, batch_id=1) self.assertEqual(count, 5) self.assertEqual(ManufacturerRecord.objects.count(), 5) def test_save_manufacturers_links_registry_organization_when_exists(self): """Test linking manufacturer to registers organization by INN/ОГРН.""" inn = _digits(10) ogrn = _digits(13) organization = _create_registry_organization(inn=inn, ogrn=ogrn) manufacturers = [ Manufacturer( full_legal_name=fake.company(), inn=inn, ogrn=ogrn, address=fake.address().replace("\n", ", "), ) ] saved = ManufacturerService.save_manufacturers(manufacturers, batch_id=1) self.assertEqual(saved, 1) record = ManufacturerRecord.objects.get(inn=inn) self.assertEqual(record.registry_organization_id, organization.id) def test_save_manufacturers_with_chunk_size(self): """Test saving manufacturers in chunks.""" manufacturers = [ Manufacturer( full_legal_name=fake.company(), inn=_digits(10), ogrn=_digits(13), address=fake.address().replace("\n", ", "), ) for i in range(10) ] count = ManufacturerService.save_manufacturers( manufacturers, batch_id=1, chunk_size=3 ) self.assertEqual(count, 10) def test_find_by_inn(self): """Test finding manufacturers by INN.""" inn_target = _digits(10) inn_other = _digits(10) inn_third = _digits(10) ManufacturerRecordFactory(inn=inn_target, load_batch=1) ManufacturerRecordFactory(inn=inn_other, load_batch=1) ManufacturerRecordFactory(inn=inn_third, load_batch=2) results = ManufacturerService.find_by_inn(inn_target) self.assertEqual(results.count(), 1) def test_find_by_inn_with_batch_filter(self): """Test finding manufacturers by INN with batch filter.""" inn_value = _digits(10) ManufacturerRecordFactory(inn=inn_value, load_batch=1) ManufacturerRecordFactory(inn=_digits(10), load_batch=2) results_batch1 = ManufacturerService.find_by_inn(inn_value, batch_id=1) self.assertEqual(results_batch1.count(), 1) results_batch2 = ManufacturerService.find_by_inn(inn_value, batch_id=2) self.assertEqual(results_batch2.count(), 0) def test_find_by_ogrn(self): """Test finding manufacturers by OGRN.""" ogrn_target = _digits(13) ManufacturerRecordFactory(ogrn=ogrn_target) ManufacturerRecordFactory(ogrn=_digits(13)) results = ManufacturerService.find_by_ogrn(ogrn_target) self.assertEqual(results.count(), 1) def test_save_manufacturers_updates_existing_record(self): """Test saving manufacturers refreshes existing record by INN.""" # Create initial manufacturer inn_value = _digits(10) ogrn_value = _digits(13) address_value = fake.address().replace("\n", ", ") company_name = fake.company() initial = [ Manufacturer( full_legal_name=company_name, inn=inn_value, ogrn=ogrn_value, address=address_value, ) ] count1 = ManufacturerService.save_manufacturers(initial, batch_id=1) self.assertEqual(count1, 1) self.assertEqual(ManufacturerRecord.objects.count(), 1) updated_name = fake.company() updated_address = fake.address().replace("\n", ", ") updated_ogrn = _digits(13) duplicate = [ Manufacturer( full_legal_name=updated_name, inn=inn_value, ogrn=updated_ogrn, address=updated_address, ) ] count2 = ManufacturerService.save_manufacturers(duplicate, batch_id=2) # Existing record should be updated in place. self.assertEqual(count2, 1) self.assertEqual(ManufacturerRecord.objects.count(), 1) # Verify latest data preserved record = ManufacturerRecord.objects.first() self.assertEqual(record.full_legal_name, updated_name) self.assertEqual(record.ogrn, updated_ogrn) self.assertEqual(record.address, updated_address) self.assertEqual(record.load_batch, 2) class IndustrialProductServiceTest(TestCase): """Tests for IndustrialProductService.""" def test_save_products_empty(self): """Test saving empty list returns 0.""" count = IndustrialProductService.save_products([], batch_id=1) self.assertEqual(count, 0) def test_save_products(self): """Test saving industrial products from dataclass.""" products = [ IndustrialProduct( full_organisation_name=fake.company(), ogrn=_digits(13), inn=_digits(10), registry_number=f"MPP-{_digits(8)}", product_name=fake.sentence(nb_words=4), product_model=fake.bothify(text="MODEL-###"), okpd2_code=f"{fake.random_int(min=10, max=99)}.{fake.random_int(min=10, max=99)}", tnved_code=_digits(10), regulatory_document=fake.sentence(nb_words=5), ) for _ in range(5) ] count = IndustrialProductService.save_products(products, batch_id=1) self.assertEqual(count, 5) self.assertEqual(IndustrialProductRecord.objects.count(), 5) def test_save_products_links_registry_organization_when_exists(self): """Test linking industrial product to registers organization.""" inn = _digits(10) ogrn = _digits(13) organization = _create_registry_organization(inn=inn, ogrn=ogrn) registry_number = f"MPP-{_digits(8)}" products = [ IndustrialProduct( full_organisation_name=fake.company(), ogrn=ogrn, inn=inn, registry_number=registry_number, product_name=fake.sentence(nb_words=4), product_model=fake.bothify(text="MODEL-###"), okpd2_code=f"{fake.random_int(min=10, max=99)}.{fake.random_int(min=10, max=99)}", tnved_code=_digits(10), regulatory_document=fake.sentence(nb_words=5), ) ] saved = IndustrialProductService.save_products(products, batch_id=1) self.assertEqual(saved, 1) record = IndustrialProductRecord.objects.get(registry_number=registry_number) self.assertEqual(record.registry_organization_id, organization.id) def test_find_by_registry_number(self): """Test finding industrial product by registry number.""" registry_number = f"MPP-{_digits(8)}" IndustrialProductRecordFactory(registry_number=registry_number) IndustrialProductRecordFactory(registry_number=f"MPP-{_digits(8)}") results = IndustrialProductService.find_by_registry_number(registry_number) self.assertEqual(results.count(), 1) def test_save_products_updates_existing_record(self): """Test saving products refreshes existing record by registry number.""" registry_number = f"MPP-{_digits(8)}" initial = [ IndustrialProduct( full_organisation_name=fake.company(), ogrn=_digits(13), inn=_digits(10), registry_number=registry_number, product_name="Начальное имя", product_model="MODEL-001", okpd2_code="25.11", tnved_code=_digits(10), regulatory_document="ГОСТ 1", ) ] count1 = IndustrialProductService.save_products(initial, batch_id=1) self.assertEqual(count1, 1) self.assertEqual(IndustrialProductRecord.objects.count(), 1) updated = [ IndustrialProduct( full_organisation_name=fake.company(), ogrn=_digits(13), inn=_digits(10), registry_number=registry_number, product_name="Обновленное имя", product_model="MODEL-777", okpd2_code="28.99", tnved_code=_digits(10), regulatory_document="ГОСТ 2", ) ] count2 = IndustrialProductService.save_products(updated, batch_id=2) self.assertEqual(count2, 1) self.assertEqual(IndustrialProductRecord.objects.count(), 1) record = IndustrialProductRecord.objects.first() self.assertEqual(record.product_name, "Обновленное имя") self.assertEqual(record.product_model, "MODEL-777") self.assertEqual(record.load_batch, 2) class InspectionServiceTest(TestCase): """Tests for InspectionService.""" def test_save_inspections_empty(self): """Test saving empty list returns 0.""" count = InspectionService.save_inspections([], batch_id=1) self.assertEqual(count, 0) def test_save_inspections(self): """Test saving inspections from dataclass.""" inspections = [ Inspection( registration_number=_digits(12), inn=_digits(10), ogrn=_digits(13), organisation_name=fake.company(), control_authority=fake.company(), inspection_type=fake.word(), inspection_form=fake.word(), start_date=str(fake.date()), end_date=str(fake.date()), status=fake.word(), legal_basis=fake.sentence(nb_words=3), result=fake.sentence(nb_words=3), ) for i in range(5) ] count = InspectionService.save_inspections(inspections, batch_id=1) self.assertEqual(count, 5) self.assertEqual(InspectionRecord.objects.count(), 5) record = InspectionRecord.objects.first() self.assertIsNotNone(record.start_date_normalized) self.assertIsNotNone(record.end_date_normalized) def test_save_inspections_links_registry_organization_when_exists(self): """Test linking inspection to registers organization by INN/ОГРН.""" inn = _digits(10) ogrn = _digits(13) organization = _create_registry_organization(inn=inn, ogrn=ogrn) registration_number = _digits(12) inspections = [ Inspection( registration_number=registration_number, inn=inn, ogrn=ogrn, organisation_name=fake.company(), control_authority=fake.company(), inspection_type=fake.word(), inspection_form=fake.word(), start_date=str(fake.date()), end_date=str(fake.date()), status=fake.word(), legal_basis=fake.sentence(nb_words=3), result=fake.sentence(nb_words=3), ) ] saved = InspectionService.save_inspections(inspections, batch_id=1) self.assertEqual(saved, 1) record = InspectionRecord.objects.get(registration_number=registration_number) self.assertEqual(record.registry_organization_id, organization.id) def test_save_inspections_with_chunk_size(self): """Test saving inspections in chunks.""" inspections = [ Inspection( registration_number=_digits(12), inn=_digits(10), ogrn=_digits(13), organisation_name=fake.company(), control_authority=fake.company(), inspection_type=fake.word(), inspection_form=fake.word(), start_date=str(fake.date()), end_date=str(fake.date()), status=fake.word(), legal_basis=fake.sentence(nb_words=3), ) for i in range(10) ] count = InspectionService.save_inspections( inspections, batch_id=1, chunk_size=3 ) self.assertEqual(count, 10) def test_find_by_inn(self): """Test finding inspections by INN.""" inn_value = _digits(10) InspectionRecordFactory(inn=inn_value, load_batch=1) InspectionRecordFactory(inn=inn_value, load_batch=2) InspectionRecordFactory(inn=_digits(10), load_batch=1) results = InspectionService.find_by_inn(inn_value) self.assertEqual(results.count(), 2) results_batch1 = InspectionService.find_by_inn(inn_value, batch_id=1) self.assertEqual(results_batch1.count(), 1) def test_find_by_registration_number(self): """Test finding inspection by registration number.""" target_number = _digits(12) other_number = _digits(12) InspectionRecordFactory(registration_number=target_number) InspectionRecordFactory(registration_number=other_number) results = InspectionService.find_by_registration_number(target_number) self.assertEqual(results.count(), 1) def test_find_by_control_authority(self): """Test finding inspections by control authority.""" authority_key = fake.word() authority_match_1 = f"{fake.company()} {authority_key}" authority_match_2 = f"{authority_key} {fake.company()}" authority_other = fake.company() InspectionRecordFactory(control_authority=authority_match_1, load_batch=1) InspectionRecordFactory(control_authority=authority_match_2, load_batch=1) InspectionRecordFactory(control_authority=authority_other, load_batch=1) results = InspectionService.find_by_control_authority(authority_key) self.assertEqual(results.count(), 2) results_batch1 = InspectionService.find_by_control_authority( authority_key, batch_id=1 ) self.assertEqual(results_batch1.count(), 2) def test_save_inspections_updates_existing_record(self): """Test saving inspections refreshes existing record by registration_number.""" # Create initial inspection reg_number = _digits(12) inn_value = _digits(10) ogrn_value = _digits(13) org_name = fake.company() control_authority = fake.company() inspection_type = fake.word() inspection_form = fake.word() start_date = str(fake.date()) end_date = str(fake.date()) status = fake.word() legal_basis = fake.sentence(nb_words=3) result_text = fake.sentence(nb_words=3) initial = [ Inspection( registration_number=reg_number, inn=inn_value, ogrn=ogrn_value, organisation_name=org_name, control_authority=control_authority, inspection_type=inspection_type, inspection_form=inspection_form, start_date=start_date, end_date=end_date, status=status, legal_basis=legal_basis, result=result_text, ) ] count1 = InspectionService.save_inspections(initial, batch_id=1) self.assertEqual(count1, 1) self.assertEqual(InspectionRecord.objects.count(), 1) updated_name = fake.company() updated_authority = fake.company() updated_status = fake.word() duplicate = [ Inspection( registration_number=reg_number, inn=_digits(10), ogrn=_digits(13), organisation_name=updated_name, control_authority=updated_authority, inspection_type=fake.word(), inspection_form=fake.word(), start_date=str(fake.date()), end_date=str(fake.date()), status=updated_status, legal_basis=fake.sentence(nb_words=3), result=fake.sentence(nb_words=3), ) ] count2 = InspectionService.save_inspections(duplicate, batch_id=2) # Existing record should be updated in place. self.assertEqual(count2, 1) self.assertEqual(InspectionRecord.objects.count(), 1) # Verify latest data preserved record = InspectionRecord.objects.first() self.assertEqual(record.organisation_name, updated_name) self.assertNotEqual(record.inn, inn_value) self.assertEqual(record.control_authority, updated_authority) self.assertEqual(record.status, updated_status) self.assertEqual(record.load_batch, 2) class ProcurementServiceTest(TestCase): """Tests for ProcurementService.""" def _build_procurement(self, **overrides) -> Procurement: data = { "purchase_number": _digits(19), "purchase_name": fake.sentence(nb_words=4), "customer_inn": _digits(10), "customer_kpp": _digits(9), "customer_ogrn": _digits(13), "customer_name": fake.company(), "max_price": "1 234 567,89", "currency_code": "RUB", "placement_method": fake.word(), "publish_date": "01.03.2026", "end_date": "2026-03-15", "status": fake.word(), "law_type": "44-FZ", "purchase_object_info": fake.sentence(nb_words=4), "href": fake.url(), } data.update(overrides) return Procurement(**data) def test_save_procurements_sets_normalized_fields(self): procurement = self._build_procurement() saved = ProcurementService.save_procurements([procurement], batch_id=1) self.assertEqual(saved, 1) record = ProcurementRecord.objects.get( purchase_number=procurement.purchase_number ) self.assertEqual(str(record.max_price_amount), "1234567.89") self.assertEqual(str(record.publish_date_normalized), "2026-03-01") self.assertEqual(str(record.end_date_normalized), "2026-03-15") def test_save_procurements_duplicate_updates_existing_record(self): purchase_number = _digits(19) first = self._build_procurement(purchase_number=purchase_number) updated_customer_name = fake.company() updated_status = fake.word() duplicate = self._build_procurement( purchase_number=purchase_number, customer_name=updated_customer_name, status=updated_status, ) saved_first = ProcurementService.save_procurements([first], batch_id=1) saved_second = ProcurementService.save_procurements([duplicate], batch_id=2) self.assertEqual(saved_first, 1) self.assertEqual(saved_second, 1) self.assertEqual(ProcurementRecord.objects.count(), 1) record = ProcurementRecord.objects.get(purchase_number=purchase_number) self.assertEqual(record.customer_name, updated_customer_name) self.assertEqual(record.status, updated_status) self.assertEqual(record.load_batch, 2) @tag("integration", "slow", "e2e") class EndToEndIntegrationTest(TestCase): """ End-to-end интеграционные тесты полного flow. Тестирует: Загрузка с API -> Парсинг -> Сохранение в БД -> Проверка. Запуск: uv run python run_tests.py tests.apps.parsers.test_services.EndToEndIntegrationTest """ def test_full_flow_fetch_and_save_certificates(self): """ Полный E2E тест: загрузка сертификатов и сохранение в БД. 1. Загружаем данные с реального API 2. Создаём лог загрузки 3. Сохраняем первые N записей в БД 4. Проверяем что данные корректно сохранились """ # 1. Загружаем данные через локальный HTTP сервер (без внешнего API) print("\n[E2E] Step 1: Fetching certificates from local API...") excel_bytes, rows = build_minpromtorg_certificates_excel(count=5) date_str = fake.date_between(start_date="-30d", end_date="today").strftime( "%Y%m%d" ) file_name = f"data_resolutions_{date_str}.xlsx" with TestHTTPServer() as server: server.add_json( "/api/kss-document-preview", { "data": [ { "name": IndustrialProductionClient().query, "files": [ {"name": file_name, "url": f"/files/{file_name}"} ], } ] }, ) server.add_bytes(f"/files/{file_name}", excel_bytes) host = urlparse(server.base_url) client_host = f"{host.hostname}:{host.port}" if host.port else host.hostname with IndustrialProductionClient( host=client_host, scheme="http", timeout=30, http_adapter=server.adapter, ) as client: all_certificates = client.fetch_certificates() self.assertEqual(len(all_certificates), len(rows)) print(f"[E2E] Loaded {len(all_certificates)} certificates from local API") # Берём все для теста certificates = all_certificates # 2. Создаём batch_id и лог print("[E2E] Step 2: Creating load log...") batch_id = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.INDUSTRIAL ) log = ParserLoadLogService.create_load_log( source=ParserLoadLog.Source.INDUSTRIAL, batch_id=batch_id, records_count=0, ) print(f"[E2E] Created batch_id={batch_id}") # 3. Сохраняем в БД print("[E2E] Step 3: Saving certificates to database...") saved_count = IndustrialCertificateService.save_certificates( certificates, batch_id=batch_id ) ParserLoadLogService.update_records_count(log, saved_count) print(f"[E2E] Saved {saved_count} certificates") # 4. Проверяем результат print("[E2E] Step 4: Verifying saved data...") # Проверяем количество db_count = IndustrialCertificateRecord.objects.filter( load_batch=batch_id ).count() self.assertEqual(db_count, saved_count) self.assertEqual(db_count, len(certificates)) # Проверяем первую запись first_cert = certificates[0] db_record = IndustrialCertificateRecord.objects.filter( load_batch=batch_id, certificate_number=first_cert.certificate_number, ).first() self.assertIsNotNone(db_record) self.assertEqual(db_record.inn, first_cert.inn) self.assertEqual(db_record.ogrn, first_cert.ogrn) self.assertEqual(db_record.organisation_name, first_cert.organisation_name) # Проверяем лог log.refresh_from_db() self.assertEqual(log.records_count, saved_count) self.assertEqual(log.status, "success") print("[E2E] ✅ All checks passed!") print(f"[E2E] Sample record: {db_record.certificate_number}") print(f"[E2E] Organisation: {db_record.organisation_name}") print(f"[E2E] INN: {db_record.inn}, OGRN: {db_record.ogrn}")