- Add Model Mixins: TimestampMixin, SoftDeleteMixin, AuditMixin, etc. - Add Base Services: BaseService, BulkOperationsMixin, QueryOptimizerMixin - Add Base ViewSets with bulk operations - Add BackgroundJob model for Celery task tracking - Add BaseAppCommand for management commands - Add permissions, pagination, filters, cache, logging - Migrate tests to factory_boy + faker - Add CHANGELOG.md - 297 tests passing
237 lines
7.9 KiB
Python
237 lines
7.9 KiB
Python
"""Тесты для BackgroundJob."""
|
||
|
||
from apps.core.models import BackgroundJob, JobStatus
|
||
from apps.core.services import BackgroundJobService
|
||
from django.test import TestCase
|
||
from faker import Faker
|
||
|
||
fake = Faker()
|
||
|
||
|
||
class BackgroundJobModelTest(TestCase):
|
||
"""Тесты для модели BackgroundJob."""
|
||
|
||
def test_create_job(self):
|
||
"""Тест создания задачи."""
|
||
task_id = fake.uuid4()
|
||
job = BackgroundJob.objects.create(
|
||
task_id=task_id,
|
||
task_name="apps.test.tasks.my_task",
|
||
)
|
||
self.assertEqual(job.task_id, task_id)
|
||
self.assertEqual(job.status, JobStatus.PENDING)
|
||
self.assertEqual(job.progress, 0)
|
||
|
||
def test_mark_started(self):
|
||
"""Тест отметки о начале выполнения."""
|
||
job = BackgroundJob.objects.create(
|
||
task_id=fake.uuid4(),
|
||
task_name="test.task",
|
||
)
|
||
job.mark_started()
|
||
|
||
self.assertEqual(job.status, JobStatus.STARTED)
|
||
self.assertIsNotNone(job.started_at)
|
||
|
||
def test_update_progress(self):
|
||
"""Тест обновления прогресса."""
|
||
job = BackgroundJob.objects.create(
|
||
task_id=fake.uuid4(),
|
||
task_name="test.task",
|
||
)
|
||
job.update_progress(50, "Обработка данных...")
|
||
|
||
self.assertEqual(job.progress, 50)
|
||
self.assertEqual(job.progress_message, "Обработка данных...")
|
||
|
||
def test_complete(self):
|
||
"""Тест успешного завершения."""
|
||
job = BackgroundJob.objects.create(
|
||
task_id=fake.uuid4(),
|
||
task_name="test.task",
|
||
)
|
||
result = {"processed": 100, "errors": 0}
|
||
job.complete(result=result)
|
||
|
||
self.assertEqual(job.status, JobStatus.SUCCESS)
|
||
self.assertEqual(job.progress, 100)
|
||
self.assertEqual(job.result, result)
|
||
self.assertIsNotNone(job.completed_at)
|
||
|
||
def test_fail(self):
|
||
"""Тест завершения с ошибкой."""
|
||
job = BackgroundJob.objects.create(
|
||
task_id=fake.uuid4(),
|
||
task_name="test.task",
|
||
)
|
||
job.fail("Something went wrong", "Traceback...")
|
||
|
||
self.assertEqual(job.status, JobStatus.FAILURE)
|
||
self.assertEqual(job.error, "Something went wrong")
|
||
self.assertEqual(job.traceback, "Traceback...")
|
||
self.assertIsNotNone(job.completed_at)
|
||
|
||
def test_revoke(self):
|
||
"""Тест отмены задачи."""
|
||
job = BackgroundJob.objects.create(
|
||
task_id=fake.uuid4(),
|
||
task_name="test.task",
|
||
)
|
||
job.revoke()
|
||
|
||
self.assertEqual(job.status, JobStatus.REVOKED)
|
||
self.assertIsNotNone(job.completed_at)
|
||
|
||
def test_is_finished_property(self):
|
||
"""Тест свойства is_finished."""
|
||
job = BackgroundJob.objects.create(
|
||
task_id=fake.uuid4(),
|
||
task_name="test.task",
|
||
)
|
||
self.assertFalse(job.is_finished)
|
||
|
||
job.complete()
|
||
self.assertTrue(job.is_finished)
|
||
|
||
def test_is_successful_property(self):
|
||
"""Тест свойства is_successful."""
|
||
job = BackgroundJob.objects.create(
|
||
task_id=fake.uuid4(),
|
||
task_name="test.task",
|
||
)
|
||
self.assertFalse(job.is_successful)
|
||
|
||
job.complete()
|
||
self.assertTrue(job.is_successful)
|
||
|
||
def test_duration_property(self):
|
||
"""Тест свойства duration."""
|
||
job = BackgroundJob.objects.create(
|
||
task_id=fake.uuid4(),
|
||
task_name="test.task",
|
||
)
|
||
self.assertIsNone(job.duration)
|
||
|
||
job.mark_started()
|
||
job.complete()
|
||
self.assertIsNotNone(job.duration)
|
||
self.assertGreaterEqual(job.duration, 0)
|
||
|
||
|
||
class BackgroundJobServiceTest(TestCase):
|
||
"""Тесты для BackgroundJobService."""
|
||
|
||
def test_create_job(self):
|
||
"""Тест создания задачи через сервис."""
|
||
task_id = fake.uuid4()
|
||
job = BackgroundJobService.create_job(
|
||
task_id=task_id,
|
||
task_name="apps.test.tasks.my_task",
|
||
user_id=1,
|
||
meta={"key": "value"},
|
||
)
|
||
self.assertEqual(job.task_id, task_id)
|
||
self.assertEqual(job.user_id, 1)
|
||
self.assertEqual(job.meta, {"key": "value"})
|
||
|
||
def test_get_by_task_id(self):
|
||
"""Тест получения задачи по task_id."""
|
||
task_id = fake.uuid4()
|
||
created_job = BackgroundJobService.create_job(
|
||
task_id=task_id,
|
||
task_name="test.task",
|
||
)
|
||
found_job = BackgroundJobService.get_by_task_id(task_id)
|
||
self.assertEqual(created_job.id, found_job.id)
|
||
|
||
def test_get_by_task_id_not_found(self):
|
||
"""Тест получения несуществующей задачи."""
|
||
from apps.core.exceptions import NotFoundError
|
||
|
||
with self.assertRaises(NotFoundError):
|
||
BackgroundJobService.get_by_task_id("non-existent-task-id")
|
||
|
||
def test_get_by_task_id_or_none(self):
|
||
"""Тест получения задачи или None."""
|
||
result = BackgroundJobService.get_by_task_id_or_none("non-existent")
|
||
self.assertIsNone(result)
|
||
|
||
task_id = fake.uuid4()
|
||
BackgroundJobService.create_job(
|
||
task_id=task_id,
|
||
task_name="test.task",
|
||
)
|
||
result = BackgroundJobService.get_by_task_id_or_none(task_id)
|
||
self.assertIsNotNone(result)
|
||
|
||
def test_get_user_jobs(self):
|
||
"""Тест получения задач пользователя."""
|
||
user_id = 123
|
||
# Создаём несколько задач
|
||
for i in range(3):
|
||
BackgroundJobService.create_job(
|
||
task_id=f"task-{user_id}-{i}",
|
||
task_name="test.task",
|
||
user_id=user_id,
|
||
)
|
||
# И одну задачу другого пользователя
|
||
BackgroundJobService.create_job(
|
||
task_id="task-other-user",
|
||
task_name="test.task",
|
||
user_id=999,
|
||
)
|
||
|
||
jobs = BackgroundJobService.get_user_jobs(user_id)
|
||
self.assertEqual(len(jobs), 3)
|
||
|
||
def test_get_user_jobs_with_status_filter(self):
|
||
"""Тест фильтрации по статусу."""
|
||
user_id = 456
|
||
job1 = BackgroundJobService.create_job(
|
||
task_id="task-pending",
|
||
task_name="test.task",
|
||
user_id=user_id,
|
||
)
|
||
job2 = BackgroundJobService.create_job(
|
||
task_id="task-success",
|
||
task_name="test.task",
|
||
user_id=user_id,
|
||
)
|
||
job2.complete()
|
||
|
||
pending_jobs = BackgroundJobService.get_user_jobs(
|
||
user_id, status=JobStatus.PENDING
|
||
)
|
||
self.assertEqual(len(pending_jobs), 1)
|
||
|
||
success_jobs = BackgroundJobService.get_user_jobs(
|
||
user_id, status=JobStatus.SUCCESS
|
||
)
|
||
self.assertEqual(len(success_jobs), 1)
|
||
|
||
def test_get_active_jobs(self):
|
||
"""Тест получения активных задач."""
|
||
# Создаём задачи с разными статусами
|
||
job_pending = BackgroundJobService.create_job(
|
||
task_id="job-active-pending",
|
||
task_name="test.task",
|
||
)
|
||
job_started = BackgroundJobService.create_job(
|
||
task_id="job-active-started",
|
||
task_name="test.task",
|
||
)
|
||
job_started.mark_started()
|
||
|
||
job_success = BackgroundJobService.create_job(
|
||
task_id="job-active-success",
|
||
task_name="test.task",
|
||
)
|
||
job_success.complete()
|
||
|
||
active_jobs = list(BackgroundJobService.get_active_jobs())
|
||
active_task_ids = [j.task_id for j in active_jobs]
|
||
|
||
self.assertIn("job-active-pending", active_task_ids)
|
||
self.assertIn("job-active-started", active_task_ids)
|
||
self.assertNotIn("job-active-success", active_task_ids)
|