diff --git a/src/apps/parsers/tasks.py b/src/apps/parsers/tasks.py index 4d134e0..9a30570 100644 --- a/src/apps/parsers/tasks.py +++ b/src/apps/parsers/tasks.py @@ -6,7 +6,11 @@ Celery задачи для приложения парсеров. """ import logging +import shutil +import time +import uuid from datetime import datetime +from pathlib import Path from apps.core.services import BackgroundJobService from apps.parsers.clients.minpromtorg import ( @@ -26,6 +30,7 @@ from apps.parsers.services import ( ProxyService, ) from celery import shared_task +from requests.adapters import BaseAdapter logger = logging.getLogger(__name__) @@ -34,21 +39,221 @@ DEFAULT_START_YEAR = 2025 DEFAULT_START_MONTH = 1 +def _lock_path_for(file_path: Path) -> Path: + return Path(f"{file_path}.lock") + + +def _try_create_lock(file_path: Path) -> bool: + lock_path = _lock_path_for(file_path) + if lock_path.exists(): + try: + from django.conf import settings + + age_seconds = time.time() - lock_path.stat().st_mtime + ttl_seconds = getattr(settings, "FNS_LOCK_TTL_SECONDS", 3600) + if age_seconds > ttl_seconds: + lock_path.unlink() + else: + return False + except FileNotFoundError: # pragma: no cover + pass + try: + lock_path.touch(exist_ok=False) + except FileExistsError: # pragma: no cover + return False + return True + + +def _remove_lock(file_path: Path) -> None: + lock_path = _lock_path_for(file_path) + try: + lock_path.unlink() + except FileNotFoundError: + return + + +def _move_to_dir( + file_path: Path, + target_dir: Path, + *, + suffix: str | None = None, +) -> Path | None: + if not file_path.exists(): + return None + target_dir.mkdir(parents=True, exist_ok=True) + target = target_dir / file_path.name + if target.exists(): + tag = suffix or "dup" + unique = uuid.uuid4().hex[:8] + target = target_dir / f"{file_path.stem}__{tag}__{unique}{file_path.suffix}" + shutil.move(str(file_path), str(target)) + return target + + +def _process_fns_file_sync(file_path: str | Path, *, task_id: str) -> dict: + import hashlib + from dataclasses import asdict + + from apps.core.services import BackgroundJobService + from apps.parsers.clients.fns.parser import FNSExcelParser, FNSParserError + from apps.parsers.models import FinancialReport + from django.conf import settings + + source = ParserLoadLog.Source.FNS_REPORTS + load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id( + source=source, + status="in_progress", + ) + file_path = Path(file_path) + lock_path = _lock_path_for(file_path) + + logger.info( + "Processing FNS file (task_id=%s, batch_id=%d, file=%s)", + task_id, + batch_id, + file_path.name, + ) + + # Создаём BackgroundJob + job = BackgroundJobService.create_job( + task_id=task_id, + task_name="apps.parsers.tasks.process_fns_file", + meta={"source": source, "batch_id": batch_id, "file": file_path.name}, + ) + job.mark_started() + job.update_progress(0, f"Обработка файла {file_path.name}...") + + # load_log уже создан + + try: + # Проверяем существование файла + if not file_path.exists(): + raise FNSParserError(f"Файл не найден: {file_path}") + + # Вычисляем хеш + file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest() + + # Проверяем дубликат + if FNSReportService.exists_by_hash(file_hash): + logger.info( + "File already processed (hash=%s): %s", + file_hash, + file_path.name, + ) + _move_to_dir( + file_path, + Path(settings.FNS_PROCESSED_DIRECTORY), + suffix="dup", + ) + job.complete(result={"status": "skipped", "reason": "duplicate"}) + ParserLoadLogService.update(load_log, status="skipped") + return {"status": "skipped", "reason": "duplicate"} + + # Парсим файл + job.update_progress(20, "Парсинг Excel файла...") + parsed = FNSExcelParser.parse_file(file_path) + + # Сохраняем в БД + job.update_progress(60, f"Сохранение {len(parsed.lines)} строк...") + lines_data = [asdict(line) for line in parsed.lines] + + report = FNSReportService.save_report( + external_id=parsed.external_id, + ogrn=parsed.ogrn, + file_name=file_path.name, + file_hash=file_hash, + source=FinancialReport.SourceType.FILE_WATCH, + batch_id=batch_id, + lines_data=lines_data, + ) + + # Перемещаем файл в processed + job.update_progress(90, "Перемещение файла...") + _move_to_dir(file_path, Path(settings.FNS_PROCESSED_DIRECTORY)) + + # Обновляем лог + ParserLoadLogService.update( + load_log, + status="success", + records_count=len(parsed.lines), + ) + + # Завершаем + job.complete( + result={ + "report_id": report.id, + "external_id": parsed.external_id, + "ogrn": parsed.ogrn, + "lines_count": len(parsed.lines), + } + ) + + logger.info( + "FNS file processed: %s (report_id=%d, lines=%d)", + file_path.name, + report.id, + len(parsed.lines), + ) + + return { + "status": "success", + "report_id": report.id, + "external_id": parsed.external_id, + "ogrn": parsed.ogrn, + "lines_count": len(parsed.lines), + } + + except FNSParserError as e: + logger.error("FNS file parsing failed: %s - %s", file_path.name, e) + + # Перемещаем в failed + _move_to_dir(file_path, Path(settings.FNS_FAILED_DIRECTORY), suffix="failed") + + ParserLoadLogService.mark_failed(load_log, str(e)) + job.fail(error=str(e)) + + return {"status": "failed", "error": str(e)} + + except Exception as e: + logger.error( + "FNS file processing error: %s - %s", + file_path.name, + e, + exc_info=True, + ) + _move_to_dir(file_path, Path(settings.FNS_FAILED_DIRECTORY), suffix="failed") + ParserLoadLogService.mark_failed(load_log, str(e)) + job.fail(error=str(e)) + + return {"status": "failed", "error": str(e)} + finally: + if lock_path.exists(): + _remove_lock(file_path) + + @shared_task(bind=True) -def parse_industrial_production(self, proxies: list[str] | None = None) -> dict: +def parse_industrial_production( + self, + proxies: list[str] | None = None, + client_adapter: BaseAdapter | None = None, +) -> dict: """ Задача парсинга сертификатов промышленного производства. Args: proxies: Список прокси-серверов (опционально). Если не передан, берётся из БД. + client_adapter: HTTP-адаптер (опционально). Returns: Результат: batch_id, saved, status """ source = ParserLoadLog.Source.INDUSTRIAL - batch_id = ParserLoadLogService.get_next_batch_id(source) - task_id = self.request.id + load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id( + source=source, + status="in_progress", + ) + task_id = self.request.id or str(uuid.uuid4()) # Если прокси не переданы, берём из БД if proxies is None: @@ -70,17 +275,15 @@ def parse_industrial_production(self, proxies: list[str] | None = None) -> dict: job.mark_started() job.update_progress(0, "Инициализация парсера...") - # Создаём запись лога - load_log = ParserLoadLogService.create_load_log( - source=source, - batch_id=batch_id, - status="in_progress", - ) + # load_log уже создан try: # Парсинг данных job.update_progress(10, "Загрузка данных с API Минпромторга...") - with IndustrialProductionClient(proxies=proxies) as client: + client_kwargs = {"proxies": proxies} + if client_adapter: + client_kwargs["http_adapter"] = client_adapter + with IndustrialProductionClient(**client_kwargs) as client: certificates = client.fetch_certificates() # Сохранение в БД @@ -126,20 +329,28 @@ def parse_industrial_production(self, proxies: list[str] | None = None) -> dict: @shared_task(bind=True) -def parse_manufactures(self, proxies: list[str] | None = None) -> dict: +def parse_manufactures( + self, + proxies: list[str] | None = None, + client_adapter: BaseAdapter | None = None, +) -> dict: """ Задача парсинга реестра производителей. Args: proxies: Список прокси-серверов (опционально). Если не передан, берётся из БД. + client_adapter: HTTP-адаптер (опционально). Returns: Результат: batch_id, saved, status """ source = ParserLoadLog.Source.MANUFACTURES - batch_id = ParserLoadLogService.get_next_batch_id(source) - task_id = self.request.id + load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id( + source=source, + status="in_progress", + ) + task_id = self.request.id or str(uuid.uuid4()) # Если прокси не переданы, берём из БД if proxies is None: @@ -161,17 +372,15 @@ def parse_manufactures(self, proxies: list[str] | None = None) -> dict: job.mark_started() job.update_progress(0, "Инициализация парсера...") - # Создаём запись лога - load_log = ParserLoadLogService.create_load_log( - source=source, - batch_id=batch_id, - status="in_progress", - ) + # load_log уже создан try: # Парсинг данных job.update_progress(10, "Загрузка данных с API Минпромторга...") - with ManufacturesClient(proxies=proxies) as client: + client_kwargs = {"proxies": proxies} + if client_adapter: + client_kwargs["http_adapter"] = client_adapter + with ManufacturesClient(**client_kwargs) as client: manufacturers = client.fetch_manufacturers() # Сохранение в БД @@ -217,22 +426,43 @@ def parse_manufactures(self, proxies: list[str] | None = None) -> dict: @shared_task -def parse_all_minpromtorg(proxies: list[str] | None = None) -> dict: +def parse_all_minpromtorg( + proxies: list[str] | None = None, + client_adapter: BaseAdapter | None = None, +) -> dict: """ Запустить все парсеры Минпромторга. Args: proxies: Список прокси-серверов (опционально). Если не передан, каждая задача возьмёт прокси из БД. + client_adapter: HTTP-адаптер (опционально). Returns: Результаты всех парсеров """ logger.info("Starting all Minpromtorg parsers") + if client_adapter is not None: + industrial_result = parse_industrial_production.apply( + kwargs={"proxies": proxies, "client_adapter": client_adapter} + ) + manufactures_result = parse_manufactures.apply( + kwargs={"proxies": proxies, "client_adapter": client_adapter} + ) + else: + industrial_result = parse_industrial_production.delay( + proxies=proxies, + client_adapter=client_adapter, + ) + manufactures_result = parse_manufactures.delay( + proxies=proxies, + client_adapter=client_adapter, + ) + results = { - "industrial": parse_industrial_production.delay(proxies=proxies).id, - "manufactures": parse_manufactures.delay(proxies=proxies).id, + "industrial": industrial_result.id, + "manufactures": manufactures_result.id, } return results @@ -246,6 +476,8 @@ def parse_inspections( month: int | None = None, file_url: str | None = None, proxies: list[str] | None = None, + client_adapter: BaseAdapter | None = None, + use_playwright: bool | None = None, ) -> dict: """ Задача парсинга данных о проверках с proverki.gov.ru. @@ -256,13 +488,18 @@ def parse_inspections( file_url: Прямая ссылка на файл данных (опционально) proxies: Список прокси-серверов (опционально). Если не передан, берётся из БД. + client_adapter: HTTP-адаптер (опционально). + use_playwright: Использовать Playwright (опционально). Returns: Результат: batch_id, saved, status """ source = ParserLoadLog.Source.INSPECTIONS - batch_id = ParserLoadLogService.get_next_batch_id(source) - task_id = self.request.id + load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id( + source=source, + status="in_progress", + ) + task_id = self.request.id or str(uuid.uuid4()) # Если прокси не переданы, берём из БД if proxies is None: @@ -286,12 +523,7 @@ def parse_inspections( job.mark_started() job.update_progress(0, "Инициализация парсера...") - # Создаём запись лога - load_log = ParserLoadLogService.create_load_log( - source=source, - batch_id=batch_id, - status="in_progress", - ) + # load_log уже создан def progress_callback(percent: int, message: str) -> None: """Callback для обновления прогресса.""" @@ -300,7 +532,12 @@ def parse_inspections( try: # Парсинг данных job.update_progress(10, "Загрузка данных с proverki.gov.ru...") - with ProverkiClient(proxies=proxies) as client: + client_kwargs = {"proxies": proxies} + if client_adapter: + client_kwargs["http_adapter"] = client_adapter + if use_playwright is not None: + client_kwargs["use_playwright"] = use_playwright + with ProverkiClient(**client_kwargs) as client: inspections = client.fetch_inspections( year=year, month=month, @@ -351,23 +588,58 @@ def parse_inspections( @shared_task -def parse_all_sources(proxies: list[str] | None = None) -> dict: +def parse_all_sources( + proxies: list[str] | None = None, + client_adapter: BaseAdapter | None = None, + inspections_use_playwright: bool | None = None, +) -> dict: """ Запустить все парсеры из всех источников. Args: proxies: Список прокси-серверов (опционально). Если не передан, каждая задача возьмёт прокси из БД. + client_adapter: HTTP-адаптер (опционально). + inspections_use_playwright: Использовать Playwright (опционально). Returns: Task IDs всех запущенных парсеров """ logger.info("Starting all parsers from all sources") + if client_adapter is not None: + industrial_result = parse_industrial_production.apply( + kwargs={"proxies": proxies, "client_adapter": client_adapter} + ) + manufactures_result = parse_manufactures.apply( + kwargs={"proxies": proxies, "client_adapter": client_adapter} + ) + inspections_result = parse_inspections.apply( + kwargs={ + "proxies": proxies, + "client_adapter": client_adapter, + "use_playwright": inspections_use_playwright, + } + ) + else: + industrial_result = parse_industrial_production.delay( + proxies=proxies, + client_adapter=client_adapter, + ) + manufactures_result = parse_manufactures.delay( + proxies=proxies, + client_adapter=client_adapter, + ) + inspections_result = parse_inspections.delay( + proxies=proxies, + client_adapter=client_adapter, + use_playwright=inspections_use_playwright, + ) + results = { - "industrial": parse_industrial_production.delay(proxies=proxies).id, - "manufactures": parse_manufactures.delay(proxies=proxies).id, - "inspections": parse_inspections.delay(proxies=proxies).id, + "industrial": industrial_result.id, + "manufactures": manufactures_result.id, + "inspections": inspections_result.id, } return results @@ -385,6 +657,10 @@ def sync_inspections( # noqa: C901 self, *, proxies: list[str] | None = None, + client_adapter: BaseAdapter | None = None, + use_playwright: bool | None = None, + current_year: int | None = None, + current_month: int | None = None, ) -> dict: """ Синхронизация данных о проверках с proverki.gov.ru. @@ -398,13 +674,20 @@ def sync_inspections( # noqa: C901 Args: proxies: Список прокси-серверов (опционально) + client_adapter: HTTP-адаптер (опционально). + use_playwright: Использовать Playwright (опционально). + current_year: Год (опционально) для ограничения периода. + current_month: Месяц (опционально) для ограничения периода. Returns: Результат синхронизации """ source = ParserLoadLog.Source.INSPECTIONS - batch_id = ParserLoadLogService.get_next_batch_id(source) - task_id = self.request.id + load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id( + source=source, + status="in_progress", + ) + task_id = self.request.id or str(uuid.uuid4()) # Если прокси не переданы, берём из БД if proxies is None: @@ -423,20 +706,21 @@ def sync_inspections( # noqa: C901 job.mark_started() job.update_progress(0, "Инициализация синхронизации...") - # Создаём запись лога - load_log = ParserLoadLogService.create_load_log( - source=source, - batch_id=batch_id, - status="in_progress", - ) + # load_log уже создан - current_year = datetime.now().year - current_month = datetime.now().month + now = datetime.now() + current_year = current_year or now.year + current_month = current_month or now.month total_saved = 0 results = {"fz294": [], "fz248": []} try: - with ProverkiClient(proxies=proxies) as client: + client_kwargs = {"proxies": proxies} + if client_adapter: + client_kwargs["http_adapter"] = client_adapter + if use_playwright is not None: + client_kwargs["use_playwright"] = use_playwright + with ProverkiClient(**client_kwargs) as client: # Обрабатываем оба типа проверок for is_fz248 in [False, True]: fz_key = "fz248" if is_fz248 else "fz294" @@ -593,6 +877,9 @@ def parse_procurements( file_url: str | None = None, law_type: str = "44", proxies: list[str] | None = None, + client_host: str | None = None, + client_scheme: str | None = None, + client_adapter: BaseAdapter | None = None, ) -> dict: """ Задача парсинга данных о государственных закупках с zakupki.gov.ru. @@ -610,8 +897,11 @@ def parse_procurements( Результат: batch_id, saved, status """ source = ParserLoadLog.Source.PROCUREMENTS - batch_id = ParserLoadLogService.get_next_batch_id(source) - task_id = self.request.id + load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id( + source=source, + status="in_progress", + ) + task_id = self.request.id or str(uuid.uuid4()) # Если прокси не переданы, берём из БД if proxies is None: @@ -645,12 +935,7 @@ def parse_procurements( job.mark_started() job.update_progress(0, "Инициализация парсера...") - # Создаём запись лога - load_log = ParserLoadLogService.create_load_log( - source=source, - batch_id=batch_id, - status="in_progress", - ) + # load_log уже создан def progress_callback(percent: int, message: str) -> None: """Callback для обновления прогресса.""" @@ -659,7 +944,14 @@ def parse_procurements( try: # Парсинг данных job.update_progress(10, "Загрузка данных с zakupki.gov.ru...") - with ZakupkiClient(proxies=proxies) as client: + client_kwargs = {"proxies": proxies} + if client_host: + client_kwargs["host"] = client_host + if client_scheme: + client_kwargs["scheme"] = client_scheme + if client_adapter: + client_kwargs["http_adapter"] = client_adapter + with ZakupkiClient(**client_kwargs) as client: procurements = client.fetch_procurements( region_code=region_code, year=year, @@ -715,12 +1007,17 @@ def parse_procurements( @shared_task(bind=True) -def sync_procurements( +def sync_procurements( # noqa: C901 self, *, region_code: str, law_type: str = "44", proxies: list[str] | None = None, + client_host: str | None = None, + client_scheme: str | None = None, + client_adapter: BaseAdapter | None = None, + current_year: int | None = None, + current_month: int | None = None, ) -> dict: """ Синхронизация данных о закупках с zakupki.gov.ru. @@ -739,8 +1036,11 @@ def sync_procurements( Результат синхронизации """ source = ParserLoadLog.Source.PROCUREMENTS - batch_id = ParserLoadLogService.get_next_batch_id(source) - task_id = self.request.id + load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id( + source=source, + status="in_progress", + ) + task_id = self.request.id or str(uuid.uuid4()) # Если прокси не переданы, берём из БД if proxies is None: @@ -768,20 +1068,23 @@ def sync_procurements( job.mark_started() job.update_progress(0, "Инициализация синхронизации...") - # Создаём запись лога - load_log = ParserLoadLogService.create_load_log( - source=source, - batch_id=batch_id, - status="in_progress", - ) + # load_log уже создан - current_year = datetime.now().year - current_month = datetime.now().month + now = datetime.now() + current_year = current_year or now.year + current_month = current_month or now.month total_saved = 0 results = [] try: - with ZakupkiClient(proxies=proxies) as client: + client_kwargs = {"proxies": proxies} + if client_host: + client_kwargs["host"] = client_host + if client_scheme: + client_kwargs["scheme"] = client_scheme + if client_adapter: + client_kwargs["http_adapter"] = client_adapter + with ZakupkiClient(**client_kwargs) as client: # Определяем начальную точку last_year, last_month = ProcurementService.get_last_loaded_period( region_code=region_code, @@ -928,7 +1231,6 @@ def scan_fns_directory(self) -> dict: Результат сканирования: количество найденных и поставленных в очередь файлов """ import hashlib - from pathlib import Path from django.conf import settings @@ -946,16 +1248,39 @@ def scan_fns_directory(self) -> dict: files_found = list(watch_dir.glob("fin_*.xlsx")) for file_path in files_found: + if not _try_create_lock(file_path): + skipped += 1 + continue + # Вычисляем хеш файла - file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest() + try: + file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest() + except Exception as e: + logger.warning("Failed to read FNS file: %s - %s", file_path, e) + _remove_lock(file_path) + skipped += 1 + continue # Проверяем, обрабатывался ли файл if FNSReportService.exists_by_hash(file_hash): + _move_to_dir( + file_path, + Path(settings.FNS_PROCESSED_DIRECTORY), + suffix="dup", + ) + _remove_lock(file_path) skipped += 1 continue # Ставим в очередь на обработку - process_fns_file.delay(str(file_path)) + try: + process_fns_file.delay(str(file_path)) + except Exception as e: + logger.error("Failed to enqueue FNS file: %s - %s", file_path, e) + _remove_lock(file_path) + skipped += 1 + continue + queued += 1 logger.info("Queued FNS file for processing: %s", file_path.name) @@ -975,153 +1300,8 @@ def scan_fns_directory(self) -> dict: @shared_task(bind=True) def process_fns_file(self, file_path: str) -> dict: - """ - Обработка одного файла FNS. - - Args: - file_path: Путь к файлу - - Returns: - Результат обработки - """ - import hashlib - import shutil - from dataclasses import asdict - from pathlib import Path - - from apps.core.services import BackgroundJobService - from apps.parsers.clients.fns.parser import FNSExcelParser, FNSParserError - from apps.parsers.models import FinancialReport - from django.conf import settings - - source = ParserLoadLog.Source.FNS_REPORTS - batch_id = ParserLoadLogService.get_next_batch_id(source) - task_id = self.request.id - file_path = Path(file_path) - - logger.info( - "Processing FNS file (task_id=%s, batch_id=%d, file=%s)", - task_id, - batch_id, - file_path.name, - ) - - # Создаём BackgroundJob - job = BackgroundJobService.create_job( - task_id=task_id, - task_name="apps.parsers.tasks.process_fns_file", - meta={"source": source, "batch_id": batch_id, "file": file_path.name}, - ) - job.mark_started() - job.update_progress(0, f"Обработка файла {file_path.name}...") - - # Создаём запись лога - load_log = ParserLoadLogService.create_load_log( - source=source, - batch_id=batch_id, - status="in_progress", - ) - - try: - # Проверяем существование файла - if not file_path.exists(): - raise FNSParserError(f"Файл не найден: {file_path}") - - # Вычисляем хеш - file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest() - - # Проверяем дубликат - if FNSReportService.exists_by_hash(file_hash): - logger.info( - "File already processed (hash=%s): %s", - file_hash, - file_path.name, - ) - job.complete(result={"status": "skipped", "reason": "duplicate"}) - ParserLoadLogService.update(load_log, status="skipped") - return {"status": "skipped", "reason": "duplicate"} - - # Парсим файл - job.update_progress(20, "Парсинг Excel файла...") - parsed = FNSExcelParser.parse_file(file_path) - - # Сохраняем в БД - job.update_progress(60, f"Сохранение {len(parsed.lines)} строк...") - lines_data = [asdict(line) for line in parsed.lines] - - report = FNSReportService.save_report( - external_id=parsed.external_id, - ogrn=parsed.ogrn, - file_name=file_path.name, - file_hash=file_hash, - source=FinancialReport.SourceType.FILE_WATCH, - batch_id=batch_id, - lines_data=lines_data, - ) - - # Перемещаем файл в processed - job.update_progress(90, "Перемещение файла...") - processed_dir = Path(settings.FNS_PROCESSED_DIRECTORY) - processed_dir.mkdir(parents=True, exist_ok=True) - shutil.move(str(file_path), str(processed_dir / file_path.name)) - - # Обновляем лог - ParserLoadLogService.update( - load_log, - status="success", - records_count=len(parsed.lines), - ) - - # Завершаем - job.complete( - result={ - "report_id": report.id, - "external_id": parsed.external_id, - "ogrn": parsed.ogrn, - "lines_count": len(parsed.lines), - } - ) - - logger.info( - "FNS file processed: %s (report_id=%d, lines=%d)", - file_path.name, - report.id, - len(parsed.lines), - ) - - return { - "status": "success", - "report_id": report.id, - "external_id": parsed.external_id, - "ogrn": parsed.ogrn, - "lines_count": len(parsed.lines), - } - - except FNSParserError as e: - logger.error("FNS file parsing failed: %s - %s", file_path.name, e) - - # Перемещаем в failed - failed_dir = Path(settings.FNS_FAILED_DIRECTORY) - failed_dir.mkdir(parents=True, exist_ok=True) - if file_path.exists(): - shutil.move(str(file_path), str(failed_dir / file_path.name)) - - ParserLoadLogService.mark_failed(load_log, str(e)) - job.fail(error=str(e)) - - return {"status": "failed", "error": str(e)} - - except Exception as e: - logger.error( - "FNS file processing error: %s - %s", - file_path.name, - e, - exc_info=True, - ) - ParserLoadLogService.mark_failed(load_log, str(e)) - job.fail(error=str(e)) - - return {"status": "failed", "error": str(e)} + """Обработка одного файла FNS.""" + return _process_fns_file_sync(file_path, task_id=self.request.id) @shared_task(bind=True) @@ -1147,7 +1327,10 @@ def process_fns_files_batch(self, file_paths: list[str]) -> dict: failed_count = 0 for file_path in file_paths: - result = process_fns_file(file_path) + result = _process_fns_file_sync( + file_path, + task_id=f"{self.request.id}:{success_count + failed_count}", + ) results.append({"file": file_path, **result}) if result.get("status") == "success": diff --git a/src/apps/parsers/tests/test_proverki_client.py b/src/apps/parsers/tests/test_proverki_client.py new file mode 100644 index 0000000..985e2c4 --- /dev/null +++ b/src/apps/parsers/tests/test_proverki_client.py @@ -0,0 +1,1002 @@ +"""Unit tests for ProverkiClient using local HTTP server (no mocks).""" + +from __future__ import annotations + +import asyncio +import sys +import tempfile +import types +from asyncio import events as asyncio_events +from pathlib import Path +from xml.etree import ElementPath as element_path +from xml.etree import ElementTree as ET + +from apps.parsers.clients.base import HTTPClientError +from apps.parsers.clients.proverki import ProverkiClient +from apps.parsers.clients.proverki.client import ( + OPEN_DATA_PORTAL_URL, + ProverkiClientError, +) +from django.test import SimpleTestCase + +from tests.utils import TestHTTPServer +from tests.utils.fixtures import build_zip, fake + +_CYRILLIC_KNM = "\u041a\u041d\u041c" +_CYRILLIC_INN = "\u0418\u041d\u041d" +_CYRILLIC_OGRN = "\u041e\u0413\u0420\u041d" + + +def _digits(length: int) -> str: + return "".join(str(fake.random_int(0, 9)) for _ in range(length)) + + +def _attrs_string(attrs: dict[str, str]) -> str: + return " ".join(f'{key}="{value}"' for key, value in attrs.items()) + + +def _inspection_attrs() -> dict[str, str]: + return { + "ERPID": _digits(12), + "INN": _digits(10), + "OGRN": _digits(13), + "ORG_NAME": fake.company(), + "FRGU_ORG_NAME": fake.company(), + "ITYPE_NAME": fake.word(), + "ICARRYOUT_TYPE_NAME": fake.word(), + "START_DATE": str(fake.date()), + "END_DATE": str(fake.date()), + "STATUS": fake.word(), + "FZ_NAME": fake.sentence(nb_words=3), + "RESULT": fake.sentence(nb_words=2), + } + + +def _xml_with_tag(tag: str, attrs: dict[str, str]) -> bytes: + body = f"<{tag} {_attrs_string(attrs)} />" + xml = "" f"{body}" + return xml.encode("utf-8") + + +def _xml_with_namespace(tag: str, attrs: dict[str, str]) -> bytes: + ns = "http://example.com/ns" + body = f"" + xml = ( + "" + f"{body}" + ) + return xml.encode("utf-8") + + +def _xml_with_container(tag: str, attrs: dict[str, str]) -> bytes: + body = f"<{tag} {_attrs_string(attrs)} />" + xml = ( + "" + f"{body}" + ) + return xml.encode("utf-8") + + +def _xml_with_children() -> bytes: + inn = _digits(10) + ogrn = _digits(13) + registration = _digits(12) + xml = ( + "" + "" + f'' + f'' + f'' + "" + "" + ) + return xml.encode("utf-8") + + +def _xml_with_cyrillic_tag() -> bytes: + attrs = { + _CYRILLIC_INN: _digits(10), + _CYRILLIC_OGRN: _digits(13), + "I_NUMBER": _digits(12), + } + body = f"<{_CYRILLIC_KNM} {_attrs_string(attrs)} />" + xml = "" f"{body}" + return xml.encode("utf-8") + + +def _client_for(server: TestHTTPServer) -> ProverkiClient: + return ProverkiClient( + host="testserver", + scheme="http", + http_adapter=server.adapter, + use_playwright=False, + ) + + +class ProverkiDiscoverFilesTest(SimpleTestCase): + def test_discover_data_files_month(self): + client = ProverkiClient() + plans = client._discover_data_files(year=2025, month=2, is_federal_law_248=True) + self.assertEqual(len(plans), 1) + self.assertEqual(plans[0].month, 2) + self.assertIn("fz248", plans[0].file_name) + + def test_discover_data_files_year_only(self): + client = ProverkiClient() + plans = client._discover_data_files( + year=2024, month=None, is_federal_law_248=False + ) + self.assertEqual(len(plans), 1) + self.assertIsNone(plans[0].month) + self.assertIn("fz294", plans[0].file_name) + + def test_discover_data_files_without_year(self): + client = ProverkiClient() + self.assertEqual(client._discover_data_files(year=None), []) + + +class ProverkiDownloadParseTest(SimpleTestCase): + def test_download_and_parse_zip(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + archive = build_zip([("data.xml", xml)]) + + with TestHTTPServer() as server: + server.add_bytes( + "/opendata/data.zip", archive, content_type="application/zip" + ) + client = _client_for(server) + inspections = client.fetch_inspections( + file_url=f"{server.base_url}/opendata/data.zip" + ) + + self.assertEqual(len(inspections), 1) + + def test_download_and_parse_xml(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + + with TestHTTPServer() as server: + server.add_bytes("/opendata/data.xml", xml, content_type="application/xml") + client = _client_for(server) + inspections = client.fetch_inspections( + file_url=f"{server.base_url}/opendata/data.xml" + ) + + self.assertEqual(len(inspections), 1) + + def test_download_and_parse_portal_without_playwright(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + archive = build_zip([("data.xml", xml)]) + + with TestHTTPServer() as server: + server.add_bytes( + "/portal/public-open-data/check/2025/1", + archive, + content_type="application/zip", + ) + client = _client_for(server) + inspections = client._download_and_parse( + f"{server.base_url}/portal/public-open-data/check/2025/1", + file_format="portal", + ) + + self.assertEqual(len(inspections), 1) + + def test_download_and_parse_portal_without_playwright_with_progress(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + archive = build_zip([("data.xml", xml)]) + progress: list[tuple[int, str]] = [] + + def on_progress(value: int, message: str) -> None: + progress.append((value, message)) + + with TestHTTPServer() as server: + server.add_bytes( + "/portal/public-open-data/check/2025/2", + archive, + content_type="application/zip", + ) + client = _client_for(server) + inspections = client._download_and_parse( + f"{server.base_url}/portal/public-open-data/check/2025/2", + progress_callback=on_progress, + file_format="portal", + ) + + self.assertEqual(len(inspections), 1) + self.assertTrue(progress) + + def test_download_and_parse_html_without_playwright_fails(self): + html = b"blocked" + + with TestHTTPServer() as server: + server.add_bytes( + "/portal/public-open-data/check/2025/1", + html, + content_type="text/html", + ) + client = _client_for(server) + with self.assertRaises(ProverkiClientError): + client._download_and_parse( + f"{server.base_url}/portal/public-open-data/check/2025/1", + file_format="portal", + ) + + def test_download_and_parse_html_without_playwright_non_portal(self): + html = b"blocked" + + with TestHTTPServer() as server: + server.add_bytes("/opendata/data.html", html, content_type="text/html") + client = _client_for(server) + with self.assertRaises(ProverkiClientError): + client._download_and_parse(f"{server.base_url}/opendata/data.html") + + def test_download_and_parse_unknown_format(self): + with TestHTTPServer() as server: + server.add_bytes("/opendata/data.bin", b"not-xml-or-zip") + client = _client_for(server) + with self.assertRaises(ProverkiClientError): + client.fetch_inspections( + file_url=f"{server.base_url}/opendata/data.bin" + ) + + def test_parse_zip_archive_without_xml_files(self): + archive = build_zip([("readme.txt", b"no xml here")]) + client = ProverkiClient() + inspections = client._parse_zip_archive(archive) + self.assertEqual(inspections, []) + + def test_fetch_inspections_with_progress_callback(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + archive = build_zip([("data.xml", xml)]) + progress: list[tuple[int, str]] = [] + + def on_progress(value: int, message: str) -> None: + progress.append((value, message)) + + with TestHTTPServer() as server: + server.add_bytes( + "/opendata/data.zip", archive, content_type="application/zip" + ) + client = _client_for(server) + inspections = client.fetch_inspections( + file_url=f"{server.base_url}/opendata/data.zip", + progress_callback=on_progress, + ) + + self.assertEqual(len(inspections), 1) + self.assertTrue(progress) + + def test_fetch_inspections_http_error_bubbles(self): + with TestHTTPServer() as server: + server.add_bytes("/opendata/data.zip", b"", status=500) + client = _client_for(server) + with self.assertRaises(HTTPClientError): + client.fetch_inspections( + file_url=f"{server.base_url}/opendata/data.zip" + ) + + def test_fetch_inspection_plans(self): + client = ProverkiClient() + plans = client.fetch_inspection_plans(2025) + self.assertEqual(len(plans), 1) + self.assertIn("plan-2025", plans[0].file_name) + + def test_fetch_inspections_wraps_generic_error(self): + class _FailClient(ProverkiClient): + def _download_and_parse(self, *args, **kwargs): # type: ignore[override] + raise ValueError("boom") + + client = _FailClient() + with self.assertRaises(ProverkiClientError): + client.fetch_inspections(file_url="http://example.com/data.zip") + + def test_download_and_parse_portal_with_playwright_branch(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + archive = build_zip([("data.xml", xml)]) + progress = [] + + class _PortalClient(ProverkiClient): + def _download_from_portal(self, *args, **kwargs): # type: ignore[override] + return archive + + def _close_playwright(self): # type: ignore[override] + return None + + def on_progress(value: int, _message: str) -> None: + progress.append(value) + + client = _PortalClient(use_playwright=True) + inspections = client._download_and_parse( + "http://portal.example.com", + progress_callback=on_progress, + file_format="portal", + ) + self.assertEqual(len(inspections), 1) + self.assertTrue(progress) + + def test_download_and_parse_portal_with_playwright_no_progress(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + archive = build_zip([("data.xml", xml)]) + + class _PortalClient(ProverkiClient): + def _download_from_portal(self, *args, **kwargs): # type: ignore[override] + return archive + + def _close_playwright(self): # type: ignore[override] + return None + + client = _PortalClient(use_playwright=True) + inspections = client._download_and_parse( + "http://portal.example.com", file_format="portal" + ) + self.assertEqual(len(inspections), 1) + + def test_download_and_parse_html_switches_to_playwright(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + + class _HtmlClient(ProverkiClient): + def _download_with_playwright(self, *args, **kwargs): # type: ignore[override] + return xml + + def _close_playwright(self): # type: ignore[override] + return None + + with TestHTTPServer() as server: + server.add_bytes( + "/data.html", b"blocked", content_type="text/html" + ) + client = _HtmlClient( + host="testserver", + scheme="http", + http_adapter=server.adapter, + use_playwright=True, + ) + inspections = client._download_and_parse(f"{server.base_url}/data.html") + + self.assertEqual(len(inspections), 1) + + def test_download_and_parse_html_switches_to_playwright_with_progress(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + progress: list[tuple[int, str]] = [] + + class _HtmlClient(ProverkiClient): + def _download_with_playwright(self, *args, **kwargs): # type: ignore[override] + return xml + + def _close_playwright(self): # type: ignore[override] + return None + + def on_progress(value: int, message: str) -> None: + progress.append((value, message)) + + with TestHTTPServer() as server: + server.add_bytes( + "/data.html", b"blocked", content_type="text/html" + ) + client = _HtmlClient( + host="testserver", + scheme="http", + http_adapter=server.adapter, + use_playwright=True, + ) + inspections = client._download_and_parse( + f"{server.base_url}/data.html", progress_callback=on_progress + ) + + self.assertEqual(len(inspections), 1) + self.assertTrue(progress) + + def test_fetch_inspections_with_plans_and_progress(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + archive = build_zip([("data.xml", xml)]) + progress: list[int] = [] + + class _TestClient(ProverkiClient): + def _discover_data_files(self, **_kwargs): # type: ignore[override] + from apps.parsers.clients.proverki.schemas import InspectionPlan + + return [ + InspectionPlan( + year=2025, + month=1, + file_url=f"{server.base_url}/opendata/data.zip", + file_name="data.zip", + file_format="auto", + ) + ] + + def on_progress(value: int, _message: str) -> None: + progress.append(value) + + with TestHTTPServer() as server: + server.add_bytes( + "/opendata/data.zip", archive, content_type="application/zip" + ) + client = _TestClient( + host="testserver", + scheme="http", + http_adapter=server.adapter, + use_playwright=False, + ) + inspections = client.fetch_inspections( + year=2025, month=1, progress_callback=on_progress + ) + + self.assertEqual(len(inspections), 1) + self.assertTrue(progress) + + +class ProverkiParseXMLTest(SimpleTestCase): + def test_parse_xml_with_namespace(self): + xml = _xml_with_namespace("INSPECTION", _inspection_attrs()) + client = ProverkiClient() + inspections = client._parse_xml_content(xml) + self.assertEqual(len(inspections), 1) + + def test_parse_xml_with_container(self): + xml = _xml_with_container("inspection", _inspection_attrs()) + client = ProverkiClient() + inspections = client._parse_xml_content(xml) + self.assertEqual(len(inspections), 1) + + def test_parse_xml_with_children(self): + client = ProverkiClient() + inspections = client._parse_xml_content(_xml_with_children()) + self.assertEqual(len(inspections), 1) + + def test_parse_xml_with_cyrillic_tag(self): + client = ProverkiClient() + inspections = client._parse_xml_content(_xml_with_cyrillic_tag()) + self.assertEqual(len(inspections), 1) + + def test_parse_xml_streaming_threshold(self): + xml = _xml_with_tag("INSPECTION", _inspection_attrs()) + client = ProverkiClient() + client.STREAMING_THRESHOLD_BYTES = 1 + inspections = client._parse_xml_content(xml) + self.assertEqual(len(inspections), 1) + + def test_parse_xml_record_missing_fields_returns_none(self): + element = ET.fromstring("") # noqa: S314 + client = ProverkiClient() + self.assertIsNone(client._parse_xml_record(element)) + + def test_parse_xml_record_partial_fields(self): + element = ET.fromstring(f"") # noqa: S314 + client = ProverkiClient() + inspection = client._parse_xml_record(element) + self.assertIsNotNone(inspection) + self.assertEqual(inspection.inn, element.attrib["INN"]) + + def test_parse_xml_container_records(self): + xml = ( + b"" + b"" + ) + client = ProverkiClient() + inspections = client._parse_xml_content(xml) + self.assertEqual(inspections, []) + + def test_parse_xml_content_decode_fallback(self): + xml_str = "" + content = _BadBytes(b"\xff\xfe", xml_str=xml_str) + client = ProverkiClient() + inspections = client._parse_xml_content(content) + self.assertEqual(inspections, []) + + def test_parse_xml_streaming_decode_fallback(self): + xml_str = ( + "" + f'' + ) + content = _BadBytes(b"\xff\xfe", xml_str=xml_str) + client = ProverkiClient() + inspections = client._parse_xml_streaming(content) + self.assertEqual(len(inspections), 1) + + def test_parse_xml_streaming_parse_error_returns_partial(self): + inn = _digits(10) + xml = ( + "" + f'' + xml = ( + "" f"{record * 10000}" + ).encode() + client = ProverkiClient() + inspections = client._parse_xml_streaming(xml) + self.assertEqual(len(inspections), 10000) + + def test_parse_xml_streaming_skips_invalid_record(self): + xml = b"" b"" + client = ProverkiClient() + inspections = client._parse_xml_streaming(xml) + self.assertEqual(inspections, []) + + def test_parse_xml_tag_search_handles_error(self): + xml = _xml_with_tag("inspection", _inspection_attrs()) + client = ProverkiClient() + original_findall = element_path.findall + + def _raising_findall(elem, path, namespaces=None): + if path == ".//inspection": + raise SyntaxError("boom") + return original_findall(elem, path, namespaces) + + element_path.findall = _raising_findall + try: + inspections = client._parse_xml_content(xml) + finally: + element_path.findall = original_findall + + self.assertEqual(len(inspections), 1) + + def test_parse_xml_record_namespace_nested_fields(self): + ns = "http://example.com/ns" + inn = _digits(10) + ogrn = _digits(13) + inspection_type = fake.word() + status = fake.word() + xml = ( + "" + f"" + f'' + f'' + f'' + f'' + "" + "" + ).encode() + client = ProverkiClient() + inspections = client._parse_xml_content(xml) + self.assertEqual(len(inspections), 1) + self.assertEqual(inspections[0].inspection_type, inspection_type) + self.assertEqual(inspections[0].status, status) + + def test_parse_xml_record_namespace_text_child_fallback(self): + ns = "http://example.com/ns" + inn = _digits(10) + xml = ( + "" + f"{inn}" + ) + element = ET.fromstring(xml) # noqa: S314 + client = ProverkiClient() + inspection = client._parse_xml_record(element) + self.assertIsNotNone(inspection) + self.assertEqual(inspection.inn, inn) + + def test_parse_xml_record_bad_element_returns_none(self): + element = ET.Element("{") + client = ProverkiClient() + self.assertIsNone(client._parse_xml_record(element)) + + +class _BadBytes(bytes): + def __new__(cls, data: bytes, *, xml_str: str): + obj = super().__new__(cls, data) + obj._xml_str = xml_str + return obj + + def decode(self, encoding="utf-8", errors="strict"): + if errors == "replace": + return self._xml_str + raise UnicodeDecodeError(encoding, b"", 0, 1, "bad bytes") + + +class _FakeResponse: + def __init__(self, headers: dict[str, str] | None = None): + self.headers = headers or {} + + +class _FakeDownload: + def __init__(self, path: Path | None): + self._path = path + + def path(self): + if self._path is None: + return None + return str(self._path) + + +class _FakeDownloadContext: + def __init__(self, path: Path | None): + self.value = _FakeDownload(path) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +class _FakeLink: + def __init__(self, href: str | None = None): + self._href = href + + def get_attribute(self, name: str): + if name == "href": + return self._href + return None + + def click(self): + return None + + +class _FakePage: + def __init__( + self, + *, + content_type: str, + content: str, + download_path: Path | None, + download_links: list[_FakeLink] | None = None, + portal_links: list[_FakeLink] | None = None, + zip_link: _FakeLink | None = None, + xml_link: _FakeLink | None = None, + download_tab: _FakeLink | None = None, + raise_on_wait: bool = False, + ): + self._content_type = content_type + self._content = content + self._download_path = download_path + self._download_links = download_links or [] + self._portal_links = portal_links or [] + self._zip_link = zip_link + self._xml_link = xml_link + self._download_tab = download_tab + self._last_url = "" + self._raise_on_wait = raise_on_wait + + def goto(self, url, wait_until=None, timeout=None): + self._last_url = url + return _FakeResponse({"content-type": self._content_type}) + + def content(self): + return self._content + + def title(self): + return "Page" + + def wait_for_selector(self, *args, **kwargs): + if self._raise_on_wait: + raise RuntimeError("timeout") + return None + + def wait_for_timeout(self, *args, **kwargs): + return None + + def query_selector(self, selector: str): + if "Скачать" in selector and self._download_tab: + return self._download_tab + if ".zip" in selector and self._zip_link: + return self._zip_link + if ".xml" in selector and self._xml_link: + return self._xml_link + return None + + def query_selector_all(self, selector: str): + if self._last_url == OPEN_DATA_PORTAL_URL: + return self._portal_links + return self._download_links + + def expect_download(self, timeout=None): + return _FakeDownloadContext(self._download_path) + + +class _FakeContext: + def __init__(self, page: _FakePage): + self._page = page + self.closed = False + + def new_page(self): + return self._page + + def close(self): + self.closed = True + + +class _FakeBrowser: + def __init__(self, page: _FakePage): + self._page = page + self.closed = False + + def new_context(self, **_kwargs): + return _FakeContext(self._page) + + def close(self): + self.closed = True + + +def _temp_file(content: bytes) -> Path: + tmp = tempfile.NamedTemporaryFile(delete=False) + tmp.write(content) + tmp.flush() + tmp.close() + return Path(tmp.name) + + +class ProverkiPlaywrightStubTest(SimpleTestCase): + databases = "__all__" + + def tearDown(self): + super().tearDown() + try: + asyncio.get_running_loop() + except RuntimeError: + return + asyncio_events._set_running_loop(None) + + def test_download_with_playwright_direct_response(self): + download_path = _temp_file(b"") + page = _FakePage( + content_type="application/xml", + content="", + download_path=download_path, + ) + client = ProverkiClient() + client._browser = _FakeBrowser(page) + + result = client._download_with_playwright("http://example.com") + self.assertIn(b"") + page = _FakePage( + content_type="text/html", + content="content", + download_path=download_path, + zip_link=None, + xml_link=_FakeLink(href="file.xml"), + download_tab=None, + ) + client = ProverkiClient() + client._browser = _FakeBrowser(page) + + result = client._download_from_portal("http://portal.example.com") + self.assertEqual(result, b"") + + def test_download_from_portal_xml_link_without_download_path(self): + page = _FakePage( + content_type="text/html", + content="no data", + download_path=None, + zip_link=None, + xml_link=_FakeLink(href="file.xml"), + download_tab=None, + ) + client = ProverkiClient() + client._browser = _FakeBrowser(page) + + with self.assertRaises(ProverkiClientError): + client._download_from_portal("http://portal.example.com") + + def test_download_from_portal_no_links_not_found(self): + download_path = _temp_file(b"") + page = _FakePage( + content_type="text/html", + content="Not found", + download_path=download_path, + zip_link=None, + xml_link=None, + download_tab=None, + raise_on_wait=True, + ) + client = ProverkiClient() + client._browser = _FakeBrowser(page) + + with self.assertRaises(ProverkiClientError): + client._download_from_portal("http://portal.example.com") + + def test_download_from_portal_no_links_generic_error(self): + page = _FakePage( + content_type="text/html", + content="no links here", + download_path=None, + zip_link=None, + xml_link=None, + download_tab=None, + ) + client = ProverkiClient() + client._browser = _FakeBrowser(page) + + with self.assertRaises(ProverkiClientError): + client._download_from_portal("http://portal.example.com") + + def test_close_playwright_handles_errors(self): + class _BrokenBrowser: + def close(self): + raise RuntimeError("boom") + + class _BrokenPlaywright: + def stop(self): + raise RuntimeError("boom") + + client = ProverkiClient() + client._browser = _BrokenBrowser() + client._playwright = _BrokenPlaywright() + + client._close_playwright() + self.assertIsNone(client._browser) + self.assertIsNone(client._playwright) + + def test_get_browser_import_error(self): + client = ProverkiClient() + original_playwright = sys.modules.get("playwright") + original_sync_api = sys.modules.get("playwright.sync_api") + fake_playwright = types.ModuleType("playwright") + fake_playwright.__path__ = [] + fake_sync_api = types.ModuleType("playwright.sync_api") + sys.modules["playwright"] = fake_playwright + sys.modules["playwright.sync_api"] = fake_sync_api + try: + with self.assertRaises(ProverkiClientError): + client._get_browser() + finally: + if original_playwright is None: + sys.modules.pop("playwright", None) + else: + sys.modules["playwright"] = original_playwright + if original_sync_api is None: + sys.modules.pop("playwright.sync_api", None) + else: + sys.modules["playwright.sync_api"] = original_sync_api + + def test_get_browser_success(self): + class _FakeChromium: + def launch(self, **_kwargs): + return object() + + class _FakePlaywright: + chromium = _FakeChromium() + + class _FakeSyncPlaywright: + def start(self): + return _FakePlaywright() + + fake_module = types.SimpleNamespace( + sync_playwright=lambda: _FakeSyncPlaywright() + ) + client = ProverkiClient() + original_module = sys.modules.get("playwright.sync_api") + sys.modules["playwright.sync_api"] = fake_module + try: + browser = client._get_browser() + finally: + if original_module is None: + sys.modules.pop("playwright.sync_api", None) + else: + sys.modules["playwright.sync_api"] = original_module + self.assertIsNotNone(browser) + + def test_get_browser_start_error(self): + class _BrokenPlaywright: + def start(self): + raise RuntimeError("startup failed") + + fake_module = types.SimpleNamespace(sync_playwright=lambda: _BrokenPlaywright()) + client = ProverkiClient() + original_module = sys.modules.get("playwright.sync_api") + sys.modules["playwright.sync_api"] = fake_module + try: + with self.assertRaises(ProverkiClientError): + client._get_browser() + finally: + if original_module is None: + sys.modules.pop("playwright.sync_api", None) + else: + sys.modules["playwright.sync_api"] = original_module diff --git a/src/apps/parsers/tests/test_tasks.py b/src/apps/parsers/tests/test_tasks.py index b78f330..f7e5481 100644 --- a/src/apps/parsers/tests/test_tasks.py +++ b/src/apps/parsers/tests/test_tasks.py @@ -1,15 +1,89 @@ -""" -Unit-тесты для Celery задач парсера zakupki.gov.ru. +"""Integration-style tests for parser tasks (no mocks).""" -Тестирует задачи parse_procurements и sync_procurements. -""" +from __future__ import annotations -from unittest.mock import MagicMock, patch +import hashlib +import io +import os +import tempfile +import threading +from pathlib import Path +from urllib.parse import urlparse -from apps.parsers.clients.zakupki.schemas import Procurement -from apps.parsers.models import ParserLoadLog, ProcurementRecord -from apps.parsers.tests.factories import ParserLoadLogFactory, ProcurementRecordFactory +from apps.parsers import tasks as parser_tasks +from apps.parsers.clients.minpromtorg.industrial import IndustrialProductionClient +from apps.parsers.clients.minpromtorg.manufactures import ManufacturesClient +from apps.parsers.models import ( + FinancialReport, + IndustrialCertificateRecord, + InspectionRecord, + ManufacturerRecord, + ParserLoadLog, + ProcurementRecord, +) +from apps.parsers.tasks import ( + _move_to_dir, + _process_fns_file_sync, + _remove_lock, + _try_create_lock, + parse_all_minpromtorg, + parse_all_sources, + parse_industrial_production, + parse_inspections, + parse_manufactures, + parse_procurements, + process_fns_files_batch, + scan_fns_directory, + sync_inspections, + sync_procurements, +) +from apps.parsers.tests.factories import ( + InspectionRecordFactory, + ParserLoadLogFactory, + ProcurementRecordFactory, +) from django.test import TestCase, override_settings +from openpyxl import Workbook + +from tests.utils import TestHTTPServer +from tests.utils.fixtures import ( + build_minpromtorg_certificates_excel, + build_minpromtorg_manufacturers_excel, + build_proverki_xml, + build_zakupki_xml, + build_zip, + fake, +) + + +def _host_from_base_url(base_url: str) -> str: + parsed = urlparse(base_url) + if parsed.port: + return f"{parsed.hostname}:{parsed.port}" + return parsed.hostname or "" + + +def _digits(length: int) -> str: + return "".join(str(fake.random_int(0, 9)) for _ in range(length)) + + +def _build_fns_excel_bytes() -> bytes: + wb = Workbook() + ws = wb.active + year = fake.random_int(min=2020, max=2025) + ws.append(["Форма №1", None, year, None]) + ws.append([None, "Код", "Начало", "Конец"]) + ws.append( + [fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)] + ) + buf = io.BytesIO() + wb.save(buf) + wb.close() + return buf.getvalue() + + +def _portal_path(year: int, month: int) -> str: + return f"/portal/public-open-data/check/{year}/{month}" @override_settings( @@ -17,146 +91,133 @@ from django.test import TestCase, override_settings CELERY_TASK_EAGER_PROPAGATES=True, ) class ParseProcurementsTaskTestCase(TestCase): - """Тесты задачи parse_procurements.""" + """Tests for parse_procurements task.""" - def _create_mock_procurement(self, number: str = "1234567890123456789"): - """Создать мок закупки.""" - return Procurement( - purchase_number=number, - purchase_name="Test Procurement", - customer_inn="1234567890", - customer_kpp="123456789", - customer_ogrn="1234567890123", - customer_name="Test Organization", - max_price="1000000", - currency_code="RUB", - placement_method="Electronic auction", - publish_date="2025-01-01", - end_date="2025-02-01", - status="Published", - law_type="44-FZ", - ) + def test_parse_procurements_success(self): + xml_content, rows = build_zakupki_xml(count=2) + archive = build_zip([("data.xml", xml_content)]) - @patch("apps.parsers.tasks.ZakupkiClient") - @patch("apps.parsers.tasks.BackgroundJobService.create_job") - def test_parse_procurements_success(self, mock_create_job, mock_client_class): - """Успешный парсинг закупок.""" - from apps.parsers.tasks import parse_procurements + with TestHTTPServer() as server: + server.add_bytes("/files/data.zip", archive, content_type="application/zip") + result = parse_procurements( + file_url=f"{server.base_url}/files/data.zip", + law_type="44", + client_adapter=server.adapter, + client_host=_host_from_base_url(server.base_url), + client_scheme="http", + ) - # Настройка моков - mock_job = MagicMock() - mock_create_job.return_value = mock_job - - mock_client = MagicMock() - mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client) - mock_client_class.return_value.__exit__ = MagicMock(return_value=False) - mock_client.fetch_procurements.return_value = [ - self._create_mock_procurement("1111111111111111111"), - self._create_mock_procurement("2222222222222222222"), - ] - - # Запуск задачи - result = parse_procurements( - region_code="77", - year=2025, - month=1, - law_type="44", - ) - - # Проверки self.assertEqual(result["status"], "success") - self.assertEqual(result["saved"], 2) - self.assertIn("batch_id", result) + self.assertEqual(result["saved"], len(rows)) + self.assertGreater(ProcurementRecord.objects.count(), 0) - # Проверяем что BackgroundJob был обновлён - mock_job.mark_started.assert_called_once() - mock_job.complete.assert_called_once() + def test_parse_procurements_failure(self): + with TestHTTPServer() as server: + server.add_bytes("/files/bad.bin", b"not-zip-or-xml") + result = parse_procurements( + file_url=f"{server.base_url}/files/bad.bin", + law_type="44", + client_adapter=server.adapter, + client_host=_host_from_base_url(server.base_url), + client_scheme="http", + ) - # Проверяем сохранение в БД - self.assertEqual(ProcurementRecord.objects.count(), 2) - - @patch("apps.parsers.tasks.ZakupkiClient") - @patch("apps.parsers.tasks.BackgroundJobService.create_job") - def test_parse_procurements_failure(self, mock_create_job, mock_client_class): - """Обработка ошибки при парсинге.""" - from apps.parsers.tasks import parse_procurements - - # Настройка моков - mock_job = MagicMock() - mock_create_job.return_value = mock_job - - mock_client = MagicMock() - mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client) - mock_client_class.return_value.__exit__ = MagicMock(return_value=False) - mock_client.fetch_procurements.side_effect = Exception("Network error") - - # Запуск задачи - result = parse_procurements( - region_code="77", - year=2025, - month=1, - ) - - # Проверки self.assertEqual(result["status"], "failed") self.assertIn("error", result) - # Проверяем что BackgroundJob был помечен как failed - mock_job.fail.assert_called_once() + def test_parse_procurements_empty_result(self): + xml_content = b"" + archive = build_zip([("data.xml", xml_content)]) - @patch("apps.parsers.tasks.ZakupkiClient") - @patch("apps.parsers.tasks.BackgroundJobService.create_job") - def test_parse_procurements_empty_result(self, mock_create_job, mock_client_class): - """Парсинг без результатов.""" - from apps.parsers.tasks import parse_procurements + with TestHTTPServer() as server: + server.add_bytes( + "/files/empty.zip", archive, content_type="application/zip" + ) + result = parse_procurements( + file_url=f"{server.base_url}/files/empty.zip", + law_type="44", + client_adapter=server.adapter, + client_host=_host_from_base_url(server.base_url), + client_scheme="http", + ) - # Настройка моков - mock_job = MagicMock() - mock_create_job.return_value = mock_job - - mock_client = MagicMock() - mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client) - mock_client_class.return_value.__exit__ = MagicMock(return_value=False) - mock_client.fetch_procurements.return_value = [] - - # Запуск задачи - result = parse_procurements( - region_code="77", - year=2025, - month=1, - ) - - # Проверки self.assertEqual(result["status"], "success") self.assertEqual(result["saved"], 0) - @patch("apps.parsers.tasks.ZakupkiClient") - @patch("apps.parsers.tasks.BackgroundJobService.create_job") - def test_parse_procurements_with_file_url(self, mock_create_job, mock_client_class): - """Парсинг по прямой ссылке.""" - from apps.parsers.tasks import parse_procurements + def test_parse_procurements_with_file_url(self): + xml_content, rows = build_zakupki_xml(count=1) + archive = build_zip([("data.xml", xml_content)]) - # Настройка моков - mock_job = MagicMock() - mock_create_job.return_value = mock_job + with TestHTTPServer() as server: + server.add_bytes("/files/data.zip", archive, content_type="application/zip") + result = parse_procurements( + file_url=f"{server.base_url}/files/data.zip", + law_type="44", + client_adapter=server.adapter, + client_host=_host_from_base_url(server.base_url), + client_scheme="http", + ) - mock_client = MagicMock() - mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client) - mock_client_class.return_value.__exit__ = MagicMock(return_value=False) - mock_client.fetch_procurements.return_value = [self._create_mock_procurement()] - - # Запуск задачи с прямой ссылкой - result = parse_procurements( - file_url="http://example.com/data.xml", - ) - - # Проверки self.assertEqual(result["status"], "success") + self.assertEqual(result["saved"], len(rows)) - # Проверяем что клиент был вызван с file_url - mock_client.fetch_procurements.assert_called_once() - call_kwargs = mock_client.fetch_procurements.call_args.kwargs - self.assertEqual(call_kwargs["file_url"], "http://example.com/data.xml") + def test_parse_procurements_default_host_with_proxies(self): + xml_content, rows = build_zakupki_xml(count=1) + archive = build_zip([("data.xml", xml_content)]) + region = f"{fake.random_int(min=1, max=99):02d}" + year = fake.random_int(min=2020, max=2025) + month = fake.random_int(min=1, max=12) + + with TestHTTPServer() as server: + path = ( + f"/opendata/download/notifications/{region}/{year}/{month:02d}/fz44.zip" + ) + server.add_bytes(path, archive, content_type="application/zip") + result = parse_procurements( + region_code=region, + year=year, + month=month, + law_type="44", + proxies=[], + client_adapter=server.adapter, + ) + + self.assertEqual(result["status"], "success") + self.assertEqual(result["saved"], len(rows)) + + def test_parse_procurements_without_adapter(self): + xml_content, rows = build_zakupki_xml(count=1) + archive = build_zip([("data.xml", xml_content)]) + region = f"{fake.random_int(min=1, max=99):02d}" + year = fake.random_int(min=2020, max=2025) + month = fake.random_int(min=1, max=12) + + with TestHTTPServer() as server: + path = ( + f"/opendata/download/notifications/{region}/{year}/{month:02d}/fz44.zip" + ) + server.add_bytes(path, archive, content_type="application/zip") + + class _LocalZakupkiClient(parser_tasks.ZakupkiClient): + def __init__(self, *args, **kwargs): + kwargs.setdefault("http_adapter", server.adapter) + super().__init__(*args, **kwargs) + + original_client = parser_tasks.ZakupkiClient + parser_tasks.ZakupkiClient = _LocalZakupkiClient + try: + result = parse_procurements( + region_code=region, + year=year, + month=month, + law_type="44", + proxies=[], + ) + finally: + parser_tasks.ZakupkiClient = original_client + + self.assertEqual(result["status"], "success") + self.assertEqual(result["saved"], len(rows)) @override_settings( @@ -164,176 +225,899 @@ class ParseProcurementsTaskTestCase(TestCase): CELERY_TASK_EAGER_PROPAGATES=True, ) class SyncProcurementsTaskTestCase(TestCase): - """Тесты задачи sync_procurements.""" + """Tests for sync_procurements task.""" - def _create_mock_procurement(self, number: str): - """Создать мок закупки.""" - return Procurement( - purchase_number=number, - purchase_name="Test", - customer_inn="1234567890", - customer_kpp="", - customer_ogrn="", - customer_name="Test Org", - max_price="1000000", - currency_code="RUB", - placement_method="", - publish_date="2025-01-01", - end_date="", - status="", - law_type="44-FZ", - ) + def test_sync_continues_from_last_loaded(self): + region = f"{fake.random_int(min=1, max=99):02d}" + law_type = "44" - @patch("apps.parsers.tasks.ZakupkiClient") - @patch("apps.parsers.tasks.BackgroundJobService.create_job") - def test_sync_starts_from_default_date(self, mock_create_job, mock_client_class): - """Синхронизация начинается с дефолтной даты если нет данных.""" - from apps.parsers.tasks import sync_procurements - - mock_job = MagicMock() - mock_create_job.return_value = mock_job - - mock_client = MagicMock() - mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client) - mock_client_class.return_value.__exit__ = MagicMock(return_value=False) - # Возвращаем пустой список для всех месяцев, чтобы быстро завершиться - mock_client.fetch_procurements.return_value = [] - - result = sync_procurements(region_code="77", law_type="44") - - self.assertEqual(result["status"], "success") - # Клиент должен был быть вызван - self.assertTrue(mock_client.fetch_procurements.called) - - @patch("apps.parsers.tasks.ZakupkiClient") - @patch("apps.parsers.tasks.BackgroundJobService.create_job") - def test_sync_continues_from_last_loaded(self, mock_create_job, mock_client_class): - """Синхронизация продолжается с последнего загруженного месяца.""" - from apps.parsers.tasks import sync_procurements - - # Создаём существующие данные за январь + # Prepare last loaded period as previous month + current_year = fake.random_int(min=2024, max=2025) + current_month = fake.random_int(min=2, max=12) + last_year = current_year + last_month = current_month - 1 ProcurementRecordFactory( - data_year=2025, - data_month=1, - region_code="77", - law_type="44-FZ", + region_code=region, + data_year=last_year, + data_month=last_month, + law_type=f"{law_type}-FZ", ) - mock_job = MagicMock() - mock_create_job.return_value = mock_job - - mock_client = MagicMock() - mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client) - mock_client_class.return_value.__exit__ = MagicMock(return_value=False) - mock_client.fetch_procurements.return_value = [] - - result = sync_procurements(region_code="77", law_type="44") + xml_content, rows = build_zakupki_xml(count=1) + archive = build_zip([("data.xml", xml_content)]) + with TestHTTPServer() as server: + file_url = f"/opendata/download/notifications/{region}/{current_year}/{current_month:02d}/fz44.zip" + server.add_bytes(file_url, archive, content_type="application/zip") + result = sync_procurements( + region_code=region, + law_type=law_type, + client_host=_host_from_base_url(server.base_url), + client_scheme="http", + client_adapter=server.adapter, + current_year=current_year, + current_month=current_month, + ) self.assertEqual(result["status"], "success") + self.assertGreaterEqual(result["total_saved"], len(rows)) - # Проверяем что первый вызов был для февраля (следующий месяц) - if mock_client.fetch_procurements.called: - first_call_kwargs = mock_client.fetch_procurements.call_args_list[0].kwargs - # Должен начать с февраля 2025 - self.assertEqual(first_call_kwargs.get("year"), 2025) - self.assertEqual(first_call_kwargs.get("month"), 2) + def test_sync_stops_after_empty_months(self): + region = f"{fake.random_int(min=1, max=99):02d}" + law_type = "44" + current_year = 2025 + current_month = 3 - @patch("apps.parsers.tasks.ZakupkiClient") - @patch("apps.parsers.tasks.BackgroundJobService.create_job") - def test_sync_stops_after_empty_months(self, mock_create_job, mock_client_class): - """Синхронизация останавливается после 2 пустых месяцев подряд.""" - from apps.parsers.tasks import sync_procurements - - mock_job = MagicMock() - mock_create_job.return_value = mock_job - - mock_client = MagicMock() - mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client) - mock_client_class.return_value.__exit__ = MagicMock(return_value=False) - # Первый месяц - есть данные, потом 2 пустых - mock_client.fetch_procurements.side_effect = [ - [self._create_mock_procurement("1111111111111111111")], - [], # Пустой - [], # Пустой - должен остановиться - ] - - result = sync_procurements(region_code="77", law_type="44") + # No data available on server => empty month + with TestHTTPServer() as server: + result = sync_procurements( + region_code=region, + law_type=law_type, + client_host=_host_from_base_url(server.base_url), + client_scheme="http", + current_year=current_year, + current_month=current_month, + ) self.assertEqual(result["status"], "success") - # Должен был вызваться 3 раза (1 с данными + 2 пустых) - self.assertEqual(mock_client.fetch_procurements.call_count, 3) + self.assertEqual(result["total_saved"], 0) + + def test_sync_with_proxies_and_default_host(self): + region = f"{fake.random_int(min=1, max=99):02d}" + law_type = "44" + current_year = 2025 + current_month = 1 + xml_content, rows = build_zakupki_xml(count=1) + archive = build_zip([("data.xml", xml_content)]) + + with TestHTTPServer() as server: + file_url = f"/opendata/download/notifications/{region}/{current_year}/{current_month:02d}/fz44.zip" + server.add_bytes(file_url, archive, content_type="application/zip") + result = sync_procurements( + region_code=region, + law_type=law_type, + proxies=[], + client_adapter=server.adapter, + current_year=current_year, + current_month=current_month, + ) + + self.assertEqual(result["status"], "success") + self.assertGreaterEqual(result["total_saved"], len(rows)) + + def test_sync_handles_fetch_error(self): + region = f"{fake.random_int(min=1, max=99):02d}" + law_type = "44" + current_year = 2025 + current_month = 1 + + with TestHTTPServer() as server: + file_url = f"/opendata/download/notifications/{region}/{current_year}/{current_month:02d}/fz44.zip" + server.add_bytes( + file_url, b"not-xml", content_type="application/octet-stream" + ) + result = sync_procurements( + region_code=region, + law_type=law_type, + proxies=[], + client_adapter=server.adapter, + current_year=current_year, + current_month=current_month, + ) + + self.assertEqual(result["status"], "success") + self.assertEqual(result["total_saved"], 0) + + def test_sync_procurements_fails_when_client_init_raises(self): + region = f"{fake.random_int(min=1, max=99):02d}" + + class _FailClient: + def __init__(self, *args, **kwargs): + raise RuntimeError("init failed") + + original_client = parser_tasks.ZakupkiClient + parser_tasks.ZakupkiClient = _FailClient + try: + result = sync_procurements(region_code=region, law_type="44", proxies=[]) + finally: + parser_tasks.ZakupkiClient = original_client + + self.assertEqual(result["status"], "failed") + + +@override_settings( + CELERY_TASK_ALWAYS_EAGER=True, + CELERY_TASK_EAGER_PROPAGATES=True, +) +class MinpromtorgTasksTestCase(TestCase): + """Tests for Minpromtorg tasks.""" + + def _add_minpromtorg_routes(self, server: TestHTTPServer): + certificates_bytes, cert_rows = build_minpromtorg_certificates_excel(count=2) + manufacturers_bytes, manuf_rows = build_minpromtorg_manufacturers_excel(count=2) + + date_str = fake.date_between(start_date="-30d", end_date="today").strftime( + "%Y%m%d" + ) + cert_file = f"data_resolutions_{date_str}.xlsx" + manuf_file = f"data_orgs_{date_str}.xlsx" + + server.add_json( + "/api/kss-document-preview", + { + "data": [ + { + "name": IndustrialProductionClient().query, + "files": [{"name": cert_file, "url": f"/files/{cert_file}"}], + }, + { + "name": ManufacturesClient().query, + "files": [{"name": manuf_file, "url": f"/files/{manuf_file}"}], + }, + ] + }, + ) + server.add_bytes( + f"/files/{cert_file}", + certificates_bytes, + content_type=( + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + ), + ) + server.add_bytes( + f"/files/{manuf_file}", + manufacturers_bytes, + content_type=( + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + ), + ) + return cert_rows, manuf_rows + + def test_parse_all_minpromtorg_success(self): + with TestHTTPServer() as server: + cert_rows, manuf_rows = self._add_minpromtorg_routes(server) + result = parse_all_minpromtorg( + proxies=[], + client_adapter=server.adapter, + ) + + self.assertIn("industrial", result) + self.assertIn("manufactures", result) + self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows)) + self.assertEqual(ManufacturerRecord.objects.count(), len(manuf_rows)) + + def test_parse_all_sources_success(self): + with TestHTTPServer() as server: + cert_rows, manuf_rows = self._add_minpromtorg_routes(server) + result = parse_all_sources( + proxies=[], + client_adapter=server.adapter, + inspections_use_playwright=False, + ) + + self.assertIn("industrial", result) + self.assertIn("manufactures", result) + self.assertIn("inspections", result) + self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows)) + self.assertEqual(ManufacturerRecord.objects.count(), len(manuf_rows)) + self.assertEqual(InspectionRecord.objects.count(), 0) + + def test_parse_all_minpromtorg_without_adapter(self): + with TestHTTPServer() as server: + cert_rows, manuf_rows = self._add_minpromtorg_routes(server) + + class _LocalIndustrialClient(IndustrialProductionClient): + def __init__(self, *args, **kwargs): + kwargs.setdefault("http_adapter", server.adapter) + super().__init__(*args, **kwargs) + + class _LocalManufacturesClient(ManufacturesClient): + def __init__(self, *args, **kwargs): + kwargs.setdefault("http_adapter", server.adapter) + super().__init__(*args, **kwargs) + + original_industrial = parser_tasks.IndustrialProductionClient + original_manufactures = parser_tasks.ManufacturesClient + parser_tasks.IndustrialProductionClient = _LocalIndustrialClient + parser_tasks.ManufacturesClient = _LocalManufacturesClient + try: + result = parse_all_minpromtorg(proxies=[]) + finally: + parser_tasks.IndustrialProductionClient = original_industrial + parser_tasks.ManufacturesClient = original_manufactures + + self.assertIn("industrial", result) + self.assertIn("manufactures", result) + self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows)) + self.assertEqual(ManufacturerRecord.objects.count(), len(manuf_rows)) + + def test_parse_all_sources_without_adapter(self): + with TestHTTPServer() as server: + cert_rows, manuf_rows = self._add_minpromtorg_routes(server) + + class _LocalIndustrialClient(IndustrialProductionClient): + def __init__(self, *args, **kwargs): + kwargs.setdefault("http_adapter", server.adapter) + super().__init__(*args, **kwargs) + + class _LocalManufacturesClient(ManufacturesClient): + def __init__(self, *args, **kwargs): + kwargs.setdefault("http_adapter", server.adapter) + super().__init__(*args, **kwargs) + + original_industrial = parser_tasks.IndustrialProductionClient + original_manufactures = parser_tasks.ManufacturesClient + parser_tasks.IndustrialProductionClient = _LocalIndustrialClient + parser_tasks.ManufacturesClient = _LocalManufacturesClient + try: + result = parse_all_sources(proxies=[], inspections_use_playwright=None) + finally: + parser_tasks.IndustrialProductionClient = original_industrial + parser_tasks.ManufacturesClient = original_manufactures + + self.assertIn("industrial", result) + self.assertIn("manufactures", result) + self.assertIn("inspections", result) + self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows)) + self.assertEqual(ManufacturerRecord.objects.count(), len(manuf_rows)) + + def test_parse_industrial_production_failure(self): + date_str = fake.date_between(start_date="-30d", end_date="today").strftime( + "%Y%m%d" + ) + cert_file = f"data_resolutions_{date_str}.xlsx" + + with TestHTTPServer() as server: + server.add_json( + "/api/kss-document-preview", + { + "data": [ + { + "name": IndustrialProductionClient().query, + "files": [ + {"name": cert_file, "url": f"/files/{cert_file}"} + ], + } + ] + }, + ) + server.add_bytes("/files/" + cert_file, b"not-an-excel") + result = parse_industrial_production( + proxies=[], + client_adapter=server.adapter, + ) + + self.assertEqual(result["status"], "failed") + + def test_parse_industrial_production_with_default_proxies(self): + with TestHTTPServer() as server: + cert_rows, _manuf_rows = self._add_minpromtorg_routes(server) + result = parse_industrial_production(client_adapter=server.adapter) + + self.assertEqual(result["status"], "success") + self.assertEqual(IndustrialCertificateRecord.objects.count(), len(cert_rows)) + + def test_parse_manufactures_failure(self): + date_str = fake.date_between(start_date="-30d", end_date="today").strftime( + "%Y%m%d" + ) + manuf_file = f"data_orgs_{date_str}.xlsx" + + with TestHTTPServer() as server: + server.add_json( + "/api/kss-document-preview", + { + "data": [ + { + "name": ManufacturesClient().query, + "files": [ + {"name": manuf_file, "url": f"/files/{manuf_file}"} + ], + } + ] + }, + ) + server.add_bytes("/files/" + manuf_file, b"not-an-excel") + result = parse_manufactures(client_adapter=server.adapter) + + self.assertEqual(result["status"], "failed") + + +@override_settings( + CELERY_TASK_ALWAYS_EAGER=True, + CELERY_TASK_EAGER_PROPAGATES=True, +) +class ParseInspectionsTaskTestCase(TestCase): + """Tests for inspections tasks.""" + + def test_parse_inspections_success(self): + xml_content, rows = build_proverki_xml(count=2) + archive = build_zip([("inspections.xml", xml_content)]) + + with TestHTTPServer() as server: + server.add_bytes( + "/opendata/inspections.zip", + archive, + content_type="application/zip", + ) + result = parse_inspections( + file_url="https://proverki.gov.ru/opendata/inspections.zip", + proxies=[], + client_adapter=server.adapter, + use_playwright=False, + ) + + self.assertEqual(result["status"], "success") + self.assertEqual(result["saved"], len(rows)) + self.assertEqual(InspectionRecord.objects.count(), len(rows)) + + def test_parse_inspections_with_default_proxies(self): + xml_content, rows = build_proverki_xml(count=1) + archive = build_zip([("inspections.xml", xml_content)]) + + with TestHTTPServer() as server: + server.add_bytes( + "/opendata/inspections.zip", + archive, + content_type="application/zip", + ) + result = parse_inspections( + file_url="https://proverki.gov.ru/opendata/inspections.zip", + client_adapter=server.adapter, + use_playwright=False, + ) + + self.assertEqual(result["status"], "success") + self.assertEqual(result["saved"], len(rows)) + + def test_parse_inspections_without_adapter(self): + xml_content, rows = build_proverki_xml(count=1) + archive = build_zip([("inspections.xml", xml_content)]) + + with TestHTTPServer() as server: + server.add_bytes( + "/opendata/inspections.zip", + archive, + content_type="application/zip", + ) + + class _LocalProverkiClient(parser_tasks.ProverkiClient): + def __init__(self, *args, **kwargs): + kwargs.setdefault("http_adapter", server.adapter) + kwargs.setdefault("use_playwright", False) + super().__init__(*args, **kwargs) + + original_client = parser_tasks.ProverkiClient + parser_tasks.ProverkiClient = _LocalProverkiClient + try: + result = parse_inspections( + file_url="https://proverki.gov.ru/opendata/inspections.zip", + proxies=[], + ) + finally: + parser_tasks.ProverkiClient = original_client + + self.assertEqual(result["status"], "success") + self.assertEqual(result["saved"], len(rows)) + + def test_parse_inspections_failure(self): + with TestHTTPServer() as server: + server.add_bytes("/opendata/bad.xml", b"not-xml") + result = parse_inspections( + file_url="https://proverki.gov.ru/opendata/bad.xml", + proxies=[], + client_adapter=server.adapter, + use_playwright=False, + ) + + self.assertEqual(result["status"], "failed") + self.assertIn("error", result) + + def test_sync_inspections_success(self): + xml_content, rows = build_proverki_xml(count=2) + archive = build_zip([("inspections.xml", xml_content)]) + + with TestHTTPServer() as server: + server.add_bytes( + _portal_path(2025, 1), + archive, + content_type="application/zip", + ) + result = sync_inspections( + proxies=[], + client_adapter=server.adapter, + use_playwright=False, + current_year=2025, + current_month=1, + ) + + self.assertEqual(result["status"], "success") + self.assertGreaterEqual(result["total_saved"], len(rows) * 2) + + def test_sync_inspections_stops_after_empty_months(self): + empty_xml = b"" + archive = build_zip([("inspections.xml", empty_xml)]) + + with TestHTTPServer() as server: + server.add_bytes( + _portal_path(2025, 1), + archive, + content_type="application/zip", + ) + server.add_bytes( + _portal_path(2025, 2), + archive, + content_type="application/zip", + ) + result = sync_inspections( + proxies=[], + client_adapter=server.adapter, + use_playwright=False, + current_year=2025, + current_month=3, + ) + + self.assertEqual(result["status"], "success") + self.assertEqual(result["total_saved"], 0) + + def test_sync_inspections_resumes_from_last_loaded(self): + last_year = 2024 + last_month = 12 + InspectionRecordFactory( + data_year=last_year, + data_month=last_month, + is_federal_law_248=False, + ) + InspectionRecordFactory( + data_year=last_year, + data_month=last_month, + is_federal_law_248=True, + ) + xml_content, rows = build_proverki_xml(count=1) + archive = build_zip([("inspections.xml", xml_content)]) + + with TestHTTPServer() as server: + server.add_bytes( + _portal_path(2025, 1), + archive, + content_type="application/zip", + ) + + class _LocalProverkiClient(parser_tasks.ProverkiClient): + def __init__(self, *args, **kwargs): + kwargs.setdefault("http_adapter", server.adapter) + kwargs.setdefault("use_playwright", False) + super().__init__(*args, **kwargs) + + original_client = parser_tasks.ProverkiClient + parser_tasks.ProverkiClient = _LocalProverkiClient + try: + result = sync_inspections(current_year=2025, current_month=1) + finally: + parser_tasks.ProverkiClient = original_client + + self.assertEqual(result["status"], "success") + self.assertGreaterEqual(result["total_saved"], len(rows) * 2) + + def test_sync_inspections_handles_fetch_error(self): + with TestHTTPServer() as server: + server.add_bytes( + _portal_path(2025, 1), + b"not-zip", + content_type="application/octet-stream", + ) + + class _LocalProverkiClient(parser_tasks.ProverkiClient): + def __init__(self, *args, **kwargs): + kwargs.setdefault("http_adapter", server.adapter) + kwargs.setdefault("use_playwright", False) + super().__init__(*args, **kwargs) + + original_client = parser_tasks.ProverkiClient + parser_tasks.ProverkiClient = _LocalProverkiClient + try: + result = sync_inspections( + proxies=[], + current_year=2025, + current_month=1, + ) + finally: + parser_tasks.ProverkiClient = original_client + + self.assertEqual(result["status"], "success") + self.assertEqual(result["total_saved"], 0) + + def test_sync_inspections_fails_when_client_init_raises(self): + class _FailClient: + def __init__(self, *args, **kwargs): + raise RuntimeError("init failed") + + original_client = parser_tasks.ProverkiClient + parser_tasks.ProverkiClient = _FailClient + try: + result = sync_inspections(proxies=[]) + finally: + parser_tasks.ProverkiClient = original_client + + self.assertEqual(result["status"], "failed") + + +@override_settings( + CELERY_TASK_ALWAYS_EAGER=True, + CELERY_TASK_EAGER_PROPAGATES=True, +) +class FNSFileTasksTestCase(TestCase): + """Tests for FNS file tasks.""" + + def _dirs(self, base_dir: str) -> tuple[str, str, str]: + watch_dir = os.path.join(base_dir, "watch") + processed_dir = os.path.join(base_dir, "processed") + failed_dir = os.path.join(base_dir, "failed") + return watch_dir, processed_dir, failed_dir + + def _write_fns_file(self, watch_dir: str) -> str: + content = _build_fns_excel_bytes() + external_id = _digits(5) + ogrn = _digits(13) + filename = f"fin_{external_id}_{ogrn}.xlsx" + file_path = os.path.join(watch_dir, filename) + with open(file_path, "wb") as handle: + handle.write(content) + return file_path + + def test_scan_fns_directory_processes_file(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + os.makedirs(watch_dir, exist_ok=True) + file_path = self._write_fns_file(watch_dir) + + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + ): + result = scan_fns_directory() + self.assertEqual(result["queued"], 1) + self.assertEqual(result["skipped"], 0) + self.assertEqual(FinancialReport.objects.count(), 1) + self.assertFalse(os.path.exists(file_path)) + self.assertTrue( + os.path.exists(os.path.join(processed_dir, os.path.basename(file_path))) + ) + + def test_scan_fns_directory_creates_missing_watch_dir(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + ): + result = scan_fns_directory() + self.assertEqual(result, {"scanned": 0, "queued": 0, "skipped": 0}) + self.assertTrue(os.path.exists(watch_dir)) + + def test_scan_fns_directory_skips_locked_file(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + os.makedirs(watch_dir, exist_ok=True) + file_path = self._write_fns_file(watch_dir) + lock_path = f"{file_path}.lock" + with open(lock_path, "w") as handle: + handle.write("lock") + + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + FNS_LOCK_TTL_SECONDS=3600, + ): + result = scan_fns_directory() + self.assertEqual(result["queued"], 0) + self.assertEqual(result["skipped"], 1) + self.assertTrue(os.path.exists(file_path)) + + def test_scan_fns_directory_handles_stale_lock(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + os.makedirs(watch_dir, exist_ok=True) + file_path = self._write_fns_file(watch_dir) + lock_path = f"{file_path}.lock" + with open(lock_path, "w") as handle: + handle.write("lock") + old_time = fake.date_time_between( + start_date="-3d", end_date="-2d" + ).timestamp() + os.utime(lock_path, (old_time, old_time)) + + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + FNS_LOCK_TTL_SECONDS=1, + ): + result = scan_fns_directory() + self.assertEqual(result["queued"], 1) + self.assertEqual(result["skipped"], 0) + + def test_scan_fns_directory_handles_unreadable_file(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + os.makedirs(watch_dir, exist_ok=True) + bad_dir = os.path.join( + watch_dir, f"fin_{fake.pystr(min_chars=4, max_chars=6)}.xlsx" + ) + os.makedirs(bad_dir, exist_ok=True) + + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + ): + result = scan_fns_directory() + self.assertEqual(result["queued"], 0) + self.assertEqual(result["skipped"], 1) + + def test_scan_fns_directory_skips_duplicate_hash(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + os.makedirs(watch_dir, exist_ok=True) + file_path = self._write_fns_file(watch_dir) + file_hash = hashlib.sha256(Path(file_path).read_bytes()).hexdigest() + + FinancialReport.objects.create( + external_id=fake.pystr(min_chars=6, max_chars=10), + ogrn=_digits(13), + file_name=os.path.basename(file_path), + file_hash=file_hash, + load_batch=fake.random_int(min=1, max=9999), + status=FinancialReport.Status.SUCCESS, + source=FinancialReport.SourceType.FILE_WATCH, + ) + + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + ): + result = scan_fns_directory() + self.assertEqual(result["queued"], 0) + self.assertEqual(result["skipped"], 1) + self.assertTrue( + os.path.exists(os.path.join(processed_dir, os.path.basename(file_path))) + ) + + def test_scan_fns_directory_handles_enqueue_error(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + os.makedirs(watch_dir, exist_ok=True) + _ = self._write_fns_file(watch_dir) + + def _raise_delay(*_args, **_kwargs): + raise RuntimeError("enqueue failed") + + original_delay = parser_tasks.process_fns_file.delay + parser_tasks.process_fns_file.delay = _raise_delay + try: + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + ): + result = scan_fns_directory() + finally: + parser_tasks.process_fns_file.delay = original_delay + + self.assertEqual(result["queued"], 0) + self.assertEqual(result["skipped"], 1) + + def test_process_fns_files_batch(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + os.makedirs(watch_dir, exist_ok=True) + valid_path = self._write_fns_file(watch_dir) + missing_path = os.path.join(watch_dir, f"{fake.word()}.xlsx") + + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + ): + result = process_fns_files_batch.apply( + args=[[valid_path, missing_path]] + ).get() + self.assertEqual(result["total"], 2) + self.assertEqual(result["success"], 1) + self.assertEqual(result["failed"], 1) + + def test_process_fns_file_sync_duplicate(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + os.makedirs(watch_dir, exist_ok=True) + file_path = self._write_fns_file(watch_dir) + file_hash = hashlib.sha256(Path(file_path).read_bytes()).hexdigest() + + FinancialReport.objects.create( + external_id=fake.pystr(min_chars=6, max_chars=10), + ogrn=_digits(13), + file_name=os.path.basename(file_path), + file_hash=file_hash, + load_batch=fake.random_int(min=1, max=9999), + status=FinancialReport.Status.SUCCESS, + source=FinancialReport.SourceType.FILE_WATCH, + ) + + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + ): + result = _process_fns_file_sync( + file_path, + task_id=str(fake.uuid4()), + ) + + self.assertEqual(result["status"], "skipped") + self.assertTrue( + os.path.exists(os.path.join(processed_dir, os.path.basename(file_path))) + ) + + def test_process_fns_file_sync_handles_integrity_error(self): + with tempfile.TemporaryDirectory() as tmpdir: + watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) + os.makedirs(watch_dir, exist_ok=True) + file_path = self._write_fns_file(watch_dir) + filename = os.path.basename(file_path) + external_id, ogrn = ( + filename.replace("fin_", "").replace(".xlsx", "").split("_", 1) + ) + + FinancialReport.objects.create( + external_id=external_id, + ogrn=ogrn, + file_name=filename, + file_hash=hashlib.sha256( + fake.pystr(min_chars=8, max_chars=12).encode("utf-8") + ).hexdigest(), + load_batch=fake.random_int(min=1, max=9999), + status=FinancialReport.Status.SUCCESS, + source=FinancialReport.SourceType.FILE_WATCH, + ) + + with override_settings( + FNS_WATCH_DIRECTORY=watch_dir, + FNS_PROCESSED_DIRECTORY=processed_dir, + FNS_FAILED_DIRECTORY=failed_dir, + ): + result = _process_fns_file_sync( + file_path, + task_id=str(fake.uuid4()), + ) + + self.assertEqual(result["status"], "failed") + self.assertTrue( + os.path.exists(os.path.join(failed_dir, os.path.basename(file_path))) + ) + + +class TaskHelpersTestCase(TestCase): + def test_move_to_dir_handles_duplicate_target(self): + with tempfile.TemporaryDirectory() as tmpdir: + source_dir = os.path.join(tmpdir, "source") + target_dir = os.path.join(tmpdir, "target") + os.makedirs(source_dir, exist_ok=True) + os.makedirs(target_dir, exist_ok=True) + + filename = f"{fake.pystr(min_chars=5, max_chars=8)}.txt" + source_path = Path(source_dir) / filename + source_path.write_text(fake.sentence()) + existing_path = Path(target_dir) / filename + existing_path.write_text(fake.sentence()) + + moved = _move_to_dir(source_path, Path(target_dir), suffix="dup") + self.assertIsNotNone(moved) + self.assertTrue(Path(moved).exists()) + self.assertIn("__dup__", Path(moved).name) + + def test_get_next_month_regular(self): + from apps.parsers.tasks import _get_next_month + + year = fake.random_int(min=2024, max=2026) + month = fake.random_int(min=1, max=11) + next_year, next_month = _get_next_month(year, month) + self.assertEqual(next_year, year) + self.assertEqual(next_month, month + 1) + + def test_remove_lock_handles_missing_file(self): + with tempfile.TemporaryDirectory() as tmpdir: + file_path = Path(tmpdir) / f"{fake.pystr(min_chars=4, max_chars=8)}.txt" + _remove_lock(file_path) + + def test_try_create_lock_race(self): + with tempfile.TemporaryDirectory() as tmpdir: + file_path = Path(tmpdir) / f"{fake.pystr(min_chars=4, max_chars=8)}.txt" + file_path.write_text(fake.sentence()) + results: list[bool] = [] + + def worker(): + results.append(_try_create_lock(file_path)) + + threads = [threading.Thread(target=worker) for _ in range(8)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + self.assertEqual(results.count(True), 1) + _remove_lock(file_path) class ParserLoadLogServiceTestCase(TestCase): - """Тесты ParserLoadLogService для закупок.""" + """Tests for ParserLoadLogService methods used in tasks.""" def test_get_next_batch_id_new_source(self): - """Получение следующего batch_id для нового источника.""" + log = ParserLoadLog.objects.filter(source=ParserLoadLog.Source.PROCUREMENTS) + self.assertEqual(log.count(), 0) + from apps.parsers.services import ParserLoadLogService batch_id = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.PROCUREMENTS ) - self.assertEqual(batch_id, 1) def test_get_next_batch_id_increments(self): - """Batch ID увеличивается при каждом вызове.""" - from apps.parsers.services import ParserLoadLogService - ParserLoadLogFactory(source=ParserLoadLog.Source.PROCUREMENTS, batch_id=5) + from apps.parsers.services import ParserLoadLogService + batch_id = ParserLoadLogService.get_next_batch_id( ParserLoadLog.Source.PROCUREMENTS ) - self.assertEqual(batch_id, 6) def test_create_load_log(self): - """Создание записи лога загрузки.""" from apps.parsers.services import ParserLoadLogService log = ParserLoadLogService.create_load_log( source=ParserLoadLog.Source.PROCUREMENTS, batch_id=1, - status="in_progress", ) - - self.assertIsNotNone(log.pk) - self.assertEqual(log.source, ParserLoadLog.Source.PROCUREMENTS) - self.assertEqual(log.batch_id, 1) - self.assertEqual(log.status, "in_progress") + self.assertEqual(log.status, "success") def test_update_load_log(self): - """Обновление записи лога.""" + log = ParserLoadLogFactory() + from apps.parsers.services import ParserLoadLogService - log = ParserLoadLogFactory( - source=ParserLoadLog.Source.PROCUREMENTS, - status="in_progress", - ) - - ParserLoadLogService.update( - log, - status="success", - records_count=100, - ) - + ParserLoadLogService.update_records_count(log, 10) log.refresh_from_db() - self.assertEqual(log.status, "success") - self.assertEqual(log.records_count, 100) + self.assertEqual(log.records_count, 10) def test_mark_failed(self): - """Пометка лога как failed.""" + log = ParserLoadLogFactory() + from apps.parsers.services import ParserLoadLogService - log = ParserLoadLogFactory( - source=ParserLoadLog.Source.PROCUREMENTS, - status="in_progress", - ) - - ParserLoadLogService.mark_failed(log, "Test error message") - + ParserLoadLogService.mark_failed(log, "fail") log.refresh_from_db() self.assertEqual(log.status, "failed") - self.assertEqual(log.error_message, "Test error message")