Files
mostovik-backend/tests/apps/parsers/test_tasks.py

204 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for parser Celery tasks."""
from contextlib import ExitStack
from decimal import Decimal
from unittest.mock import patch
from apps.core.models import BackgroundJob, JobStatus
from apps.parsers.clients.common.schemas import GenericParserItem
from apps.parsers.models import GenericParserRecord, ParserLoadLog
from apps.parsers.tasks import (
import_parser_upload,
parse_all_sources,
parse_fns_financial_indicators,
parse_industrial_production,
parse_trudvsem_vacancies,
sync_inspections,
)
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
from django.test import TestCase
class GenericParserTasksTest(TestCase):
"""Tests for new generic parser tasks."""
@patch("apps.parsers.tasks.IndustrialProductionClient.fetch_certificates")
def test_parse_industrial_production_sets_user_id(self, mock_fetch_certificates):
"""Test existing parser task keeps BackgroundJob ownership."""
mock_fetch_certificates.return_value = []
result = parse_industrial_production.delay(user_id=99).get()
self.assertEqual(result["status"], "success")
self.assertEqual(result["saved"], 0)
self.assertEqual(BackgroundJob.objects.get().user_id, 99)
@patch("apps.parsers.tasks.StructuredDataClient.fetch_records")
def test_parse_fns_financial_indicators_saves_records(self, mock_fetch_records):
"""Test FNS financial task stores generic records."""
mock_fetch_records.return_value = [
GenericParserItem(
source=ParserLoadLog.Source.FNS_FINANCIAL,
external_id="FIN-1",
inn="1234567890",
organisation_name="ООО Тест",
title="Выручка",
amount=Decimal("1000.00"),
payload={"id": "FIN-1"},
)
]
result = parse_fns_financial_indicators.delay(
file_url="https://example.com/fns.json",
user_id=42,
).get()
self.assertEqual(result["status"], "success")
self.assertEqual(result["saved"], 1)
self.assertEqual(GenericParserRecord.objects.count(), 1)
self.assertEqual(ParserLoadLog.objects.count(), 1)
job = BackgroundJob.objects.get()
self.assertEqual(job.status, JobStatus.SUCCESS)
self.assertEqual(job.user_id, 42)
@patch("apps.parsers.tasks.StructuredDataClient.fetch_records")
def test_generic_parser_uses_registry_upstream_url(self, mock_fetch_records):
"""Test generic parser resolves official source URL without manual file."""
mock_fetch_records.return_value = []
result = parse_fns_financial_indicators.delay(user_id=42).get()
self.assertEqual(result["status"], "success")
self.assertEqual(
mock_fetch_records.call_args.kwargs["file_url"],
"https://bo.nalog.gov.ru/advanced-search/organizations/search",
)
def test_import_parser_upload_saves_records_and_removes_file(self):
"""Test uploaded registry files are imported through Celery."""
storage_path = default_storage.save(
"parser_uploads/test-fns.csv",
ContentFile(b"inn,ogrn,name\n1234567890,1234567890123,Test Org\n"),
)
result = import_parser_upload.delay(
source_key="fns_financial",
storage_path=storage_path,
file_name="test-fns.csv",
user_id=77,
).get()
self.assertEqual(result["status"], "success")
self.assertEqual(result["saved"], 1)
self.assertFalse(default_storage.exists(storage_path))
record = GenericParserRecord.objects.get()
self.assertEqual(record.source, ParserLoadLog.Source.FNS_FINANCIAL)
self.assertEqual(record.inn, "1234567890")
self.assertEqual(BackgroundJob.objects.get().user_id, 77)
@patch("apps.parsers.tasks.TrudvsemClient.fetch_vacancies")
def test_parse_trudvsem_vacancies_saves_records(self, mock_fetch_vacancies):
"""Test Trudvsem task stores vacancy records."""
mock_fetch_vacancies.return_value = [
GenericParserItem(
source=ParserLoadLog.Source.TRUDVSEM,
external_id="VAC-1",
inn="1234567890",
organisation_name="ООО Тест",
title="Инженер",
payload={"id": "VAC-1"},
)
]
result = parse_trudvsem_vacancies.delay(limit=1, user_id=7).get()
self.assertEqual(result["status"], "success")
self.assertEqual(result["saved"], 1)
self.assertTrue(
GenericParserRecord.objects.filter(
source=ParserLoadLog.Source.TRUDVSEM,
external_id="VAC-1",
).exists()
)
self.assertEqual(BackgroundJob.objects.get().user_id, 7)
def test_parse_all_sources_reports_all_sources(self):
"""Test parse_all_sources covers native and upstream parser entries."""
task_names = [
"parse_industrial_production",
"parse_manufactures",
"parse_inspections",
"parse_trudvsem_vacancies",
"parse_mpt_products",
"parse_procurements_44fz",
"parse_procurements_223fz",
"parse_contracts",
"parse_unfair_suppliers",
"parse_fas_goz_evasion",
"parse_fns_financial_indicators",
"parse_arbitration_cases",
"parse_fedresurs_bankruptcy",
"parse_fstec_registers",
]
with ExitStack() as stack:
mocks = {
task_name: stack.enter_context(
patch(f"apps.parsers.tasks.{task_name}.delay")
)
for task_name in task_names
}
for task_name, mock_delay in mocks.items():
mock_delay.return_value.id = f"{task_name}-task"
result = parse_all_sources.run(user_id=5, file_urls={})
self.assertEqual(result["industrial"], "parse_industrial_production-task")
self.assertEqual(result["trudvsem"], "parse_trudvsem_vacancies-task")
self.assertEqual(
result["fns_financial"],
"parse_fns_financial_indicators-task",
)
self.assertEqual(
result["fedresurs_bankruptcy"],
"parse_fedresurs_bankruptcy-task",
)
@patch("apps.parsers.tasks.ProverkiClient.fetch_inspections")
def test_sync_inspections_marks_period_errors_as_failed(self, mock_fetch):
"""Test inspection sync does not persist upstream errors as success."""
mock_fetch.side_effect = RuntimeError("upstream unavailable")
result = sync_inspections.delay(user_id=11).get()
self.assertEqual(result["status"], "failed")
self.assertGreaterEqual(len(result["errors"]), 1)
load_log = ParserLoadLog.objects.get()
self.assertEqual(load_log.status, "failed")
job = BackgroundJob.objects.get()
self.assertEqual(job.status, JobStatus.FAILURE)
self.assertEqual(job.user_id, 11)
@patch("apps.parsers.tasks.ProverkiClient.fetch_inspections")
def test_sync_inspections_can_limit_real_sync_slice(self, mock_fetch):
"""Test inspection sync can be bounded for manual/API runs."""
mock_fetch.return_value = []
result = sync_inspections.delay(
user_id=11,
max_months_per_law=1,
start_year=2026,
start_month=1,
include_fz248=False,
).get()
self.assertEqual(result["status"], "success")
self.assertEqual(mock_fetch.call_count, 1)
self.assertEqual(mock_fetch.call_args.kwargs["year"], 2026)
self.assertEqual(mock_fetch.call_args.kwargs["month"], 1)
self.assertFalse(mock_fetch.call_args.kwargs["is_federal_law_248"])