feat: add parser source dashboard and scheduling
Some checks failed
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 9s
CI/CD Pipeline / Run Tests (pull_request) Failing after 48s
CI/CD Pipeline / Build Docker Images (pull_request) Has been skipped
CI/CD Pipeline / Push to Gitea Registry (pull_request) Has been skipped

This commit is contained in:
2026-04-27 23:36:28 +02:00
parent 199d871923
commit c9b350da1e
96 changed files with 15014 additions and 268 deletions

View File

@@ -0,0 +1 @@
"""Tests for backups app."""

View File

@@ -0,0 +1,131 @@
"""Tests for encrypted backup export."""
import json
import struct
from datetime import date
from io import BytesIO
from zipfile import ZipFile
from apps.backups.models import BackupExportJob
from apps.backups.services import BackupExportService
from apps.core.models import BackgroundJob, JobStatus
from apps.parsers.models import ParserLoadLog
from apps.registers.models import Register
from apps.user.services import UserService
from django.test import override_settings
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APITestCase
from tests.apps.parsers.factories import GenericParserRecordFactory
from tests.apps.registers.factories import (
OrganizationFactory,
RegistryMembershipPeriodFactory,
)
from tests.apps.user.factories import UserFactory
@override_settings(
BACKUP_ENCRYPTION_KEY="MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDA=",
CELERY_TASK_ALWAYS_EAGER=True,
CELERY_TASK_EAGER_PROPAGATES=True,
)
class BackupExportTest(APITestCase):
"""Tests for registry backup service and API."""
def setUp(self):
self.admin = UserFactory.create_user(is_staff=True)
self.user = UserFactory.create_user()
self.registry, _ = Register.objects.get_or_create(
name="Реестр предприятий ОПК"
)
self.organization = OrganizationFactory(
pn_name='АО "ОПК"',
mn_ogrn=1027600980990,
mn_inn=7601000086,
mn_okpo="07506197",
)
RegistryMembershipPeriodFactory(
registry=self.registry,
organization=self.organization,
)
def authenticate(self, user):
tokens = UserService.get_tokens_for_user(user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
def test_backup_archive_contains_registry_payload_header(self):
"""Test service creates ZIP with encrypted bin and metadata header."""
GenericParserRecordFactory(
source=ParserLoadLog.Source.FNS_FINANCIAL,
inn=str(self.organization.mn_inn),
ogrn=str(self.organization.mn_ogrn),
)
artifact = BackupExportService.build_backup_archive(
actual_date=date(2026, 4, 27),
registry=self.registry,
)
with ZipFile(BytesIO(artifact.archive_bytes)) as archive:
names = archive.namelist()
bin_name = next(name for name in names if name.endswith(".bin"))
self.assertTrue(any(name.endswith(".sha256") for name in names))
bin_payload = archive.read(bin_name)
self.assertEqual(bin_payload[:4], BackupExportService.MAGIC)
header_length = struct.unpack(">I", bin_payload[5:9])[0]
header = json.loads(bin_payload[9 : 9 + header_length])
self.assertEqual(header["actual_date"], "2026-04-27")
self.assertEqual(header["registry_name"], "Реестр предприятий ОПК")
self.assertEqual(header["organizations_count"], 1)
self.assertEqual(artifact.organizations_count, 1)
def test_backup_export_requires_admin(self):
"""Test regular users cannot start backup export."""
self.authenticate(self.user)
response = self.client.post(
reverse("api_v1:backups:export"),
{
"actual_date": "2026-04-27",
"registry": str(self.registry.id),
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_backup_export_starts_task_then_downloads_ready_archive(self):
"""Test endpoint starts Celery task and returns archive on repeated call."""
self.authenticate(self.admin)
payload = {
"actual_date": "2026-04-27",
"registry": str(self.registry.id),
}
with self.captureOnCommitCallbacks(execute=True):
first_response = self.client.post(
reverse("api_v1:backups:export"),
payload,
format="json",
)
second_response = self.client.post(
reverse("api_v1:backups:export"),
payload,
format="json",
)
self.assertEqual(first_response.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(first_response.data["data"]["status"], "started")
self.assertEqual(second_response.status_code, status.HTTP_200_OK)
self.assertEqual(second_response["Content-Type"], "application/zip")
self.assertEqual(
second_response["X-Backup-Actual-Date"],
"2026-04-27",
)
self.assertFalse(BackupExportJob.objects.exists())
background_job = BackgroundJob.objects.get(
task_id=first_response.data["data"]["task_id"]
)
self.assertEqual(background_job.status, JobStatus.SUCCESS)