Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 3m10s
CI/CD Pipeline / Run Tests (push) Successful in 3m35s
CI/CD Pipeline / Telegram Notify Success (push) Has been skipped
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 2m26s
CI/CD Pipeline / Run Tests (pull_request) Successful in 2m46s
CI/CD Pipeline / Telegram Notify Success (pull_request) Has been skipped
169 lines
6.5 KiB
Python
169 lines
6.5 KiB
Python
"""Tests for async backups export API."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
from pathlib import Path
|
|
from tempfile import TemporaryDirectory
|
|
from unittest.mock import Mock, patch
|
|
|
|
from apps.backups.models import BackupExportJob
|
|
from apps.backups.services import BackupExportJobService
|
|
from django.db import IntegrityError
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from rest_framework import status
|
|
from rest_framework.test import APITestCase
|
|
|
|
from tests.apps.user.factories import UserFactory
|
|
|
|
|
|
class BackupExportViewTest(APITestCase):
|
|
"""Tests for async backup export endpoint."""
|
|
|
|
def setUp(self):
|
|
self.user = UserFactory.create_user()
|
|
self.admin = UserFactory.create_superuser()
|
|
self.url = reverse("api_v1:backups:export")
|
|
|
|
def test_export_admin_only(self):
|
|
response = self.client.post(self.url, {}, format="json")
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
self.client.force_authenticate(self.user)
|
|
response = self.client.post(self.url, {}, format="json")
|
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
|
|
|
@patch("apps.backups.tasks.generate_backup_for_date.delay")
|
|
def test_export_starts_job_when_absent(self, delay_mock):
|
|
delay_mock.return_value = Mock(id="task-backup-1")
|
|
|
|
self.client.force_authenticate(self.admin)
|
|
today = timezone.localdate()
|
|
response = self.client.post(
|
|
self.url,
|
|
{"actual_date": today.isoformat()},
|
|
format="json",
|
|
)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
|
|
self.assertEqual(response.data["status"], "started")
|
|
self.assertEqual(response.data["task_id"], "task-backup-1")
|
|
|
|
job = BackupExportJob.objects.get(actual_date=today)
|
|
self.assertEqual(job.status, BackupExportJob.Status.PENDING)
|
|
self.assertEqual(job.task_id, "task-backup-1")
|
|
delay_mock.assert_called_once_with(job_id=job.id)
|
|
|
|
@patch("apps.backups.tasks.generate_backup_for_date.delay")
|
|
def test_export_returns_wait_when_job_in_progress(self, delay_mock):
|
|
today = timezone.localdate()
|
|
BackupExportJob.objects.create(
|
|
actual_date=today,
|
|
status=BackupExportJob.Status.STARTED,
|
|
task_id="task-running-1",
|
|
)
|
|
|
|
self.client.force_authenticate(self.admin)
|
|
response = self.client.post(
|
|
self.url,
|
|
{"actual_date": today.isoformat()},
|
|
format="json",
|
|
)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
|
|
self.assertEqual(response.data["status"], "wait")
|
|
self.assertEqual(response.data["task_id"], "task-running-1")
|
|
delay_mock.assert_not_called()
|
|
|
|
@patch("apps.backups.tasks.generate_backup_for_date.delay")
|
|
def test_export_returns_file_and_deletes_after_download(self, delay_mock):
|
|
with TemporaryDirectory() as tmp_dir:
|
|
tmp_path = Path(tmp_dir)
|
|
archive_bytes = b"zip-content"
|
|
archive_path = tmp_path / "backup.zip"
|
|
archive_path.write_bytes(archive_bytes)
|
|
|
|
today = timezone.localdate()
|
|
job = BackupExportJob.objects.create(
|
|
actual_date=today,
|
|
status=BackupExportJob.Status.SUCCESS,
|
|
task_id="task-success-1",
|
|
archive_path=str(archive_path),
|
|
archive_filename="backup.zip",
|
|
checksum_filename="backup.zip.sha256",
|
|
checksum_sha256=hashlib.sha256(archive_bytes).hexdigest(),
|
|
organizations_count=7,
|
|
)
|
|
|
|
self.client.force_authenticate(self.admin)
|
|
response = self.client.post(
|
|
self.url,
|
|
{"actual_date": today.isoformat()},
|
|
format="json",
|
|
)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertEqual(response.content, archive_bytes)
|
|
self.assertEqual(response["Content-Type"], "application/zip")
|
|
self.assertIn('attachment; filename="backup.zip"', response["Content-Disposition"])
|
|
self.assertEqual(response["X-Backup-Organizations"], "7")
|
|
self.assertEqual(
|
|
response["X-Backup-SHA256"],
|
|
hashlib.sha256(archive_bytes).hexdigest(),
|
|
)
|
|
|
|
self.assertFalse(archive_path.exists())
|
|
self.assertFalse(BackupExportJob.objects.filter(id=job.id).exists())
|
|
delay_mock.assert_not_called()
|
|
|
|
@patch("apps.backups.tasks.generate_backup_for_date.delay")
|
|
def test_export_restarts_when_success_job_has_no_file(self, delay_mock):
|
|
delay_mock.return_value = Mock(id="task-backup-retry")
|
|
today = timezone.localdate()
|
|
with TemporaryDirectory() as tmp_dir:
|
|
missing_archive_path = Path(tmp_dir) / "non-existent-backup.zip"
|
|
BackupExportJob.objects.create(
|
|
actual_date=today,
|
|
status=BackupExportJob.Status.SUCCESS,
|
|
task_id="task-old",
|
|
archive_path=str(missing_archive_path),
|
|
archive_filename="non-existent-backup.zip",
|
|
)
|
|
|
|
self.client.force_authenticate(self.admin)
|
|
response = self.client.post(
|
|
self.url,
|
|
{"actual_date": today.isoformat()},
|
|
format="json",
|
|
)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
|
|
self.assertEqual(response.data["status"], "started")
|
|
self.assertEqual(response.data["task_id"], "task-backup-retry")
|
|
delay_mock.assert_called_once()
|
|
|
|
@patch("apps.backups.services.BackupExportJob.objects.create")
|
|
def test_check_or_start_job_handles_integrity_race(self, create_mock):
|
|
today = timezone.localdate()
|
|
concurrent_job = BackupExportJob(
|
|
actual_date=today,
|
|
status=BackupExportJob.Status.STARTED,
|
|
task_id="task-race-running",
|
|
)
|
|
concurrent_job.save()
|
|
create_mock.side_effect = IntegrityError("duplicate key value")
|
|
|
|
with patch.object(
|
|
BackupExportJobService,
|
|
"_get_job_for_update",
|
|
side_effect=[None, concurrent_job],
|
|
):
|
|
result = BackupExportJobService.check_or_start_job(
|
|
actual_date=today,
|
|
requested_by_id=self.admin.id,
|
|
)
|
|
|
|
self.assertEqual(result.action, "wait")
|
|
self.assertEqual(result.task_id, "task-race-running")
|