Some checks failed
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 10m20s
CI/CD Pipeline / Run Tests (pull_request) Failing after 11m5s
CI/CD Pipeline / Build Docker Images (pull_request) Has been skipped
CI/CD Pipeline / Push to Gitea Registry (pull_request) Has been skipped
132 lines
4.8 KiB
Python
132 lines
4.8 KiB
Python
"""Tests for encrypted backup export."""
|
||
|
||
import json
|
||
import struct
|
||
from datetime import date
|
||
from io import BytesIO
|
||
from zipfile import ZipFile
|
||
|
||
from apps.backups.models import BackupExportJob
|
||
from apps.backups.services import BackupExportService
|
||
from apps.core.models import BackgroundJob, JobStatus
|
||
from apps.parsers.models import ParserLoadLog
|
||
from apps.registers.models import Register
|
||
from apps.user.services import UserService
|
||
from django.test import override_settings
|
||
from django.urls import reverse
|
||
from rest_framework import status
|
||
from rest_framework.test import APITestCase
|
||
|
||
from tests.apps.parsers.factories import GenericParserRecordFactory
|
||
from tests.apps.registers.factories import (
|
||
OrganizationFactory,
|
||
RegistryMembershipPeriodFactory,
|
||
)
|
||
from tests.apps.user.factories import UserFactory
|
||
|
||
|
||
@override_settings(
|
||
BACKUP_ENCRYPTION_KEY="MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDA=",
|
||
CELERY_TASK_ALWAYS_EAGER=True,
|
||
CELERY_TASK_EAGER_PROPAGATES=True,
|
||
)
|
||
class BackupExportTest(APITestCase):
|
||
"""Tests for registry backup service and API."""
|
||
|
||
def setUp(self):
|
||
self.admin = UserFactory.create_user(is_staff=True)
|
||
self.user = UserFactory.create_user()
|
||
self.registry, _ = Register.objects.get_or_create(
|
||
name="Реестр предприятий ОПК"
|
||
)
|
||
self.organization = OrganizationFactory(
|
||
pn_name='АО "ОПК"',
|
||
mn_ogrn=1027600980990,
|
||
mn_inn=7601000086,
|
||
mn_okpo="07506197",
|
||
)
|
||
RegistryMembershipPeriodFactory(
|
||
registry=self.registry,
|
||
organization=self.organization,
|
||
)
|
||
|
||
def authenticate(self, user):
|
||
tokens = UserService.get_tokens_for_user(user)
|
||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
|
||
|
||
def test_backup_archive_contains_registry_payload_header(self):
|
||
"""Test service creates ZIP with encrypted bin and metadata header."""
|
||
GenericParserRecordFactory(
|
||
source=ParserLoadLog.Source.FNS_FINANCIAL,
|
||
inn=str(self.organization.mn_inn),
|
||
ogrn=str(self.organization.mn_ogrn),
|
||
)
|
||
|
||
artifact = BackupExportService.build_backup_archive(
|
||
actual_date=date(2026, 4, 27),
|
||
registry=self.registry,
|
||
)
|
||
|
||
with ZipFile(BytesIO(artifact.archive_bytes)) as archive:
|
||
names = archive.namelist()
|
||
bin_name = next(name for name in names if name.endswith(".bin"))
|
||
self.assertTrue(any(name.endswith(".sha256") for name in names))
|
||
bin_payload = archive.read(bin_name)
|
||
|
||
self.assertEqual(bin_payload[:4], BackupExportService.MAGIC)
|
||
header_length = struct.unpack(">I", bin_payload[5:9])[0]
|
||
header = json.loads(bin_payload[9 : 9 + header_length])
|
||
self.assertEqual(header["actual_date"], "2026-04-27")
|
||
self.assertEqual(header["registry_name"], "Реестр предприятий ОПК")
|
||
self.assertEqual(header["organizations_count"], 1)
|
||
self.assertEqual(artifact.organizations_count, 1)
|
||
|
||
def test_backup_export_requires_admin(self):
|
||
"""Test regular users cannot start backup export."""
|
||
self.authenticate(self.user)
|
||
|
||
response = self.client.post(
|
||
reverse("api_v1:backups:export"),
|
||
{
|
||
"actual_date": "2026-04-27",
|
||
"registry": str(self.registry.id),
|
||
},
|
||
format="json",
|
||
)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||
|
||
def test_backup_export_starts_task_then_downloads_ready_archive(self):
|
||
"""Test endpoint starts Celery task and returns archive on repeated call."""
|
||
self.authenticate(self.admin)
|
||
payload = {
|
||
"actual_date": "2026-04-27",
|
||
"registry": str(self.registry.id),
|
||
}
|
||
|
||
with self.captureOnCommitCallbacks(execute=True):
|
||
first_response = self.client.post(
|
||
reverse("api_v1:backups:export"),
|
||
payload,
|
||
format="json",
|
||
)
|
||
second_response = self.client.post(
|
||
reverse("api_v1:backups:export"),
|
||
payload,
|
||
format="json",
|
||
)
|
||
|
||
self.assertEqual(first_response.status_code, status.HTTP_202_ACCEPTED)
|
||
self.assertEqual(first_response.data["data"]["status"], "started")
|
||
self.assertEqual(second_response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(second_response["Content-Type"], "application/zip")
|
||
self.assertEqual(
|
||
second_response["X-Backup-Actual-Date"],
|
||
"2026-04-27",
|
||
)
|
||
self.assertFalse(BackupExportJob.objects.exists())
|
||
background_job = BackgroundJob.objects.get(
|
||
task_id=first_response.data["data"]["task_id"]
|
||
)
|
||
self.assertEqual(background_job.status, JobStatus.SUCCESS)
|