Files
mostovik-backend/tests/apps/core/test_celery_module.py
Aleksandr Meshchriakov 7d4c54636b
Some checks failed
CI/CD Pipeline / Telegram Notify Success (push) Has been cancelled
CI/CD Pipeline / Run Tests (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (push) Has been cancelled
feat(parsers): sync RU proxies from proxy-tools
2026-03-23 10:47:34 +01:00

124 lines
4.9 KiB
Python

from __future__ import annotations
import importlib.util
import os
import sys
from pathlib import Path
from types import ModuleType, SimpleNamespace
from unittest.mock import MagicMock, patch
from django.test import SimpleTestCase
CELERY_MODULE_PATH = Path(__file__).resolve().parents[3] / "src" / "core" / "celery.py"
def _load_module(module_name: str):
spec = importlib.util.spec_from_file_location(module_name, CELERY_MODULE_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
class CeleryModuleTest(SimpleTestCase):
def test_import_requires_django_settings_module(self):
with patch.dict(os.environ, {}, clear=True), self.assertRaisesMessage(
RuntimeError,
"DJANGO_SETTINGS_MODULE is not set.",
):
_load_module("isolated_core_celery_missing")
def test_import_runs_startup_checks_for_worker_runtime(self):
app_mock = MagicMock()
app_mock.conf = SimpleNamespace()
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["celery", "-A", "project", "worker"]), patch(
"apps.core.startup_checks.run_startup_checks"
) as checks_mock, patch("celery.Celery", return_value=app_mock):
module = _load_module("isolated_core_celery_worker")
checks_mock.assert_called_once_with(component="celery")
app_mock.config_from_object.assert_called_once_with(
"django.conf:settings",
namespace="CELERY",
)
app_mock.autodiscover_tasks.assert_called_once_with()
self.assertEqual(module.app, app_mock)
self.assertIn(
"parse-industrial-production-daily", module.app.conf.beat_schedule
)
self.assertIn("parse-manufactures-daily", module.app.conf.beat_schedule)
self.assertIn("parse-industrial-products-daily", module.app.conf.beat_schedule)
self.assertIn("parse-inspections-weekly", module.app.conf.beat_schedule)
self.assertIn("sync-ru-proxies-hourly", module.app.conf.beat_schedule)
def test_startup_refresh_queues_when_lock_acquired(self):
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
module = _load_module("isolated_core_celery_startup_refresh")
fake_tasks_module = ModuleType("apps.parsers.tasks")
apply_async_mock = MagicMock(return_value=SimpleNamespace(id="task-123"))
fake_tasks_module.parse_all_sources = SimpleNamespace(
apply_async=apply_async_mock
)
with patch.object(
module,
"settings",
SimpleNamespace(
CELERY_STARTUP_REFRESH_ENABLED=True,
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
),
), patch.object(module.cache, "add", return_value=True) as add_mock, patch.dict(
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
):
module._queue_startup_sources_refresh()
add_mock.assert_called_once_with("startup-lock", "1", timeout=120)
apply_async_mock.assert_called_once_with(countdown=45)
def test_startup_refresh_skips_when_lock_exists(self):
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
module = _load_module("isolated_core_celery_startup_skip")
fake_tasks_module = ModuleType("apps.parsers.tasks")
apply_async_mock = MagicMock()
fake_tasks_module.parse_all_sources = SimpleNamespace(
apply_async=apply_async_mock
)
with patch.object(
module,
"settings",
SimpleNamespace(
CELERY_STARTUP_REFRESH_ENABLED=True,
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
),
), patch.object(module.cache, "add", return_value=False), patch.dict(
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
):
module._queue_startup_sources_refresh()
apply_async_mock.assert_not_called()
def test_debug_task_prints_request(self):
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
module = _load_module("isolated_core_celery_debug")
with patch("builtins.print") as print_mock:
module.debug_task.run()
print_mock.assert_called_once()