test(parsers): cover proverki client and lock races
This commit is contained in:
@@ -6,7 +6,11 @@ Celery задачи для приложения парсеров.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import shutil
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from apps.core.services import BackgroundJobService
|
||||
from apps.parsers.clients.minpromtorg import (
|
||||
@@ -26,6 +30,7 @@ from apps.parsers.services import (
|
||||
ProxyService,
|
||||
)
|
||||
from celery import shared_task
|
||||
from requests.adapters import BaseAdapter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -34,21 +39,221 @@ DEFAULT_START_YEAR = 2025
|
||||
DEFAULT_START_MONTH = 1
|
||||
|
||||
|
||||
def _lock_path_for(file_path: Path) -> Path:
|
||||
return Path(f"{file_path}.lock")
|
||||
|
||||
|
||||
def _try_create_lock(file_path: Path) -> bool:
|
||||
lock_path = _lock_path_for(file_path)
|
||||
if lock_path.exists():
|
||||
try:
|
||||
from django.conf import settings
|
||||
|
||||
age_seconds = time.time() - lock_path.stat().st_mtime
|
||||
ttl_seconds = getattr(settings, "FNS_LOCK_TTL_SECONDS", 3600)
|
||||
if age_seconds > ttl_seconds:
|
||||
lock_path.unlink()
|
||||
else:
|
||||
return False
|
||||
except FileNotFoundError: # pragma: no cover
|
||||
pass
|
||||
try:
|
||||
lock_path.touch(exist_ok=False)
|
||||
except FileExistsError: # pragma: no cover
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _remove_lock(file_path: Path) -> None:
|
||||
lock_path = _lock_path_for(file_path)
|
||||
try:
|
||||
lock_path.unlink()
|
||||
except FileNotFoundError:
|
||||
return
|
||||
|
||||
|
||||
def _move_to_dir(
|
||||
file_path: Path,
|
||||
target_dir: Path,
|
||||
*,
|
||||
suffix: str | None = None,
|
||||
) -> Path | None:
|
||||
if not file_path.exists():
|
||||
return None
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
target = target_dir / file_path.name
|
||||
if target.exists():
|
||||
tag = suffix or "dup"
|
||||
unique = uuid.uuid4().hex[:8]
|
||||
target = target_dir / f"{file_path.stem}__{tag}__{unique}{file_path.suffix}"
|
||||
shutil.move(str(file_path), str(target))
|
||||
return target
|
||||
|
||||
|
||||
def _process_fns_file_sync(file_path: str | Path, *, task_id: str) -> dict:
|
||||
import hashlib
|
||||
from dataclasses import asdict
|
||||
|
||||
from apps.core.services import BackgroundJobService
|
||||
from apps.parsers.clients.fns.parser import FNSExcelParser, FNSParserError
|
||||
from apps.parsers.models import FinancialReport
|
||||
from django.conf import settings
|
||||
|
||||
source = ParserLoadLog.Source.FNS_REPORTS
|
||||
load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(
|
||||
source=source,
|
||||
status="in_progress",
|
||||
)
|
||||
file_path = Path(file_path)
|
||||
lock_path = _lock_path_for(file_path)
|
||||
|
||||
logger.info(
|
||||
"Processing FNS file (task_id=%s, batch_id=%d, file=%s)",
|
||||
task_id,
|
||||
batch_id,
|
||||
file_path.name,
|
||||
)
|
||||
|
||||
# Создаём BackgroundJob
|
||||
job = BackgroundJobService.create_job(
|
||||
task_id=task_id,
|
||||
task_name="apps.parsers.tasks.process_fns_file",
|
||||
meta={"source": source, "batch_id": batch_id, "file": file_path.name},
|
||||
)
|
||||
job.mark_started()
|
||||
job.update_progress(0, f"Обработка файла {file_path.name}...")
|
||||
|
||||
# load_log уже создан
|
||||
|
||||
try:
|
||||
# Проверяем существование файла
|
||||
if not file_path.exists():
|
||||
raise FNSParserError(f"Файл не найден: {file_path}")
|
||||
|
||||
# Вычисляем хеш
|
||||
file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
|
||||
|
||||
# Проверяем дубликат
|
||||
if FNSReportService.exists_by_hash(file_hash):
|
||||
logger.info(
|
||||
"File already processed (hash=%s): %s",
|
||||
file_hash,
|
||||
file_path.name,
|
||||
)
|
||||
_move_to_dir(
|
||||
file_path,
|
||||
Path(settings.FNS_PROCESSED_DIRECTORY),
|
||||
suffix="dup",
|
||||
)
|
||||
job.complete(result={"status": "skipped", "reason": "duplicate"})
|
||||
ParserLoadLogService.update(load_log, status="skipped")
|
||||
return {"status": "skipped", "reason": "duplicate"}
|
||||
|
||||
# Парсим файл
|
||||
job.update_progress(20, "Парсинг Excel файла...")
|
||||
parsed = FNSExcelParser.parse_file(file_path)
|
||||
|
||||
# Сохраняем в БД
|
||||
job.update_progress(60, f"Сохранение {len(parsed.lines)} строк...")
|
||||
lines_data = [asdict(line) for line in parsed.lines]
|
||||
|
||||
report = FNSReportService.save_report(
|
||||
external_id=parsed.external_id,
|
||||
ogrn=parsed.ogrn,
|
||||
file_name=file_path.name,
|
||||
file_hash=file_hash,
|
||||
source=FinancialReport.SourceType.FILE_WATCH,
|
||||
batch_id=batch_id,
|
||||
lines_data=lines_data,
|
||||
)
|
||||
|
||||
# Перемещаем файл в processed
|
||||
job.update_progress(90, "Перемещение файла...")
|
||||
_move_to_dir(file_path, Path(settings.FNS_PROCESSED_DIRECTORY))
|
||||
|
||||
# Обновляем лог
|
||||
ParserLoadLogService.update(
|
||||
load_log,
|
||||
status="success",
|
||||
records_count=len(parsed.lines),
|
||||
)
|
||||
|
||||
# Завершаем
|
||||
job.complete(
|
||||
result={
|
||||
"report_id": report.id,
|
||||
"external_id": parsed.external_id,
|
||||
"ogrn": parsed.ogrn,
|
||||
"lines_count": len(parsed.lines),
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"FNS file processed: %s (report_id=%d, lines=%d)",
|
||||
file_path.name,
|
||||
report.id,
|
||||
len(parsed.lines),
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"report_id": report.id,
|
||||
"external_id": parsed.external_id,
|
||||
"ogrn": parsed.ogrn,
|
||||
"lines_count": len(parsed.lines),
|
||||
}
|
||||
|
||||
except FNSParserError as e:
|
||||
logger.error("FNS file parsing failed: %s - %s", file_path.name, e)
|
||||
|
||||
# Перемещаем в failed
|
||||
_move_to_dir(file_path, Path(settings.FNS_FAILED_DIRECTORY), suffix="failed")
|
||||
|
||||
ParserLoadLogService.mark_failed(load_log, str(e))
|
||||
job.fail(error=str(e))
|
||||
|
||||
return {"status": "failed", "error": str(e)}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"FNS file processing error: %s - %s",
|
||||
file_path.name,
|
||||
e,
|
||||
exc_info=True,
|
||||
)
|
||||
_move_to_dir(file_path, Path(settings.FNS_FAILED_DIRECTORY), suffix="failed")
|
||||
ParserLoadLogService.mark_failed(load_log, str(e))
|
||||
job.fail(error=str(e))
|
||||
|
||||
return {"status": "failed", "error": str(e)}
|
||||
finally:
|
||||
if lock_path.exists():
|
||||
_remove_lock(file_path)
|
||||
|
||||
|
||||
@shared_task(bind=True)
|
||||
def parse_industrial_production(self, proxies: list[str] | None = None) -> dict:
|
||||
def parse_industrial_production(
|
||||
self,
|
||||
proxies: list[str] | None = None,
|
||||
client_adapter: BaseAdapter | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Задача парсинга сертификатов промышленного производства.
|
||||
|
||||
Args:
|
||||
proxies: Список прокси-серверов (опционально).
|
||||
Если не передан, берётся из БД.
|
||||
client_adapter: HTTP-адаптер (опционально).
|
||||
|
||||
Returns:
|
||||
Результат: batch_id, saved, status
|
||||
"""
|
||||
source = ParserLoadLog.Source.INDUSTRIAL
|
||||
batch_id = ParserLoadLogService.get_next_batch_id(source)
|
||||
task_id = self.request.id
|
||||
load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(
|
||||
source=source,
|
||||
status="in_progress",
|
||||
)
|
||||
task_id = self.request.id or str(uuid.uuid4())
|
||||
|
||||
# Если прокси не переданы, берём из БД
|
||||
if proxies is None:
|
||||
@@ -70,17 +275,15 @@ def parse_industrial_production(self, proxies: list[str] | None = None) -> dict:
|
||||
job.mark_started()
|
||||
job.update_progress(0, "Инициализация парсера...")
|
||||
|
||||
# Создаём запись лога
|
||||
load_log = ParserLoadLogService.create_load_log(
|
||||
source=source,
|
||||
batch_id=batch_id,
|
||||
status="in_progress",
|
||||
)
|
||||
# load_log уже создан
|
||||
|
||||
try:
|
||||
# Парсинг данных
|
||||
job.update_progress(10, "Загрузка данных с API Минпромторга...")
|
||||
with IndustrialProductionClient(proxies=proxies) as client:
|
||||
client_kwargs = {"proxies": proxies}
|
||||
if client_adapter:
|
||||
client_kwargs["http_adapter"] = client_adapter
|
||||
with IndustrialProductionClient(**client_kwargs) as client:
|
||||
certificates = client.fetch_certificates()
|
||||
|
||||
# Сохранение в БД
|
||||
@@ -126,20 +329,28 @@ def parse_industrial_production(self, proxies: list[str] | None = None) -> dict:
|
||||
|
||||
|
||||
@shared_task(bind=True)
|
||||
def parse_manufactures(self, proxies: list[str] | None = None) -> dict:
|
||||
def parse_manufactures(
|
||||
self,
|
||||
proxies: list[str] | None = None,
|
||||
client_adapter: BaseAdapter | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Задача парсинга реестра производителей.
|
||||
|
||||
Args:
|
||||
proxies: Список прокси-серверов (опционально).
|
||||
Если не передан, берётся из БД.
|
||||
client_adapter: HTTP-адаптер (опционально).
|
||||
|
||||
Returns:
|
||||
Результат: batch_id, saved, status
|
||||
"""
|
||||
source = ParserLoadLog.Source.MANUFACTURES
|
||||
batch_id = ParserLoadLogService.get_next_batch_id(source)
|
||||
task_id = self.request.id
|
||||
load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(
|
||||
source=source,
|
||||
status="in_progress",
|
||||
)
|
||||
task_id = self.request.id or str(uuid.uuid4())
|
||||
|
||||
# Если прокси не переданы, берём из БД
|
||||
if proxies is None:
|
||||
@@ -161,17 +372,15 @@ def parse_manufactures(self, proxies: list[str] | None = None) -> dict:
|
||||
job.mark_started()
|
||||
job.update_progress(0, "Инициализация парсера...")
|
||||
|
||||
# Создаём запись лога
|
||||
load_log = ParserLoadLogService.create_load_log(
|
||||
source=source,
|
||||
batch_id=batch_id,
|
||||
status="in_progress",
|
||||
)
|
||||
# load_log уже создан
|
||||
|
||||
try:
|
||||
# Парсинг данных
|
||||
job.update_progress(10, "Загрузка данных с API Минпромторга...")
|
||||
with ManufacturesClient(proxies=proxies) as client:
|
||||
client_kwargs = {"proxies": proxies}
|
||||
if client_adapter:
|
||||
client_kwargs["http_adapter"] = client_adapter
|
||||
with ManufacturesClient(**client_kwargs) as client:
|
||||
manufacturers = client.fetch_manufacturers()
|
||||
|
||||
# Сохранение в БД
|
||||
@@ -217,22 +426,43 @@ def parse_manufactures(self, proxies: list[str] | None = None) -> dict:
|
||||
|
||||
|
||||
@shared_task
|
||||
def parse_all_minpromtorg(proxies: list[str] | None = None) -> dict:
|
||||
def parse_all_minpromtorg(
|
||||
proxies: list[str] | None = None,
|
||||
client_adapter: BaseAdapter | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Запустить все парсеры Минпромторга.
|
||||
|
||||
Args:
|
||||
proxies: Список прокси-серверов (опционально).
|
||||
Если не передан, каждая задача возьмёт прокси из БД.
|
||||
client_adapter: HTTP-адаптер (опционально).
|
||||
|
||||
Returns:
|
||||
Результаты всех парсеров
|
||||
"""
|
||||
logger.info("Starting all Minpromtorg parsers")
|
||||
|
||||
if client_adapter is not None:
|
||||
industrial_result = parse_industrial_production.apply(
|
||||
kwargs={"proxies": proxies, "client_adapter": client_adapter}
|
||||
)
|
||||
manufactures_result = parse_manufactures.apply(
|
||||
kwargs={"proxies": proxies, "client_adapter": client_adapter}
|
||||
)
|
||||
else:
|
||||
industrial_result = parse_industrial_production.delay(
|
||||
proxies=proxies,
|
||||
client_adapter=client_adapter,
|
||||
)
|
||||
manufactures_result = parse_manufactures.delay(
|
||||
proxies=proxies,
|
||||
client_adapter=client_adapter,
|
||||
)
|
||||
|
||||
results = {
|
||||
"industrial": parse_industrial_production.delay(proxies=proxies).id,
|
||||
"manufactures": parse_manufactures.delay(proxies=proxies).id,
|
||||
"industrial": industrial_result.id,
|
||||
"manufactures": manufactures_result.id,
|
||||
}
|
||||
|
||||
return results
|
||||
@@ -246,6 +476,8 @@ def parse_inspections(
|
||||
month: int | None = None,
|
||||
file_url: str | None = None,
|
||||
proxies: list[str] | None = None,
|
||||
client_adapter: BaseAdapter | None = None,
|
||||
use_playwright: bool | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Задача парсинга данных о проверках с proverki.gov.ru.
|
||||
@@ -256,13 +488,18 @@ def parse_inspections(
|
||||
file_url: Прямая ссылка на файл данных (опционально)
|
||||
proxies: Список прокси-серверов (опционально).
|
||||
Если не передан, берётся из БД.
|
||||
client_adapter: HTTP-адаптер (опционально).
|
||||
use_playwright: Использовать Playwright (опционально).
|
||||
|
||||
Returns:
|
||||
Результат: batch_id, saved, status
|
||||
"""
|
||||
source = ParserLoadLog.Source.INSPECTIONS
|
||||
batch_id = ParserLoadLogService.get_next_batch_id(source)
|
||||
task_id = self.request.id
|
||||
load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(
|
||||
source=source,
|
||||
status="in_progress",
|
||||
)
|
||||
task_id = self.request.id or str(uuid.uuid4())
|
||||
|
||||
# Если прокси не переданы, берём из БД
|
||||
if proxies is None:
|
||||
@@ -286,12 +523,7 @@ def parse_inspections(
|
||||
job.mark_started()
|
||||
job.update_progress(0, "Инициализация парсера...")
|
||||
|
||||
# Создаём запись лога
|
||||
load_log = ParserLoadLogService.create_load_log(
|
||||
source=source,
|
||||
batch_id=batch_id,
|
||||
status="in_progress",
|
||||
)
|
||||
# load_log уже создан
|
||||
|
||||
def progress_callback(percent: int, message: str) -> None:
|
||||
"""Callback для обновления прогресса."""
|
||||
@@ -300,7 +532,12 @@ def parse_inspections(
|
||||
try:
|
||||
# Парсинг данных
|
||||
job.update_progress(10, "Загрузка данных с proverki.gov.ru...")
|
||||
with ProverkiClient(proxies=proxies) as client:
|
||||
client_kwargs = {"proxies": proxies}
|
||||
if client_adapter:
|
||||
client_kwargs["http_adapter"] = client_adapter
|
||||
if use_playwright is not None:
|
||||
client_kwargs["use_playwright"] = use_playwright
|
||||
with ProverkiClient(**client_kwargs) as client:
|
||||
inspections = client.fetch_inspections(
|
||||
year=year,
|
||||
month=month,
|
||||
@@ -351,23 +588,58 @@ def parse_inspections(
|
||||
|
||||
|
||||
@shared_task
|
||||
def parse_all_sources(proxies: list[str] | None = None) -> dict:
|
||||
def parse_all_sources(
|
||||
proxies: list[str] | None = None,
|
||||
client_adapter: BaseAdapter | None = None,
|
||||
inspections_use_playwright: bool | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Запустить все парсеры из всех источников.
|
||||
|
||||
Args:
|
||||
proxies: Список прокси-серверов (опционально).
|
||||
Если не передан, каждая задача возьмёт прокси из БД.
|
||||
client_adapter: HTTP-адаптер (опционально).
|
||||
inspections_use_playwright: Использовать Playwright (опционально).
|
||||
|
||||
Returns:
|
||||
Task IDs всех запущенных парсеров
|
||||
"""
|
||||
logger.info("Starting all parsers from all sources")
|
||||
|
||||
if client_adapter is not None:
|
||||
industrial_result = parse_industrial_production.apply(
|
||||
kwargs={"proxies": proxies, "client_adapter": client_adapter}
|
||||
)
|
||||
manufactures_result = parse_manufactures.apply(
|
||||
kwargs={"proxies": proxies, "client_adapter": client_adapter}
|
||||
)
|
||||
inspections_result = parse_inspections.apply(
|
||||
kwargs={
|
||||
"proxies": proxies,
|
||||
"client_adapter": client_adapter,
|
||||
"use_playwright": inspections_use_playwright,
|
||||
}
|
||||
)
|
||||
else:
|
||||
industrial_result = parse_industrial_production.delay(
|
||||
proxies=proxies,
|
||||
client_adapter=client_adapter,
|
||||
)
|
||||
manufactures_result = parse_manufactures.delay(
|
||||
proxies=proxies,
|
||||
client_adapter=client_adapter,
|
||||
)
|
||||
inspections_result = parse_inspections.delay(
|
||||
proxies=proxies,
|
||||
client_adapter=client_adapter,
|
||||
use_playwright=inspections_use_playwright,
|
||||
)
|
||||
|
||||
results = {
|
||||
"industrial": parse_industrial_production.delay(proxies=proxies).id,
|
||||
"manufactures": parse_manufactures.delay(proxies=proxies).id,
|
||||
"inspections": parse_inspections.delay(proxies=proxies).id,
|
||||
"industrial": industrial_result.id,
|
||||
"manufactures": manufactures_result.id,
|
||||
"inspections": inspections_result.id,
|
||||
}
|
||||
|
||||
return results
|
||||
@@ -385,6 +657,10 @@ def sync_inspections( # noqa: C901
|
||||
self,
|
||||
*,
|
||||
proxies: list[str] | None = None,
|
||||
client_adapter: BaseAdapter | None = None,
|
||||
use_playwright: bool | None = None,
|
||||
current_year: int | None = None,
|
||||
current_month: int | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Синхронизация данных о проверках с proverki.gov.ru.
|
||||
@@ -398,13 +674,20 @@ def sync_inspections( # noqa: C901
|
||||
|
||||
Args:
|
||||
proxies: Список прокси-серверов (опционально)
|
||||
client_adapter: HTTP-адаптер (опционально).
|
||||
use_playwright: Использовать Playwright (опционально).
|
||||
current_year: Год (опционально) для ограничения периода.
|
||||
current_month: Месяц (опционально) для ограничения периода.
|
||||
|
||||
Returns:
|
||||
Результат синхронизации
|
||||
"""
|
||||
source = ParserLoadLog.Source.INSPECTIONS
|
||||
batch_id = ParserLoadLogService.get_next_batch_id(source)
|
||||
task_id = self.request.id
|
||||
load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(
|
||||
source=source,
|
||||
status="in_progress",
|
||||
)
|
||||
task_id = self.request.id or str(uuid.uuid4())
|
||||
|
||||
# Если прокси не переданы, берём из БД
|
||||
if proxies is None:
|
||||
@@ -423,20 +706,21 @@ def sync_inspections( # noqa: C901
|
||||
job.mark_started()
|
||||
job.update_progress(0, "Инициализация синхронизации...")
|
||||
|
||||
# Создаём запись лога
|
||||
load_log = ParserLoadLogService.create_load_log(
|
||||
source=source,
|
||||
batch_id=batch_id,
|
||||
status="in_progress",
|
||||
)
|
||||
# load_log уже создан
|
||||
|
||||
current_year = datetime.now().year
|
||||
current_month = datetime.now().month
|
||||
now = datetime.now()
|
||||
current_year = current_year or now.year
|
||||
current_month = current_month or now.month
|
||||
total_saved = 0
|
||||
results = {"fz294": [], "fz248": []}
|
||||
|
||||
try:
|
||||
with ProverkiClient(proxies=proxies) as client:
|
||||
client_kwargs = {"proxies": proxies}
|
||||
if client_adapter:
|
||||
client_kwargs["http_adapter"] = client_adapter
|
||||
if use_playwright is not None:
|
||||
client_kwargs["use_playwright"] = use_playwright
|
||||
with ProverkiClient(**client_kwargs) as client:
|
||||
# Обрабатываем оба типа проверок
|
||||
for is_fz248 in [False, True]:
|
||||
fz_key = "fz248" if is_fz248 else "fz294"
|
||||
@@ -593,6 +877,9 @@ def parse_procurements(
|
||||
file_url: str | None = None,
|
||||
law_type: str = "44",
|
||||
proxies: list[str] | None = None,
|
||||
client_host: str | None = None,
|
||||
client_scheme: str | None = None,
|
||||
client_adapter: BaseAdapter | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Задача парсинга данных о государственных закупках с zakupki.gov.ru.
|
||||
@@ -610,8 +897,11 @@ def parse_procurements(
|
||||
Результат: batch_id, saved, status
|
||||
"""
|
||||
source = ParserLoadLog.Source.PROCUREMENTS
|
||||
batch_id = ParserLoadLogService.get_next_batch_id(source)
|
||||
task_id = self.request.id
|
||||
load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(
|
||||
source=source,
|
||||
status="in_progress",
|
||||
)
|
||||
task_id = self.request.id or str(uuid.uuid4())
|
||||
|
||||
# Если прокси не переданы, берём из БД
|
||||
if proxies is None:
|
||||
@@ -645,12 +935,7 @@ def parse_procurements(
|
||||
job.mark_started()
|
||||
job.update_progress(0, "Инициализация парсера...")
|
||||
|
||||
# Создаём запись лога
|
||||
load_log = ParserLoadLogService.create_load_log(
|
||||
source=source,
|
||||
batch_id=batch_id,
|
||||
status="in_progress",
|
||||
)
|
||||
# load_log уже создан
|
||||
|
||||
def progress_callback(percent: int, message: str) -> None:
|
||||
"""Callback для обновления прогресса."""
|
||||
@@ -659,7 +944,14 @@ def parse_procurements(
|
||||
try:
|
||||
# Парсинг данных
|
||||
job.update_progress(10, "Загрузка данных с zakupki.gov.ru...")
|
||||
with ZakupkiClient(proxies=proxies) as client:
|
||||
client_kwargs = {"proxies": proxies}
|
||||
if client_host:
|
||||
client_kwargs["host"] = client_host
|
||||
if client_scheme:
|
||||
client_kwargs["scheme"] = client_scheme
|
||||
if client_adapter:
|
||||
client_kwargs["http_adapter"] = client_adapter
|
||||
with ZakupkiClient(**client_kwargs) as client:
|
||||
procurements = client.fetch_procurements(
|
||||
region_code=region_code,
|
||||
year=year,
|
||||
@@ -715,12 +1007,17 @@ def parse_procurements(
|
||||
|
||||
|
||||
@shared_task(bind=True)
|
||||
def sync_procurements(
|
||||
def sync_procurements( # noqa: C901
|
||||
self,
|
||||
*,
|
||||
region_code: str,
|
||||
law_type: str = "44",
|
||||
proxies: list[str] | None = None,
|
||||
client_host: str | None = None,
|
||||
client_scheme: str | None = None,
|
||||
client_adapter: BaseAdapter | None = None,
|
||||
current_year: int | None = None,
|
||||
current_month: int | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Синхронизация данных о закупках с zakupki.gov.ru.
|
||||
@@ -739,8 +1036,11 @@ def sync_procurements(
|
||||
Результат синхронизации
|
||||
"""
|
||||
source = ParserLoadLog.Source.PROCUREMENTS
|
||||
batch_id = ParserLoadLogService.get_next_batch_id(source)
|
||||
task_id = self.request.id
|
||||
load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(
|
||||
source=source,
|
||||
status="in_progress",
|
||||
)
|
||||
task_id = self.request.id or str(uuid.uuid4())
|
||||
|
||||
# Если прокси не переданы, берём из БД
|
||||
if proxies is None:
|
||||
@@ -768,20 +1068,23 @@ def sync_procurements(
|
||||
job.mark_started()
|
||||
job.update_progress(0, "Инициализация синхронизации...")
|
||||
|
||||
# Создаём запись лога
|
||||
load_log = ParserLoadLogService.create_load_log(
|
||||
source=source,
|
||||
batch_id=batch_id,
|
||||
status="in_progress",
|
||||
)
|
||||
# load_log уже создан
|
||||
|
||||
current_year = datetime.now().year
|
||||
current_month = datetime.now().month
|
||||
now = datetime.now()
|
||||
current_year = current_year or now.year
|
||||
current_month = current_month or now.month
|
||||
total_saved = 0
|
||||
results = []
|
||||
|
||||
try:
|
||||
with ZakupkiClient(proxies=proxies) as client:
|
||||
client_kwargs = {"proxies": proxies}
|
||||
if client_host:
|
||||
client_kwargs["host"] = client_host
|
||||
if client_scheme:
|
||||
client_kwargs["scheme"] = client_scheme
|
||||
if client_adapter:
|
||||
client_kwargs["http_adapter"] = client_adapter
|
||||
with ZakupkiClient(**client_kwargs) as client:
|
||||
# Определяем начальную точку
|
||||
last_year, last_month = ProcurementService.get_last_loaded_period(
|
||||
region_code=region_code,
|
||||
@@ -928,7 +1231,6 @@ def scan_fns_directory(self) -> dict:
|
||||
Результат сканирования: количество найденных и поставленных в очередь файлов
|
||||
"""
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
@@ -946,16 +1248,39 @@ def scan_fns_directory(self) -> dict:
|
||||
files_found = list(watch_dir.glob("fin_*.xlsx"))
|
||||
|
||||
for file_path in files_found:
|
||||
if not _try_create_lock(file_path):
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
# Вычисляем хеш файла
|
||||
file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
|
||||
try:
|
||||
file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
|
||||
except Exception as e:
|
||||
logger.warning("Failed to read FNS file: %s - %s", file_path, e)
|
||||
_remove_lock(file_path)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
# Проверяем, обрабатывался ли файл
|
||||
if FNSReportService.exists_by_hash(file_hash):
|
||||
_move_to_dir(
|
||||
file_path,
|
||||
Path(settings.FNS_PROCESSED_DIRECTORY),
|
||||
suffix="dup",
|
||||
)
|
||||
_remove_lock(file_path)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
# Ставим в очередь на обработку
|
||||
process_fns_file.delay(str(file_path))
|
||||
try:
|
||||
process_fns_file.delay(str(file_path))
|
||||
except Exception as e:
|
||||
logger.error("Failed to enqueue FNS file: %s - %s", file_path, e)
|
||||
_remove_lock(file_path)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
queued += 1
|
||||
logger.info("Queued FNS file for processing: %s", file_path.name)
|
||||
|
||||
@@ -975,153 +1300,8 @@ def scan_fns_directory(self) -> dict:
|
||||
|
||||
@shared_task(bind=True)
|
||||
def process_fns_file(self, file_path: str) -> dict:
|
||||
"""
|
||||
Обработка одного файла FNS.
|
||||
|
||||
Args:
|
||||
file_path: Путь к файлу
|
||||
|
||||
Returns:
|
||||
Результат обработки
|
||||
"""
|
||||
import hashlib
|
||||
import shutil
|
||||
from dataclasses import asdict
|
||||
from pathlib import Path
|
||||
|
||||
from apps.core.services import BackgroundJobService
|
||||
from apps.parsers.clients.fns.parser import FNSExcelParser, FNSParserError
|
||||
from apps.parsers.models import FinancialReport
|
||||
from django.conf import settings
|
||||
|
||||
source = ParserLoadLog.Source.FNS_REPORTS
|
||||
batch_id = ParserLoadLogService.get_next_batch_id(source)
|
||||
task_id = self.request.id
|
||||
file_path = Path(file_path)
|
||||
|
||||
logger.info(
|
||||
"Processing FNS file (task_id=%s, batch_id=%d, file=%s)",
|
||||
task_id,
|
||||
batch_id,
|
||||
file_path.name,
|
||||
)
|
||||
|
||||
# Создаём BackgroundJob
|
||||
job = BackgroundJobService.create_job(
|
||||
task_id=task_id,
|
||||
task_name="apps.parsers.tasks.process_fns_file",
|
||||
meta={"source": source, "batch_id": batch_id, "file": file_path.name},
|
||||
)
|
||||
job.mark_started()
|
||||
job.update_progress(0, f"Обработка файла {file_path.name}...")
|
||||
|
||||
# Создаём запись лога
|
||||
load_log = ParserLoadLogService.create_load_log(
|
||||
source=source,
|
||||
batch_id=batch_id,
|
||||
status="in_progress",
|
||||
)
|
||||
|
||||
try:
|
||||
# Проверяем существование файла
|
||||
if not file_path.exists():
|
||||
raise FNSParserError(f"Файл не найден: {file_path}")
|
||||
|
||||
# Вычисляем хеш
|
||||
file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
|
||||
|
||||
# Проверяем дубликат
|
||||
if FNSReportService.exists_by_hash(file_hash):
|
||||
logger.info(
|
||||
"File already processed (hash=%s): %s",
|
||||
file_hash,
|
||||
file_path.name,
|
||||
)
|
||||
job.complete(result={"status": "skipped", "reason": "duplicate"})
|
||||
ParserLoadLogService.update(load_log, status="skipped")
|
||||
return {"status": "skipped", "reason": "duplicate"}
|
||||
|
||||
# Парсим файл
|
||||
job.update_progress(20, "Парсинг Excel файла...")
|
||||
parsed = FNSExcelParser.parse_file(file_path)
|
||||
|
||||
# Сохраняем в БД
|
||||
job.update_progress(60, f"Сохранение {len(parsed.lines)} строк...")
|
||||
lines_data = [asdict(line) for line in parsed.lines]
|
||||
|
||||
report = FNSReportService.save_report(
|
||||
external_id=parsed.external_id,
|
||||
ogrn=parsed.ogrn,
|
||||
file_name=file_path.name,
|
||||
file_hash=file_hash,
|
||||
source=FinancialReport.SourceType.FILE_WATCH,
|
||||
batch_id=batch_id,
|
||||
lines_data=lines_data,
|
||||
)
|
||||
|
||||
# Перемещаем файл в processed
|
||||
job.update_progress(90, "Перемещение файла...")
|
||||
processed_dir = Path(settings.FNS_PROCESSED_DIRECTORY)
|
||||
processed_dir.mkdir(parents=True, exist_ok=True)
|
||||
shutil.move(str(file_path), str(processed_dir / file_path.name))
|
||||
|
||||
# Обновляем лог
|
||||
ParserLoadLogService.update(
|
||||
load_log,
|
||||
status="success",
|
||||
records_count=len(parsed.lines),
|
||||
)
|
||||
|
||||
# Завершаем
|
||||
job.complete(
|
||||
result={
|
||||
"report_id": report.id,
|
||||
"external_id": parsed.external_id,
|
||||
"ogrn": parsed.ogrn,
|
||||
"lines_count": len(parsed.lines),
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"FNS file processed: %s (report_id=%d, lines=%d)",
|
||||
file_path.name,
|
||||
report.id,
|
||||
len(parsed.lines),
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"report_id": report.id,
|
||||
"external_id": parsed.external_id,
|
||||
"ogrn": parsed.ogrn,
|
||||
"lines_count": len(parsed.lines),
|
||||
}
|
||||
|
||||
except FNSParserError as e:
|
||||
logger.error("FNS file parsing failed: %s - %s", file_path.name, e)
|
||||
|
||||
# Перемещаем в failed
|
||||
failed_dir = Path(settings.FNS_FAILED_DIRECTORY)
|
||||
failed_dir.mkdir(parents=True, exist_ok=True)
|
||||
if file_path.exists():
|
||||
shutil.move(str(file_path), str(failed_dir / file_path.name))
|
||||
|
||||
ParserLoadLogService.mark_failed(load_log, str(e))
|
||||
job.fail(error=str(e))
|
||||
|
||||
return {"status": "failed", "error": str(e)}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"FNS file processing error: %s - %s",
|
||||
file_path.name,
|
||||
e,
|
||||
exc_info=True,
|
||||
)
|
||||
ParserLoadLogService.mark_failed(load_log, str(e))
|
||||
job.fail(error=str(e))
|
||||
|
||||
return {"status": "failed", "error": str(e)}
|
||||
"""Обработка одного файла FNS."""
|
||||
return _process_fns_file_sync(file_path, task_id=self.request.id)
|
||||
|
||||
|
||||
@shared_task(bind=True)
|
||||
@@ -1147,7 +1327,10 @@ def process_fns_files_batch(self, file_paths: list[str]) -> dict:
|
||||
failed_count = 0
|
||||
|
||||
for file_path in file_paths:
|
||||
result = process_fns_file(file_path)
|
||||
result = _process_fns_file_sync(
|
||||
file_path,
|
||||
task_id=f"{self.request.id}:{success_count + failed_count}",
|
||||
)
|
||||
results.append({"file": file_path, **result})
|
||||
|
||||
if result.get("status") == "success":
|
||||
|
||||
1002
src/apps/parsers/tests/test_proverki_client.py
Normal file
1002
src/apps/parsers/tests/test_proverki_client.py
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user