Files
mostovik-backend/tests/apps/core/test_celery_module.py
Aleksandr Meshchriakov 25176f31b4
Some checks failed
CI/CD Pipeline / Telegram Notify Success (push) Has been cancelled
CI/CD Pipeline / Run Tests (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (pull_request) Successful in 1m42s
CI/CD Pipeline / Run Tests (pull_request) Successful in 2m25s
CI/CD Pipeline / Telegram Notify Success (pull_request) Successful in 1m34s
fix pre-commit
2026-03-17 13:55:34 +01:00

60 lines
2.1 KiB
Python

from __future__ import annotations
import importlib.util
import os
import sys
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from django.test import SimpleTestCase
CELERY_MODULE_PATH = Path(__file__).resolve().parents[3] / "src" / "core" / "celery.py"
def _load_module(module_name: str):
spec = importlib.util.spec_from_file_location(module_name, CELERY_MODULE_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
class CeleryModuleTest(SimpleTestCase):
def test_import_requires_django_settings_module(self):
with patch.dict(os.environ, {}, clear=True), self.assertRaisesMessage(
RuntimeError,
"DJANGO_SETTINGS_MODULE is not set.",
):
_load_module("isolated_core_celery_missing")
def test_import_runs_startup_checks_for_worker_runtime(self):
app_mock = MagicMock()
app_mock.conf = SimpleNamespace()
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["celery", "-A", "project", "worker"]), patch(
"apps.core.startup_checks.run_startup_checks"
) as checks_mock, patch("celery.Celery", return_value=app_mock):
module = _load_module("isolated_core_celery_worker")
checks_mock.assert_called_once_with(component="celery")
app_mock.config_from_object.assert_called_once_with(
"django.conf:settings",
namespace="CELERY",
)
app_mock.autodiscover_tasks.assert_called_once_with()
self.assertEqual(module.app, app_mock)
def test_debug_task_prints_request(self):
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
module = _load_module("isolated_core_celery_debug")
with patch("builtins.print") as print_mock:
module.debug_task.run()
print_mock.assert_called_once()