"""Tests for parsers services.""" from apps.parsers.clients.minpromtorg.industrial import IndustrialProductionClient from apps.parsers.clients.minpromtorg.schemas import IndustrialCertificate, Manufacturer from apps.parsers.clients.proverki.schemas import Inspection from apps.parsers.models import ( IndustrialCertificateRecord, InspectionRecord, ManufacturerRecord, ParserLoadLog, Proxy, ) from apps.parsers.services import ( IndustrialCertificateService, InspectionService, ManufacturerService, ParserLoadLogService, ProxyService, ) from django.test import TestCase, tag from tests.utils import TestHTTPServer from tests.utils.fixtures import build_minpromtorg_certificates_excel, fake from urllib.parse import urlparse from .factories import ( IndustrialCertificateRecordFactory, InspectionRecordFactory, ManufacturerRecordFactory, ParserLoadLogFactory, ProxyFactory, ) def _digits(length: int) -> str: return "".join(str(fake.random_int(0, 9)) for _ in range(length)) def _proxy_address() -> str: return f"http://{fake.ipv4()}:{fake.port_number()}" class ProxyServiceTest(TestCase): """Tests for ProxyService.""" def test_get_active_proxies_empty(self): """Test getting active proxies when none exist.""" proxies = ProxyService.get_active_proxies() self.assertEqual(proxies, []) def test_get_active_proxies_with_data(self): """Test getting active proxies returns only active ones.""" proxy1 = ProxyFactory(is_active=True) proxy2 = ProxyFactory(is_active=True) ProxyFactory(is_active=False) # Inactive - should not be returned proxies = ProxyService.get_active_proxies() self.assertEqual(len(proxies), 2) self.assertIn(proxy1.address, proxies) self.assertIn(proxy2.address, proxies) def test_get_active_proxies_or_none_empty(self): """Test get_active_proxies_or_none returns None when no proxies.""" result = ProxyService.get_active_proxies_or_none() self.assertIsNone(result) def test_get_active_proxies_or_none_with_data(self): """Test get_active_proxies_or_none returns list when proxies exist.""" ProxyFactory(is_active=True) result = ProxyService.get_active_proxies_or_none() self.assertIsNotNone(result) self.assertEqual(len(result), 1) def test_mark_used(self): """Test marking proxy as used updates timestamp.""" proxy = ProxyFactory(last_used_at=None) self.assertIsNone(proxy.last_used_at) ProxyService.mark_used(proxy.address) proxy.refresh_from_db() self.assertIsNotNone(proxy.last_used_at) def test_mark_failed(self): """Test marking proxy as failed increases fail count.""" proxy = ProxyFactory(fail_count=0) ProxyService.mark_failed(proxy.address) proxy.refresh_from_db() self.assertEqual(proxy.fail_count, 1) def test_deactivate(self): """Test deactivating proxy.""" proxy = ProxyFactory(is_active=True) ProxyService.deactivate(proxy.address) proxy.refresh_from_db() self.assertFalse(proxy.is_active) def test_add_proxy(self): """Test adding new proxy.""" address = _proxy_address() description = fake.sentence(nb_words=3) proxy = ProxyService.add_proxy(address, description) self.assertEqual(proxy.address, address) self.assertEqual(proxy.description, description) self.assertTrue(proxy.is_active) def test_add_proxy_idempotent(self): """Test adding existing proxy returns existing record.""" address = _proxy_address() existing_description = fake.sentence(nb_words=3) existing = ProxyFactory(address=address, description=existing_description) new_description = fake.sentence(nb_words=3) proxy = ProxyService.add_proxy(address, new_description) self.assertEqual(proxy.id, existing.id) self.assertEqual(proxy.description, existing_description) # Not updated def test_add_proxies(self): """Test bulk adding proxies.""" addresses = [_proxy_address() for _ in range(3)] created = ProxyService.add_proxies(addresses) self.assertEqual(created, 3) self.assertEqual(Proxy.objects.count(), 3) def test_add_proxies_skips_existing(self): """Test bulk add skips existing proxies.""" existing_address = _proxy_address() new_address = _proxy_address() while new_address == existing_address: new_address = _proxy_address() ProxyFactory(address=existing_address) addresses = [ existing_address, # Already exists new_address, ] created = ProxyService.add_proxies(addresses) self.assertEqual(created, 1) self.assertEqual(Proxy.objects.count(), 2) class ParserLoadLogServiceTest(TestCase): """Tests for ParserLoadLogService.""" def test_get_next_batch_id_first(self): """Test getting first batch_id for new source.""" batch_id = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.INDUSTRIAL ) self.assertEqual(batch_id, 1) def test_get_next_batch_id_increment(self): """Test batch_id increments correctly.""" ParserLoadLogFactory(batch_id=5, source=ParserLoadLog.Source.INDUSTRIAL) ParserLoadLogFactory(batch_id=3, source=ParserLoadLog.Source.INDUSTRIAL) batch_id = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.INDUSTRIAL ) self.assertEqual(batch_id, 6) def test_get_next_batch_id_per_source(self): """Test batch_id is tracked per source.""" ParserLoadLogFactory(batch_id=10, source=ParserLoadLog.Source.INDUSTRIAL) ParserLoadLogFactory(batch_id=5, source=ParserLoadLog.Source.MANUFACTURES) industrial_batch = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.INDUSTRIAL ) manufactures_batch = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.MANUFACTURES ) self.assertEqual(industrial_batch, 11) self.assertEqual(manufactures_batch, 6) def test_create_load_log(self): """Test creating load log.""" log = ParserLoadLogService.create_load_log( source=ParserLoadLog.Source.INDUSTRIAL, batch_id=1, records_count=100, status="success", ) self.assertIsInstance(log, ParserLoadLog) self.assertEqual(log.source, ParserLoadLog.Source.INDUSTRIAL) self.assertEqual(log.batch_id, 1) self.assertEqual(log.records_count, 100) self.assertEqual(log.status, "success") def test_mark_failed(self): """Test marking log as failed.""" log = ParserLoadLogFactory(status="success") error_message = fake.sentence(nb_words=4) ParserLoadLogService.mark_failed(log, error_message) log.refresh_from_db() self.assertEqual(log.status, "failed") self.assertEqual(log.error_message, error_message) def test_update_records_count(self): """Test updating records count.""" log = ParserLoadLogFactory(records_count=0) ParserLoadLogService.update_records_count(log, 250) log.refresh_from_db() self.assertEqual(log.records_count, 250) class IndustrialCertificateServiceTest(TestCase): """Tests for IndustrialCertificateService.""" def test_save_certificates_empty(self): """Test saving empty list returns 0.""" count = IndustrialCertificateService.save_certificates([], batch_id=1) self.assertEqual(count, 0) def test_save_certificates(self): """Test saving certificates from dataclass.""" certificates = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=fake.bothify(text="??-####-#####"), expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=_digits(10), ogrn=_digits(13), ) for i in range(5) ] count = IndustrialCertificateService.save_certificates(certificates, batch_id=1) self.assertEqual(count, 5) self.assertEqual(IndustrialCertificateRecord.objects.count(), 5) def test_save_certificates_with_chunk_size(self): """Test saving certificates in chunks.""" certificates = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=fake.bothify(text="??-####-#####"), expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=_digits(10), ogrn=_digits(13), ) for i in range(10) ] count = IndustrialCertificateService.save_certificates( certificates, batch_id=1, chunk_size=3 ) self.assertEqual(count, 10) def test_find_by_inn(self): """Test finding certificates by INN.""" inn_a = _digits(10) inn_b = _digits(10) IndustrialCertificateRecordFactory( inn=inn_a, certificate_number=fake.bothify(text="CERT-####"), load_batch=1 ) IndustrialCertificateRecordFactory( inn=inn_a, certificate_number=fake.bothify(text="CERT-####"), load_batch=2 ) IndustrialCertificateRecordFactory( inn=inn_b, certificate_number=fake.bothify(text="CERT-####"), load_batch=1 ) results = IndustrialCertificateService.find_by_inn(inn_a) self.assertEqual(results.count(), 2) results_batch1 = IndustrialCertificateService.find_by_inn( inn_a, batch_id=1 ) self.assertEqual(results_batch1.count(), 1) def test_find_by_certificate_number(self): """Test finding certificate by number.""" unique_number = fake.bothify(text="CERT-#####") IndustrialCertificateRecordFactory(certificate_number=unique_number) IndustrialCertificateRecordFactory(certificate_number=fake.bothify(text="CERT-#####")) results = IndustrialCertificateService.find_by_certificate_number(unique_number) self.assertEqual(results.count(), 1) def test_save_certificates_deduplication(self): """Test saving certificates skips duplicates by certificate_number.""" # Create initial certificate cert_number = fake.bothify(text="CERT-DEDUP-#####") initial = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=cert_number, expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=_digits(10), ogrn=_digits(13), ) ] count1 = IndustrialCertificateService.save_certificates(initial, batch_id=1) self.assertEqual(count1, 1) self.assertEqual(IndustrialCertificateRecord.objects.count(), 1) # Try to save with same certificate_number - should be skipped duplicate = [ IndustrialCertificate( issue_date=str(fake.date()), certificate_number=cert_number, # Same number - will be skipped expiry_date=str(fake.date()), certificate_file_url=fake.url(), organisation_name=fake.company(), inn=_digits(10), ogrn=_digits(13), ) ] IndustrialCertificateService.save_certificates(duplicate, batch_id=2) # Should still be 1 record (duplicate skipped) self.assertEqual(IndustrialCertificateRecord.objects.count(), 1) # Verify original data preserved record = IndustrialCertificateRecord.objects.first() self.assertEqual(record.organisation_name, initial[0].organisation_name) self.assertEqual(record.inn, initial[0].inn) self.assertEqual(record.load_batch, 1) # Original batch class ManufacturerServiceTest(TestCase): """Tests for ManufacturerService.""" def test_save_manufacturers_empty(self): """Test saving empty list returns 0.""" count = ManufacturerService.save_manufacturers([], batch_id=1) self.assertEqual(count, 0) def test_save_manufacturers(self): """Test saving manufacturers from dataclass.""" manufacturers = [ Manufacturer( full_legal_name=fake.company(), inn=_digits(10), ogrn=_digits(13), address=fake.address().replace("\n", ", "), ) for i in range(5) ] count = ManufacturerService.save_manufacturers(manufacturers, batch_id=1) self.assertEqual(count, 5) self.assertEqual(ManufacturerRecord.objects.count(), 5) def test_save_manufacturers_with_chunk_size(self): """Test saving manufacturers in chunks.""" manufacturers = [ Manufacturer( full_legal_name=fake.company(), inn=_digits(10), ogrn=_digits(13), address=fake.address().replace("\n", ", "), ) for i in range(10) ] count = ManufacturerService.save_manufacturers( manufacturers, batch_id=1, chunk_size=3 ) self.assertEqual(count, 10) def test_find_by_inn(self): """Test finding manufacturers by INN.""" inn_target = _digits(10) inn_other = _digits(10) inn_third = _digits(10) ManufacturerRecordFactory(inn=inn_target, load_batch=1) ManufacturerRecordFactory(inn=inn_other, load_batch=1) ManufacturerRecordFactory(inn=inn_third, load_batch=2) results = ManufacturerService.find_by_inn(inn_target) self.assertEqual(results.count(), 1) def test_find_by_inn_with_batch_filter(self): """Test finding manufacturers by INN with batch filter.""" inn_value = _digits(10) ManufacturerRecordFactory(inn=inn_value, load_batch=1) ManufacturerRecordFactory(inn=_digits(10), load_batch=2) results_batch1 = ManufacturerService.find_by_inn(inn_value, batch_id=1) self.assertEqual(results_batch1.count(), 1) results_batch2 = ManufacturerService.find_by_inn(inn_value, batch_id=2) self.assertEqual(results_batch2.count(), 0) def test_find_by_ogrn(self): """Test finding manufacturers by OGRN.""" ogrn_target = _digits(13) ManufacturerRecordFactory(ogrn=ogrn_target) ManufacturerRecordFactory(ogrn=_digits(13)) results = ManufacturerService.find_by_ogrn(ogrn_target) self.assertEqual(results.count(), 1) def test_save_manufacturers_deduplication(self): """Test saving manufacturers skips duplicates by INN.""" # Create initial manufacturer inn_value = _digits(10) ogrn_value = _digits(13) address_value = fake.address().replace("\n", ", ") company_name = fake.company() initial = [ Manufacturer( full_legal_name=company_name, inn=inn_value, ogrn=ogrn_value, address=address_value, ) ] count1 = ManufacturerService.save_manufacturers(initial, batch_id=1) self.assertEqual(count1, 1) self.assertEqual(ManufacturerRecord.objects.count(), 1) # Try to save with same INN - should be skipped duplicate = [ Manufacturer( full_legal_name=fake.company(), inn=inn_value, # Same INN - will be skipped ogrn=_digits(13), address=fake.address().replace("\n", ", "), ) ] ManufacturerService.save_manufacturers(duplicate, batch_id=2) # Should still be 1 record (duplicate skipped) self.assertEqual(ManufacturerRecord.objects.count(), 1) # Verify original data preserved record = ManufacturerRecord.objects.first() self.assertEqual(record.full_legal_name, company_name) self.assertEqual(record.ogrn, ogrn_value) self.assertEqual(record.address, address_value) self.assertEqual(record.load_batch, 1) # Original batch class InspectionServiceTest(TestCase): """Tests for InspectionService.""" def test_save_inspections_empty(self): """Test saving empty list returns 0.""" count = InspectionService.save_inspections([], batch_id=1) self.assertEqual(count, 0) def test_save_inspections(self): """Test saving inspections from dataclass.""" inspections = [ Inspection( registration_number=_digits(12), inn=_digits(10), ogrn=_digits(13), organisation_name=fake.company(), control_authority=fake.company(), inspection_type=fake.word(), inspection_form=fake.word(), start_date=str(fake.date()), end_date=str(fake.date()), status=fake.word(), legal_basis=fake.sentence(nb_words=3), result=fake.sentence(nb_words=3), ) for i in range(5) ] count = InspectionService.save_inspections(inspections, batch_id=1) self.assertEqual(count, 5) self.assertEqual(InspectionRecord.objects.count(), 5) def test_save_inspections_with_chunk_size(self): """Test saving inspections in chunks.""" inspections = [ Inspection( registration_number=_digits(12), inn=_digits(10), ogrn=_digits(13), organisation_name=fake.company(), control_authority=fake.company(), inspection_type=fake.word(), inspection_form=fake.word(), start_date=str(fake.date()), end_date=str(fake.date()), status=fake.word(), legal_basis=fake.sentence(nb_words=3), ) for i in range(10) ] count = InspectionService.save_inspections( inspections, batch_id=1, chunk_size=3 ) self.assertEqual(count, 10) def test_find_by_inn(self): """Test finding inspections by INN.""" inn_value = _digits(10) InspectionRecordFactory(inn=inn_value, load_batch=1) InspectionRecordFactory(inn=inn_value, load_batch=2) InspectionRecordFactory(inn=_digits(10), load_batch=1) results = InspectionService.find_by_inn(inn_value) self.assertEqual(results.count(), 2) results_batch1 = InspectionService.find_by_inn(inn_value, batch_id=1) self.assertEqual(results_batch1.count(), 1) def test_find_by_registration_number(self): """Test finding inspection by registration number.""" target_number = _digits(12) other_number = _digits(12) InspectionRecordFactory(registration_number=target_number) InspectionRecordFactory(registration_number=other_number) results = InspectionService.find_by_registration_number(target_number) self.assertEqual(results.count(), 1) def test_find_by_control_authority(self): """Test finding inspections by control authority.""" authority_key = fake.word() authority_match_1 = f"{fake.company()} {authority_key}" authority_match_2 = f"{authority_key} {fake.company()}" authority_other = fake.company() InspectionRecordFactory(control_authority=authority_match_1, load_batch=1) InspectionRecordFactory(control_authority=authority_match_2, load_batch=1) InspectionRecordFactory(control_authority=authority_other, load_batch=1) results = InspectionService.find_by_control_authority(authority_key) self.assertEqual(results.count(), 2) results_batch1 = InspectionService.find_by_control_authority( authority_key, batch_id=1 ) self.assertEqual(results_batch1.count(), 2) def test_save_inspections_deduplication(self): """Test saving inspections skips duplicates by registration_number.""" # Create initial inspection reg_number = _digits(12) inn_value = _digits(10) ogrn_value = _digits(13) org_name = fake.company() control_authority = fake.company() inspection_type = fake.word() inspection_form = fake.word() start_date = str(fake.date()) end_date = str(fake.date()) status = fake.word() legal_basis = fake.sentence(nb_words=3) result_text = fake.sentence(nb_words=3) initial = [ Inspection( registration_number=reg_number, inn=inn_value, ogrn=ogrn_value, organisation_name=org_name, control_authority=control_authority, inspection_type=inspection_type, inspection_form=inspection_form, start_date=start_date, end_date=end_date, status=status, legal_basis=legal_basis, result=result_text, ) ] count1 = InspectionService.save_inspections(initial, batch_id=1) self.assertEqual(count1, 1) self.assertEqual(InspectionRecord.objects.count(), 1) # Try to save with same registration_number - should be skipped duplicate = [ Inspection( registration_number=reg_number, # Same number - will be skipped inn=_digits(10), ogrn=_digits(13), organisation_name=fake.company(), control_authority=fake.company(), inspection_type=fake.word(), inspection_form=fake.word(), start_date=str(fake.date()), end_date=str(fake.date()), status=fake.word(), legal_basis=fake.sentence(nb_words=3), result=fake.sentence(nb_words=3), ) ] InspectionService.save_inspections(duplicate, batch_id=2) # Should still be 1 record (duplicate skipped) self.assertEqual(InspectionRecord.objects.count(), 1) # Verify original data preserved record = InspectionRecord.objects.first() self.assertEqual(record.organisation_name, org_name) self.assertEqual(record.inn, inn_value) self.assertEqual(record.control_authority, control_authority) self.assertEqual(record.status, status) self.assertEqual(record.load_batch, 1) # Original batch @tag("integration", "slow", "e2e") class EndToEndIntegrationTest(TestCase): """ End-to-end интеграционные тесты полного flow. Тестирует: Загрузка с API -> Парсинг -> Сохранение в БД -> Проверка. Запуск: uv run python run_tests.py tests.apps.parsers.test_services.EndToEndIntegrationTest """ def test_full_flow_fetch_and_save_certificates(self): """ Полный E2E тест: загрузка сертификатов и сохранение в БД. 1. Загружаем данные с реального API 2. Создаём лог загрузки 3. Сохраняем первые N записей в БД 4. Проверяем что данные корректно сохранились """ # 1. Загружаем данные через локальный HTTP сервер (без внешнего API) print("\n[E2E] Step 1: Fetching certificates from local API...") excel_bytes, rows = build_minpromtorg_certificates_excel(count=5) date_str = fake.date_between(start_date="-30d", end_date="today").strftime( "%Y%m%d" ) file_name = f"data_resolutions_{date_str}.xlsx" with TestHTTPServer() as server: server.add_json( "/api/kss-document-preview", { "data": [ { "name": IndustrialProductionClient().query, "files": [ {"name": file_name, "url": f"/files/{file_name}"} ], } ] }, ) server.add_bytes(f"/files/{file_name}", excel_bytes) host = urlparse(server.base_url) client_host = ( f"{host.hostname}:{host.port}" if host.port else host.hostname ) with IndustrialProductionClient( host=client_host, scheme="http", timeout=30, http_adapter=server.adapter, ) as client: all_certificates = client.fetch_certificates() self.assertEqual(len(all_certificates), len(rows)) print(f"[E2E] Loaded {len(all_certificates)} certificates from local API") # Берём все для теста certificates = all_certificates # 2. Создаём batch_id и лог print("[E2E] Step 2: Creating load log...") batch_id = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.INDUSTRIAL ) log = ParserLoadLogService.create_load_log( source=ParserLoadLog.Source.INDUSTRIAL, batch_id=batch_id, records_count=0, ) print(f"[E2E] Created batch_id={batch_id}") # 3. Сохраняем в БД print("[E2E] Step 3: Saving certificates to database...") saved_count = IndustrialCertificateService.save_certificates( certificates, batch_id=batch_id ) ParserLoadLogService.update_records_count(log, saved_count) print(f"[E2E] Saved {saved_count} certificates") # 4. Проверяем результат print("[E2E] Step 4: Verifying saved data...") # Проверяем количество db_count = IndustrialCertificateRecord.objects.filter( load_batch=batch_id ).count() self.assertEqual(db_count, saved_count) self.assertEqual(db_count, len(certificates)) # Проверяем первую запись first_cert = certificates[0] db_record = IndustrialCertificateRecord.objects.filter( load_batch=batch_id, certificate_number=first_cert.certificate_number, ).first() self.assertIsNotNone(db_record) self.assertEqual(db_record.inn, first_cert.inn) self.assertEqual(db_record.ogrn, first_cert.ogrn) self.assertEqual(db_record.organisation_name, first_cert.organisation_name) # Проверяем лог log.refresh_from_db() self.assertEqual(log.records_count, saved_count) self.assertEqual(log.status, "success") print("[E2E] ✅ All checks passed!") print(f"[E2E] Sample record: {db_record.certificate_number}") print(f"[E2E] Organisation: {db_record.organisation_name}") print(f"[E2E] INN: {db_record.inn}, OGRN: {db_record.ogrn}")