diff --git a/README.md b/README.md index d35785a..5c649b6 100644 --- a/README.md +++ b/README.md @@ -218,9 +218,12 @@ Celery-задачи (основные): Планировщик (`celery beat`) по умолчанию: -- ежедневный парсинг Минпромторга (сертификаты/производители) +- weekly-обновление источников Минпромторга и проверок - сканирование директории ФНС каждые 5 минут +При старте `celery worker` также выполняется одноразовый автозапуск +`apps.parsers.tasks.parse_all_sources` (с lock через cache, чтобы избежать дублей). + Директории ФНС: - входящие: `input/fns/` diff --git a/src/core/celery.py b/src/core/celery.py index 0faf39c..1cc45eb 100644 --- a/src/core/celery.py +++ b/src/core/celery.py @@ -4,11 +4,17 @@ Celery configuration for the project. This module contains Celery configuration and task registration. """ +import logging import os import sys from apps.core.startup_checks import run_startup_checks from celery import Celery +from celery.signals import worker_ready +from django.conf import settings +from django.core.cache import cache + +logger = logging.getLogger(__name__) # Set the Django settings module for the 'celery' program. if "DJANGO_SETTINGS_MODULE" not in os.environ: @@ -27,6 +33,12 @@ def _is_celery_runtime() -> bool: ) +def _is_worker_runtime() -> bool: + """True when current process is a Celery worker command.""" + argv = " ".join(sys.argv).lower() + return "celery" in argv and " worker" in argv + + if _is_celery_runtime(): run_startup_checks(component="celery") @@ -43,20 +55,24 @@ app.autodiscover_tasks() # Configure Celery Beat schedule app.conf.beat_schedule = { - # Парсинг сертификатов промышленного производства - каждый день в 3:00 + # Обновления источников по умолчанию — раз в неделю. + # Сохраняем legacy-названия ключей, чтобы корректно обновлять существующие + # PeriodicTask в django-celery-beat через update_or_create по name. "parse-industrial-production-daily": { "task": "apps.parsers.tasks.parse_industrial_production", - "schedule": 86400.0, # Every 24 hours + "schedule": 7 * 24 * 60 * 60, # Every 7 days }, - # Парсинг реестра производителей - каждый день в 4:00 "parse-manufactures-daily": { "task": "apps.parsers.tasks.parse_manufactures", - "schedule": 86400.0, # Every 24 hours + "schedule": 7 * 24 * 60 * 60, # Every 7 days }, - # Парсинг реестра промышленной продукции - каждый день в 5:00 "parse-industrial-products-daily": { "task": "apps.parsers.tasks.parse_industrial_products", - "schedule": 86400.0, # Every 24 hours + "schedule": 7 * 24 * 60 * 60, # Every 7 days + }, + "parse-inspections-weekly": { + "task": "apps.parsers.tasks.parse_inspections", + "schedule": 7 * 24 * 60 * 60, # Every 7 days }, # Сканирование папки FNS - каждые 5 минут "scan-fns-directory": { @@ -68,6 +84,54 @@ app.conf.beat_schedule = { app.conf.timezone = "Europe/Moscow" +def _queue_startup_sources_refresh() -> None: + """ + Запустить parse_all_sources один раз при старте worker. + + Используется distributed lock через cache, чтобы несколько worker/pod + не запускали автообновление одновременно. + """ + if not getattr(settings, "CELERY_STARTUP_REFRESH_ENABLED", True): + logger.info("Startup refresh is disabled by settings") + return + + lock_key = getattr( + settings, + "CELERY_STARTUP_REFRESH_LOCK_KEY", + "celery:startup:parse_all_sources:lock", + ) + lock_ttl = int(getattr(settings, "CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS", 3600)) + acquired = cache.add(lock_key, "1", timeout=lock_ttl) + if not acquired: + logger.info( + "Startup refresh skipped: lock already acquired (%s)", + lock_key, + ) + return + + countdown = int(getattr(settings, "CELERY_STARTUP_REFRESH_DELAY_SECONDS", 30)) + from apps.parsers.tasks import parse_all_sources + + async_result = parse_all_sources.apply_async(countdown=max(countdown, 0)) + logger.info( + "Startup refresh queued: task=%s task_id=%s countdown=%ss", + "apps.parsers.tasks.parse_all_sources", + async_result.id, + countdown, + ) + + +@worker_ready.connect(weak=False) +def _on_worker_ready(sender=None, **kwargs) -> None: # noqa: ARG001 + """Queue source refresh when worker starts.""" + if not _is_worker_runtime(): + return + try: + _queue_startup_sources_refresh() + except Exception: + logger.exception("Failed to queue startup source refresh") + + @app.task(bind=True) def debug_task(self): print(f"Request: {self.request!r}") diff --git a/src/settings/base.py b/src/settings/base.py index 087fa03..7da16a3 100644 --- a/src/settings/base.py +++ b/src/settings/base.py @@ -209,6 +209,12 @@ BACKUP_EXPORT_DIRECTORY = os.getenv( CELERY_BROKER_CONNECTION_RETRY = True CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True +# Celery: автообновление источников при старте worker. +CELERY_STARTUP_REFRESH_ENABLED = True +CELERY_STARTUP_REFRESH_DELAY_SECONDS = 30 +CELERY_STARTUP_REFRESH_LOCK_KEY = "celery:startup:parse_all_sources:lock" +CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS = 3600 + # Password validation AUTH_PASSWORD_VALIDATORS = [ diff --git a/tests/apps/core/test_celery_module.py b/tests/apps/core/test_celery_module.py index c44d9cd..cae69e0 100644 --- a/tests/apps/core/test_celery_module.py +++ b/tests/apps/core/test_celery_module.py @@ -4,7 +4,7 @@ import importlib.util import os import sys from pathlib import Path -from types import SimpleNamespace +from types import ModuleType, SimpleNamespace from unittest.mock import MagicMock, patch from django.test import SimpleTestCase @@ -46,6 +46,59 @@ class CeleryModuleTest(SimpleTestCase): ) app_mock.autodiscover_tasks.assert_called_once_with() self.assertEqual(module.app, app_mock) + self.assertIn("parse-industrial-production-daily", module.app.conf.beat_schedule) + self.assertIn("parse-manufactures-daily", module.app.conf.beat_schedule) + self.assertIn("parse-industrial-products-daily", module.app.conf.beat_schedule) + self.assertIn("parse-inspections-weekly", module.app.conf.beat_schedule) + + def test_startup_refresh_queues_when_lock_acquired(self): + with patch.dict( + os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True + ), patch.object(sys, "argv", ["python", "manage.py", "shell"]): + module = _load_module("isolated_core_celery_startup_refresh") + + fake_tasks_module = ModuleType("apps.parsers.tasks") + apply_async_mock = MagicMock(return_value=SimpleNamespace(id="task-123")) + fake_tasks_module.parse_all_sources = SimpleNamespace( + apply_async=apply_async_mock + ) + + with patch.object(module, "settings", SimpleNamespace( + CELERY_STARTUP_REFRESH_ENABLED=True, + CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock", + CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120, + CELERY_STARTUP_REFRESH_DELAY_SECONDS=45, + )), patch.object(module.cache, "add", return_value=True) as add_mock, patch.dict( + sys.modules, {"apps.parsers.tasks": fake_tasks_module} + ): + module._queue_startup_sources_refresh() + + add_mock.assert_called_once_with("startup-lock", "1", timeout=120) + apply_async_mock.assert_called_once_with(countdown=45) + + def test_startup_refresh_skips_when_lock_exists(self): + with patch.dict( + os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True + ), patch.object(sys, "argv", ["python", "manage.py", "shell"]): + module = _load_module("isolated_core_celery_startup_skip") + + fake_tasks_module = ModuleType("apps.parsers.tasks") + apply_async_mock = MagicMock() + fake_tasks_module.parse_all_sources = SimpleNamespace( + apply_async=apply_async_mock + ) + + with patch.object(module, "settings", SimpleNamespace( + CELERY_STARTUP_REFRESH_ENABLED=True, + CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock", + CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120, + CELERY_STARTUP_REFRESH_DELAY_SECONDS=45, + )), patch.object(module.cache, "add", return_value=False), patch.dict( + sys.modules, {"apps.parsers.tasks": fake_tasks_module} + ): + module._queue_startup_sources_refresh() + + apply_async_mock.assert_not_called() def test_debug_task_prints_request(self): with patch.dict(