Files
mostovik-backend/tests/apps/backups/test_services.py
Aleksandr Meshchriakov 25176f31b4
Some checks failed
CI/CD Pipeline / Telegram Notify Success (push) Has been cancelled
CI/CD Pipeline / Run Tests (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (pull_request) Successful in 1m42s
CI/CD Pipeline / Run Tests (pull_request) Successful in 2m25s
CI/CD Pipeline / Telegram Notify Success (pull_request) Successful in 1m34s
fix pre-commit
2026-03-17 13:55:34 +01:00

506 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import base64
import json
import struct
import zlib
from datetime import date, datetime
from decimal import Decimal
from io import BytesIO
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
from uuid import uuid4
from zipfile import ZipFile
from apps.backups.models import BackupExportJob
from apps.backups.services import (
BackupArtifact,
BackupExportError,
BackupExportJobService,
BackupExportService,
)
from apps.parsers.models import FinancialReport, FinancialReportLine
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from django.db import IntegrityError
from django.test import TestCase, override_settings
from tests.apps.parsers.factories import (
IndustrialCertificateRecordFactory,
InspectionRecordFactory,
ManufacturerRecordFactory,
ProcurementRecordFactory,
)
from tests.apps.registers.factories import (
OrganizationFactory,
RegisterFactory,
RegisterUploadFactory,
RegistryMembershipPeriodFactory,
)
from tests.apps.user.factories import UserFactory
TEST_BACKUP_KEY = base64.urlsafe_b64encode(b"k" * 32).decode("ascii").rstrip("=")
def _decode_backup_payload(bin_bytes: bytes) -> tuple[dict, dict]:
header_size = struct.unpack(">I", bin_bytes[5:9])[0]
header = json.loads(bin_bytes[9 : 9 + header_size].decode("utf-8"))
encrypted_payload = bin_bytes[9 + header_size :]
nonce = header["nonce"]
normalized_nonce = nonce + ("=" * (-len(nonce) % 4))
raw_nonce = base64.urlsafe_b64decode(normalized_nonce)
payload_bytes = AESGCM(BackupExportService._read_encryption_key()).decrypt(
raw_nonce,
encrypted_payload,
BackupExportService.AAD,
)
payload = json.loads(zlib.decompress(payload_bytes).decode("utf-8"))
return header, payload
@override_settings(BACKUP_ENCRYPTION_KEY=TEST_BACKUP_KEY, BACKUP_KEY_ID="test-key")
class BackupExportServiceTest(TestCase):
def test_build_backup_archive_raises_when_no_active_organizations(self):
with self.assertRaisesMessage(
BackupExportError,
"Нет актуальных организаций для экспорта",
):
BackupExportService.build_backup_archive(actual_date=date(2026, 3, 1))
def test_build_backup_archive_exports_zip_and_payload(self):
registry = RegisterFactory(name="Main registry")
active_upload = RegisterUploadFactory(
registry=registry,
actual_date=date(2026, 3, 1),
)
organization = OrganizationFactory(
pn_name="Active Org",
mn_ogrn=10_277_001_189_840,
mn_inn=7_702_000_000,
mn_okpo="12345678",
)
RegistryMembershipPeriodFactory(
registry=registry,
organization=organization,
started_at=date(2026, 3, 1),
started_by_upload=active_upload,
)
inactive_org = OrganizationFactory(
pn_name="Inactive Org",
mn_ogrn=10_277_001_189_841,
mn_inn=7_702_000_001,
mn_okpo="87654321",
)
old_upload = RegisterUploadFactory(
registry=registry,
actual_date=date(2026, 2, 1),
)
RegistryMembershipPeriodFactory(
registry=registry,
organization=inactive_org,
started_at=date(2026, 2, 1),
ended_at=date(2026, 2, 20),
started_by_upload=old_upload,
ended_by_upload=active_upload,
)
IndustrialCertificateRecordFactory(
registry_organization=organization,
inn=str(organization.mn_inn),
ogrn=str(organization.mn_ogrn),
)
ManufacturerRecordFactory(
registry_organization=organization,
inn=str(organization.mn_inn),
ogrn=str(organization.mn_ogrn),
)
InspectionRecordFactory(
registry_organization=organization,
inn=str(organization.mn_inn),
ogrn=str(organization.mn_ogrn),
)
ProcurementRecordFactory(
registry_organization=organization,
customer_inn=str(organization.mn_inn),
customer_ogrn=str(organization.mn_ogrn),
)
report = FinancialReport.objects.create(
external_id="100500",
ogrn=str(organization.mn_ogrn),
registry_organization=organization,
file_name="fin_100500_10277001189840.xlsx",
file_hash="f" * 64,
load_batch=1,
status=FinancialReport.Status.SUCCESS,
source=FinancialReport.SourceType.API,
)
FinancialReportLine.objects.create(
report=report,
form_code="1",
line_code="1100",
line_name="Assets",
year=2025,
period_start=100,
period_end=200,
)
artifact = BackupExportService.build_backup_archive(
actual_date=date(2026, 3, 15)
)
self.assertIsInstance(artifact, BackupArtifact)
self.assertEqual(artifact.organizations_count, 1)
self.assertEqual(artifact.actual_date, date(2026, 3, 15))
self.assertTrue(artifact.archive_filename.endswith(".zip"))
self.assertTrue(artifact.bin_filename.endswith(".bin"))
self.assertTrue(artifact.checksum_filename.endswith(".sha256"))
with ZipFile(BytesIO(artifact.archive_bytes)) as archive:
self.assertEqual(
sorted(archive.namelist()),
sorted([artifact.bin_filename, artifact.checksum_filename]),
)
checksum_content = archive.read(artifact.checksum_filename).decode("utf-8")
bin_bytes = archive.read(artifact.bin_filename)
self.assertTrue(bin_bytes.startswith(BackupExportService.MAGIC))
self.assertIn(artifact.checksum_sha256, checksum_content)
self.assertIn(artifact.bin_filename, checksum_content)
header, payload = _decode_backup_payload(bin_bytes)
self.assertEqual(header["format"], "mostovik-backup-bin")
self.assertEqual(header["key_id"], "test-key")
self.assertEqual(payload["format"], "mostovik-backup-payload")
self.assertEqual(payload["actual_date"], "2026-03-15")
self.assertEqual(payload["organizations_count"], 1)
self.assertEqual(len(payload["data"]["registers.Organization"]), 1)
self.assertEqual(
payload["data"]["registers.Organization"][0]["pn_name"], "Active Org"
)
self.assertEqual(len(payload["data"]["parsers.FinancialReportLine"]), 1)
def test_normalize_value_supports_scalar_types(self):
random_uuid = uuid4()
self.assertEqual(
BackupExportService._normalize_value(date(2026, 3, 15)),
"2026-03-15",
)
self.assertEqual(
BackupExportService._normalize_value(datetime(2026, 3, 15, 12, 30, 0)),
"2026-03-15T12:30:00",
)
self.assertEqual(
BackupExportService._normalize_value(Decimal("12.34")),
"12.34",
)
self.assertEqual(
BackupExportService._normalize_value(random_uuid),
str(random_uuid),
)
self.assertEqual(
BackupExportService._normalize_value(b"payload"),
{
"__type__": "bytes",
"base64": base64.b64encode(b"payload").decode("ascii"),
},
)
self.assertEqual(BackupExportService._normalize_value("plain"), "plain")
@override_settings(BACKUP_ENCRYPTION_KEY="")
def test_read_encryption_key_requires_setting(self):
with self.assertRaisesMessage(
BackupExportError,
"Не задан BACKUP_ENCRYPTION_KEY в настройках",
):
BackupExportService._read_encryption_key()
def test_read_encryption_key_rejects_invalid_base64(self):
with patch(
"apps.backups.services.base64.urlsafe_b64decode",
side_effect=ValueError("bad base64"),
), self.assertRaisesMessage(
BackupExportError,
"BACKUP_ENCRYPTION_KEY должен быть base64-url кодированным ключом",
):
BackupExportService._read_encryption_key()
@override_settings(
BACKUP_ENCRYPTION_KEY=base64.urlsafe_b64encode(b"short").decode("ascii")
)
def test_read_encryption_key_requires_32_bytes(self):
with self.assertRaisesMessage(
BackupExportError,
"BACKUP_ENCRYPTION_KEY после декодирования должен быть 32 байта",
):
BackupExportService._read_encryption_key()
def test_build_bin_container_rejects_oversized_header(self):
class HugeBytes(bytes):
def __len__(self):
return 2**32
class HugeJson(str):
def encode(self, *_args, **_kwargs):
return HugeBytes(b"{}")
with patch(
"apps.backups.services.json.dumps", return_value=HugeJson("{}")
), self.assertRaisesMessage(
BackupExportError,
"Заголовок backup контейнера слишком большой",
):
BackupExportService._build_bin_container(
encrypted_payload=b"payload",
header_payload={},
)
class BackupExportJobServiceTest(TestCase):
def test_result_for_existing_job_returns_wait_and_download(self):
today = date(2026, 3, 15)
pending_job = BackupExportJob.objects.create(
actual_date=today,
status=BackupExportJob.Status.PENDING,
task_id="task-pending",
)
wait_result = BackupExportJobService._result_for_existing_job(
actual_date=today,
job=pending_job,
)
self.assertEqual(wait_result.action, "wait")
self.assertEqual(wait_result.task_id, "task-pending")
with TemporaryDirectory() as tmp_dir:
archive_path = Path(tmp_dir) / "backup.zip"
archive_path.write_bytes(b"zip")
pending_job.status = BackupExportJob.Status.SUCCESS
pending_job.archive_path = str(archive_path)
pending_job.save(update_fields=["status", "archive_path", "updated_at"])
download_result = BackupExportJobService._result_for_existing_job(
actual_date=today,
job=pending_job,
)
self.assertEqual(download_result.action, "download")
def test_enqueue_backup_task_calls_celery(self):
with patch(
"apps.backups.tasks.generate_backup_for_date.apply_async"
) as apply_async:
BackupExportJobService._enqueue_backup_task(job_id=5, task_id="task-5")
apply_async.assert_called_once_with(kwargs={"job_id": 5}, task_id="task-5")
def test_consume_ready_archive_raises_when_job_missing_or_not_ready(self):
with self.assertRaisesMessage(BackupExportError, "Задача бэкапа не найдена"):
BackupExportJobService.consume_ready_archive(actual_date=date(2026, 3, 20))
job = BackupExportJob.objects.create(
actual_date=date(2026, 3, 21),
status=BackupExportJob.Status.STARTED,
task_id="task-started",
)
with self.assertRaisesMessage(BackupExportError, "Бэкап еще не готов"):
BackupExportJobService.consume_ready_archive(actual_date=job.actual_date)
def test_consume_ready_archive_deletes_job_when_file_missing(self):
with TemporaryDirectory() as tmp_dir:
missing_archive = Path(tmp_dir) / "does-not-exist.zip"
job = BackupExportJob.objects.create(
actual_date=date(2026, 3, 22),
status=BackupExportJob.Status.SUCCESS,
task_id="task-success",
archive_path=str(missing_archive),
)
with self.assertRaisesMessage(
BackupExportError,
"Файл бэкапа отсутствует, запустите формирование снова",
):
BackupExportJobService.consume_ready_archive(
actual_date=job.actual_date
)
self.assertTrue(BackupExportJob.objects.filter(id=job.id).exists())
def test_consume_ready_archive_reads_file_and_uses_path_name_as_fallback(self):
with TemporaryDirectory() as tmp_dir:
archive_path = Path(tmp_dir) / "backup-export.zip"
archive_bytes = b"archive-bytes"
archive_path.write_bytes(archive_bytes)
job = BackupExportJob.objects.create(
actual_date=date(2026, 3, 23),
status=BackupExportJob.Status.SUCCESS,
task_id="task-success-2",
archive_path=str(archive_path),
archive_filename="",
checksum_filename="backup-export.zip.sha256",
organizations_count=3,
)
artifact = BackupExportJobService.consume_ready_archive(
actual_date=job.actual_date
)
self.assertEqual(artifact.archive_bytes, archive_bytes)
self.assertEqual(artifact.archive_filename, "backup-export.zip")
self.assertEqual(artifact.organizations_count, 3)
self.assertFalse(BackupExportJob.objects.filter(id=job.id).exists())
def test_check_or_start_job_replaces_stale_failed_job(self):
today = date(2026, 3, 24)
user = UserFactory.create_user()
with TemporaryDirectory() as tmp_dir:
stale_path = Path(tmp_dir) / "stale.zip"
stale_path.write_bytes(b"stale")
stale_job = BackupExportJob.objects.create(
actual_date=today,
status=BackupExportJob.Status.FAILURE,
task_id="stale-task",
archive_path=str(stale_path),
)
with patch(
"apps.backups.services.uuid.uuid4", return_value="new-task-id"
), patch.object(
BackupExportJobService,
"_enqueue_backup_task",
) as enqueue_mock, self.captureOnCommitCallbacks(execute=True):
result = BackupExportJobService.check_or_start_job(
actual_date=today,
requested_by_id=user.id,
)
self.assertEqual(result.action, "started")
self.assertEqual(result.task_id, "new-task-id")
self.assertFalse(BackupExportJob.objects.filter(id=stale_job.id).exists())
self.assertFalse(stale_path.exists())
new_job = BackupExportJob.objects.get(actual_date=today)
self.assertEqual(new_job.task_id, "new-task-id")
enqueue_mock.assert_called_once_with(job_id=new_job.id, task_id="new-task-id")
def test_check_or_start_job_retries_create_after_integrity_error_with_stale_job(
self,
):
today = date(2026, 3, 26)
user = UserFactory.create_user()
stale_job = BackupExportJob.objects.create(
actual_date=today,
status=BackupExportJob.Status.FAILURE,
task_id="stale-task",
)
original_create = BackupExportJob.objects.create
def create_side_effect(*args, **kwargs):
if not hasattr(create_side_effect, "called"):
create_side_effect.called = True
raise IntegrityError("duplicate")
return original_create(*args, **kwargs)
with patch.object(
BackupExportJobService,
"_get_job_for_update",
side_effect=[None, stale_job],
), patch.object(
BackupExportJobService,
"_result_for_existing_job",
return_value=None,
), patch(
"apps.backups.services.BackupExportJob.objects.create",
side_effect=create_side_effect,
), patch(
"apps.backups.services.uuid.uuid4",
return_value="retry-task-id",
), patch.object(
BackupExportJobService,
"_enqueue_backup_task",
) as enqueue_mock, self.captureOnCommitCallbacks(execute=True):
result = BackupExportJobService.check_or_start_job(
actual_date=today,
requested_by_id=user.id,
)
self.assertEqual(result.action, "started")
self.assertEqual(result.task_id, "retry-task-id")
self.assertFalse(BackupExportJob.objects.filter(id=stale_job.id).exists())
new_job = BackupExportJob.objects.get(actual_date=today)
self.assertEqual(new_job.task_id, "retry-task-id")
enqueue_mock.assert_called_once_with(job_id=new_job.id, task_id="retry-task-id")
def test_check_or_start_job_retries_create_after_integrity_error_without_concurrent_job(
self,
):
today = date(2026, 3, 28)
user = UserFactory.create_user()
original_create = BackupExportJob.objects.create
def create_side_effect(*args, **kwargs):
if not hasattr(create_side_effect, "called"):
create_side_effect.called = True
raise IntegrityError("duplicate")
return original_create(*args, **kwargs)
with patch.object(
BackupExportJobService,
"_get_job_for_update",
side_effect=[None, None],
), patch.object(
BackupExportJobService,
"_result_for_existing_job",
return_value=None,
), patch(
"apps.backups.services.BackupExportJob.objects.create",
side_effect=create_side_effect,
), patch(
"apps.backups.services.uuid.uuid4",
return_value="retry-task-id-2",
), patch.object(
BackupExportJobService,
"_enqueue_backup_task",
) as enqueue_mock, self.captureOnCommitCallbacks(execute=True):
result = BackupExportJobService.check_or_start_job(
actual_date=today,
requested_by_id=user.id,
)
self.assertEqual(result.action, "started")
self.assertEqual(result.task_id, "retry-task-id-2")
new_job = BackupExportJob.objects.get(actual_date=today)
self.assertEqual(new_job.task_id, "retry-task-id-2")
enqueue_mock.assert_called_once_with(
job_id=new_job.id, task_id="retry-task-id-2"
)
def test_archive_exists_and_cleanup_job_artifact(self):
with TemporaryDirectory() as tmp_dir:
archive_path = Path(tmp_dir) / "backup.zip"
archive_path.write_bytes(b"zip")
job = BackupExportJob.objects.create(
actual_date=date(2026, 3, 25),
status=BackupExportJob.Status.SUCCESS,
task_id="task-cleanup",
archive_path=str(archive_path),
)
self.assertTrue(BackupExportJobService._archive_exists(job))
BackupExportJobService._cleanup_job_artifact(job)
self.assertFalse(archive_path.exists())
def test_cleanup_job_artifact_is_noop_without_archive_path(self):
job = BackupExportJob.objects.create(
actual_date=date(2026, 3, 27),
status=BackupExportJob.Status.FAILURE,
task_id="task-no-artifact",
archive_path="",
)
BackupExportJobService._cleanup_job_artifact(job)
self.assertTrue(BackupExportJob.objects.filter(id=job.id).exists())