Files
mostovik-backend/tests/apps/parsers/test_models.py
Aleksandr Meshchriakov 199d871923
Some checks failed
CI/CD Pipeline / Build Docker Images (push) Blocked by required conditions
CI/CD Pipeline / Push to Gitea Registry (push) Blocked by required conditions
CI/CD Pipeline / Code Quality Checks (push) Failing after 3m55s
CI/CD Pipeline / Run Tests (push) Failing after 3h11m38s
feat(parsers): add proverki.gov.ru parser with sync_inspections task
- Add InspectionRecord model with is_federal_law_248, data_year, data_month fields
- Add ProverkiClient with Playwright support for JS-rendered portal
- Add streaming XML parser for large files (>50MB)
- Add sync_inspections task with incremental loading logic
  - Starts from 01.01.2025 if DB is empty
  - Loads both FZ-294 and FZ-248 inspections
  - Stops after 2 consecutive empty months
- Add InspectionService methods: get_last_loaded_period, has_data_for_period
- Add Minpromtorg parsers (certificates, manufacturers)
- Add Django Admin for parser models
- Update README with parsers documentation and changelog
2026-01-21 20:16:25 +01:00

142 lines
4.6 KiB
Python

"""Tests for parsers models."""
from django.test import TestCase
from apps.parsers.models import (
IndustrialCertificateRecord,
ManufacturerRecord,
ParserLoadLog,
Proxy,
)
from .factories import (
IndustrialCertificateRecordFactory,
ManufacturerRecordFactory,
ParserLoadLogFactory,
ProxyFactory,
)
class ProxyModelTest(TestCase):
"""Tests for Proxy model."""
def test_create_proxy(self):
"""Test creating proxy record."""
proxy = ProxyFactory()
self.assertIsInstance(proxy, Proxy)
self.assertIn("://", proxy.address) # http://, https://, socks5://
self.assertTrue(proxy.is_active)
self.assertEqual(proxy.fail_count, 0)
def test_proxy_str(self):
"""Test proxy string representation."""
proxy = ProxyFactory(address="http://proxy:8080", is_active=True)
self.assertEqual(str(proxy), "http://proxy:8080 (active)")
proxy.is_active = False
self.assertEqual(str(proxy), "http://proxy:8080 (inactive)")
def test_proxy_ordering(self):
"""Test proxy ordering by fail_count."""
proxy1 = ProxyFactory(fail_count=5)
proxy2 = ProxyFactory(fail_count=0)
proxy3 = ProxyFactory(fail_count=10)
proxies = list(Proxy.objects.all())
self.assertEqual(proxies[0], proxy2) # fail_count=0
self.assertEqual(proxies[1], proxy1) # fail_count=5
self.assertEqual(proxies[2], proxy3) # fail_count=10
def test_proxy_unique_address(self):
"""Test proxy address uniqueness."""
ProxyFactory(address="http://unique:8080")
from django.db import IntegrityError
with self.assertRaises(IntegrityError):
ProxyFactory(address="http://unique:8080")
class ParserLoadLogModelTest(TestCase):
"""Tests for ParserLoadLog model."""
def test_create_load_log(self):
"""Test creating load log record."""
log = ParserLoadLogFactory()
self.assertIsInstance(log, ParserLoadLog)
self.assertIsNotNone(log.batch_id)
self.assertIn(log.source, [c[0] for c in ParserLoadLog.Source.choices])
def test_load_log_str(self):
"""Test load log string representation."""
log = ParserLoadLogFactory(
batch_id=42, source=ParserLoadLog.Source.INDUSTRIAL, records_count=100
)
self.assertIn("42", str(log))
self.assertIn("100", str(log))
def test_load_log_timestamps(self):
"""Test load log has timestamps from mixin."""
log = ParserLoadLogFactory()
self.assertIsNotNone(log.created_at)
self.assertIsNotNone(log.updated_at)
class IndustrialCertificateRecordModelTest(TestCase):
"""Tests for IndustrialCertificateRecord model."""
def test_create_certificate(self):
"""Test creating certificate record."""
cert = IndustrialCertificateRecordFactory()
self.assertIsInstance(cert, IndustrialCertificateRecord)
self.assertIsNotNone(cert.certificate_number)
self.assertIsNotNone(cert.inn)
self.assertIsNotNone(cert.ogrn)
def test_certificate_str(self):
"""Test certificate string representation."""
cert = IndustrialCertificateRecordFactory(
certificate_number="CERT-123", organisation_name="Test Company LLC"
)
self.assertIn("CERT-123", str(cert))
self.assertIn("Test Company", str(cert))
def test_certificate_timestamps(self):
"""Test certificate has timestamps from mixin."""
cert = IndustrialCertificateRecordFactory()
self.assertIsNotNone(cert.created_at)
self.assertIsNotNone(cert.updated_at)
class ManufacturerRecordModelTest(TestCase):
"""Tests for ManufacturerRecord model."""
def test_create_manufacturer(self):
"""Test creating manufacturer record."""
manufacturer = ManufacturerRecordFactory()
self.assertIsInstance(manufacturer, ManufacturerRecord)
self.assertIsNotNone(manufacturer.full_legal_name)
self.assertIsNotNone(manufacturer.inn)
self.assertIsNotNone(manufacturer.ogrn)
def test_manufacturer_str(self):
"""Test manufacturer string representation."""
manufacturer = ManufacturerRecordFactory(
inn="1234567890", full_legal_name="Test Manufacturing Company"
)
self.assertIn("1234567890", str(manufacturer))
self.assertIn("Test Manufacturing", str(manufacturer))
def test_manufacturer_timestamps(self):
"""Test manufacturer has timestamps from mixin."""
manufacturer = ManufacturerRecordFactory()
self.assertIsNotNone(manufacturer.created_at)
self.assertIsNotNone(manufacturer.updated_at)