feature/registers-generate-test-data-command #12
@@ -218,9 +218,12 @@ Celery-задачи (основные):
|
||||
|
||||
Планировщик (`celery beat`) по умолчанию:
|
||||
|
||||
- ежедневный парсинг Минпромторга (сертификаты/производители)
|
||||
- weekly-обновление источников Минпромторга и проверок
|
||||
- сканирование директории ФНС каждые 5 минут
|
||||
|
||||
При старте `celery worker` также выполняется одноразовый автозапуск
|
||||
`apps.parsers.tasks.parse_all_sources` (с lock через cache, чтобы избежать дублей).
|
||||
|
||||
Директории ФНС:
|
||||
|
||||
- входящие: `input/fns/`
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
# Generated by Django 3.2.25 on 2026-03-20 10:10
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('exchange', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='exchangeconnection',
|
||||
name='password',
|
||||
field=models.TextField(help_text='Хранится в зашифрованном виде', verbose_name='пароль'),
|
||||
),
|
||||
]
|
||||
29
src/apps/parsers/migrations/0013_auto_20260320_1010.py
Normal file
29
src/apps/parsers/migrations/0013_auto_20260320_1010.py
Normal file
@@ -0,0 +1,29 @@
|
||||
# Generated by Django 3.2.25 on 2026-03-20 10:10
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('parsers', '0012_add_industrial_product_record'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RemoveIndex(
|
||||
model_name='industrialproductrecord',
|
||||
name='parsers_indprod_load_ba_98b870_idx',
|
||||
),
|
||||
migrations.RemoveIndex(
|
||||
model_name='industrialproductrecord',
|
||||
name='parsers_indprod_inn_eb2ad3_idx',
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='industrialproductrecord',
|
||||
index=models.Index(fields=['load_batch', 'inn'], name='parsers_ind_load_ba_680f5a_idx'),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='industrialproductrecord',
|
||||
index=models.Index(fields=['inn', 'registry_number'], name='parsers_ind_inn_05320c_idx'),
|
||||
),
|
||||
]
|
||||
19
src/apps/user/migrations/0008_alter_user_groups.py
Normal file
19
src/apps/user/migrations/0008_alter_user_groups.py
Normal file
@@ -0,0 +1,19 @@
|
||||
# Generated by Django 3.2.25 on 2026-03-20 10:10
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('auth', '0012_alter_user_first_name_max_length'),
|
||||
('user', '0007_profile_middle_name'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='user',
|
||||
name='groups',
|
||||
field=models.ManyToManyField(blank=True, help_text='', related_name='custom_user_set', related_query_name='custom_user', to='auth.Group', verbose_name='groups'),
|
||||
),
|
||||
]
|
||||
@@ -4,11 +4,17 @@ Celery configuration for the project.
|
||||
This module contains Celery configuration and task registration.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from apps.core.startup_checks import run_startup_checks
|
||||
from celery import Celery
|
||||
from celery.signals import worker_ready
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Set the Django settings module for the 'celery' program.
|
||||
if "DJANGO_SETTINGS_MODULE" not in os.environ:
|
||||
@@ -27,6 +33,12 @@ def _is_celery_runtime() -> bool:
|
||||
)
|
||||
|
||||
|
||||
def _is_worker_runtime() -> bool:
|
||||
"""True when current process is a Celery worker command."""
|
||||
argv = " ".join(sys.argv).lower()
|
||||
return "celery" in argv and " worker" in argv
|
||||
|
||||
|
||||
if _is_celery_runtime():
|
||||
run_startup_checks(component="celery")
|
||||
|
||||
@@ -43,20 +55,24 @@ app.autodiscover_tasks()
|
||||
|
||||
# Configure Celery Beat schedule
|
||||
app.conf.beat_schedule = {
|
||||
# Парсинг сертификатов промышленного производства - каждый день в 3:00
|
||||
# Обновления источников по умолчанию — раз в неделю.
|
||||
# Сохраняем legacy-названия ключей, чтобы корректно обновлять существующие
|
||||
# PeriodicTask в django-celery-beat через update_or_create по name.
|
||||
"parse-industrial-production-daily": {
|
||||
"task": "apps.parsers.tasks.parse_industrial_production",
|
||||
"schedule": 86400.0, # Every 24 hours
|
||||
"schedule": 7 * 24 * 60 * 60, # Every 7 days
|
||||
},
|
||||
# Парсинг реестра производителей - каждый день в 4:00
|
||||
"parse-manufactures-daily": {
|
||||
"task": "apps.parsers.tasks.parse_manufactures",
|
||||
"schedule": 86400.0, # Every 24 hours
|
||||
"schedule": 7 * 24 * 60 * 60, # Every 7 days
|
||||
},
|
||||
# Парсинг реестра промышленной продукции - каждый день в 5:00
|
||||
"parse-industrial-products-daily": {
|
||||
"task": "apps.parsers.tasks.parse_industrial_products",
|
||||
"schedule": 86400.0, # Every 24 hours
|
||||
"schedule": 7 * 24 * 60 * 60, # Every 7 days
|
||||
},
|
||||
"parse-inspections-weekly": {
|
||||
"task": "apps.parsers.tasks.parse_inspections",
|
||||
"schedule": 7 * 24 * 60 * 60, # Every 7 days
|
||||
},
|
||||
# Сканирование папки FNS - каждые 5 минут
|
||||
"scan-fns-directory": {
|
||||
@@ -68,6 +84,54 @@ app.conf.beat_schedule = {
|
||||
app.conf.timezone = "Europe/Moscow"
|
||||
|
||||
|
||||
def _queue_startup_sources_refresh() -> None:
|
||||
"""
|
||||
Запустить parse_all_sources один раз при старте worker.
|
||||
|
||||
Используется distributed lock через cache, чтобы несколько worker/pod
|
||||
не запускали автообновление одновременно.
|
||||
"""
|
||||
if not getattr(settings, "CELERY_STARTUP_REFRESH_ENABLED", True):
|
||||
logger.info("Startup refresh is disabled by settings")
|
||||
return
|
||||
|
||||
lock_key = getattr(
|
||||
settings,
|
||||
"CELERY_STARTUP_REFRESH_LOCK_KEY",
|
||||
"celery:startup:parse_all_sources:lock",
|
||||
)
|
||||
lock_ttl = int(getattr(settings, "CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS", 3600))
|
||||
acquired = cache.add(lock_key, "1", timeout=lock_ttl)
|
||||
if not acquired:
|
||||
logger.info(
|
||||
"Startup refresh skipped: lock already acquired (%s)",
|
||||
lock_key,
|
||||
)
|
||||
return
|
||||
|
||||
countdown = int(getattr(settings, "CELERY_STARTUP_REFRESH_DELAY_SECONDS", 30))
|
||||
from apps.parsers.tasks import parse_all_sources
|
||||
|
||||
async_result = parse_all_sources.apply_async(countdown=max(countdown, 0))
|
||||
logger.info(
|
||||
"Startup refresh queued: task=%s task_id=%s countdown=%ss",
|
||||
"apps.parsers.tasks.parse_all_sources",
|
||||
async_result.id,
|
||||
countdown,
|
||||
)
|
||||
|
||||
|
||||
@worker_ready.connect(weak=False)
|
||||
def _on_worker_ready(sender=None, **kwargs) -> None: # noqa: ARG001
|
||||
"""Queue source refresh when worker starts."""
|
||||
if not _is_worker_runtime():
|
||||
return
|
||||
try:
|
||||
_queue_startup_sources_refresh()
|
||||
except Exception:
|
||||
logger.exception("Failed to queue startup source refresh")
|
||||
|
||||
|
||||
@app.task(bind=True)
|
||||
def debug_task(self):
|
||||
print(f"Request: {self.request!r}")
|
||||
|
||||
@@ -205,6 +205,16 @@ BACKUP_EXPORT_DIRECTORY = os.getenv(
|
||||
str(PROJECT_ROOT / "media" / "backups"),
|
||||
)
|
||||
|
||||
# Celery: сохраняем ретраи подключения на старте и для 6.x совместимости.
|
||||
CELERY_BROKER_CONNECTION_RETRY = True
|
||||
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
|
||||
|
||||
# Celery: автообновление источников при старте worker.
|
||||
CELERY_STARTUP_REFRESH_ENABLED = True
|
||||
CELERY_STARTUP_REFRESH_DELAY_SECONDS = 30
|
||||
CELERY_STARTUP_REFRESH_LOCK_KEY = "celery:startup:parse_all_sources:lock"
|
||||
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS = 3600
|
||||
|
||||
|
||||
# Password validation
|
||||
AUTH_PASSWORD_VALIDATORS = [
|
||||
|
||||
@@ -4,7 +4,7 @@ import importlib.util
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from types import ModuleType, SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from django.test import SimpleTestCase
|
||||
@@ -46,6 +46,59 @@ class CeleryModuleTest(SimpleTestCase):
|
||||
)
|
||||
app_mock.autodiscover_tasks.assert_called_once_with()
|
||||
self.assertEqual(module.app, app_mock)
|
||||
self.assertIn("parse-industrial-production-daily", module.app.conf.beat_schedule)
|
||||
self.assertIn("parse-manufactures-daily", module.app.conf.beat_schedule)
|
||||
self.assertIn("parse-industrial-products-daily", module.app.conf.beat_schedule)
|
||||
self.assertIn("parse-inspections-weekly", module.app.conf.beat_schedule)
|
||||
|
||||
def test_startup_refresh_queues_when_lock_acquired(self):
|
||||
with patch.dict(
|
||||
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
|
||||
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
|
||||
module = _load_module("isolated_core_celery_startup_refresh")
|
||||
|
||||
fake_tasks_module = ModuleType("apps.parsers.tasks")
|
||||
apply_async_mock = MagicMock(return_value=SimpleNamespace(id="task-123"))
|
||||
fake_tasks_module.parse_all_sources = SimpleNamespace(
|
||||
apply_async=apply_async_mock
|
||||
)
|
||||
|
||||
with patch.object(module, "settings", SimpleNamespace(
|
||||
CELERY_STARTUP_REFRESH_ENABLED=True,
|
||||
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
|
||||
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
|
||||
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
|
||||
)), patch.object(module.cache, "add", return_value=True) as add_mock, patch.dict(
|
||||
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
|
||||
):
|
||||
module._queue_startup_sources_refresh()
|
||||
|
||||
add_mock.assert_called_once_with("startup-lock", "1", timeout=120)
|
||||
apply_async_mock.assert_called_once_with(countdown=45)
|
||||
|
||||
def test_startup_refresh_skips_when_lock_exists(self):
|
||||
with patch.dict(
|
||||
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
|
||||
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
|
||||
module = _load_module("isolated_core_celery_startup_skip")
|
||||
|
||||
fake_tasks_module = ModuleType("apps.parsers.tasks")
|
||||
apply_async_mock = MagicMock()
|
||||
fake_tasks_module.parse_all_sources = SimpleNamespace(
|
||||
apply_async=apply_async_mock
|
||||
)
|
||||
|
||||
with patch.object(module, "settings", SimpleNamespace(
|
||||
CELERY_STARTUP_REFRESH_ENABLED=True,
|
||||
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
|
||||
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
|
||||
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
|
||||
)), patch.object(module.cache, "add", return_value=False), patch.dict(
|
||||
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
|
||||
):
|
||||
module._queue_startup_sources_refresh()
|
||||
|
||||
apply_async_mock.assert_not_called()
|
||||
|
||||
def test_debug_task_prints_request(self):
|
||||
with patch.dict(
|
||||
|
||||
Reference in New Issue
Block a user