Files
mostovik-backend/tests/apps/parsers/test_fns_upload.py

493 lines
19 KiB
Python

"""Integration tests for FNS upload flow (no mocks)."""
import io
import os
import tempfile
import time
import zipfile
from unittest.mock import patch
from apps.core.models import BackgroundJob
from apps.parsers.fns_upload import FNSUploadService
from apps.parsers.models import FinancialReport, FinancialReportLine
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import override_settings
from django.urls import reverse
from openpyxl import Workbook
from rest_framework import status
from rest_framework.test import APITestCase
from tests.apps.user.factories import UserFactory
from tests.utils.fixtures import fake
def _digits(length: int) -> str:
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
def _build_fns_excel_bytes() -> bytes:
wb = Workbook()
ws = wb.active
year = fake.random_int(min=2020, max=2025)
ws.append(["Форма №1", None, year, None])
ws.append([None, "Код", "Начало", "Конец"])
ws.append(
[fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)]
)
buf = io.BytesIO()
wb.save(buf)
wb.close()
return buf.getvalue()
def _build_fns_zip_bytes(file_map: dict[str, bytes]) -> bytes:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for file_name, content in file_map.items():
archive.writestr(file_name, content)
return buffer.getvalue()
class FNSUploadIntegrationTest(APITestCase):
"""Tests real upload + processing of FNS files."""
def setUp(self):
self.user = UserFactory.create_user()
self.admin = UserFactory.create_user(is_staff=True)
self.other = UserFactory.create_user()
self.client.force_authenticate(self.admin)
self.upload_url = reverse("api_v1:fns:fns-upload")
def _dirs(self, base_dir: str) -> tuple[str, str, str]:
watch_dir = os.path.join(base_dir, "watch")
processed_dir = os.path.join(base_dir, "processed")
failed_dir = os.path.join(base_dir, "failed")
return watch_dir, processed_dir, failed_dir
def test_upload_processes_file_and_moves_to_processed(self):
content = _build_fns_excel_bytes()
external_id = _digits(5)
ogrn = _digits(13)
filename = f"fin_{external_id}_{ogrn}.xlsx"
upload = SimpleUploadedFile(
filename,
content,
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
):
response = self.client.post(
self.upload_url, {"files": [upload]}, format="multipart"
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(response.data["queued"], 1)
self.assertEqual(response.data["skipped"], 0)
self.assertEqual(FinancialReport.objects.count(), 1)
report = FinancialReport.objects.first()
self.assertEqual(report.external_id, external_id)
self.assertEqual(report.ogrn, ogrn)
self.assertTrue(FinancialReportLine.objects.filter(report=report).exists())
processed_path = os.path.join(processed_dir, filename)
self.assertTrue(os.path.exists(processed_path))
self.assertFalse(os.path.exists(os.path.join(watch_dir, filename)))
self.assertFalse(
os.path.exists(os.path.join(watch_dir, f"{filename}.lock"))
)
def test_upload_duplicate_is_skipped(self):
content = _build_fns_excel_bytes()
external_id = _digits(3)
ogrn = _digits(13)
filename = f"fin_{external_id}_{ogrn}.xlsx"
upload1 = SimpleUploadedFile(
filename,
content,
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
upload2 = SimpleUploadedFile(
filename,
content,
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
):
first = self.client.post(
self.upload_url, {"files": [upload1]}, format="multipart"
)
second = self.client.post(
self.upload_url, {"files": [upload2]}, format="multipart"
)
self.assertEqual(first.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(second.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(second.data["queued"], 0)
self.assertEqual(second.data["skipped"], 1)
self.assertEqual(FinancialReport.objects.count(), 1)
self.assertFalse(os.path.exists(os.path.join(watch_dir, filename)))
self.assertFalse(
os.path.exists(os.path.join(watch_dir, f"{filename}.lock"))
)
def test_upload_creates_owned_background_job(self):
content = _build_fns_excel_bytes()
external_id = _digits(5)
ogrn = _digits(13)
filename = f"fin_{external_id}_{ogrn}.xlsx"
upload = SimpleUploadedFile(
filename,
content,
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
):
response = self.client.post(
self.upload_url, {"files": [upload]}, format="multipart"
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
task_id = response.data["task_ids"][0]
job = BackgroundJob.objects.get(task_id=task_id)
self.assertEqual(job.user_id, self.admin.id)
jobs_response = self.client.get(reverse("api_v1:jobs:job-list"))
self.assertEqual(jobs_response.status_code, status.HTTP_200_OK)
self.assertEqual(len(jobs_response.data["results"]), 1)
self.assertEqual(jobs_response.data["results"][0]["task_id"], task_id)
other_client = self.client_class()
other_client.force_authenticate(self.other)
status_response = other_client.get(
reverse("api_v1:jobs:job-status", kwargs={"task_id": task_id})
)
self.assertEqual(status_response.status_code, status.HTTP_403_FORBIDDEN)
def test_upload_invalid_filename_rejected(self):
content = _build_fns_excel_bytes()
upload = SimpleUploadedFile(
f"{fake.word()}_{fake.random_int()}.xlsx",
content,
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
):
response = self.client.post(
self.upload_url, {"files": [upload]}, format="multipart"
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(FinancialReport.objects.count(), 0)
def test_upload_skips_when_lock_is_fresh(self):
content = _build_fns_excel_bytes()
external_id = _digits(5)
ogrn = _digits(13)
filename = f"fin_{external_id}_{ogrn}.xlsx"
upload = SimpleUploadedFile(
filename,
content,
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
os.makedirs(watch_dir, exist_ok=True)
lock_path = os.path.join(watch_dir, f"{filename}.lock")
with open(lock_path, "w") as handle:
handle.write("lock")
now = time.time()
os.utime(lock_path, (now, now))
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
FNS_LOCK_TTL_SECONDS=3600,
):
response = self.client.post(
self.upload_url, {"files": [upload]}, format="multipart"
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(response.data["queued"], 0)
self.assertEqual(response.data["skipped"], 1)
def test_regular_user_cannot_upload(self):
self.client.force_authenticate(self.user)
upload = SimpleUploadedFile(
f"fin_{_digits(5)}_{_digits(13)}.xlsx",
_build_fns_excel_bytes(),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
):
response = self.client.post(
self.upload_url, {"files": [upload]}, format="multipart"
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_upload_skips_when_file_already_exists(self):
content = _build_fns_excel_bytes()
external_id = _digits(5)
ogrn = _digits(13)
filename = f"fin_{external_id}_{ogrn}.xlsx"
upload = SimpleUploadedFile(
filename,
content,
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
os.makedirs(watch_dir, exist_ok=True)
existing_path = os.path.join(watch_dir, filename)
with open(existing_path, "wb") as handle:
handle.write(b"existing")
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
):
response = self.client.post(
self.upload_url, {"files": [upload]}, format="multipart"
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(response.data["queued"], 0)
self.assertEqual(response.data["skipped"], 1)
def test_upload_removes_stale_lock_and_queues_file(self):
content = _build_fns_excel_bytes()
external_id = _digits(5)
ogrn = _digits(13)
filename = f"fin_{external_id}_{ogrn}.xlsx"
upload = SimpleUploadedFile(
filename,
content,
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
os.makedirs(watch_dir, exist_ok=True)
lock_path = os.path.join(watch_dir, f"{filename}.lock")
with open(lock_path, "w") as handle:
handle.write("lock")
stale = time.time() - 7200
os.utime(lock_path, (stale, stale))
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
FNS_LOCK_TTL_SECONDS=1,
):
response = self.client.post(
self.upload_url, {"files": [upload]}, format="multipart"
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(response.data["queued"], 1)
self.assertEqual(response.data["skipped"], 0)
def test_upload_skips_when_lock_creation_races(self):
upload = SimpleUploadedFile(
f"fin_{_digits(5)}_{_digits(13)}.xlsx",
_build_fns_excel_bytes(),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
), patch("apps.parsers.fns_upload.Path.touch", side_effect=FileExistsError):
response = self.client.post(
self.upload_url,
{"files": [upload]},
format="multipart",
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(response.data["queued"], 0)
self.assertEqual(response.data["skipped"], 1)
def test_upload_cleans_up_lock_when_file_write_fails(self):
filename = f"fin_{_digits(5)}_{_digits(13)}.xlsx"
upload = SimpleUploadedFile(
filename,
_build_fns_excel_bytes(),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
), patch(
"apps.parsers.fns_upload.Path.write_bytes",
side_effect=OSError("disk full"),
):
response = self.client.post(
self.upload_url,
{"files": [upload]},
format="multipart",
)
self.assertEqual(
response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR
)
self.assertFalse(
os.path.exists(os.path.join(watch_dir, f"{filename}.lock"))
)
def test_upload_deletes_background_job_when_task_enqueue_fails(self):
filename = f"fin_{_digits(5)}_{_digits(13)}.xlsx"
upload = SimpleUploadedFile(
filename,
_build_fns_excel_bytes(),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
), patch(
"apps.parsers.fns_upload.uuid.uuid4", return_value="job-task-id"
), patch(
"apps.parsers.fns_upload.process_fns_file.apply_async",
side_effect=RuntimeError("queue down"),
):
response = self.client.post(
self.upload_url,
{"files": [upload]},
format="multipart",
)
self.assertEqual(
response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR
)
self.assertFalse(
BackgroundJob.objects.filter(task_id="job-task-id").exists()
)
self.assertFalse(
os.path.exists(os.path.join(watch_dir, f"{filename}.lock"))
)
def test_queue_zip_archive_processes_valid_files_and_skips_invalid(self):
first_name = f"fin_{_digits(5)}_{_digits(13)}.xlsx"
second_name = f"fin_{_digits(5)}_{_digits(13)}.xlsx"
zip_content = _build_fns_zip_bytes(
{
first_name: _build_fns_excel_bytes(),
second_name: _build_fns_excel_bytes(),
"nested/fin_0000001_1234567890123.xlsx": _build_fns_excel_bytes(),
"readme.txt": b"invalid",
}
)
archive_upload = SimpleUploadedFile(
"fin_ropk.zip",
zip_content,
content_type="application/zip",
)
with tempfile.TemporaryDirectory() as tmpdir:
watch_dir, processed_dir, failed_dir = self._dirs(tmpdir)
with override_settings(
FNS_WATCH_DIRECTORY=watch_dir,
FNS_PROCESSED_DIRECTORY=processed_dir,
FNS_FAILED_DIRECTORY=failed_dir,
):
result = FNSUploadService.queue_zip_archive(
archive_file=archive_upload,
requested_by_id=self.admin.id,
)
self.assertEqual(result.queued, 2)
self.assertEqual(result.skipped, 0)
self.assertEqual(result.invalid, 2)
self.assertEqual(FinancialReport.objects.count(), 2)
self.assertEqual(FinancialReportLine.objects.count(), 2)
def test_queue_zip_archive_rejects_bad_zip(self):
archive_upload = SimpleUploadedFile(
"fin_ropk.zip",
b"not-a-zip",
content_type="application/zip",
)
with self.assertRaisesMessage(
ValueError,
"Загруженный файл не является корректным ZIP архивом",
):
FNSUploadService.queue_zip_archive(
archive_file=archive_upload,
requested_by_id=self.admin.id,
)