From fecfbc5f83244f7be0ead3c0c906c4c0c363ed89 Mon Sep 17 00:00:00 2001 From: Aleksandr Meshchriakov Date: Fri, 20 Mar 2026 11:04:39 +0100 Subject: [PATCH 1/3] chore(celery): enable broker retry on startup --- src/settings/base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/settings/base.py b/src/settings/base.py index 48d4833..087fa03 100644 --- a/src/settings/base.py +++ b/src/settings/base.py @@ -205,6 +205,10 @@ BACKUP_EXPORT_DIRECTORY = os.getenv( str(PROJECT_ROOT / "media" / "backups"), ) +# Celery: сохраняем ретраи подключения на старте и для 6.x совместимости. +CELERY_BROKER_CONNECTION_RETRY = True +CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True + # Password validation AUTH_PASSWORD_VALIDATORS = [ -- 2.39.5 From c8bdc282deeb635c3816c3828a6d40fd0ae4d6a3 Mon Sep 17 00:00:00 2001 From: Aleksandr Meshchriakov Date: Fri, 20 Mar 2026 11:11:21 +0100 Subject: [PATCH 2/3] fix pre-commit --- .../0002_alter_exchangeconnection_password.py | 18 ++++++++++++ .../migrations/0013_auto_20260320_1010.py | 29 +++++++++++++++++++ .../user/migrations/0008_alter_user_groups.py | 19 ++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 src/apps/exchange/migrations/0002_alter_exchangeconnection_password.py create mode 100644 src/apps/parsers/migrations/0013_auto_20260320_1010.py create mode 100644 src/apps/user/migrations/0008_alter_user_groups.py diff --git a/src/apps/exchange/migrations/0002_alter_exchangeconnection_password.py b/src/apps/exchange/migrations/0002_alter_exchangeconnection_password.py new file mode 100644 index 0000000..b45311d --- /dev/null +++ b/src/apps/exchange/migrations/0002_alter_exchangeconnection_password.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.25 on 2026-03-20 10:10 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('exchange', '0001_initial'), + ] + + operations = [ + migrations.AlterField( + model_name='exchangeconnection', + name='password', + field=models.TextField(help_text='Хранится в зашифрованном виде', verbose_name='пароль'), + ), + ] diff --git a/src/apps/parsers/migrations/0013_auto_20260320_1010.py b/src/apps/parsers/migrations/0013_auto_20260320_1010.py new file mode 100644 index 0000000..34eac2d --- /dev/null +++ b/src/apps/parsers/migrations/0013_auto_20260320_1010.py @@ -0,0 +1,29 @@ +# Generated by Django 3.2.25 on 2026-03-20 10:10 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('parsers', '0012_add_industrial_product_record'), + ] + + operations = [ + migrations.RemoveIndex( + model_name='industrialproductrecord', + name='parsers_indprod_load_ba_98b870_idx', + ), + migrations.RemoveIndex( + model_name='industrialproductrecord', + name='parsers_indprod_inn_eb2ad3_idx', + ), + migrations.AddIndex( + model_name='industrialproductrecord', + index=models.Index(fields=['load_batch', 'inn'], name='parsers_ind_load_ba_680f5a_idx'), + ), + migrations.AddIndex( + model_name='industrialproductrecord', + index=models.Index(fields=['inn', 'registry_number'], name='parsers_ind_inn_05320c_idx'), + ), + ] diff --git a/src/apps/user/migrations/0008_alter_user_groups.py b/src/apps/user/migrations/0008_alter_user_groups.py new file mode 100644 index 0000000..a214046 --- /dev/null +++ b/src/apps/user/migrations/0008_alter_user_groups.py @@ -0,0 +1,19 @@ +# Generated by Django 3.2.25 on 2026-03-20 10:10 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('auth', '0012_alter_user_first_name_max_length'), + ('user', '0007_profile_middle_name'), + ] + + operations = [ + migrations.AlterField( + model_name='user', + name='groups', + field=models.ManyToManyField(blank=True, help_text='', related_name='custom_user_set', related_query_name='custom_user', to='auth.Group', verbose_name='groups'), + ), + ] -- 2.39.5 From b8e3f0a5d3eca4d5d0abdffe4631efe59652331c Mon Sep 17 00:00:00 2001 From: Aleksandr Meshchriakov Date: Fri, 20 Mar 2026 11:14:03 +0100 Subject: [PATCH 3/3] feat(celery): schedule weekly updates and run startup refresh --- README.md | 5 +- src/core/celery.py | 76 ++++++++++++++++++++++++--- src/settings/base.py | 6 +++ tests/apps/core/test_celery_module.py | 55 ++++++++++++++++++- 4 files changed, 134 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index d35785a..5c649b6 100644 --- a/README.md +++ b/README.md @@ -218,9 +218,12 @@ Celery-задачи (основные): Планировщик (`celery beat`) по умолчанию: -- ежедневный парсинг Минпромторга (сертификаты/производители) +- weekly-обновление источников Минпромторга и проверок - сканирование директории ФНС каждые 5 минут +При старте `celery worker` также выполняется одноразовый автозапуск +`apps.parsers.tasks.parse_all_sources` (с lock через cache, чтобы избежать дублей). + Директории ФНС: - входящие: `input/fns/` diff --git a/src/core/celery.py b/src/core/celery.py index 0faf39c..1cc45eb 100644 --- a/src/core/celery.py +++ b/src/core/celery.py @@ -4,11 +4,17 @@ Celery configuration for the project. This module contains Celery configuration and task registration. """ +import logging import os import sys from apps.core.startup_checks import run_startup_checks from celery import Celery +from celery.signals import worker_ready +from django.conf import settings +from django.core.cache import cache + +logger = logging.getLogger(__name__) # Set the Django settings module for the 'celery' program. if "DJANGO_SETTINGS_MODULE" not in os.environ: @@ -27,6 +33,12 @@ def _is_celery_runtime() -> bool: ) +def _is_worker_runtime() -> bool: + """True when current process is a Celery worker command.""" + argv = " ".join(sys.argv).lower() + return "celery" in argv and " worker" in argv + + if _is_celery_runtime(): run_startup_checks(component="celery") @@ -43,20 +55,24 @@ app.autodiscover_tasks() # Configure Celery Beat schedule app.conf.beat_schedule = { - # Парсинг сертификатов промышленного производства - каждый день в 3:00 + # Обновления источников по умолчанию — раз в неделю. + # Сохраняем legacy-названия ключей, чтобы корректно обновлять существующие + # PeriodicTask в django-celery-beat через update_or_create по name. "parse-industrial-production-daily": { "task": "apps.parsers.tasks.parse_industrial_production", - "schedule": 86400.0, # Every 24 hours + "schedule": 7 * 24 * 60 * 60, # Every 7 days }, - # Парсинг реестра производителей - каждый день в 4:00 "parse-manufactures-daily": { "task": "apps.parsers.tasks.parse_manufactures", - "schedule": 86400.0, # Every 24 hours + "schedule": 7 * 24 * 60 * 60, # Every 7 days }, - # Парсинг реестра промышленной продукции - каждый день в 5:00 "parse-industrial-products-daily": { "task": "apps.parsers.tasks.parse_industrial_products", - "schedule": 86400.0, # Every 24 hours + "schedule": 7 * 24 * 60 * 60, # Every 7 days + }, + "parse-inspections-weekly": { + "task": "apps.parsers.tasks.parse_inspections", + "schedule": 7 * 24 * 60 * 60, # Every 7 days }, # Сканирование папки FNS - каждые 5 минут "scan-fns-directory": { @@ -68,6 +84,54 @@ app.conf.beat_schedule = { app.conf.timezone = "Europe/Moscow" +def _queue_startup_sources_refresh() -> None: + """ + Запустить parse_all_sources один раз при старте worker. + + Используется distributed lock через cache, чтобы несколько worker/pod + не запускали автообновление одновременно. + """ + if not getattr(settings, "CELERY_STARTUP_REFRESH_ENABLED", True): + logger.info("Startup refresh is disabled by settings") + return + + lock_key = getattr( + settings, + "CELERY_STARTUP_REFRESH_LOCK_KEY", + "celery:startup:parse_all_sources:lock", + ) + lock_ttl = int(getattr(settings, "CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS", 3600)) + acquired = cache.add(lock_key, "1", timeout=lock_ttl) + if not acquired: + logger.info( + "Startup refresh skipped: lock already acquired (%s)", + lock_key, + ) + return + + countdown = int(getattr(settings, "CELERY_STARTUP_REFRESH_DELAY_SECONDS", 30)) + from apps.parsers.tasks import parse_all_sources + + async_result = parse_all_sources.apply_async(countdown=max(countdown, 0)) + logger.info( + "Startup refresh queued: task=%s task_id=%s countdown=%ss", + "apps.parsers.tasks.parse_all_sources", + async_result.id, + countdown, + ) + + +@worker_ready.connect(weak=False) +def _on_worker_ready(sender=None, **kwargs) -> None: # noqa: ARG001 + """Queue source refresh when worker starts.""" + if not _is_worker_runtime(): + return + try: + _queue_startup_sources_refresh() + except Exception: + logger.exception("Failed to queue startup source refresh") + + @app.task(bind=True) def debug_task(self): print(f"Request: {self.request!r}") diff --git a/src/settings/base.py b/src/settings/base.py index 087fa03..7da16a3 100644 --- a/src/settings/base.py +++ b/src/settings/base.py @@ -209,6 +209,12 @@ BACKUP_EXPORT_DIRECTORY = os.getenv( CELERY_BROKER_CONNECTION_RETRY = True CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True +# Celery: автообновление источников при старте worker. +CELERY_STARTUP_REFRESH_ENABLED = True +CELERY_STARTUP_REFRESH_DELAY_SECONDS = 30 +CELERY_STARTUP_REFRESH_LOCK_KEY = "celery:startup:parse_all_sources:lock" +CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS = 3600 + # Password validation AUTH_PASSWORD_VALIDATORS = [ diff --git a/tests/apps/core/test_celery_module.py b/tests/apps/core/test_celery_module.py index c44d9cd..cae69e0 100644 --- a/tests/apps/core/test_celery_module.py +++ b/tests/apps/core/test_celery_module.py @@ -4,7 +4,7 @@ import importlib.util import os import sys from pathlib import Path -from types import SimpleNamespace +from types import ModuleType, SimpleNamespace from unittest.mock import MagicMock, patch from django.test import SimpleTestCase @@ -46,6 +46,59 @@ class CeleryModuleTest(SimpleTestCase): ) app_mock.autodiscover_tasks.assert_called_once_with() self.assertEqual(module.app, app_mock) + self.assertIn("parse-industrial-production-daily", module.app.conf.beat_schedule) + self.assertIn("parse-manufactures-daily", module.app.conf.beat_schedule) + self.assertIn("parse-industrial-products-daily", module.app.conf.beat_schedule) + self.assertIn("parse-inspections-weekly", module.app.conf.beat_schedule) + + def test_startup_refresh_queues_when_lock_acquired(self): + with patch.dict( + os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True + ), patch.object(sys, "argv", ["python", "manage.py", "shell"]): + module = _load_module("isolated_core_celery_startup_refresh") + + fake_tasks_module = ModuleType("apps.parsers.tasks") + apply_async_mock = MagicMock(return_value=SimpleNamespace(id="task-123")) + fake_tasks_module.parse_all_sources = SimpleNamespace( + apply_async=apply_async_mock + ) + + with patch.object(module, "settings", SimpleNamespace( + CELERY_STARTUP_REFRESH_ENABLED=True, + CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock", + CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120, + CELERY_STARTUP_REFRESH_DELAY_SECONDS=45, + )), patch.object(module.cache, "add", return_value=True) as add_mock, patch.dict( + sys.modules, {"apps.parsers.tasks": fake_tasks_module} + ): + module._queue_startup_sources_refresh() + + add_mock.assert_called_once_with("startup-lock", "1", timeout=120) + apply_async_mock.assert_called_once_with(countdown=45) + + def test_startup_refresh_skips_when_lock_exists(self): + with patch.dict( + os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True + ), patch.object(sys, "argv", ["python", "manage.py", "shell"]): + module = _load_module("isolated_core_celery_startup_skip") + + fake_tasks_module = ModuleType("apps.parsers.tasks") + apply_async_mock = MagicMock() + fake_tasks_module.parse_all_sources = SimpleNamespace( + apply_async=apply_async_mock + ) + + with patch.object(module, "settings", SimpleNamespace( + CELERY_STARTUP_REFRESH_ENABLED=True, + CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock", + CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120, + CELERY_STARTUP_REFRESH_DELAY_SECONDS=45, + )), patch.object(module.cache, "add", return_value=False), patch.dict( + sys.modules, {"apps.parsers.tasks": fake_tasks_module} + ): + module._queue_startup_sources_refresh() + + apply_async_mock.assert_not_called() def test_debug_task_prints_request(self): with patch.dict( -- 2.39.5