Some checks failed
CI/CD Pipeline / Manual Action Help (push) Has been skipped
CI/CD Pipeline / Start Dev Containers in Dokploy (push) Has been skipped
CI/CD Pipeline / Drop and Recreate Dev Database (push) Has been skipped
CI/CD Pipeline / Quality Gate (push) Successful in 54s
CI/CD Pipeline / Build and Push Images (push) Failing after 2m59s
CI/CD Pipeline / Deploy Dev in Dokploy (push) Has been skipped
CI/CD Pipeline / Internal Notify (push) Successful in 1s
1471 lines
56 KiB
Python
1471 lines
56 KiB
Python
"""Integration-style tests for parser tasks (no mocks)."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import io
|
||
import os
|
||
import tempfile
|
||
import threading
|
||
from pathlib import Path
|
||
from types import SimpleNamespace
|
||
from unittest.mock import patch
|
||
from urllib.parse import urlparse
|
||
|
||
from apps.parsers import tasks as parser_tasks
|
||
from apps.parsers.clients.minpromtorg.industrial import (
|
||
IndustrialProductionClient,
|
||
IndustrialProductionClientError,
|
||
)
|
||
from apps.parsers.clients.minpromtorg.manufactures import (
|
||
ManufacturesClient,
|
||
ManufacturesClientError,
|
||
)
|
||
from apps.parsers.clients.minpromtorg.products import (
|
||
IndustrialProductsClient,
|
||
IndustrialProductsClientError,
|
||
)
|
||
from apps.parsers.clients.proverki.client import ProverkiClientError
|
||
from apps.parsers.clients.zakupki import ZakupkiClientError
|
||
from apps.parsers.models import (
|
||
FinancialReport,
|
||
IndustrialCertificateRecord,
|
||
IndustrialProductRecord,
|
||
InspectionRecord,
|
||
ManufacturerRecord,
|
||
ParserLoadLog,
|
||
ProcurementRecord,
|
||
)
|
||
from apps.parsers.tasks import (
|
||
_move_to_dir,
|
||
_process_fns_file_sync,
|
||
_remove_lock,
|
||
_resolve_proxies,
|
||
_try_create_lock,
|
||
parse_all_minpromtorg,
|
||
parse_all_sources,
|
||
parse_industrial_production,
|
||
parse_industrial_products,
|
||
parse_inspections,
|
||
parse_manufactures,
|
||
parse_procurements,
|
||
process_fns_files_batch,
|
||
scan_fns_directory,
|
||
sync_inspections,
|
||
sync_procurements,
|
||
sync_ru_proxies,
|
||
)
|
||
from django.test import TestCase, override_settings
|
||
from openpyxl import Workbook
|
||
from registers.models import Organization
|
||
|
||
from tests.apps.parsers.factories import (
|
||
InspectionRecordFactory,
|
||
ParserLoadLogFactory,
|
||
ProcurementRecordFactory,
|
||
ProxyFactory,
|
||
)
|
||
from tests.utils import TestHTTPServer
|
||
from tests.utils.fixtures import (
|
||
build_minpromtorg_certificates_excel,
|
||
build_minpromtorg_manufacturers_excel,
|
||
build_minpromtorg_products_excel,
|
||
build_proverki_xml,
|
||
build_zakupki_xml,
|
||
build_zip,
|
||
fake,
|
||
)
|
||
|
||
|
||
def _host_from_base_url(base_url: str) -> str:
|
||
parsed = urlparse(base_url)
|
||
if parsed.port:
|
||
return f"{parsed.hostname}:{parsed.port}"
|
||
return parsed.hostname or ""
|
||
|
||
|
||
def _digits(length: int) -> str:
|
||
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
|
||
|
||
|
||
def _build_fns_excel_bytes() -> bytes:
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
year = fake.random_int(min=2020, max=2025)
|
||
ws.append(["Форма №1", None, year, None])
|
||
ws.append([None, "Код", "Начало", "Конец"])
|
||
ws.append(
|
||
[fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)]
|
||
)
|
||
buf = io.BytesIO()
|
||
wb.save(buf)
|
||
wb.close()
|
||
return buf.getvalue()
|
||
|
||
|
||
def _portal_path(year: int, month: int) -> str:
|
||
return f"/portal/public-open-data/check/{year}/{month}"
|
||
|
||
|
||
class ProxyResolutionTestCase(TestCase):
|
||
"""Tests for proxy resolution in parser tasks."""
|
||
|
||
@override_settings(PARSER_PROXIES=["http://env-proxy:8080"])
|
||
def test_resolve_proxies_prefers_runtime_db_proxies(self):
|
||
imported_proxy = ProxyFactory(
|
||
address="http://10.0.0.2:8000",
|
||
source="proxy-tools",
|
||
country_code="RU",
|
||
is_active=True,
|
||
)
|
||
ProxyFactory(
|
||
address="http://10.0.0.3:8000",
|
||
source="manual",
|
||
country_code="RU",
|
||
is_active=True,
|
||
)
|
||
|
||
result = _resolve_proxies(None)
|
||
|
||
self.assertEqual(result, [imported_proxy.address])
|
||
|
||
@override_settings(PARSER_PROXIES=["http://env-proxy:8080"])
|
||
def test_resolve_proxies_does_not_use_unclassified_env_fallback(self):
|
||
result = _resolve_proxies(None)
|
||
|
||
self.assertIsNone(result)
|
||
|
||
|
||
class SyncRuProxiesTaskTestCase(TestCase):
|
||
"""Tests for periodic RU proxy sync task."""
|
||
|
||
@patch("apps.parsers.tasks.ProxyToolsSyncService.sync_ru_proxies")
|
||
def test_sync_ru_proxies_returns_service_payload(self, sync_mock):
|
||
sync_mock.return_value = {
|
||
"status": "success",
|
||
"fetched": 3,
|
||
"created": 2,
|
||
"updated": 1,
|
||
"deactivated": 0,
|
||
}
|
||
|
||
result = sync_ru_proxies.run()
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["fetched"], 3)
|
||
sync_mock.assert_called_once_with()
|
||
|
||
|
||
class GenericSourceFetchTestCase(TestCase):
|
||
"""Tests for source-specific generic fetch configuration."""
|
||
|
||
def test_fstec_disables_ssl_verification_for_broken_certificate_chain(self):
|
||
class _RecordingStructuredClient:
|
||
instances = []
|
||
|
||
def __init__(self, **kwargs):
|
||
self.kwargs = kwargs
|
||
self.instances.append(self)
|
||
|
||
def fetch_records(self, **_kwargs):
|
||
return []
|
||
|
||
with patch.object(
|
||
parser_tasks,
|
||
"StructuredDataClient",
|
||
_RecordingStructuredClient,
|
||
):
|
||
records = parser_tasks._fetch_structured_records(
|
||
source_key="fstec",
|
||
file_url="https://reestr.fstec.ru/reg3",
|
||
file_path=None,
|
||
proxies=[],
|
||
)
|
||
|
||
self.assertEqual(records, [])
|
||
self.assertEqual(len(_RecordingStructuredClient.instances), 1)
|
||
self.assertEqual(
|
||
_RecordingStructuredClient.instances[0].kwargs["source"],
|
||
"fstec",
|
||
)
|
||
self.assertFalse(_RecordingStructuredClient.instances[0].kwargs["verify_ssl"])
|
||
|
||
@override_settings(CHECKO_API_KEY="test-key", FEDRESURS_CHECKO_FALLBACK_LIMIT=10)
|
||
def test_fedresurs_falls_back_to_checko_for_registry_organizations(self):
|
||
organization = Organization.objects.create(
|
||
pn_name='ООО "Тест"',
|
||
mn_ogrn=1027700000000,
|
||
mn_inn=7701000001,
|
||
in_kpp=770101001,
|
||
mn_okpo="12345678",
|
||
)
|
||
|
||
class _CheckoClient:
|
||
def __init__(self, **_kwargs):
|
||
return
|
||
|
||
def get_company(self, _request):
|
||
return SimpleNamespace(
|
||
data=SimpleNamespace(
|
||
ogrn=str(organization.mn_ogrn),
|
||
inn=str(organization.mn_inn),
|
||
short_name=organization.pn_name,
|
||
bankruptcy=(
|
||
SimpleNamespace(
|
||
type="Сообщение о введении наблюдения",
|
||
date="2026-04-01",
|
||
case_number="А40-1/2026",
|
||
),
|
||
),
|
||
)
|
||
)
|
||
|
||
with (
|
||
patch.object(
|
||
parser_tasks,
|
||
"_fetch_structured_records",
|
||
side_effect=RuntimeError("HTTP 401"),
|
||
),
|
||
patch.object(parser_tasks, "CheckoClient", _CheckoClient),
|
||
):
|
||
records = parser_tasks._fetch_fedresurs_bankruptcy_records(
|
||
file_url=None,
|
||
file_path=None,
|
||
proxies=[],
|
||
)
|
||
|
||
self.assertEqual(len(records), 1)
|
||
self.assertEqual(records[0].source, "fedresurs_bankruptcy")
|
||
self.assertEqual(records[0].inn, str(organization.mn_inn))
|
||
self.assertEqual(records[0].ogrn, str(organization.mn_ogrn))
|
||
self.assertEqual(records[0].record_date, "2026-04-01")
|
||
self.assertEqual(records[0].payload["provider"], "checko")
|
||
|
||
|
||
@override_settings(
|
||
CELERY_TASK_ALWAYS_EAGER=True,
|
||
CELERY_TASK_EAGER_PROPAGATES=True,
|
||
)
|
||
class ParseProcurementsTaskTestCase(TestCase):
|
||
"""Tests for parse_procurements task."""
|
||
|
||
def test_parse_procurements_success(self):
|
||
xml_content, rows = build_zakupki_xml(count=2)
|
||
archive = build_zip([("data.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes("/files/data.zip", archive, content_type="application/zip")
|
||
result = parse_procurements(
|
||
file_url=f"{server.base_url}/files/data.zip",
|
||
law_type="44",
|
||
client_adapter=server.adapter,
|
||
client_host=_host_from_base_url(server.base_url),
|
||
client_scheme="http",
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["saved"], len(rows))
|
||
self.assertGreater(ProcurementRecord.objects.count(), 0)
|
||
|
||
def test_parse_procurements_failure(self):
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes("/files/bad.bin", b"not-zip-or-xml")
|
||
with self.assertRaises(ZakupkiClientError):
|
||
parse_procurements(
|
||
file_url=f"{server.base_url}/files/bad.bin",
|
||
law_type="44",
|
||
client_adapter=server.adapter,
|
||
client_host=_host_from_base_url(server.base_url),
|
||
client_scheme="http",
|
||
)
|
||
|
||
def test_parse_procurements_empty_result(self):
|
||
xml_content = b"<?xml version='1.0' encoding='UTF-8'?><export></export>"
|
||
archive = build_zip([("data.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes(
|
||
"/files/empty.zip", archive, content_type="application/zip"
|
||
)
|
||
result = parse_procurements(
|
||
file_url=f"{server.base_url}/files/empty.zip",
|
||
law_type="44",
|
||
client_adapter=server.adapter,
|
||
client_host=_host_from_base_url(server.base_url),
|
||
client_scheme="http",
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["saved"], 0)
|
||
|
||
def test_parse_procurements_with_file_url(self):
|
||
xml_content, rows = build_zakupki_xml(count=1)
|
||
archive = build_zip([("data.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes("/files/data.zip", archive, content_type="application/zip")
|
||
result = parse_procurements(
|
||
file_url=f"{server.base_url}/files/data.zip",
|
||
law_type="44",
|
||
client_adapter=server.adapter,
|
||
client_host=_host_from_base_url(server.base_url),
|
||
client_scheme="http",
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["saved"], len(rows))
|
||
|
||
def test_parse_procurements_default_host_with_proxies(self):
|
||
xml_content, rows = build_zakupki_xml(count=1)
|
||
archive = build_zip([("data.xml", xml_content)])
|
||
region = f"{fake.random_int(min=1, max=99):02d}"
|
||
year = fake.random_int(min=2020, max=2025)
|
||
month = fake.random_int(min=1, max=12)
|
||
|
||
with TestHTTPServer() as server:
|
||
path = (
|
||
f"/opendata/download/notifications/{region}/{year}/{month:02d}/fz44.zip"
|
||
)
|
||
server.add_bytes(path, archive, content_type="application/zip")
|
||
result = parse_procurements(
|
||
region_code=region,
|
||
year=year,
|
||
month=month,
|
||
law_type="44",
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["saved"], len(rows))
|
||
|
||
def test_parse_procurements_without_adapter(self):
|
||
xml_content, rows = build_zakupki_xml(count=1)
|
||
archive = build_zip([("data.xml", xml_content)])
|
||
region = f"{fake.random_int(min=1, max=99):02d}"
|
||
year = fake.random_int(min=2020, max=2025)
|
||
month = fake.random_int(min=1, max=12)
|
||
|
||
with TestHTTPServer() as server:
|
||
path = (
|
||
f"/opendata/download/notifications/{region}/{year}/{month:02d}/fz44.zip"
|
||
)
|
||
server.add_bytes(path, archive, content_type="application/zip")
|
||
|
||
class _LocalZakupkiClient(parser_tasks.ZakupkiClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
original_client = parser_tasks.ZakupkiClient
|
||
parser_tasks.ZakupkiClient = _LocalZakupkiClient
|
||
try:
|
||
result = parse_procurements(
|
||
region_code=region,
|
||
year=year,
|
||
month=month,
|
||
law_type="44",
|
||
proxies=[],
|
||
)
|
||
finally:
|
||
parser_tasks.ZakupkiClient = original_client
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["saved"], len(rows))
|
||
|
||
|
||
@override_settings(
|
||
CELERY_TASK_ALWAYS_EAGER=True,
|
||
CELERY_TASK_EAGER_PROPAGATES=True,
|
||
)
|
||
class SyncProcurementsTaskTestCase(TestCase):
|
||
"""Tests for sync_procurements task."""
|
||
|
||
def test_sync_continues_from_last_loaded(self):
|
||
region = f"{fake.random_int(min=1, max=99):02d}"
|
||
law_type = "44"
|
||
|
||
# Prepare last loaded period as previous month
|
||
current_year = fake.random_int(min=2024, max=2025)
|
||
current_month = fake.random_int(min=2, max=12)
|
||
last_year = current_year
|
||
last_month = current_month - 1
|
||
ProcurementRecordFactory(
|
||
region_code=region,
|
||
data_year=last_year,
|
||
data_month=last_month,
|
||
law_type=f"{law_type}-FZ",
|
||
)
|
||
|
||
xml_content, rows = build_zakupki_xml(count=1)
|
||
archive = build_zip([("data.xml", xml_content)])
|
||
with TestHTTPServer() as server:
|
||
file_url = f"/opendata/download/notifications/{region}/{current_year}/{current_month:02d}/fz44.zip"
|
||
server.add_bytes(file_url, archive, content_type="application/zip")
|
||
result = sync_procurements(
|
||
region_code=region,
|
||
law_type=law_type,
|
||
client_host=_host_from_base_url(server.base_url),
|
||
client_scheme="http",
|
||
client_adapter=server.adapter,
|
||
current_year=current_year,
|
||
current_month=current_month,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertGreaterEqual(result["total_saved"], len(rows))
|
||
|
||
def test_sync_stops_after_empty_months(self):
|
||
region = f"{fake.random_int(min=1, max=99):02d}"
|
||
law_type = "44"
|
||
current_year = 2025
|
||
current_month = 3
|
||
|
||
# No data available on server => empty month
|
||
with TestHTTPServer() as server:
|
||
result = sync_procurements(
|
||
region_code=region,
|
||
law_type=law_type,
|
||
client_host=_host_from_base_url(server.base_url),
|
||
client_scheme="http",
|
||
current_year=current_year,
|
||
current_month=current_month,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["total_saved"], 0)
|
||
|
||
def test_sync_with_proxies_and_default_host(self):
|
||
region = f"{fake.random_int(min=1, max=99):02d}"
|
||
law_type = "44"
|
||
current_year = 2025
|
||
current_month = 1
|
||
xml_content, rows = build_zakupki_xml(count=1)
|
||
archive = build_zip([("data.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
file_url = f"/opendata/download/notifications/{region}/{current_year}/{current_month:02d}/fz44.zip"
|
||
server.add_bytes(file_url, archive, content_type="application/zip")
|
||
result = sync_procurements(
|
||
region_code=region,
|
||
law_type=law_type,
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
current_year=current_year,
|
||
current_month=current_month,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertGreaterEqual(result["total_saved"], len(rows))
|
||
|
||
def test_sync_handles_fetch_error(self):
|
||
region = f"{fake.random_int(min=1, max=99):02d}"
|
||
law_type = "44"
|
||
current_year = 2025
|
||
current_month = 1
|
||
|
||
with TestHTTPServer() as server:
|
||
file_url = f"/opendata/download/notifications/{region}/{current_year}/{current_month:02d}/fz44.zip"
|
||
server.add_bytes(
|
||
file_url, b"not-xml", content_type="application/octet-stream"
|
||
)
|
||
result = sync_procurements(
|
||
region_code=region,
|
||
law_type=law_type,
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
current_year=current_year,
|
||
current_month=current_month,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["total_saved"], 0)
|
||
|
||
def test_sync_procurements_fails_when_client_init_raises(self):
|
||
region = f"{fake.random_int(min=1, max=99):02d}"
|
||
|
||
class _FailClient:
|
||
def __init__(self, *args, **kwargs):
|
||
raise RuntimeError("init failed")
|
||
|
||
original_client = parser_tasks.ZakupkiClient
|
||
parser_tasks.ZakupkiClient = _FailClient
|
||
try:
|
||
with self.assertRaises(RuntimeError):
|
||
sync_procurements(region_code=region, law_type="44", proxies=[])
|
||
finally:
|
||
parser_tasks.ZakupkiClient = original_client
|
||
|
||
|
||
@override_settings(
|
||
CELERY_TASK_ALWAYS_EAGER=True,
|
||
CELERY_TASK_EAGER_PROPAGATES=True,
|
||
)
|
||
class MinpromtorgTasksTestCase(TestCase):
|
||
"""Tests for Minpromtorg tasks."""
|
||
|
||
def _add_minpromtorg_routes(self, server: TestHTTPServer):
|
||
certificates_bytes, cert_rows = build_minpromtorg_certificates_excel(count=2)
|
||
manufacturers_bytes, manuf_rows = build_minpromtorg_manufacturers_excel(count=2)
|
||
products_bytes, product_rows = build_minpromtorg_products_excel(count=2)
|
||
|
||
date_str = fake.date_between(start_date="-30d", end_date="today").strftime(
|
||
"%Y%m%d"
|
||
)
|
||
cert_file = f"data_resolutions_{date_str}.xlsx"
|
||
manuf_file = f"data_orgs_{date_str}.xlsx"
|
||
products_file = f"industrial_products_{date_str}.xlsx"
|
||
|
||
server.add_json(
|
||
"/api/kss-document-preview",
|
||
{
|
||
"data": [
|
||
{
|
||
"name": IndustrialProductionClient().query,
|
||
"files": [{"name": cert_file, "url": f"/files/{cert_file}"}],
|
||
},
|
||
{
|
||
"name": ManufacturesClient().query,
|
||
"files": [{"name": manuf_file, "url": f"/files/{manuf_file}"}],
|
||
},
|
||
{
|
||
"name": IndustrialProductsClient().query,
|
||
"files": [
|
||
{
|
||
"name": products_file,
|
||
"url": f"/files/{products_file}",
|
||
}
|
||
],
|
||
},
|
||
]
|
||
},
|
||
)
|
||
server.add_bytes(
|
||
f"/files/{cert_file}",
|
||
certificates_bytes,
|
||
content_type=(
|
||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
||
),
|
||
)
|
||
server.add_bytes(
|
||
f"/files/{manuf_file}",
|
||
manufacturers_bytes,
|
||
content_type=(
|
||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
||
),
|
||
)
|
||
server.add_bytes(
|
||
f"/files/{products_file}",
|
||
products_bytes,
|
||
content_type=(
|
||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
||
),
|
||
)
|
||
return cert_rows, product_rows, manuf_rows
|
||
|
||
def test_parse_all_minpromtorg_success(self):
|
||
with TestHTTPServer() as server:
|
||
cert_rows, product_rows, manuf_rows = self._add_minpromtorg_routes(server)
|
||
result = parse_all_minpromtorg(
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
)
|
||
|
||
self.assertIn("industrial", result)
|
||
self.assertIn("industrial_products", result)
|
||
self.assertIn("manufactures", result)
|
||
self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows))
|
||
self.assertEqual(IndustrialProductRecord.objects.count(), len(product_rows))
|
||
self.assertEqual(ManufacturerRecord.objects.count(), len(manuf_rows))
|
||
|
||
def test_parse_all_sources_success(self):
|
||
with TestHTTPServer() as server:
|
||
cert_rows, product_rows, manuf_rows = self._add_minpromtorg_routes(server)
|
||
result = parse_all_sources(
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
inspections_use_playwright=False,
|
||
)
|
||
|
||
self.assertIn("industrial", result)
|
||
self.assertIn("industrial_products", result)
|
||
self.assertIn("manufactures", result)
|
||
self.assertIn("inspections", result)
|
||
self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows))
|
||
self.assertEqual(IndustrialProductRecord.objects.count(), len(product_rows))
|
||
self.assertEqual(ManufacturerRecord.objects.count(), len(manuf_rows))
|
||
self.assertEqual(InspectionRecord.objects.count(), 0)
|
||
|
||
def test_parse_all_minpromtorg_without_adapter(self):
|
||
with TestHTTPServer() as server:
|
||
cert_rows, product_rows, manuf_rows = self._add_minpromtorg_routes(server)
|
||
|
||
class _LocalIndustrialClient(IndustrialProductionClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
class _LocalIndustrialProductsClient(IndustrialProductsClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
class _LocalManufacturesClient(ManufacturesClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
original_industrial = parser_tasks.IndustrialProductionClient
|
||
original_industrial_products = parser_tasks.IndustrialProductsClient
|
||
original_manufactures = parser_tasks.ManufacturesClient
|
||
original_industrial_delay = parser_tasks.parse_industrial_production.delay
|
||
original_industrial_products_delay = (
|
||
parser_tasks.parse_industrial_products.delay
|
||
)
|
||
original_manufactures_delay = parser_tasks.parse_manufactures.delay
|
||
parser_tasks.IndustrialProductionClient = _LocalIndustrialClient
|
||
parser_tasks.IndustrialProductsClient = _LocalIndustrialProductsClient
|
||
parser_tasks.ManufacturesClient = _LocalManufacturesClient
|
||
|
||
def _industrial_eager_delay(*args, **kwargs):
|
||
return parser_tasks.parse_industrial_production.apply(
|
||
args=args,
|
||
kwargs=kwargs,
|
||
)
|
||
|
||
def _industrial_products_eager_delay(*args, **kwargs):
|
||
return parser_tasks.parse_industrial_products.apply(
|
||
args=args,
|
||
kwargs=kwargs,
|
||
)
|
||
|
||
def _manufactures_eager_delay(*args, **kwargs):
|
||
return parser_tasks.parse_manufactures.apply(
|
||
args=args,
|
||
kwargs=kwargs,
|
||
)
|
||
|
||
parser_tasks.parse_industrial_production.delay = _industrial_eager_delay
|
||
parser_tasks.parse_industrial_products.delay = (
|
||
_industrial_products_eager_delay
|
||
)
|
||
parser_tasks.parse_manufactures.delay = _manufactures_eager_delay
|
||
try:
|
||
result = parse_all_minpromtorg(proxies=[])
|
||
finally:
|
||
parser_tasks.IndustrialProductionClient = original_industrial
|
||
parser_tasks.IndustrialProductsClient = original_industrial_products
|
||
parser_tasks.ManufacturesClient = original_manufactures
|
||
parser_tasks.parse_industrial_production.delay = (
|
||
original_industrial_delay
|
||
)
|
||
parser_tasks.parse_industrial_products.delay = (
|
||
original_industrial_products_delay
|
||
)
|
||
parser_tasks.parse_manufactures.delay = original_manufactures_delay
|
||
|
||
self.assertIn("industrial", result)
|
||
self.assertIn("industrial_products", result)
|
||
self.assertIn("manufactures", result)
|
||
self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows))
|
||
self.assertEqual(IndustrialProductRecord.objects.count(), len(product_rows))
|
||
self.assertEqual(ManufacturerRecord.objects.count(), len(manuf_rows))
|
||
|
||
def test_parse_all_sources_without_adapter(self):
|
||
with TestHTTPServer() as server:
|
||
cert_rows, product_rows, manuf_rows = self._add_minpromtorg_routes(server)
|
||
|
||
class _LocalIndustrialClient(IndustrialProductionClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
class _LocalIndustrialProductsClient(IndustrialProductsClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
class _LocalManufacturesClient(ManufacturesClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
original_industrial = parser_tasks.IndustrialProductionClient
|
||
original_industrial_products = parser_tasks.IndustrialProductsClient
|
||
original_manufactures = parser_tasks.ManufacturesClient
|
||
original_industrial_delay = parser_tasks.parse_industrial_production.delay
|
||
original_industrial_products_delay = (
|
||
parser_tasks.parse_industrial_products.delay
|
||
)
|
||
original_manufactures_delay = parser_tasks.parse_manufactures.delay
|
||
original_inspections_delay = parser_tasks.parse_inspections.delay
|
||
parser_tasks.IndustrialProductionClient = _LocalIndustrialClient
|
||
parser_tasks.IndustrialProductsClient = _LocalIndustrialProductsClient
|
||
parser_tasks.ManufacturesClient = _LocalManufacturesClient
|
||
|
||
def _industrial_eager_delay(*args, **kwargs):
|
||
return parser_tasks.parse_industrial_production.apply(
|
||
args=args,
|
||
kwargs=kwargs,
|
||
)
|
||
|
||
def _industrial_products_eager_delay(*args, **kwargs):
|
||
return parser_tasks.parse_industrial_products.apply(
|
||
args=args,
|
||
kwargs=kwargs,
|
||
)
|
||
|
||
def _manufactures_eager_delay(*args, **kwargs):
|
||
return parser_tasks.parse_manufactures.apply(
|
||
args=args,
|
||
kwargs=kwargs,
|
||
)
|
||
|
||
def _inspections_stub_delay(*_args, **_kwargs):
|
||
return SimpleNamespace(id="inspections-test-task")
|
||
|
||
parser_tasks.parse_industrial_production.delay = _industrial_eager_delay
|
||
parser_tasks.parse_industrial_products.delay = (
|
||
_industrial_products_eager_delay
|
||
)
|
||
parser_tasks.parse_manufactures.delay = _manufactures_eager_delay
|
||
parser_tasks.parse_inspections.delay = _inspections_stub_delay
|
||
try:
|
||
result = parse_all_sources(proxies=[], inspections_use_playwright=None)
|
||
finally:
|
||
parser_tasks.IndustrialProductionClient = original_industrial
|
||
parser_tasks.IndustrialProductsClient = original_industrial_products
|
||
parser_tasks.ManufacturesClient = original_manufactures
|
||
parser_tasks.parse_industrial_production.delay = (
|
||
original_industrial_delay
|
||
)
|
||
parser_tasks.parse_industrial_products.delay = (
|
||
original_industrial_products_delay
|
||
)
|
||
parser_tasks.parse_manufactures.delay = original_manufactures_delay
|
||
parser_tasks.parse_inspections.delay = original_inspections_delay
|
||
|
||
self.assertIn("industrial", result)
|
||
self.assertIn("industrial_products", result)
|
||
self.assertIn("manufactures", result)
|
||
self.assertIn("inspections", result)
|
||
self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows))
|
||
self.assertEqual(IndustrialProductRecord.objects.count(), len(product_rows))
|
||
self.assertEqual(ManufacturerRecord.objects.count(), len(manuf_rows))
|
||
|
||
def test_parse_industrial_production_failure(self):
|
||
date_str = fake.date_between(start_date="-30d", end_date="today").strftime(
|
||
"%Y%m%d"
|
||
)
|
||
cert_file = f"data_resolutions_{date_str}.xlsx"
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_json(
|
||
"/api/kss-document-preview",
|
||
{
|
||
"data": [
|
||
{
|
||
"name": IndustrialProductionClient().query,
|
||
"files": [
|
||
{"name": cert_file, "url": f"/files/{cert_file}"}
|
||
],
|
||
}
|
||
]
|
||
},
|
||
)
|
||
server.add_bytes("/files/" + cert_file, b"not-an-excel")
|
||
with self.assertRaises(IndustrialProductionClientError):
|
||
parse_industrial_production(
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
)
|
||
|
||
def test_parse_industrial_production_with_default_proxies(self):
|
||
with TestHTTPServer() as server:
|
||
cert_rows, _product_rows, _manuf_rows = self._add_minpromtorg_routes(server)
|
||
result = parse_industrial_production(client_adapter=server.adapter)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows))
|
||
|
||
def test_parse_industrial_products_failure(self):
|
||
date_str = fake.date_between(start_date="-30d", end_date="today").strftime(
|
||
"%Y%m%d"
|
||
)
|
||
products_file = f"industrial_products_{date_str}.xlsx"
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_json(
|
||
"/api/kss-document-preview",
|
||
{
|
||
"data": [
|
||
{
|
||
"name": IndustrialProductsClient().query,
|
||
"files": [
|
||
{
|
||
"name": products_file,
|
||
"url": f"/files/{products_file}",
|
||
}
|
||
],
|
||
}
|
||
]
|
||
},
|
||
)
|
||
server.add_bytes("/files/" + products_file, b"not-an-excel")
|
||
with self.assertRaises(IndustrialProductsClientError):
|
||
parse_industrial_products(client_adapter=server.adapter)
|
||
|
||
def test_parse_industrial_products_with_default_proxies(self):
|
||
with TestHTTPServer() as server:
|
||
_cert_rows, product_rows, _manuf_rows = self._add_minpromtorg_routes(server)
|
||
result = parse_industrial_products(client_adapter=server.adapter)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(IndustrialProductRecord.objects.count(), len(product_rows))
|
||
|
||
def test_parse_manufactures_failure(self):
|
||
date_str = fake.date_between(start_date="-30d", end_date="today").strftime(
|
||
"%Y%m%d"
|
||
)
|
||
manuf_file = f"data_orgs_{date_str}.xlsx"
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_json(
|
||
"/api/kss-document-preview",
|
||
{
|
||
"data": [
|
||
{
|
||
"name": ManufacturesClient().query,
|
||
"files": [
|
||
{"name": manuf_file, "url": f"/files/{manuf_file}"}
|
||
],
|
||
}
|
||
]
|
||
},
|
||
)
|
||
server.add_bytes("/files/" + manuf_file, b"not-an-excel")
|
||
with self.assertRaises(ManufacturesClientError):
|
||
parse_manufactures(client_adapter=server.adapter)
|
||
|
||
|
||
@override_settings(
|
||
CELERY_TASK_ALWAYS_EAGER=True,
|
||
CELERY_TASK_EAGER_PROPAGATES=True,
|
||
)
|
||
class ParseInspectionsTaskTestCase(TestCase):
|
||
"""Tests for inspections tasks."""
|
||
|
||
def test_parse_inspections_success(self):
|
||
xml_content, rows = build_proverki_xml(count=2)
|
||
archive = build_zip([("inspections.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes(
|
||
"/opendata/inspections.zip",
|
||
archive,
|
||
content_type="application/zip",
|
||
)
|
||
result = parse_inspections(
|
||
file_url="https://proverki.gov.ru/opendata/inspections.zip",
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
use_playwright=False,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["saved"], len(rows))
|
||
self.assertEqual(InspectionRecord.objects.count(), len(rows))
|
||
|
||
def test_parse_inspections_with_default_proxies(self):
|
||
xml_content, rows = build_proverki_xml(count=1)
|
||
archive = build_zip([("inspections.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes(
|
||
"/opendata/inspections.zip",
|
||
archive,
|
||
content_type="application/zip",
|
||
)
|
||
result = parse_inspections(
|
||
file_url="https://proverki.gov.ru/opendata/inspections.zip",
|
||
client_adapter=server.adapter,
|
||
use_playwright=False,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["saved"], len(rows))
|
||
|
||
def test_parse_inspections_without_adapter(self):
|
||
xml_content, rows = build_proverki_xml(count=1)
|
||
archive = build_zip([("inspections.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes(
|
||
"/opendata/inspections.zip",
|
||
archive,
|
||
content_type="application/zip",
|
||
)
|
||
|
||
class _LocalProverkiClient(parser_tasks.ProverkiClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
kwargs.setdefault("use_playwright", False)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
original_client = parser_tasks.ProverkiClient
|
||
parser_tasks.ProverkiClient = _LocalProverkiClient
|
||
try:
|
||
result = parse_inspections(
|
||
file_url="https://proverki.gov.ru/opendata/inspections.zip",
|
||
proxies=[],
|
||
)
|
||
finally:
|
||
parser_tasks.ProverkiClient = original_client
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["saved"], len(rows))
|
||
|
||
def test_parse_inspections_failure(self):
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes("/opendata/bad.xml", b"not-xml")
|
||
with self.assertRaises(ProverkiClientError):
|
||
parse_inspections(
|
||
file_url="https://proverki.gov.ru/opendata/bad.xml",
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
use_playwright=False,
|
||
)
|
||
|
||
def test_sync_inspections_success(self):
|
||
xml_content, rows = build_proverki_xml(count=2)
|
||
archive = build_zip([("inspections.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes(
|
||
_portal_path(2025, 1),
|
||
archive,
|
||
content_type="application/zip",
|
||
)
|
||
result = sync_inspections(
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
use_playwright=False,
|
||
current_year=2025,
|
||
current_month=1,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertGreaterEqual(result["total_saved"], len(rows))
|
||
|
||
def test_sync_inspections_stops_after_empty_months(self):
|
||
empty_xml = b"<?xml version='1.0' encoding='utf-8'?><INSPECTIONS></INSPECTIONS>"
|
||
archive = build_zip([("inspections.xml", empty_xml)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes(
|
||
_portal_path(2025, 1),
|
||
archive,
|
||
content_type="application/zip",
|
||
)
|
||
server.add_bytes(
|
||
_portal_path(2025, 2),
|
||
archive,
|
||
content_type="application/zip",
|
||
)
|
||
result = sync_inspections(
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
use_playwright=False,
|
||
current_year=2025,
|
||
current_month=3,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["total_saved"], 0)
|
||
|
||
def test_sync_inspections_honors_limited_params(self):
|
||
xml_content, rows = build_proverki_xml(count=1)
|
||
archive = build_zip([("inspections.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes(
|
||
_portal_path(2026, 4),
|
||
archive,
|
||
content_type="application/zip",
|
||
)
|
||
result = sync_inspections(
|
||
proxies=[],
|
||
client_adapter=server.adapter,
|
||
use_playwright=False,
|
||
max_months_per_law=1,
|
||
start_year=2026,
|
||
start_month=4,
|
||
include_fz294=True,
|
||
include_fz248=False,
|
||
current_year=2026,
|
||
current_month=5,
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(len(result["results"]["fz294"]), 1)
|
||
self.assertEqual(result["results"]["fz248"], [])
|
||
self.assertGreaterEqual(result["total_saved"], len(rows))
|
||
|
||
def test_sync_inspections_resumes_from_last_loaded(self):
|
||
last_year = 2024
|
||
last_month = 12
|
||
InspectionRecordFactory(
|
||
data_year=last_year,
|
||
data_month=last_month,
|
||
is_federal_law_248=False,
|
||
)
|
||
InspectionRecordFactory(
|
||
data_year=last_year,
|
||
data_month=last_month,
|
||
is_federal_law_248=True,
|
||
)
|
||
xml_content, rows = build_proverki_xml(count=1)
|
||
archive = build_zip([("inspections.xml", xml_content)])
|
||
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes(
|
||
_portal_path(2025, 1),
|
||
archive,
|
||
content_type="application/zip",
|
||
)
|
||
|
||
class _LocalProverkiClient(parser_tasks.ProverkiClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
kwargs.setdefault("use_playwright", False)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
original_client = parser_tasks.ProverkiClient
|
||
parser_tasks.ProverkiClient = _LocalProverkiClient
|
||
try:
|
||
result = sync_inspections(current_year=2025, current_month=1)
|
||
finally:
|
||
parser_tasks.ProverkiClient = original_client
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertGreaterEqual(result["total_saved"], len(rows))
|
||
|
||
def test_sync_inspections_handles_fetch_error(self):
|
||
with TestHTTPServer() as server:
|
||
server.add_bytes(
|
||
_portal_path(2025, 1),
|
||
b"not-zip",
|
||
content_type="application/octet-stream",
|
||
)
|
||
|
||
class _LocalProverkiClient(parser_tasks.ProverkiClient):
|
||
def __init__(self, *args, **kwargs):
|
||
kwargs.setdefault("http_adapter", server.adapter)
|
||
kwargs.setdefault("use_playwright", False)
|
||
super().__init__(*args, **kwargs)
|
||
|
||
original_client = parser_tasks.ProverkiClient
|
||
parser_tasks.ProverkiClient = _LocalProverkiClient
|
||
try:
|
||
result = sync_inspections(
|
||
proxies=[],
|
||
current_year=2025,
|
||
current_month=1,
|
||
)
|
||
finally:
|
||
parser_tasks.ProverkiClient = original_client
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(result["total_saved"], 0)
|
||
|
||
def test_sync_inspections_fails_when_client_init_raises(self):
|
||
class _FailClient:
|
||
def __init__(self, *args, **kwargs):
|
||
raise RuntimeError("init failed")
|
||
|
||
original_client = parser_tasks.ProverkiClient
|
||
parser_tasks.ProverkiClient = _FailClient
|
||
try:
|
||
with self.assertRaises(RuntimeError):
|
||
sync_inspections(proxies=[])
|
||
finally:
|
||
parser_tasks.ProverkiClient = original_client
|
||
|
||
|
||
@override_settings(
|
||
CELERY_TASK_ALWAYS_EAGER=True,
|
||
CELERY_TASK_EAGER_PROPAGATES=True,
|
||
)
|
||
class FNSFileTasksTestCase(TestCase):
|
||
"""Tests for FNS file tasks."""
|
||
|
||
def _dirs(self, base_dir: str) -> tuple[str, str, str]:
|
||
watch_dir = os.path.join(base_dir, "watch")
|
||
processed_dir = os.path.join(base_dir, "processed")
|
||
failed_dir = os.path.join(base_dir, "failed")
|
||
return watch_dir, processed_dir, failed_dir
|
||
|
||
def _write_fns_file(self, watch_dir: str) -> str:
|
||
content = _build_fns_excel_bytes()
|
||
external_id = _digits(5)
|
||
ogrn = _digits(13)
|
||
filename = f"fin_{external_id}_{ogrn}.xlsx"
|
||
file_path = os.path.join(watch_dir, filename)
|
||
with open(file_path, "wb") as handle:
|
||
handle.write(content)
|
||
return file_path
|
||
|
||
def _scan_with_eager_process(self) -> dict:
|
||
original_delay = parser_tasks.process_fns_file.delay
|
||
|
||
def _local_delay(file_path: str):
|
||
return _process_fns_file_sync(
|
||
file_path,
|
||
task_id=str(fake.uuid4()),
|
||
raise_on_error=True,
|
||
)
|
||
|
||
parser_tasks.process_fns_file.delay = _local_delay
|
||
try:
|
||
return scan_fns_directory()
|
||
finally:
|
||
parser_tasks.process_fns_file.delay = original_delay
|
||
|
||
def test_scan_fns_directory_processes_file(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
file_path = self._write_fns_file(watch_dir)
|
||
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
):
|
||
result = self._scan_with_eager_process()
|
||
self.assertEqual(result["queued"], 1)
|
||
self.assertEqual(result["skipped"], 0)
|
||
self.assertEqual(FinancialReport.objects.count(), 1)
|
||
self.assertFalse(os.path.exists(file_path))
|
||
self.assertTrue(
|
||
os.path.exists(os.path.join(processed_dir, os.path.basename(file_path)))
|
||
)
|
||
|
||
def test_scan_fns_directory_creates_missing_watch_dir(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
):
|
||
result = scan_fns_directory()
|
||
self.assertEqual(result, {"scanned": 0, "queued": 0, "skipped": 0})
|
||
self.assertTrue(os.path.exists(watch_dir))
|
||
|
||
def test_scan_fns_directory_skips_locked_file(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
file_path = self._write_fns_file(watch_dir)
|
||
lock_path = f"{file_path}.lock"
|
||
with open(lock_path, "w") as handle:
|
||
handle.write("lock")
|
||
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
FNS_LOCK_TTL_SECONDS=3600,
|
||
):
|
||
result = scan_fns_directory()
|
||
self.assertEqual(result["queued"], 0)
|
||
self.assertEqual(result["skipped"], 1)
|
||
self.assertTrue(os.path.exists(file_path))
|
||
|
||
def test_scan_fns_directory_handles_stale_lock(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
file_path = self._write_fns_file(watch_dir)
|
||
lock_path = f"{file_path}.lock"
|
||
with open(lock_path, "w") as handle:
|
||
handle.write("lock")
|
||
old_time = fake.date_time_between(
|
||
start_date="-3d", end_date="-2d"
|
||
).timestamp()
|
||
os.utime(lock_path, (old_time, old_time))
|
||
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
FNS_LOCK_TTL_SECONDS=1,
|
||
):
|
||
result = self._scan_with_eager_process()
|
||
self.assertEqual(result["queued"], 1)
|
||
self.assertEqual(result["skipped"], 0)
|
||
|
||
def test_scan_fns_directory_handles_unreadable_file(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
bad_dir = os.path.join(
|
||
watch_dir, f"fin_{fake.pystr(min_chars=4, max_chars=6)}.xlsx"
|
||
)
|
||
os.makedirs(bad_dir, exist_ok=True)
|
||
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
):
|
||
result = scan_fns_directory()
|
||
self.assertEqual(result["queued"], 0)
|
||
self.assertEqual(result["skipped"], 1)
|
||
|
||
def test_scan_fns_directory_skips_duplicate_hash(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
file_path = self._write_fns_file(watch_dir)
|
||
file_hash = hashlib.sha256(Path(file_path).read_bytes()).hexdigest()
|
||
|
||
FinancialReport.objects.create(
|
||
external_id=fake.pystr(min_chars=6, max_chars=10),
|
||
ogrn=_digits(13),
|
||
file_name=os.path.basename(file_path),
|
||
file_hash=file_hash,
|
||
load_batch=fake.random_int(min=1, max=9999),
|
||
status=FinancialReport.Status.SUCCESS,
|
||
source=FinancialReport.SourceType.FILE_WATCH,
|
||
)
|
||
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
):
|
||
result = scan_fns_directory()
|
||
self.assertEqual(result["queued"], 0)
|
||
self.assertEqual(result["skipped"], 1)
|
||
self.assertTrue(
|
||
os.path.exists(os.path.join(processed_dir, os.path.basename(file_path)))
|
||
)
|
||
|
||
def test_scan_fns_directory_handles_enqueue_error(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
_ = self._write_fns_file(watch_dir)
|
||
|
||
def _raise_delay(*_args, **_kwargs):
|
||
raise RuntimeError("enqueue failed")
|
||
|
||
original_delay = parser_tasks.process_fns_file.delay
|
||
parser_tasks.process_fns_file.delay = _raise_delay
|
||
try:
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
):
|
||
result = scan_fns_directory()
|
||
finally:
|
||
parser_tasks.process_fns_file.delay = original_delay
|
||
|
||
self.assertEqual(result["queued"], 0)
|
||
self.assertEqual(result["skipped"], 1)
|
||
|
||
def test_process_fns_files_batch(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
valid_path = self._write_fns_file(watch_dir)
|
||
missing_path = os.path.join(watch_dir, f"{fake.word()}.xlsx")
|
||
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
):
|
||
result = process_fns_files_batch.apply(
|
||
args=[[valid_path, missing_path]]
|
||
).get()
|
||
self.assertEqual(result["total"], 2)
|
||
self.assertEqual(result["success"], 1)
|
||
self.assertEqual(result["failed"], 1)
|
||
|
||
def test_process_fns_file_sync_duplicate(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
file_path = self._write_fns_file(watch_dir)
|
||
file_hash = hashlib.sha256(Path(file_path).read_bytes()).hexdigest()
|
||
|
||
FinancialReport.objects.create(
|
||
external_id=fake.pystr(min_chars=6, max_chars=10),
|
||
ogrn=_digits(13),
|
||
file_name=os.path.basename(file_path),
|
||
file_hash=file_hash,
|
||
load_batch=fake.random_int(min=1, max=9999),
|
||
status=FinancialReport.Status.SUCCESS,
|
||
source=FinancialReport.SourceType.FILE_WATCH,
|
||
)
|
||
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
):
|
||
result = _process_fns_file_sync(
|
||
file_path,
|
||
task_id=str(fake.uuid4()),
|
||
)
|
||
|
||
self.assertEqual(result["status"], "skipped")
|
||
self.assertTrue(
|
||
os.path.exists(os.path.join(processed_dir, os.path.basename(file_path)))
|
||
)
|
||
|
||
def test_process_fns_file_sync_handles_integrity_error(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
|
||
os.makedirs(watch_dir, exist_ok=True)
|
||
file_path = self._write_fns_file(watch_dir)
|
||
filename = os.path.basename(file_path)
|
||
external_id, ogrn = (
|
||
filename.replace("fin_", "").replace(".xlsx", "").split("_", 1)
|
||
)
|
||
|
||
FinancialReport.objects.create(
|
||
external_id=external_id,
|
||
ogrn=ogrn,
|
||
file_name=filename,
|
||
file_hash=hashlib.sha256(
|
||
fake.pystr(min_chars=8, max_chars=12).encode("utf-8")
|
||
).hexdigest(),
|
||
load_batch=fake.random_int(min=1, max=9999),
|
||
status=FinancialReport.Status.SUCCESS,
|
||
source=FinancialReport.SourceType.FILE_WATCH,
|
||
)
|
||
|
||
with override_settings(
|
||
FNS_WATCH_DIRECTORY=watch_dir,
|
||
FNS_PROCESSED_DIRECTORY=processed_dir,
|
||
FNS_FAILED_DIRECTORY=failed_dir,
|
||
):
|
||
result = _process_fns_file_sync(
|
||
file_path,
|
||
task_id=str(fake.uuid4()),
|
||
)
|
||
|
||
self.assertEqual(result["status"], "failed")
|
||
self.assertTrue(
|
||
os.path.exists(os.path.join(failed_dir, os.path.basename(file_path)))
|
||
)
|
||
|
||
|
||
class TaskHelpersTestCase(TestCase):
|
||
def test_move_to_dir_handles_duplicate_target(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
source_dir = os.path.join(tmpdir, "source")
|
||
target_dir = os.path.join(tmpdir, "target")
|
||
os.makedirs(source_dir, exist_ok=True)
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
|
||
filename = f"{fake.pystr(min_chars=5, max_chars=8)}.txt"
|
||
source_path = Path(source_dir) / filename
|
||
source_path.write_text(fake.sentence())
|
||
existing_path = Path(target_dir) / filename
|
||
existing_path.write_text(fake.sentence())
|
||
|
||
moved = _move_to_dir(source_path, Path(target_dir), suffix="dup")
|
||
self.assertIsNotNone(moved)
|
||
self.assertTrue(Path(moved).exists())
|
||
self.assertIn("__dup__", Path(moved).name)
|
||
|
||
def test_get_next_month_regular(self):
|
||
from apps.parsers.tasks import _get_next_month
|
||
|
||
year = fake.random_int(min=2024, max=2026)
|
||
month = fake.random_int(min=1, max=11)
|
||
next_year, next_month = _get_next_month(year, month)
|
||
self.assertEqual(next_year, year)
|
||
self.assertEqual(next_month, month + 1)
|
||
|
||
def test_remove_lock_handles_missing_file(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
file_path = Path(tmpdir) / f"{fake.pystr(min_chars=4, max_chars=8)}.txt"
|
||
_remove_lock(file_path)
|
||
|
||
def test_try_create_lock_race(self):
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
file_path = Path(tmpdir) / f"{fake.pystr(min_chars=4, max_chars=8)}.txt"
|
||
file_path.write_text(fake.sentence())
|
||
results: list[bool] = []
|
||
|
||
def worker():
|
||
results.append(_try_create_lock(file_path))
|
||
|
||
threads = [threading.Thread(target=worker) for _ in range(8)]
|
||
for thread in threads:
|
||
thread.start()
|
||
for thread in threads:
|
||
thread.join()
|
||
|
||
self.assertEqual(results.count(True), 1)
|
||
_remove_lock(file_path)
|
||
|
||
|
||
class ParserLoadLogServiceTestCase(TestCase):
|
||
"""Tests for ParserLoadLogService methods used in tasks."""
|
||
|
||
def test_get_next_batch_id_new_source(self):
|
||
log = ParserLoadLog.objects.filter(source=ParserLoadLog.Source.PROCUREMENTS)
|
||
self.assertEqual(log.count(), 0)
|
||
|
||
from apps.parsers.services import ParserLoadLogService
|
||
|
||
batch_id = ParserLoadLogService.get_next_batch_id(
|
||
ParserLoadLog.Source.PROCUREMENTS
|
||
)
|
||
self.assertEqual(batch_id, 1)
|
||
|
||
def test_get_next_batch_id_increments(self):
|
||
ParserLoadLogFactory(source=ParserLoadLog.Source.PROCUREMENTS, batch_id=5)
|
||
|
||
from apps.parsers.services import ParserLoadLogService
|
||
|
||
batch_id = ParserLoadLogService.get_next_batch_id(
|
||
ParserLoadLog.Source.PROCUREMENTS
|
||
)
|
||
self.assertEqual(batch_id, 6)
|
||
|
||
def test_create_load_log(self):
|
||
from apps.parsers.services import ParserLoadLogService
|
||
|
||
log = ParserLoadLogService.create_load_log(
|
||
source=ParserLoadLog.Source.PROCUREMENTS,
|
||
batch_id=1,
|
||
)
|
||
self.assertEqual(log.status, "success")
|
||
|
||
def test_update_load_log(self):
|
||
log = ParserLoadLogFactory()
|
||
|
||
from apps.parsers.services import ParserLoadLogService
|
||
|
||
ParserLoadLogService.update_records_count(log, 10)
|
||
log.refresh_from_db()
|
||
self.assertEqual(log.records_count, 10)
|
||
|
||
def test_mark_failed(self):
|
||
log = ParserLoadLogFactory()
|
||
|
||
from apps.parsers.services import ParserLoadLogService
|
||
|
||
ParserLoadLogService.mark_failed(log, "fail")
|
||
log.refresh_from_db()
|
||
self.assertEqual(log.status, "failed")
|