123 lines
4.8 KiB
Python
123 lines
4.8 KiB
Python
from __future__ import annotations
|
|
|
|
import importlib.util
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from types import ModuleType, SimpleNamespace
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from django.test import SimpleTestCase
|
|
|
|
CELERY_MODULE_PATH = Path(__file__).resolve().parents[3] / "src" / "core" / "celery.py"
|
|
|
|
|
|
def _load_module(module_name: str):
|
|
spec = importlib.util.spec_from_file_location(module_name, CELERY_MODULE_PATH)
|
|
module = importlib.util.module_from_spec(spec)
|
|
assert spec.loader is not None
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
class CeleryModuleTest(SimpleTestCase):
|
|
def test_import_requires_django_settings_module(self):
|
|
with patch.dict(os.environ, {}, clear=True), self.assertRaisesMessage(
|
|
RuntimeError,
|
|
"DJANGO_SETTINGS_MODULE is not set.",
|
|
):
|
|
_load_module("isolated_core_celery_missing")
|
|
|
|
def test_import_runs_startup_checks_for_worker_runtime(self):
|
|
app_mock = MagicMock()
|
|
app_mock.conf = SimpleNamespace()
|
|
|
|
with patch.dict(
|
|
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
|
|
), patch.object(sys, "argv", ["celery", "-A", "project", "worker"]), patch(
|
|
"apps.core.startup_checks.run_startup_checks"
|
|
) as checks_mock, patch("celery.Celery", return_value=app_mock):
|
|
module = _load_module("isolated_core_celery_worker")
|
|
|
|
checks_mock.assert_called_once_with(component="celery")
|
|
app_mock.config_from_object.assert_called_once_with(
|
|
"django.conf:settings",
|
|
namespace="CELERY",
|
|
)
|
|
app_mock.autodiscover_tasks.assert_called_once_with()
|
|
self.assertEqual(module.app, app_mock)
|
|
self.assertIn(
|
|
"parse-industrial-production-daily", module.app.conf.beat_schedule
|
|
)
|
|
self.assertIn("parse-manufactures-daily", module.app.conf.beat_schedule)
|
|
self.assertIn("parse-industrial-products-daily", module.app.conf.beat_schedule)
|
|
self.assertIn("parse-inspections-weekly", module.app.conf.beat_schedule)
|
|
|
|
def test_startup_refresh_queues_when_lock_acquired(self):
|
|
with patch.dict(
|
|
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
|
|
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
|
|
module = _load_module("isolated_core_celery_startup_refresh")
|
|
|
|
fake_tasks_module = ModuleType("apps.parsers.tasks")
|
|
apply_async_mock = MagicMock(return_value=SimpleNamespace(id="task-123"))
|
|
fake_tasks_module.parse_all_sources = SimpleNamespace(
|
|
apply_async=apply_async_mock
|
|
)
|
|
|
|
with patch.object(
|
|
module,
|
|
"settings",
|
|
SimpleNamespace(
|
|
CELERY_STARTUP_REFRESH_ENABLED=True,
|
|
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
|
|
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
|
|
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
|
|
),
|
|
), patch.object(module.cache, "add", return_value=True) as add_mock, patch.dict(
|
|
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
|
|
):
|
|
module._queue_startup_sources_refresh()
|
|
|
|
add_mock.assert_called_once_with("startup-lock", "1", timeout=120)
|
|
apply_async_mock.assert_called_once_with(countdown=45)
|
|
|
|
def test_startup_refresh_skips_when_lock_exists(self):
|
|
with patch.dict(
|
|
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
|
|
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
|
|
module = _load_module("isolated_core_celery_startup_skip")
|
|
|
|
fake_tasks_module = ModuleType("apps.parsers.tasks")
|
|
apply_async_mock = MagicMock()
|
|
fake_tasks_module.parse_all_sources = SimpleNamespace(
|
|
apply_async=apply_async_mock
|
|
)
|
|
|
|
with patch.object(
|
|
module,
|
|
"settings",
|
|
SimpleNamespace(
|
|
CELERY_STARTUP_REFRESH_ENABLED=True,
|
|
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
|
|
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
|
|
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
|
|
),
|
|
), patch.object(module.cache, "add", return_value=False), patch.dict(
|
|
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
|
|
):
|
|
module._queue_startup_sources_refresh()
|
|
|
|
apply_async_mock.assert_not_called()
|
|
|
|
def test_debug_task_prints_request(self):
|
|
with patch.dict(
|
|
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
|
|
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
|
|
module = _load_module("isolated_core_celery_debug")
|
|
|
|
with patch("builtins.print") as print_mock:
|
|
module.debug_task.run()
|
|
|
|
print_mock.assert_called_once()
|