feat(fns): парсер ФНС бухгалтерской отчетности

- Модели FinancialReport и FinancialReportLine
- FNSExcelParser для файлов fin_{id}_{ogrn}.xlsx
- FNSReportService с дедупликацией по хешу файла
- Celery задачи для мониторинга папки (каждые 5 мин)
- API: POST /fns/upload/, GET /fns/reports/
- Django admin интеграция
- 25 unit-тестов
This commit is contained in:
2026-02-01 14:44:19 +01:00
parent eb0d6f2600
commit cd0e21350b
17 changed files with 1537 additions and 10 deletions

View File

@@ -17,6 +17,7 @@ from apps.parsers.clients.proverki import ProverkiClient
from apps.parsers.clients.zakupki import ZakupkiClient
from apps.parsers.models import ParserLoadLog
from apps.parsers.services import (
FNSReportService,
IndustrialCertificateService,
InspectionService,
ManufacturerService,
@@ -908,3 +909,262 @@ def sync_procurements(
"status": "failed",
"error": str(e),
}
# =============================================================================
# FNS Tasks (File Watch & Processing)
# =============================================================================
@shared_task(bind=True)
def scan_fns_directory(self) -> dict:
"""
Периодическая задача: сканирует папку fns на новые файлы.
Запускается через Celery Beat каждые 5 минут.
Новые файлы ставятся в очередь на обработку.
Returns:
Результат сканирования: количество найденных и поставленных в очередь файлов
"""
import hashlib
from pathlib import Path
from django.conf import settings
task_id = self.request.id
logger.info("Starting FNS directory scan (task_id=%s)", task_id)
watch_dir = Path(settings.FNS_WATCH_DIRECTORY)
if not watch_dir.exists():
logger.warning("FNS watch directory does not exist: %s", watch_dir)
watch_dir.mkdir(parents=True, exist_ok=True)
return {"scanned": 0, "queued": 0, "skipped": 0}
queued = 0
skipped = 0
files_found = list(watch_dir.glob("fin_*.xlsx"))
for file_path in files_found:
# Вычисляем хеш файла
file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
# Проверяем, обрабатывался ли файл
if FNSReportService.exists_by_hash(file_hash):
skipped += 1
continue
# Ставим в очередь на обработку
process_fns_file.delay(str(file_path))
queued += 1
logger.info("Queued FNS file for processing: %s", file_path.name)
logger.info(
"FNS directory scan completed: found=%d, queued=%d, skipped=%d",
len(files_found),
queued,
skipped,
)
return {
"scanned": len(files_found),
"queued": queued,
"skipped": skipped,
}
@shared_task(bind=True)
def process_fns_file(self, file_path: str) -> dict:
"""
Обработка одного файла FNS.
Args:
file_path: Путь к файлу
Returns:
Результат обработки
"""
import hashlib
import shutil
from dataclasses import asdict
from pathlib import Path
from apps.core.services import BackgroundJobService
from apps.parsers.clients.fns.parser import FNSExcelParser, FNSParserError
from apps.parsers.models import FinancialReport
from django.conf import settings
source = ParserLoadLog.Source.FNS_REPORTS
batch_id = ParserLoadLogService.get_next_batch_id(source)
task_id = self.request.id
file_path = Path(file_path)
logger.info(
"Processing FNS file (task_id=%s, batch_id=%d, file=%s)",
task_id,
batch_id,
file_path.name,
)
# Создаём BackgroundJob
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.process_fns_file",
meta={"source": source, "batch_id": batch_id, "file": file_path.name},
)
job.mark_started()
job.update_progress(0, f"Обработка файла {file_path.name}...")
# Создаём запись лога
load_log = ParserLoadLogService.create_load_log(
source=source,
batch_id=batch_id,
status="in_progress",
)
try:
# Проверяем существование файла
if not file_path.exists():
raise FNSParserError(f"Файл не найден: {file_path}")
# Вычисляем хеш
file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
# Проверяем дубликат
if FNSReportService.exists_by_hash(file_hash):
logger.info(
"File already processed (hash=%s): %s",
file_hash,
file_path.name,
)
job.complete(result={"status": "skipped", "reason": "duplicate"})
ParserLoadLogService.update(load_log, status="skipped")
return {"status": "skipped", "reason": "duplicate"}
# Парсим файл
job.update_progress(20, "Парсинг Excel файла...")
parsed = FNSExcelParser.parse_file(file_path)
# Сохраняем в БД
job.update_progress(60, f"Сохранение {len(parsed.lines)} строк...")
lines_data = [asdict(line) for line in parsed.lines]
report = FNSReportService.save_report(
external_id=parsed.external_id,
ogrn=parsed.ogrn,
file_name=file_path.name,
file_hash=file_hash,
source=FinancialReport.SourceType.FILE_WATCH,
batch_id=batch_id,
lines_data=lines_data,
)
# Перемещаем файл в processed
job.update_progress(90, "Перемещение файла...")
processed_dir = Path(settings.FNS_PROCESSED_DIRECTORY)
processed_dir.mkdir(parents=True, exist_ok=True)
shutil.move(str(file_path), str(processed_dir / file_path.name))
# Обновляем лог
ParserLoadLogService.update(
load_log,
status="success",
records_count=len(parsed.lines),
)
# Завершаем
job.complete(
result={
"report_id": report.id,
"external_id": parsed.external_id,
"ogrn": parsed.ogrn,
"lines_count": len(parsed.lines),
}
)
logger.info(
"FNS file processed: %s (report_id=%d, lines=%d)",
file_path.name,
report.id,
len(parsed.lines),
)
return {
"status": "success",
"report_id": report.id,
"external_id": parsed.external_id,
"ogrn": parsed.ogrn,
"lines_count": len(parsed.lines),
}
except FNSParserError as e:
logger.error("FNS file parsing failed: %s - %s", file_path.name, e)
# Перемещаем в failed
failed_dir = Path(settings.FNS_FAILED_DIRECTORY)
failed_dir.mkdir(parents=True, exist_ok=True)
if file_path.exists():
shutil.move(str(file_path), str(failed_dir / file_path.name))
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {"status": "failed", "error": str(e)}
except Exception as e:
logger.error(
"FNS file processing error: %s - %s",
file_path.name,
e,
exc_info=True,
)
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {"status": "failed", "error": str(e)}
@shared_task(bind=True)
def process_fns_files_batch(self, file_paths: list[str]) -> dict:
"""
Пакетная обработка файлов FNS (для API).
Args:
file_paths: Список путей к файлам
Returns:
Результат обработки всех файлов
"""
task_id = self.request.id
logger.info(
"Processing FNS batch (task_id=%s, files=%d)",
task_id,
len(file_paths),
)
results = []
success_count = 0
failed_count = 0
for file_path in file_paths:
result = process_fns_file(file_path)
results.append({"file": file_path, **result})
if result.get("status") == "success":
success_count += 1
else:
failed_count += 1
logger.info(
"FNS batch completed: total=%d, success=%d, failed=%d",
len(file_paths),
success_count,
failed_count,
)
return {
"total": len(file_paths),
"success": success_count,
"failed": failed_count,
"results": results,
}