Files
mostovik-backend/src/apps/parsers/tasks.py
Aleksandr Meshchriakov cd0e21350b feat(fns): парсер ФНС бухгалтерской отчетности
- Модели FinancialReport и FinancialReportLine
- FNSExcelParser для файлов fin_{id}_{ogrn}.xlsx
- FNSReportService с дедупликацией по хешу файла
- Celery задачи для мониторинга папки (каждые 5 мин)
- API: POST /fns/upload/, GET /fns/reports/
- Django admin интеграция
- 25 unit-тестов
2026-02-01 14:44:19 +01:00

1171 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Celery задачи для приложения парсеров.
Задачи являются тонкими обёртками над сервисами и клиентами.
Интегрируются с BackgroundJob для отслеживания прогресса.
"""
import logging
from datetime import datetime
from apps.core.services import BackgroundJobService
from apps.parsers.clients.minpromtorg import (
IndustrialProductionClient,
ManufacturesClient,
)
from apps.parsers.clients.proverki import ProverkiClient
from apps.parsers.clients.zakupki import ZakupkiClient
from apps.parsers.models import ParserLoadLog
from apps.parsers.services import (
FNSReportService,
IndustrialCertificateService,
InspectionService,
ManufacturerService,
ParserLoadLogService,
ProcurementService,
ProxyService,
)
from celery import shared_task
logger = logging.getLogger(__name__)
# Константы для синхронизации проверок
DEFAULT_START_YEAR = 2025
DEFAULT_START_MONTH = 1
@shared_task(bind=True)
def parse_industrial_production(self, proxies: list[str] | None = None) -> dict:
"""
Задача парсинга сертификатов промышленного производства.
Args:
proxies: Список прокси-серверов (опционально).
Если не передан, берётся из БД.
Returns:
Результат: batch_id, saved, status
"""
source = ParserLoadLog.Source.INDUSTRIAL
batch_id = ParserLoadLogService.get_next_batch_id(source)
task_id = self.request.id
# Если прокси не переданы, берём из БД
if proxies is None:
proxies = ProxyService.get_active_proxies_or_none()
logger.info(
"Starting industrial production parsing (task_id=%s, batch_id=%d, proxies=%d)",
task_id,
batch_id,
len(proxies) if proxies else 0,
)
# Создаём запись BackgroundJob для отслеживания прогресса
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.parse_industrial_production",
meta={"source": source, "batch_id": batch_id},
)
job.mark_started()
job.update_progress(0, "Инициализация парсера...")
# Создаём запись лога
load_log = ParserLoadLogService.create_load_log(
source=source,
batch_id=batch_id,
status="in_progress",
)
try:
# Парсинг данных
job.update_progress(10, "Загрузка данных с API Минпромторга...")
with IndustrialProductionClient(proxies=proxies) as client:
certificates = client.fetch_certificates()
# Сохранение в БД
job.update_progress(50, f"Сохранение {len(certificates)} сертификатов...")
saved_count = IndustrialCertificateService.save_certificates(
certificates,
batch_id=batch_id,
)
# Обновляем лог
ParserLoadLogService.update(
load_log,
status="success",
records_count=saved_count,
)
# Завершаем BackgroundJob
job.complete(result={"batch_id": batch_id, "saved": saved_count})
logger.info(
"Industrial production parsing completed (batch_id=%d, saved=%d)",
batch_id,
saved_count,
)
return {
"batch_id": batch_id,
"saved": saved_count,
"status": "success",
}
except Exception as e:
logger.error("Industrial production parsing failed: %s", e, exc_info=True)
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {
"batch_id": batch_id,
"saved": 0,
"status": "failed",
"error": str(e),
}
@shared_task(bind=True)
def parse_manufactures(self, proxies: list[str] | None = None) -> dict:
"""
Задача парсинга реестра производителей.
Args:
proxies: Список прокси-серверов (опционально).
Если не передан, берётся из БД.
Returns:
Результат: batch_id, saved, status
"""
source = ParserLoadLog.Source.MANUFACTURES
batch_id = ParserLoadLogService.get_next_batch_id(source)
task_id = self.request.id
# Если прокси не переданы, берём из БД
if proxies is None:
proxies = ProxyService.get_active_proxies_or_none()
logger.info(
"Starting manufactures parsing (task_id=%s, batch_id=%d, proxies=%d)",
task_id,
batch_id,
len(proxies) if proxies else 0,
)
# Создаём запись BackgroundJob для отслеживания прогресса
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.parse_manufactures",
meta={"source": source, "batch_id": batch_id},
)
job.mark_started()
job.update_progress(0, "Инициализация парсера...")
# Создаём запись лога
load_log = ParserLoadLogService.create_load_log(
source=source,
batch_id=batch_id,
status="in_progress",
)
try:
# Парсинг данных
job.update_progress(10, "Загрузка данных с API Минпромторга...")
with ManufacturesClient(proxies=proxies) as client:
manufacturers = client.fetch_manufacturers()
# Сохранение в БД
job.update_progress(50, f"Сохранение {len(manufacturers)} производителей...")
saved_count = ManufacturerService.save_manufacturers(
manufacturers,
batch_id=batch_id,
)
# Обновляем лог
ParserLoadLogService.update(
load_log,
status="success",
records_count=saved_count,
)
# Завершаем BackgroundJob
job.complete(result={"batch_id": batch_id, "saved": saved_count})
logger.info(
"Manufactures parsing completed (batch_id=%d, saved=%d)",
batch_id,
saved_count,
)
return {
"batch_id": batch_id,
"saved": saved_count,
"status": "success",
}
except Exception as e:
logger.error("Manufactures parsing failed: %s", e, exc_info=True)
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {
"batch_id": batch_id,
"saved": 0,
"status": "failed",
"error": str(e),
}
@shared_task
def parse_all_minpromtorg(proxies: list[str] | None = None) -> dict:
"""
Запустить все парсеры Минпромторга.
Args:
proxies: Список прокси-серверов (опционально).
Если не передан, каждая задача возьмёт прокси из БД.
Returns:
Результаты всех парсеров
"""
logger.info("Starting all Minpromtorg parsers")
results = {
"industrial": parse_industrial_production.delay(proxies=proxies).id,
"manufactures": parse_manufactures.delay(proxies=proxies).id,
}
return results
@shared_task(bind=True)
def parse_inspections(
self,
*,
year: int | None = None,
month: int | None = None,
file_url: str | None = None,
proxies: list[str] | None = None,
) -> dict:
"""
Задача парсинга данных о проверках с proverki.gov.ru.
Args:
year: Год плана проверок (опционально)
month: Месяц (опционально)
file_url: Прямая ссылка на файл данных (опционально)
proxies: Список прокси-серверов (опционально).
Если не передан, берётся из БД.
Returns:
Результат: batch_id, saved, status
"""
source = ParserLoadLog.Source.INSPECTIONS
batch_id = ParserLoadLogService.get_next_batch_id(source)
task_id = self.request.id
# Если прокси не переданы, берём из БД
if proxies is None:
proxies = ProxyService.get_active_proxies_or_none()
logger.info(
"Starting inspections parsing (task_id=%s, batch_id=%d, year=%s, month=%s, proxies=%d)",
task_id,
batch_id,
year,
month,
len(proxies) if proxies else 0,
)
# Создаём запись BackgroundJob для отслеживания прогресса
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.parse_inspections",
meta={"source": source, "batch_id": batch_id, "year": year, "month": month},
)
job.mark_started()
job.update_progress(0, "Инициализация парсера...")
# Создаём запись лога
load_log = ParserLoadLogService.create_load_log(
source=source,
batch_id=batch_id,
status="in_progress",
)
def progress_callback(percent: int, message: str) -> None:
"""Callback для обновления прогресса."""
job.update_progress(percent, message)
try:
# Парсинг данных
job.update_progress(10, "Загрузка данных с proverki.gov.ru...")
with ProverkiClient(proxies=proxies) as client:
inspections = client.fetch_inspections(
year=year,
month=month,
file_url=file_url,
progress_callback=progress_callback,
)
# Сохранение в БД
job.update_progress(80, f"Сохранение {len(inspections)} проверок...")
saved_count = InspectionService.save_inspections(
inspections,
batch_id=batch_id,
)
# Обновляем лог
ParserLoadLogService.update(
load_log,
status="success",
records_count=saved_count,
)
# Завершаем BackgroundJob
job.complete(result={"batch_id": batch_id, "saved": saved_count})
logger.info(
"Inspections parsing completed (batch_id=%d, saved=%d)",
batch_id,
saved_count,
)
return {
"batch_id": batch_id,
"saved": saved_count,
"status": "success",
}
except Exception as e:
logger.error("Inspections parsing failed: %s", e, exc_info=True)
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {
"batch_id": batch_id,
"saved": 0,
"status": "failed",
"error": str(e),
}
@shared_task
def parse_all_sources(proxies: list[str] | None = None) -> dict:
"""
Запустить все парсеры из всех источников.
Args:
proxies: Список прокси-серверов (опционально).
Если не передан, каждая задача возьмёт прокси из БД.
Returns:
Task IDs всех запущенных парсеров
"""
logger.info("Starting all parsers from all sources")
results = {
"industrial": parse_industrial_production.delay(proxies=proxies).id,
"manufactures": parse_manufactures.delay(proxies=proxies).id,
"inspections": parse_inspections.delay(proxies=proxies).id,
}
return results
def _get_next_month(year: int, month: int) -> tuple[int, int]:
"""Получить следующий месяц."""
if month == 12:
return year + 1, 1
return year, month + 1
@shared_task(bind=True)
def sync_inspections( # noqa: C901
self,
*,
proxies: list[str] | None = None,
) -> dict:
"""
Синхронизация данных о проверках с proverki.gov.ru.
Логика работы:
1. Проверяет последнюю загруженную дату в БД
2. Если данных нет - начинает с 01.01.2025
3. Загружает месяц за месяцем до конца текущего года
4. Загружает оба типа проверок (ФЗ-294 и ФЗ-248)
5. Если данных нет за период - прекращает загрузку для этого типа
Args:
proxies: Список прокси-серверов (опционально)
Returns:
Результат синхронизации
"""
source = ParserLoadLog.Source.INSPECTIONS
batch_id = ParserLoadLogService.get_next_batch_id(source)
task_id = self.request.id
# Если прокси не переданы, берём из БД
if proxies is None:
proxies = ProxyService.get_active_proxies_or_none()
logger.info(
"Starting inspections sync (task_id=%s, batch_id=%d)", task_id, batch_id
)
# Создаём запись BackgroundJob
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.sync_inspections",
meta={"source": source, "batch_id": batch_id},
)
job.mark_started()
job.update_progress(0, "Инициализация синхронизации...")
# Создаём запись лога
load_log = ParserLoadLogService.create_load_log(
source=source,
batch_id=batch_id,
status="in_progress",
)
current_year = datetime.now().year
current_month = datetime.now().month
total_saved = 0
results = {"fz294": [], "fz248": []}
try:
with ProverkiClient(proxies=proxies) as client:
# Обрабатываем оба типа проверок
for is_fz248 in [False, True]:
fz_key = "fz248" if is_fz248 else "fz294"
fz_name = "ФЗ-248" if is_fz248 else "ФЗ-294"
# Определяем начальную точку
last_year, last_month = InspectionService.get_last_loaded_period(
is_federal_law_248=is_fz248
)
if last_year and last_month:
# Начинаем со следующего месяца после последнего загруженного
start_year, start_month = _get_next_month(last_year, last_month)
logger.info(
"%s: continuing from %d/%d (last loaded: %d/%d)",
fz_name,
start_year,
start_month,
last_year,
last_month,
)
else:
# Начинаем с дефолтной даты
start_year, start_month = DEFAULT_START_YEAR, DEFAULT_START_MONTH
logger.info(
"%s: no data in DB, starting from %d/%d",
fz_name,
start_year,
start_month,
)
# Загружаем месяц за месяцем
year, month = start_year, start_month
empty_months_count = 0
while year < current_year or (
year == current_year and month <= current_month
):
# Прекращаем если 2 месяца подряд нет данных
if empty_months_count >= 2:
logger.info(
"%s: stopping after %d empty months",
fz_name,
empty_months_count,
)
break
job.update_progress(
20 + (50 if is_fz248 else 0),
f"Загрузка {fz_name} за {month:02d}/{year}...",
)
try:
inspections = client.fetch_inspections(
year=year,
month=month,
is_federal_law_248=is_fz248,
)
if inspections:
saved = InspectionService.save_inspections(
inspections,
batch_id=batch_id,
is_federal_law_248=is_fz248,
data_year=year,
data_month=month,
)
total_saved += saved
results[fz_key].append(
{
"year": year,
"month": month,
"fetched": len(inspections),
"saved": saved,
}
)
empty_months_count = 0
logger.info(
"%s %d/%d: fetched %d, saved %d",
fz_name,
year,
month,
len(inspections),
saved,
)
else:
empty_months_count += 1
logger.info(
"%s %d/%d: no data found (empty_count=%d)",
fz_name,
year,
month,
empty_months_count,
)
except Exception as e:
logger.warning(
"%s %d/%d: error - %s",
fz_name,
year,
month,
str(e),
)
empty_months_count += 1
# Переходим к следующему месяцу
year, month = _get_next_month(year, month)
# Обновляем лог
ParserLoadLogService.update(
load_log,
status="success",
records_count=total_saved,
)
# Завершаем BackgroundJob
job.complete(
result={
"batch_id": batch_id,
"total_saved": total_saved,
"results": results,
}
)
logger.info("Inspections sync completed (total_saved=%d)", total_saved)
return {
"batch_id": batch_id,
"total_saved": total_saved,
"status": "success",
"results": results,
}
except Exception as e:
logger.error("Inspections sync failed: %s", e, exc_info=True)
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {
"batch_id": batch_id,
"total_saved": total_saved,
"status": "failed",
"error": str(e),
}
@shared_task(bind=True)
def parse_procurements(
self,
*,
region_code: str | None = None,
year: int | None = None,
month: int | None = None,
file_url: str | None = None,
law_type: str = "44",
proxies: list[str] | None = None,
) -> dict:
"""
Задача парсинга данных о государственных закупках с zakupki.gov.ru.
Args:
region_code: Код региона (например, "77" для Москвы)
year: Год данных
month: Месяц (опционально)
file_url: Прямая ссылка на файл данных (опционально)
law_type: Тип закона ("44" или "223")
proxies: Список прокси-серверов (опционально).
Если не передан, берётся из БД.
Returns:
Результат: batch_id, saved, status
"""
source = ParserLoadLog.Source.PROCUREMENTS
batch_id = ParserLoadLogService.get_next_batch_id(source)
task_id = self.request.id
# Если прокси не переданы, берём из БД
if proxies is None:
proxies = ProxyService.get_active_proxies_or_none()
logger.info(
"Starting procurements parsing "
"(task_id=%s, batch_id=%d, region=%s, year=%s, month=%s, law=%s-FZ, proxies=%d)",
task_id,
batch_id,
region_code,
year,
month,
law_type,
len(proxies) if proxies else 0,
)
# Создаём запись BackgroundJob для отслеживания прогресса
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.parse_procurements",
meta={
"source": source,
"batch_id": batch_id,
"region_code": region_code,
"year": year,
"month": month,
"law_type": law_type,
},
)
job.mark_started()
job.update_progress(0, "Инициализация парсера...")
# Создаём запись лога
load_log = ParserLoadLogService.create_load_log(
source=source,
batch_id=batch_id,
status="in_progress",
)
def progress_callback(percent: int, message: str) -> None:
"""Callback для обновления прогресса."""
job.update_progress(percent, message)
try:
# Парсинг данных
job.update_progress(10, "Загрузка данных с zakupki.gov.ru...")
with ZakupkiClient(proxies=proxies) as client:
procurements = client.fetch_procurements(
region_code=region_code,
year=year,
month=month,
file_url=file_url,
law_type=law_type,
progress_callback=progress_callback,
)
# Сохранение в БД
job.update_progress(80, f"Сохранение {len(procurements)} закупок...")
saved_count = ProcurementService.save_procurements(
procurements,
batch_id=batch_id,
region_code=region_code,
data_year=year,
data_month=month,
)
# Обновляем лог
ParserLoadLogService.update(
load_log,
status="success",
records_count=saved_count,
)
# Завершаем BackgroundJob
job.complete(result={"batch_id": batch_id, "saved": saved_count})
logger.info(
"Procurements parsing completed (batch_id=%d, saved=%d)",
batch_id,
saved_count,
)
return {
"batch_id": batch_id,
"saved": saved_count,
"status": "success",
}
except Exception as e:
logger.error("Procurements parsing failed: %s", e, exc_info=True)
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {
"batch_id": batch_id,
"saved": 0,
"status": "failed",
"error": str(e),
}
@shared_task(bind=True)
def sync_procurements(
self,
*,
region_code: str,
law_type: str = "44",
proxies: list[str] | None = None,
) -> dict:
"""
Синхронизация данных о закупках с zakupki.gov.ru.
Логика работы:
1. Проверяет последнюю загруженную дату в БД для региона
2. Если данных нет - начинает с 01.01.2025
3. Загружает месяц за месяцем до текущего
Args:
region_code: Код региона (обязательный)
law_type: Тип закона ("44" или "223")
proxies: Список прокси-серверов (опционально)
Returns:
Результат синхронизации
"""
source = ParserLoadLog.Source.PROCUREMENTS
batch_id = ParserLoadLogService.get_next_batch_id(source)
task_id = self.request.id
# Если прокси не переданы, берём из БД
if proxies is None:
proxies = ProxyService.get_active_proxies_or_none()
logger.info(
"Starting procurements sync (task_id=%s, batch_id=%d, region=%s, law=%s-FZ)",
task_id,
batch_id,
region_code,
law_type,
)
# Создаём запись BackgroundJob
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.sync_procurements",
meta={
"source": source,
"batch_id": batch_id,
"region_code": region_code,
"law_type": law_type,
},
)
job.mark_started()
job.update_progress(0, "Инициализация синхронизации...")
# Создаём запись лога
load_log = ParserLoadLogService.create_load_log(
source=source,
batch_id=batch_id,
status="in_progress",
)
current_year = datetime.now().year
current_month = datetime.now().month
total_saved = 0
results = []
try:
with ZakupkiClient(proxies=proxies) as client:
# Определяем начальную точку
last_year, last_month = ProcurementService.get_last_loaded_period(
region_code=region_code,
law_type=f"{law_type}-FZ",
)
if last_year and last_month:
# Начинаем со следующего месяца после последнего загруженного
start_year, start_month = _get_next_month(last_year, last_month)
logger.info(
"Continuing from %d/%d (last loaded: %d/%d)",
start_year,
start_month,
last_year,
last_month,
)
else:
# Начинаем с дефолтной даты
start_year, start_month = DEFAULT_START_YEAR, DEFAULT_START_MONTH
logger.info(
"No data in DB, starting from %d/%d",
start_year,
start_month,
)
# Загружаем месяц за месяцем
year, month = start_year, start_month
empty_months_count = 0
while year < current_year or (
year == current_year and month <= current_month
):
# Прекращаем если 2 месяца подряд нет данных
if empty_months_count >= 2:
logger.info("Stopping after %d empty months", empty_months_count)
break
job.update_progress(
20 + (60 * ((year - start_year) * 12 + month - start_month) // 24),
f"Загрузка за {month:02d}/{year}...",
)
try:
procurements = client.fetch_procurements(
region_code=region_code,
year=year,
month=month,
law_type=law_type,
)
if procurements:
saved = ProcurementService.save_procurements(
procurements,
batch_id=batch_id,
region_code=region_code,
data_year=year,
data_month=month,
)
total_saved += saved
results.append(
{
"year": year,
"month": month,
"fetched": len(procurements),
"saved": saved,
}
)
empty_months_count = 0
logger.info(
"%d/%d: fetched %d, saved %d",
year,
month,
len(procurements),
saved,
)
else:
empty_months_count += 1
logger.info(
"%d/%d: no data found (empty_count=%d)",
year,
month,
empty_months_count,
)
except Exception as e:
logger.warning("%d/%d: error - %s", year, month, str(e))
empty_months_count += 1
# Переходим к следующему месяцу
year, month = _get_next_month(year, month)
# Обновляем лог
ParserLoadLogService.update(
load_log,
status="success",
records_count=total_saved,
)
# Завершаем BackgroundJob
job.complete(
result={
"batch_id": batch_id,
"total_saved": total_saved,
"results": results,
}
)
logger.info("Procurements sync completed (total_saved=%d)", total_saved)
return {
"batch_id": batch_id,
"total_saved": total_saved,
"status": "success",
"results": results,
}
except Exception as e:
logger.error("Procurements sync failed: %s", e, exc_info=True)
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {
"batch_id": batch_id,
"total_saved": total_saved,
"status": "failed",
"error": str(e),
}
# =============================================================================
# FNS Tasks (File Watch & Processing)
# =============================================================================
@shared_task(bind=True)
def scan_fns_directory(self) -> dict:
"""
Периодическая задача: сканирует папку fns на новые файлы.
Запускается через Celery Beat каждые 5 минут.
Новые файлы ставятся в очередь на обработку.
Returns:
Результат сканирования: количество найденных и поставленных в очередь файлов
"""
import hashlib
from pathlib import Path
from django.conf import settings
task_id = self.request.id
logger.info("Starting FNS directory scan (task_id=%s)", task_id)
watch_dir = Path(settings.FNS_WATCH_DIRECTORY)
if not watch_dir.exists():
logger.warning("FNS watch directory does not exist: %s", watch_dir)
watch_dir.mkdir(parents=True, exist_ok=True)
return {"scanned": 0, "queued": 0, "skipped": 0}
queued = 0
skipped = 0
files_found = list(watch_dir.glob("fin_*.xlsx"))
for file_path in files_found:
# Вычисляем хеш файла
file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
# Проверяем, обрабатывался ли файл
if FNSReportService.exists_by_hash(file_hash):
skipped += 1
continue
# Ставим в очередь на обработку
process_fns_file.delay(str(file_path))
queued += 1
logger.info("Queued FNS file for processing: %s", file_path.name)
logger.info(
"FNS directory scan completed: found=%d, queued=%d, skipped=%d",
len(files_found),
queued,
skipped,
)
return {
"scanned": len(files_found),
"queued": queued,
"skipped": skipped,
}
@shared_task(bind=True)
def process_fns_file(self, file_path: str) -> dict:
"""
Обработка одного файла FNS.
Args:
file_path: Путь к файлу
Returns:
Результат обработки
"""
import hashlib
import shutil
from dataclasses import asdict
from pathlib import Path
from apps.core.services import BackgroundJobService
from apps.parsers.clients.fns.parser import FNSExcelParser, FNSParserError
from apps.parsers.models import FinancialReport
from django.conf import settings
source = ParserLoadLog.Source.FNS_REPORTS
batch_id = ParserLoadLogService.get_next_batch_id(source)
task_id = self.request.id
file_path = Path(file_path)
logger.info(
"Processing FNS file (task_id=%s, batch_id=%d, file=%s)",
task_id,
batch_id,
file_path.name,
)
# Создаём BackgroundJob
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.process_fns_file",
meta={"source": source, "batch_id": batch_id, "file": file_path.name},
)
job.mark_started()
job.update_progress(0, f"Обработка файла {file_path.name}...")
# Создаём запись лога
load_log = ParserLoadLogService.create_load_log(
source=source,
batch_id=batch_id,
status="in_progress",
)
try:
# Проверяем существование файла
if not file_path.exists():
raise FNSParserError(f"Файл не найден: {file_path}")
# Вычисляем хеш
file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
# Проверяем дубликат
if FNSReportService.exists_by_hash(file_hash):
logger.info(
"File already processed (hash=%s): %s",
file_hash,
file_path.name,
)
job.complete(result={"status": "skipped", "reason": "duplicate"})
ParserLoadLogService.update(load_log, status="skipped")
return {"status": "skipped", "reason": "duplicate"}
# Парсим файл
job.update_progress(20, "Парсинг Excel файла...")
parsed = FNSExcelParser.parse_file(file_path)
# Сохраняем в БД
job.update_progress(60, f"Сохранение {len(parsed.lines)} строк...")
lines_data = [asdict(line) for line in parsed.lines]
report = FNSReportService.save_report(
external_id=parsed.external_id,
ogrn=parsed.ogrn,
file_name=file_path.name,
file_hash=file_hash,
source=FinancialReport.SourceType.FILE_WATCH,
batch_id=batch_id,
lines_data=lines_data,
)
# Перемещаем файл в processed
job.update_progress(90, "Перемещение файла...")
processed_dir = Path(settings.FNS_PROCESSED_DIRECTORY)
processed_dir.mkdir(parents=True, exist_ok=True)
shutil.move(str(file_path), str(processed_dir / file_path.name))
# Обновляем лог
ParserLoadLogService.update(
load_log,
status="success",
records_count=len(parsed.lines),
)
# Завершаем
job.complete(
result={
"report_id": report.id,
"external_id": parsed.external_id,
"ogrn": parsed.ogrn,
"lines_count": len(parsed.lines),
}
)
logger.info(
"FNS file processed: %s (report_id=%d, lines=%d)",
file_path.name,
report.id,
len(parsed.lines),
)
return {
"status": "success",
"report_id": report.id,
"external_id": parsed.external_id,
"ogrn": parsed.ogrn,
"lines_count": len(parsed.lines),
}
except FNSParserError as e:
logger.error("FNS file parsing failed: %s - %s", file_path.name, e)
# Перемещаем в failed
failed_dir = Path(settings.FNS_FAILED_DIRECTORY)
failed_dir.mkdir(parents=True, exist_ok=True)
if file_path.exists():
shutil.move(str(file_path), str(failed_dir / file_path.name))
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {"status": "failed", "error": str(e)}
except Exception as e:
logger.error(
"FNS file processing error: %s - %s",
file_path.name,
e,
exc_info=True,
)
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {"status": "failed", "error": str(e)}
@shared_task(bind=True)
def process_fns_files_batch(self, file_paths: list[str]) -> dict:
"""
Пакетная обработка файлов FNS (для API).
Args:
file_paths: Список путей к файлам
Returns:
Результат обработки всех файлов
"""
task_id = self.request.id
logger.info(
"Processing FNS batch (task_id=%s, files=%d)",
task_id,
len(file_paths),
)
results = []
success_count = 0
failed_count = 0
for file_path in file_paths:
result = process_fns_file(file_path)
results.append({"file": file_path, **result})
if result.get("status") == "success":
success_count += 1
else:
failed_count += 1
logger.info(
"FNS batch completed: total=%d, success=%d, failed=%d",
len(file_paths),
success_count,
failed_count,
)
return {
"total": len(file_paths),
"success": success_count,
"failed": failed_count,
"results": results,
}