feat(celery): schedule weekly updates and run startup refresh
Some checks failed
CI/CD Pipeline / Run Tests (push) Successful in 1m41s
CI/CD Pipeline / Telegram Notify Success (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (pull_request) Successful in 1m31s
CI/CD Pipeline / Telegram Notify Success (pull_request) Has been cancelled
CI/CD Pipeline / Run Tests (pull_request) Has been cancelled

This commit is contained in:
2026-03-20 11:14:03 +01:00
parent c8bdc282de
commit b8e3f0a5d3
4 changed files with 134 additions and 8 deletions

View File

@@ -218,9 +218,12 @@ Celery-задачи (основные):
Планировщик (`celery beat`) по умолчанию: Планировщик (`celery beat`) по умолчанию:
- ежедневный парсинг Минпромторга (сертификаты/производители) - weekly-обновление источников Минпромторга и проверок
- сканирование директории ФНС каждые 5 минут - сканирование директории ФНС каждые 5 минут
При старте `celery worker` также выполняется одноразовый автозапуск
`apps.parsers.tasks.parse_all_sources` (с lock через cache, чтобы избежать дублей).
Директории ФНС: Директории ФНС:
- входящие: `input/fns/` - входящие: `input/fns/`

View File

@@ -4,11 +4,17 @@ Celery configuration for the project.
This module contains Celery configuration and task registration. This module contains Celery configuration and task registration.
""" """
import logging
import os import os
import sys import sys
from apps.core.startup_checks import run_startup_checks from apps.core.startup_checks import run_startup_checks
from celery import Celery from celery import Celery
from celery.signals import worker_ready
from django.conf import settings
from django.core.cache import cache
logger = logging.getLogger(__name__)
# Set the Django settings module for the 'celery' program. # Set the Django settings module for the 'celery' program.
if "DJANGO_SETTINGS_MODULE" not in os.environ: if "DJANGO_SETTINGS_MODULE" not in os.environ:
@@ -27,6 +33,12 @@ def _is_celery_runtime() -> bool:
) )
def _is_worker_runtime() -> bool:
"""True when current process is a Celery worker command."""
argv = " ".join(sys.argv).lower()
return "celery" in argv and " worker" in argv
if _is_celery_runtime(): if _is_celery_runtime():
run_startup_checks(component="celery") run_startup_checks(component="celery")
@@ -43,20 +55,24 @@ app.autodiscover_tasks()
# Configure Celery Beat schedule # Configure Celery Beat schedule
app.conf.beat_schedule = { app.conf.beat_schedule = {
# Парсинг сертификатов промышленного производства - каждый день в 3:00 # Обновления источников по умолчанию — раз в неделю.
# Сохраняем legacy-названия ключей, чтобы корректно обновлять существующие
# PeriodicTask в django-celery-beat через update_or_create по name.
"parse-industrial-production-daily": { "parse-industrial-production-daily": {
"task": "apps.parsers.tasks.parse_industrial_production", "task": "apps.parsers.tasks.parse_industrial_production",
"schedule": 86400.0, # Every 24 hours "schedule": 7 * 24 * 60 * 60, # Every 7 days
}, },
# Парсинг реестра производителей - каждый день в 4:00
"parse-manufactures-daily": { "parse-manufactures-daily": {
"task": "apps.parsers.tasks.parse_manufactures", "task": "apps.parsers.tasks.parse_manufactures",
"schedule": 86400.0, # Every 24 hours "schedule": 7 * 24 * 60 * 60, # Every 7 days
}, },
# Парсинг реестра промышленной продукции - каждый день в 5:00
"parse-industrial-products-daily": { "parse-industrial-products-daily": {
"task": "apps.parsers.tasks.parse_industrial_products", "task": "apps.parsers.tasks.parse_industrial_products",
"schedule": 86400.0, # Every 24 hours "schedule": 7 * 24 * 60 * 60, # Every 7 days
},
"parse-inspections-weekly": {
"task": "apps.parsers.tasks.parse_inspections",
"schedule": 7 * 24 * 60 * 60, # Every 7 days
}, },
# Сканирование папки FNS - каждые 5 минут # Сканирование папки FNS - каждые 5 минут
"scan-fns-directory": { "scan-fns-directory": {
@@ -68,6 +84,54 @@ app.conf.beat_schedule = {
app.conf.timezone = "Europe/Moscow" app.conf.timezone = "Europe/Moscow"
def _queue_startup_sources_refresh() -> None:
"""
Запустить parse_all_sources один раз при старте worker.
Используется distributed lock через cache, чтобы несколько worker/pod
не запускали автообновление одновременно.
"""
if not getattr(settings, "CELERY_STARTUP_REFRESH_ENABLED", True):
logger.info("Startup refresh is disabled by settings")
return
lock_key = getattr(
settings,
"CELERY_STARTUP_REFRESH_LOCK_KEY",
"celery:startup:parse_all_sources:lock",
)
lock_ttl = int(getattr(settings, "CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS", 3600))
acquired = cache.add(lock_key, "1", timeout=lock_ttl)
if not acquired:
logger.info(
"Startup refresh skipped: lock already acquired (%s)",
lock_key,
)
return
countdown = int(getattr(settings, "CELERY_STARTUP_REFRESH_DELAY_SECONDS", 30))
from apps.parsers.tasks import parse_all_sources
async_result = parse_all_sources.apply_async(countdown=max(countdown, 0))
logger.info(
"Startup refresh queued: task=%s task_id=%s countdown=%ss",
"apps.parsers.tasks.parse_all_sources",
async_result.id,
countdown,
)
@worker_ready.connect(weak=False)
def _on_worker_ready(sender=None, **kwargs) -> None: # noqa: ARG001
"""Queue source refresh when worker starts."""
if not _is_worker_runtime():
return
try:
_queue_startup_sources_refresh()
except Exception:
logger.exception("Failed to queue startup source refresh")
@app.task(bind=True) @app.task(bind=True)
def debug_task(self): def debug_task(self):
print(f"Request: {self.request!r}") print(f"Request: {self.request!r}")

View File

@@ -209,6 +209,12 @@ BACKUP_EXPORT_DIRECTORY = os.getenv(
CELERY_BROKER_CONNECTION_RETRY = True CELERY_BROKER_CONNECTION_RETRY = True
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
# Celery: автообновление источников при старте worker.
CELERY_STARTUP_REFRESH_ENABLED = True
CELERY_STARTUP_REFRESH_DELAY_SECONDS = 30
CELERY_STARTUP_REFRESH_LOCK_KEY = "celery:startup:parse_all_sources:lock"
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS = 3600
# Password validation # Password validation
AUTH_PASSWORD_VALIDATORS = [ AUTH_PASSWORD_VALIDATORS = [

View File

@@ -4,7 +4,7 @@ import importlib.util
import os import os
import sys import sys
from pathlib import Path from pathlib import Path
from types import SimpleNamespace from types import ModuleType, SimpleNamespace
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from django.test import SimpleTestCase from django.test import SimpleTestCase
@@ -46,6 +46,59 @@ class CeleryModuleTest(SimpleTestCase):
) )
app_mock.autodiscover_tasks.assert_called_once_with() app_mock.autodiscover_tasks.assert_called_once_with()
self.assertEqual(module.app, app_mock) self.assertEqual(module.app, app_mock)
self.assertIn("parse-industrial-production-daily", module.app.conf.beat_schedule)
self.assertIn("parse-manufactures-daily", module.app.conf.beat_schedule)
self.assertIn("parse-industrial-products-daily", module.app.conf.beat_schedule)
self.assertIn("parse-inspections-weekly", module.app.conf.beat_schedule)
def test_startup_refresh_queues_when_lock_acquired(self):
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
module = _load_module("isolated_core_celery_startup_refresh")
fake_tasks_module = ModuleType("apps.parsers.tasks")
apply_async_mock = MagicMock(return_value=SimpleNamespace(id="task-123"))
fake_tasks_module.parse_all_sources = SimpleNamespace(
apply_async=apply_async_mock
)
with patch.object(module, "settings", SimpleNamespace(
CELERY_STARTUP_REFRESH_ENABLED=True,
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
)), patch.object(module.cache, "add", return_value=True) as add_mock, patch.dict(
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
):
module._queue_startup_sources_refresh()
add_mock.assert_called_once_with("startup-lock", "1", timeout=120)
apply_async_mock.assert_called_once_with(countdown=45)
def test_startup_refresh_skips_when_lock_exists(self):
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
module = _load_module("isolated_core_celery_startup_skip")
fake_tasks_module = ModuleType("apps.parsers.tasks")
apply_async_mock = MagicMock()
fake_tasks_module.parse_all_sources = SimpleNamespace(
apply_async=apply_async_mock
)
with patch.object(module, "settings", SimpleNamespace(
CELERY_STARTUP_REFRESH_ENABLED=True,
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
)), patch.object(module.cache, "add", return_value=False), patch.dict(
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
):
module._queue_startup_sources_refresh()
apply_async_mock.assert_not_called()
def test_debug_task_prints_request(self): def test_debug_task_prints_request(self):
with patch.dict( with patch.dict(