"""Integration tests for FNS upload flow (no mocks).""" import io import os import tempfile import time import zipfile from unittest.mock import patch from apps.core.models import BackgroundJob from apps.parsers.fns_upload import FNSUploadService from apps.parsers.models import FinancialReport, FinancialReportLine from django.core.files.uploadedfile import SimpleUploadedFile from django.test import override_settings from django.urls import reverse from openpyxl import Workbook from rest_framework import status from rest_framework.test import APITestCase from tests.apps.user.factories import UserFactory from tests.utils.fixtures import fake def _digits(length: int) -> str: return "".join(str(fake.random_int(0, 9)) for _ in range(length)) def _build_fns_excel_bytes() -> bytes: wb = Workbook() ws = wb.active year = fake.random_int(min=2020, max=2025) ws.append(["Форма №1", None, year, None]) ws.append([None, "Код", "Начало", "Конец"]) ws.append( [fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)] ) buf = io.BytesIO() wb.save(buf) wb.close() return buf.getvalue() def _build_fns_zip_bytes(file_map: dict[str, bytes]) -> bytes: buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive: for file_name, content in file_map.items(): archive.writestr(file_name, content) return buffer.getvalue() class FNSUploadIntegrationTest(APITestCase): """Tests real upload + processing of FNS files.""" def setUp(self): self.user = UserFactory.create_user() self.admin = UserFactory.create_user(is_staff=True) self.other = UserFactory.create_user() self.client.force_authenticate(self.admin) self.upload_url = reverse("api_v1:fns:fns-upload") def _dirs(self, base_dir: str) -> tuple[str, str, str]: watch_dir = os.path.join(base_dir, "watch") processed_dir = os.path.join(base_dir, "processed") failed_dir = os.path.join(base_dir, "failed") return watch_dir, processed_dir, failed_dir def test_upload_processes_file_and_moves_to_processed(self): content = _build_fns_excel_bytes() external_id = _digits(5) ogrn = _digits(13) filename = f"fin_{external_id}_{ogrn}.xlsx" upload = SimpleUploadedFile( filename, content, content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart" ) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(response.data["queued"], 1) self.assertEqual(response.data["skipped"], 0) self.assertEqual(FinancialReport.objects.count(), 1) report = FinancialReport.objects.first() self.assertEqual(report.external_id, external_id) self.assertEqual(report.ogrn, ogrn) self.assertTrue(FinancialReportLine.objects.filter(report=report).exists()) processed_path = os.path.join(processed_dir, filename) self.assertTrue(os.path.exists(processed_path)) self.assertFalse(os.path.exists(os.path.join(watch_dir, filename))) self.assertFalse( os.path.exists(os.path.join(watch_dir, f"{filename}.lock")) ) def test_upload_duplicate_is_skipped(self): content = _build_fns_excel_bytes() external_id = _digits(3) ogrn = _digits(13) filename = f"fin_{external_id}_{ogrn}.xlsx" upload1 = SimpleUploadedFile( filename, content, content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) upload2 = SimpleUploadedFile( filename, content, content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ): first = self.client.post( self.upload_url, {"files": [upload1]}, format="multipart" ) second = self.client.post( self.upload_url, {"files": [upload2]}, format="multipart" ) self.assertEqual(first.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(second.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(second.data["queued"], 0) self.assertEqual(second.data["skipped"], 1) self.assertEqual(FinancialReport.objects.count(), 1) self.assertFalse(os.path.exists(os.path.join(watch_dir, filename))) self.assertFalse( os.path.exists(os.path.join(watch_dir, f"{filename}.lock")) ) def test_upload_creates_owned_background_job(self): content = _build_fns_excel_bytes() external_id = _digits(5) ogrn = _digits(13) filename = f"fin_{external_id}_{ogrn}.xlsx" upload = SimpleUploadedFile( filename, content, content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart" ) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) task_id = response.data["task_ids"][0] job = BackgroundJob.objects.get(task_id=task_id) self.assertEqual(job.user_id, self.admin.id) jobs_response = self.client.get(reverse("api_v1:jobs:job-list")) self.assertEqual(jobs_response.status_code, status.HTTP_200_OK) self.assertEqual(len(jobs_response.data["results"]), 1) self.assertEqual(jobs_response.data["results"][0]["task_id"], task_id) other_client = self.client_class() other_client.force_authenticate(self.other) status_response = other_client.get( reverse("api_v1:jobs:job-status", kwargs={"task_id": task_id}) ) self.assertEqual(status_response.status_code, status.HTTP_403_FORBIDDEN) def test_upload_invalid_filename_rejected(self): content = _build_fns_excel_bytes() upload = SimpleUploadedFile( f"{fake.word()}_{fake.random_int()}.xlsx", content, content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart" ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertEqual(FinancialReport.objects.count(), 0) def test_upload_skips_when_lock_is_fresh(self): content = _build_fns_excel_bytes() external_id = _digits(5) ogrn = _digits(13) filename = f"fin_{external_id}_{ogrn}.xlsx" upload = SimpleUploadedFile( filename, content, content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) os.makedirs(watch_dir, exist_ok=True) lock_path = os.path.join(watch_dir, f"{filename}.lock") with open(lock_path, "w") as handle: handle.write("lock") now = time.time() os.utime(lock_path, (now, now)) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, FNS_LOCK_TTL_SECONDS=3600, ): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart" ) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(response.data["queued"], 0) self.assertEqual(response.data["skipped"], 1) def test_regular_user_cannot_upload(self): self.client.force_authenticate(self.user) upload = SimpleUploadedFile( f"fin_{_digits(5)}_{_digits(13)}.xlsx", _build_fns_excel_bytes(), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart" ) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_upload_skips_when_file_already_exists(self): content = _build_fns_excel_bytes() external_id = _digits(5) ogrn = _digits(13) filename = f"fin_{external_id}_{ogrn}.xlsx" upload = SimpleUploadedFile( filename, content, content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) os.makedirs(watch_dir, exist_ok=True) existing_path = os.path.join(watch_dir, filename) with open(existing_path, "wb") as handle: handle.write(b"existing") with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart" ) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(response.data["queued"], 0) self.assertEqual(response.data["skipped"], 1) def test_upload_removes_stale_lock_and_queues_file(self): content = _build_fns_excel_bytes() external_id = _digits(5) ogrn = _digits(13) filename = f"fin_{external_id}_{ogrn}.xlsx" upload = SimpleUploadedFile( filename, content, content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) os.makedirs(watch_dir, exist_ok=True) lock_path = os.path.join(watch_dir, f"{filename}.lock") with open(lock_path, "w") as handle: handle.write("lock") stale = time.time() - 7200 os.utime(lock_path, (stale, stale)) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, FNS_LOCK_TTL_SECONDS=1, ): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart" ) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(response.data["queued"], 1) self.assertEqual(response.data["skipped"], 0) def test_upload_skips_when_lock_creation_races(self): upload = SimpleUploadedFile( f"fin_{_digits(5)}_{_digits(13)}.xlsx", _build_fns_excel_bytes(), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ), patch("apps.parsers.fns_upload.Path.touch", side_effect=FileExistsError): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart", ) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(response.data["queued"], 0) self.assertEqual(response.data["skipped"], 1) def test_upload_cleans_up_lock_when_file_write_fails(self): filename = f"fin_{_digits(5)}_{_digits(13)}.xlsx" upload = SimpleUploadedFile( filename, _build_fns_excel_bytes(), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ), patch( "apps.parsers.fns_upload.Path.write_bytes", side_effect=OSError("disk full"), ): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart", ) self.assertEqual( response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR ) self.assertFalse( os.path.exists(os.path.join(watch_dir, f"{filename}.lock")) ) def test_upload_deletes_background_job_when_task_enqueue_fails(self): filename = f"fin_{_digits(5)}_{_digits(13)}.xlsx" upload = SimpleUploadedFile( filename, _build_fns_excel_bytes(), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ), patch( "apps.parsers.fns_upload.uuid.uuid4", return_value="job-task-id" ), patch( "apps.parsers.fns_upload.process_fns_file.apply_async", side_effect=RuntimeError("queue down"), ): response = self.client.post( self.upload_url, {"files": [upload]}, format="multipart", ) self.assertEqual( response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR ) self.assertFalse( BackgroundJob.objects.filter(task_id="job-task-id").exists() ) self.assertFalse( os.path.exists(os.path.join(watch_dir, f"{filename}.lock")) ) def test_queue_zip_archive_processes_valid_files_and_skips_invalid(self): first_name = f"fin_{_digits(5)}_{_digits(13)}.xlsx" second_name = f"fin_{_digits(5)}_{_digits(13)}.xlsx" zip_content = _build_fns_zip_bytes( { first_name: _build_fns_excel_bytes(), second_name: _build_fns_excel_bytes(), "nested/fin_0000001_1234567890123.xlsx": _build_fns_excel_bytes(), "readme.txt": b"invalid", } ) archive_upload = SimpleUploadedFile( "fin_ropk.zip", zip_content, content_type="application/zip", ) with tempfile.TemporaryDirectory() as tmpdir: watch_dir, processed_dir, failed_dir = self._dirs(tmpdir) with override_settings( FNS_WATCH_DIRECTORY=watch_dir, FNS_PROCESSED_DIRECTORY=processed_dir, FNS_FAILED_DIRECTORY=failed_dir, ): result = FNSUploadService.queue_zip_archive( archive_file=archive_upload, requested_by_id=self.admin.id, ) self.assertEqual(result.queued, 2) self.assertEqual(result.skipped, 0) self.assertEqual(result.invalid, 2) self.assertEqual(FinancialReport.objects.count(), 2) self.assertEqual(FinancialReportLine.objects.count(), 2) def test_queue_zip_archive_rejects_bad_zip(self): archive_upload = SimpleUploadedFile( "fin_ropk.zip", b"not-a-zip", content_type="application/zip", ) with self.assertRaisesMessage( ValueError, "Загруженный файл не является корректным ZIP архивом", ): FNSUploadService.queue_zip_archive( archive_file=archive_upload, requested_by_id=self.admin.id, )