feature/registers-generate-test-data-command #13

Merged
avm merged 12 commits from feature/registers-generate-test-data-command into dev 2026-03-20 17:05:02 +03:00
8 changed files with 459 additions and 198 deletions
Showing only changes of commit b8015d9cdd - Show all commits

View File

@@ -2,13 +2,7 @@
Admin configuration for parsers app.
"""
import hashlib
import time
import uuid
from pathlib import Path
from apps.core.models import BackgroundJob
from apps.core.services import BackgroundJobService
from apps.parsers.fns_upload import FNSUploadService
from apps.parsers.models import (
FinancialReport,
FinancialReportLine,
@@ -20,10 +14,7 @@ from apps.parsers.models import (
ProcurementRecord,
Proxy,
)
from apps.parsers.serializers import FNSFileUploadSerializer
from apps.parsers.services import FNSReportService
from apps.parsers.tasks import process_fns_file
from django.conf import settings
from apps.parsers.serializers import FNSFileUploadSerializer, FNSZipUploadSerializer
from django.contrib import admin, messages
from django.shortcuts import redirect
from django.template.response import TemplateResponse
@@ -742,6 +733,11 @@ class FinancialReportAdmin(admin.ModelAdmin):
self.admin_site.admin_view(self.upload_excel_view),
name="parsers_financialreport_upload_excel",
),
path(
"upload-zip/",
self.admin_site.admin_view(self.upload_zip_view),
name="parsers_financialreport_upload_zip",
),
]
return custom_urls + urls
@@ -750,6 +746,9 @@ class FinancialReportAdmin(admin.ModelAdmin):
extra_context["upload_excel_url"] = reverse(
"admin:parsers_financialreport_upload_excel"
)
extra_context["upload_zip_url"] = reverse(
"admin:parsers_financialreport_upload_zip"
)
return super().changelist_view(request, extra_context=extra_context)
def upload_excel_view(self, request):
@@ -768,9 +767,9 @@ class FinancialReportAdmin(admin.ModelAdmin):
return redirect(changelist_url)
try:
queued, skipped, task_ids = self._enqueue_fns_files(
request,
serializer.validated_data["files"],
result = FNSUploadService.queue_uploaded_files(
files=serializer.validated_data["files"],
requested_by_id=request.user.id,
)
except Exception as exc: # noqa: BLE001
self.message_user(
@@ -780,19 +779,21 @@ class FinancialReportAdmin(admin.ModelAdmin):
)
return redirect(changelist_url)
if queued:
if result.queued:
self.message_user(
request,
f"Файлов поставлено в очередь: {queued}. Task IDs: {', '.join(task_ids[:5])}",
"Файлов поставлено в очередь: "
f"{result.queued}. Task IDs: {', '.join(result.task_ids[:5])}",
level=messages.SUCCESS,
)
if skipped:
if result.skipped:
self.message_user(
request,
f"Пропущено файлов: {skipped} (дубликаты или уже обрабатываются).",
"Пропущено файлов: "
f"{result.skipped} (дубликаты или уже обрабатываются).",
level=messages.WARNING,
)
if not queued and not skipped:
if not result.queued and not result.skipped:
self.message_user(
request,
"Файлы не были обработаны.",
@@ -813,86 +814,74 @@ class FinancialReportAdmin(admin.ModelAdmin):
context,
)
@staticmethod
def _try_create_fns_lock(file_path: Path) -> bool:
lock_path = Path(f"{file_path}.lock")
if lock_path.exists():
try:
age_seconds = time.time() - lock_path.stat().st_mtime
ttl_seconds = getattr(settings, "FNS_LOCK_TTL_SECONDS", 3600)
if age_seconds > ttl_seconds:
lock_path.unlink()
else:
return False
except FileNotFoundError:
pass
try:
lock_path.touch(exist_ok=False)
except FileExistsError:
return False
return True
def upload_zip_view(self, request):
changelist_url = reverse("admin:parsers_financialreport_changelist")
def _enqueue_fns_files(self, request, files):
upload_dir = Path(settings.FNS_WATCH_DIRECTORY)
upload_dir.mkdir(parents=True, exist_ok=True)
if request.method == "POST":
serializer = FNSZipUploadSerializer(
data={"file": request.FILES.get("file")}
)
task_ids = []
queued = 0
skipped = 0
for file in files:
file_content = file.read()
file_hash = hashlib.sha256(file_content).hexdigest()
file.seek(0)
if FNSReportService.exists_by_hash(file_hash):
skipped += 1
continue
file_path = upload_dir / file.name
if not self._try_create_fns_lock(file_path):
skipped += 1
continue
lock_path = Path(f"{file_path}.lock")
if file_path.exists():
lock_path.unlink(missing_ok=True)
skipped += 1
continue
try:
with open(file_path, "wb") as f:
for chunk in file.chunks():
f.write(chunk)
except Exception:
lock_path.unlink(missing_ok=True)
raise
task_id = str(uuid.uuid4())
try:
BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.process_fns_file",
user_id=request.user.id,
meta={
"source": "fns_reports",
"file": file.name,
},
if not serializer.is_valid():
self.message_user(
request,
f"Ошибка валидации архива: {serializer.errors}",
level=messages.ERROR,
)
task = process_fns_file.apply_async(
args=[str(file_path)],
kwargs={"requested_by_id": request.user.id},
task_id=task_id,
return redirect(changelist_url)
try:
result = FNSUploadService.queue_zip_archive(
archive_file=serializer.validated_data["file"],
requested_by_id=request.user.id,
)
except Exception:
lock_path.unlink(missing_ok=True)
BackgroundJob.objects.filter(task_id=task_id).delete()
raise
except Exception as exc: # noqa: BLE001
self.message_user(
request,
f"Ошибка обработки ZIP архива: {exc}",
level=messages.ERROR,
)
return redirect(changelist_url)
task_ids.append(task.id)
queued += 1
if result.queued:
self.message_user(
request,
"Файлов из архива поставлено в очередь: "
f"{result.queued}. Task IDs: {', '.join(result.task_ids[:5])}",
level=messages.SUCCESS,
)
if result.skipped:
self.message_user(
request,
f"Пропущено файлов из архива: {result.skipped}.",
level=messages.WARNING,
)
if result.invalid:
self.message_user(
request,
f"Невалидных элементов в архиве: {result.invalid}.",
level=messages.WARNING,
)
if not result.queued and not result.skipped and not result.invalid:
self.message_user(
request,
"Архив не содержит подходящих файлов.",
level=messages.WARNING,
)
return queued, skipped, task_ids
return redirect(changelist_url)
context = {
**self.admin_site.each_context(request),
"opts": self.model._meta,
"title": "Загрузка ZIP отчетности ФНС",
"changelist_url": changelist_url,
}
return TemplateResponse(
request,
"admin/parsers/financialreport/upload_zip.html",
context,
)
def has_add_permission(self, request):
"""Запретить создание записей вручную."""

View File

@@ -0,0 +1,178 @@
"""Reusable upload helpers for FNS financial report files."""
from __future__ import annotations
import hashlib
import re
import time
import uuid
import zipfile
from dataclasses import dataclass, field
from pathlib import Path, PurePosixPath
from apps.core.models import BackgroundJob
from apps.core.services import BackgroundJobService
from apps.parsers.models import ParserLoadLog
from apps.parsers.services import FNSReportService
from apps.parsers.tasks import process_fns_file
from django.conf import settings
FNS_XLSX_FILENAME_RE = re.compile(r"^fin_\d+_\d{13,15}\.xlsx$")
@dataclass
class FNSUploadResult:
"""Result of queuing FNS files for processing."""
queued: int = 0
skipped: int = 0
invalid: int = 0
task_ids: list[str] = field(default_factory=list)
class FNSUploadService:
"""Queue uploaded FNS Excel files and ZIP archives for processing."""
@classmethod
def queue_uploaded_files(cls, *, files, requested_by_id: int | None) -> FNSUploadResult:
result = FNSUploadResult()
seen_hashes: set[str] = set()
for uploaded_file in files:
status, task_id = cls._queue_file_bytes(
file_name=uploaded_file.name,
file_content=uploaded_file.read(),
requested_by_id=requested_by_id,
seen_hashes=seen_hashes,
)
cls._accumulate(result=result, status=status, task_id=task_id)
return result
@classmethod
def queue_zip_archive(
cls,
*,
archive_file,
requested_by_id: int | None,
) -> FNSUploadResult:
result = FNSUploadResult()
seen_hashes: set[str] = set()
archive_file.seek(0)
try:
with zipfile.ZipFile(archive_file) as archive:
for member in archive.infolist():
if member.is_dir():
continue
file_name = cls._extract_member_name(member.filename)
if not file_name or not FNS_XLSX_FILENAME_RE.match(file_name):
result.invalid += 1
continue
status, task_id = cls._queue_file_bytes(
file_name=file_name,
file_content=archive.read(member),
requested_by_id=requested_by_id,
seen_hashes=seen_hashes,
)
cls._accumulate(result=result, status=status, task_id=task_id)
except zipfile.BadZipFile as exc:
raise ValueError("Загруженный файл не является корректным ZIP архивом") from exc
return result
@staticmethod
def _extract_member_name(member_name: str) -> str | None:
path = PurePosixPath(member_name)
if path.is_absolute() or ".." in path.parts:
return None
if len(path.parts) != 1:
return None
file_name = path.name
return file_name or None
@classmethod
def _queue_file_bytes(
cls,
*,
file_name: str,
file_content: bytes,
requested_by_id: int | None,
seen_hashes: set[str],
) -> tuple[str, str | None]:
file_hash = hashlib.sha256(file_content).hexdigest()
if file_hash in seen_hashes or FNSReportService.exists_by_hash(file_hash):
return "skipped", None
upload_dir = Path(settings.FNS_WATCH_DIRECTORY)
upload_dir.mkdir(parents=True, exist_ok=True)
file_path = upload_dir / file_name
if not cls._try_create_fns_lock(file_path):
return "skipped", None
lock_path = Path(f"{file_path}.lock")
if file_path.exists():
lock_path.unlink(missing_ok=True)
return "skipped", None
try:
file_path.write_bytes(file_content)
except Exception:
lock_path.unlink(missing_ok=True)
raise
task_id = str(uuid.uuid4())
try:
BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.process_fns_file",
user_id=requested_by_id,
meta={
"source": ParserLoadLog.Source.FNS_REPORTS,
"file": file_name,
},
)
task = process_fns_file.apply_async(
args=[str(file_path)],
kwargs={"requested_by_id": requested_by_id},
task_id=task_id,
)
except Exception:
lock_path.unlink(missing_ok=True)
BackgroundJob.objects.filter(task_id=task_id).delete()
raise
seen_hashes.add(file_hash)
return "queued", task.id
@staticmethod
def _try_create_fns_lock(file_path: Path) -> bool:
lock_path = Path(f"{file_path}.lock")
if lock_path.exists():
try:
age_seconds = time.time() - lock_path.stat().st_mtime
ttl_seconds = getattr(settings, "FNS_LOCK_TTL_SECONDS", 3600)
if age_seconds > ttl_seconds:
lock_path.unlink()
else:
return False
except FileNotFoundError:
pass
try:
lock_path.touch(exist_ok=False)
except FileExistsError:
return False
return True
@staticmethod
def _accumulate(*, result: FNSUploadResult, status: str, task_id: str | None) -> None:
if status == "queued":
result.queued += 1
if task_id:
result.task_ids.append(task_id)
return
if status == "skipped":
result.skipped += 1

View File

@@ -4,6 +4,7 @@
Все сериализаторы read-only, так как данные загружаются только через парсеры.
"""
from apps.parsers.fns_upload import FNS_XLSX_FILENAME_RE
from apps.parsers.models import (
FinancialReport,
FinancialReportLine,
@@ -269,12 +270,8 @@ class FNSFileUploadSerializer(serializers.Serializer):
def validate_files(self, value):
"""Валидация файлов."""
import re
pattern = re.compile(r"^fin_\d+_\d{13,15}\.xlsx$")
for file in value:
if not pattern.match(file.name):
if not FNS_XLSX_FILENAME_RE.match(file.name):
raise serializers.ValidationError(
f"Неверный формат имени файла: {file.name}. "
"Ожидается: fin_{{id}}_{{ogrn}}.xlsx"
@@ -283,6 +280,17 @@ class FNSFileUploadSerializer(serializers.Serializer):
return value
class FNSZipUploadSerializer(serializers.Serializer):
"""Сериализатор для загрузки ZIP архива с FNS Excel файлами."""
file = serializers.FileField(help_text="ZIP архив с файлами fin_*.xlsx")
def validate_file(self, value):
if not value.name.lower().endswith(".zip"):
raise serializers.ValidationError("Поддерживаются только ZIP архивы")
return value
# =============================================================================
# Служебные модели
# =============================================================================

View File

@@ -6,14 +6,10 @@ Views для приложения парсеров.
"""
import csv
import hashlib
import time
import uuid
from pathlib import Path
from apps.core.openapi import CommonResponses, ErrorResponses, swagger_tag
from apps.core.response import api_response
from apps.core.services import BackgroundJobService
from apps.parsers.fns_upload import FNSUploadService
from apps.parsers.models import (
FinancialReport,
IndustrialCertificateRecord,
@@ -42,8 +38,6 @@ from apps.parsers.serializers import (
SourceTaskStatusSerializer,
)
from apps.parsers.source_cards import SourceCardService
from apps.parsers.tasks import process_fns_file
from django.conf import settings
from django.db.models import CharField, Count, Q
from django.db.models.functions import Cast
from django.http import HttpResponse
@@ -532,97 +526,16 @@ class FNSReportUploadView(APIView):
serializer = FNSFileUploadSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
files = serializer.validated_data["files"]
task_ids = []
queued = 0
skipped = 0
# Создаём директорию для загрузки
upload_dir = Path(settings.FNS_WATCH_DIRECTORY)
upload_dir.mkdir(parents=True, exist_ok=True)
from apps.parsers.services import FNSReportService
def _try_create_fns_lock(file_path: Path) -> bool:
lock_path = Path(f"{file_path}.lock")
if lock_path.exists():
try:
age_seconds = time.time() - lock_path.stat().st_mtime
ttl_seconds = getattr(settings, "FNS_LOCK_TTL_SECONDS", 3600)
if age_seconds > ttl_seconds:
lock_path.unlink()
else:
return False
except FileNotFoundError:
pass
try:
lock_path.touch(exist_ok=False)
except FileExistsError:
return False
return True
for file in files:
# Вычисляем хеш файла
file_content = file.read()
file_hash = hashlib.sha256(file_content).hexdigest()
file.seek(0)
# Проверяем дубликат
if FNSReportService.exists_by_hash(file_hash):
skipped += 1
continue
# Сохраняем файл
file_path = upload_dir / file.name
if not _try_create_fns_lock(file_path):
skipped += 1
continue
lock_path = Path(f"{file_path}.lock")
if file_path.exists():
lock_path.unlink(missing_ok=True)
skipped += 1
continue
try:
with open(file_path, "wb") as f:
for chunk in file.chunks():
f.write(chunk)
except Exception:
lock_path.unlink(missing_ok=True)
raise
# Ставим в очередь
try:
task_id = str(uuid.uuid4())
BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.process_fns_file",
user_id=request.user.id,
meta={
"source": ParserLoadLog.Source.FNS_REPORTS,
"file": file.name,
},
)
task = process_fns_file.apply_async(
args=[str(file_path)],
kwargs={"requested_by_id": request.user.id},
task_id=task_id,
)
except Exception:
lock_path.unlink(missing_ok=True)
from apps.core.models import BackgroundJob
BackgroundJob.objects.filter(task_id=task_id).delete()
raise
task_ids.append(task.id)
queued += 1
result = FNSUploadService.queue_uploaded_files(
files=serializer.validated_data["files"],
requested_by_id=request.user.id,
)
return Response(
{
"queued": queued,
"skipped": skipped,
"task_ids": task_ids,
"queued": result.queued,
"skipped": result.skipped,
"task_ids": result.task_ids,
},
status=status.HTTP_202_ACCEPTED,
)

View File

@@ -4,5 +4,8 @@
<li>
<a href="{{ upload_excel_url }}" class="addlink">Загрузить Excel ФНС</a>
</li>
<li>
<a href="{{ upload_zip_url }}" class="addlink">Загрузить ZIP ФНС</a>
</li>
{{ block.super }}
{% endblock %}

View File

@@ -0,0 +1,37 @@
{% extends "admin/base_site.html" %}
{% block breadcrumbs %}
<div class="breadcrumbs">
<a href="{% url 'admin:index' %}">Главная</a>
&rsaquo; <a href="{{ changelist_url }}">Финансовые отчеты ФНС</a>
&rsaquo; Загрузка ZIP
</div>
{% endblock %}
{% block content %}
<div id="content-main">
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
<fieldset class="module aligned">
<div class="form-row">
<label for="id_file" class="required">ZIP архив:</label>
<input
type="file"
name="file"
id="id_file"
accept=".zip"
required
/>
<p class="help">
Архив должен содержать файлы вида fin_{id}_{ogrn}.xlsx в корне архива.
</p>
</div>
</fieldset>
<div class="submit-row">
<input type="submit" value="Загрузить" class="default" />
<a href="{{ changelist_url }}" class="closelink">Отмена</a>
</div>
</form>
</div>
{% endblock %}

View File

@@ -1,5 +1,10 @@
"""Tests for parsers admin configurations."""
import io
import os
import tempfile
import zipfile
from apps.parsers.admin import (
FinancialReportAdmin,
HasCertificateNumberFilter,
@@ -22,7 +27,9 @@ from apps.parsers.models import (
)
from django.contrib.admin.sites import AdminSite
from django.contrib.messages.storage.fallback import FallbackStorage
from django.test import RequestFactory, TestCase
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import RequestFactory, TestCase, override_settings
from openpyxl import Workbook
from tests.apps.parsers.factories import (
IndustrialCertificateRecordFactory,
@@ -42,6 +49,41 @@ def _digits(length: int) -> str:
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
def _build_fns_excel_bytes() -> bytes:
workbook = Workbook()
worksheet = workbook.active
year = fake.random_int(min=2020, max=2025)
worksheet.append(["Форма №1", None, year, None])
worksheet.append([None, "Код", "Начало", "Конец"])
worksheet.append(
[
fake.word(),
_digits(4),
fake.random_int(min=10, max=999),
fake.random_int(min=10, max=999),
]
)
buffer = io.BytesIO()
workbook.save(buffer)
workbook.close()
return buffer.getvalue()
def _build_fns_zip_upload() -> SimpleUploadedFile:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive:
archive.writestr(
f"fin_{_digits(5)}_{_digits(13)}.xlsx",
_build_fns_excel_bytes(),
)
archive.writestr("ignored.txt", b"invalid")
return SimpleUploadedFile(
"fin_ropk.zip",
buffer.getvalue(),
content_type="application/zip",
)
class ParsersAdminTest(TestCase):
def setUp(self):
self.site = AdminSite()
@@ -55,6 +97,13 @@ class ParsersAdminTest(TestCase):
request._messages = FallbackStorage(request)
return request
def _post_request(self, path, data):
request = self.factory.post(path, data=data)
request.user = self.user
request.session = {}
request._messages = FallbackStorage(request)
return request
def test_proxy_admin_actions(self):
admin = ProxyAdmin(Proxy, self.site)
proxy = ProxyFactory(is_active=False, fail_count=5)
@@ -248,3 +297,23 @@ class ParsersAdminTest(TestCase):
self.assertIn("registry_organization__pn_name", admin.search_fields)
route_names = [route.name for route in admin.get_urls()]
self.assertIn("parsers_financialreport_upload_excel", route_names)
self.assertIn("parsers_financialreport_upload_zip", route_names)
def test_financial_report_admin_upload_zip_view(self):
admin = FinancialReportAdmin(FinancialReport, self.site)
archive_upload = _build_fns_zip_upload()
request = self._post_request(
"/admin/parsers/financialreport/upload-zip/",
{"file": archive_upload},
)
with tempfile.TemporaryDirectory() as tmpdir, override_settings(
FNS_WATCH_DIRECTORY=os.path.join(tmpdir, "watch"),
FNS_PROCESSED_DIRECTORY=os.path.join(tmpdir, "processed"),
FNS_FAILED_DIRECTORY=os.path.join(tmpdir, "failed"),
):
response = admin.upload_zip_view(request)
self.assertEqual(response.status_code, 302)
self.assertEqual(FinancialReport.objects.count(), 1)
self.assertEqual(FinancialReportLine.objects.count(), 1)

View File

@@ -4,9 +4,11 @@ import io
import os
import tempfile
import time
import zipfile
from unittest.mock import patch
from apps.core.models import BackgroundJob
from apps.parsers.fns_upload import FNSUploadService
from apps.parsers.models import FinancialReport, FinancialReportLine
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import override_settings
@@ -38,6 +40,14 @@ def _build_fns_excel_bytes() -> bytes:
return buf.getvalue()
def _build_fns_zip_bytes(file_map: dict[str, bytes]) -> bytes:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for file_name, content in file_map.items():
archive.writestr(file_name, content)
return buffer.getvalue()
class FNSUploadIntegrationTest(APITestCase):
"""Tests real upload + processing of FNS files."""
@@ -348,7 +358,7 @@ class FNSUploadIntegrationTest(APITestCase):
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
), patch("apps.parsers.views.Path.touch", side_effect=FileExistsError):
), patch("apps.parsers.fns_upload.Path.touch", side_effect=FileExistsError):
response = self.client.post(
self.upload_url,
{"files": [upload]},
@@ -375,7 +385,10 @@ class FNSUploadIntegrationTest(APITestCase):
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
), patch("apps.parsers.views.open", side_effect=OSError("disk full")):
), patch(
"apps.parsers.fns_upload.Path.write_bytes",
side_effect=OSError("disk full"),
):
response = self.client.post(
self.upload_url,
{"files": [upload]},
@@ -406,9 +419,9 @@ class FNSUploadIntegrationTest(APITestCase):
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
), patch(
"apps.parsers.views.uuid.uuid4", return_value="job-task-id"
"apps.parsers.fns_upload.uuid.uuid4", return_value="job-task-id"
), patch(
"apps.parsers.views.process_fns_file.apply_async",
"apps.parsers.fns_upload.process_fns_file.apply_async",
side_effect=RuntimeError("queue down"),
):
response = self.client.post(
@@ -426,3 +439,54 @@ class FNSUploadIntegrationTest(APITestCase):
self.assertFalse(
os.path.exists(os.path.join(watch_dir, f"{filename}.lock"))
)
def test_queue_zip_archive_processes_valid_files_and_skips_invalid(self):
first_name = f"fin_{_digits(5)}_{_digits(13)}.xlsx"
second_name = f"fin_{_digits(5)}_{_digits(13)}.xlsx"
zip_content = _build_fns_zip_bytes(
{
first_name: _build_fns_excel_bytes(),
second_name: _build_fns_excel_bytes(),
"nested/fin_0000001_1234567890123.xlsx": _build_fns_excel_bytes(),
"readme.txt": b"invalid",
}
)
archive_upload = SimpleUploadedFile(
"fin_ropk.zip",
zip_content,
content_type="application/zip",
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
):
result = FNSUploadService.queue_zip_archive(
archive_file=archive_upload,
requested_by_id=self.admin.id,
)
self.assertEqual(result.queued, 2)
self.assertEqual(result.skipped, 0)
self.assertEqual(result.invalid, 2)
self.assertEqual(FinancialReport.objects.count(), 2)
self.assertEqual(FinancialReportLine.objects.count(), 2)
def test_queue_zip_archive_rejects_bad_zip(self):
archive_upload = SimpleUploadedFile(
"fin_ropk.zip",
b"not-a-zip",
content_type="application/zip",
)
with self.assertRaisesMessage(
ValueError,
"Загруженный файл не является корректным ZIP архивом",
):
FNSUploadService.queue_zip_archive(
archive_file=archive_upload,
requested_by_id=self.admin.id,
)