feature/registers-generate-test-data-command #12

Merged
avm merged 3 commits from feature/registers-generate-test-data-command into dev 2026-03-20 13:23:21 +03:00
7 changed files with 204 additions and 8 deletions

View File

@@ -218,9 +218,12 @@ Celery-задачи (основные):
Планировщик (`celery beat`) по умолчанию:
- ежедневный парсинг Минпромторга (сертификаты/производители)
- weekly-обновление источников Минпромторга и проверок
- сканирование директории ФНС каждые 5 минут
При старте `celery worker` также выполняется одноразовый автозапуск
`apps.parsers.tasks.parse_all_sources` (с lock через cache, чтобы избежать дублей).
Директории ФНС:
- входящие: `input/fns/`

View File

@@ -0,0 +1,18 @@
# Generated by Django 3.2.25 on 2026-03-20 10:10
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('exchange', '0001_initial'),
]
operations = [
migrations.AlterField(
model_name='exchangeconnection',
name='password',
field=models.TextField(help_text='Хранится в зашифрованном виде', verbose_name='пароль'),
),
]

View File

@@ -0,0 +1,29 @@
# Generated by Django 3.2.25 on 2026-03-20 10:10
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('parsers', '0012_add_industrial_product_record'),
]
operations = [
migrations.RemoveIndex(
model_name='industrialproductrecord',
name='parsers_indprod_load_ba_98b870_idx',
),
migrations.RemoveIndex(
model_name='industrialproductrecord',
name='parsers_indprod_inn_eb2ad3_idx',
),
migrations.AddIndex(
model_name='industrialproductrecord',
index=models.Index(fields=['load_batch', 'inn'], name='parsers_ind_load_ba_680f5a_idx'),
),
migrations.AddIndex(
model_name='industrialproductrecord',
index=models.Index(fields=['inn', 'registry_number'], name='parsers_ind_inn_05320c_idx'),
),
]

View File

@@ -0,0 +1,19 @@
# Generated by Django 3.2.25 on 2026-03-20 10:10
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
('user', '0007_profile_middle_name'),
]
operations = [
migrations.AlterField(
model_name='user',
name='groups',
field=models.ManyToManyField(blank=True, help_text='', related_name='custom_user_set', related_query_name='custom_user', to='auth.Group', verbose_name='groups'),
),
]

View File

@@ -4,11 +4,17 @@ Celery configuration for the project.
This module contains Celery configuration and task registration.
"""
import logging
import os
import sys
from apps.core.startup_checks import run_startup_checks
from celery import Celery
from celery.signals import worker_ready
from django.conf import settings
from django.core.cache import cache
logger = logging.getLogger(__name__)
# Set the Django settings module for the 'celery' program.
if "DJANGO_SETTINGS_MODULE" not in os.environ:
@@ -27,6 +33,12 @@ def _is_celery_runtime() -> bool:
)
def _is_worker_runtime() -> bool:
"""True when current process is a Celery worker command."""
argv = " ".join(sys.argv).lower()
return "celery" in argv and " worker" in argv
if _is_celery_runtime():
run_startup_checks(component="celery")
@@ -43,20 +55,24 @@ app.autodiscover_tasks()
# Configure Celery Beat schedule
app.conf.beat_schedule = {
# Парсинг сертификатов промышленного производства - каждый день в 3:00
# Обновления источников по умолчанию — раз в неделю.
# Сохраняем legacy-названия ключей, чтобы корректно обновлять существующие
# PeriodicTask в django-celery-beat через update_or_create по name.
"parse-industrial-production-daily": {
"task": "apps.parsers.tasks.parse_industrial_production",
"schedule": 86400.0, # Every 24 hours
"schedule": 7 * 24 * 60 * 60, # Every 7 days
},
# Парсинг реестра производителей - каждый день в 4:00
"parse-manufactures-daily": {
"task": "apps.parsers.tasks.parse_manufactures",
"schedule": 86400.0, # Every 24 hours
"schedule": 7 * 24 * 60 * 60, # Every 7 days
},
# Парсинг реестра промышленной продукции - каждый день в 5:00
"parse-industrial-products-daily": {
"task": "apps.parsers.tasks.parse_industrial_products",
"schedule": 86400.0, # Every 24 hours
"schedule": 7 * 24 * 60 * 60, # Every 7 days
},
"parse-inspections-weekly": {
"task": "apps.parsers.tasks.parse_inspections",
"schedule": 7 * 24 * 60 * 60, # Every 7 days
},
# Сканирование папки FNS - каждые 5 минут
"scan-fns-directory": {
@@ -68,6 +84,54 @@ app.conf.beat_schedule = {
app.conf.timezone = "Europe/Moscow"
def _queue_startup_sources_refresh() -> None:
"""
Запустить parse_all_sources один раз при старте worker.
Используется distributed lock через cache, чтобы несколько worker/pod
не запускали автообновление одновременно.
"""
if not getattr(settings, "CELERY_STARTUP_REFRESH_ENABLED", True):
logger.info("Startup refresh is disabled by settings")
return
lock_key = getattr(
settings,
"CELERY_STARTUP_REFRESH_LOCK_KEY",
"celery:startup:parse_all_sources:lock",
)
lock_ttl = int(getattr(settings, "CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS", 3600))
acquired = cache.add(lock_key, "1", timeout=lock_ttl)
if not acquired:
logger.info(
"Startup refresh skipped: lock already acquired (%s)",
lock_key,
)
return
countdown = int(getattr(settings, "CELERY_STARTUP_REFRESH_DELAY_SECONDS", 30))
from apps.parsers.tasks import parse_all_sources
async_result = parse_all_sources.apply_async(countdown=max(countdown, 0))
logger.info(
"Startup refresh queued: task=%s task_id=%s countdown=%ss",
"apps.parsers.tasks.parse_all_sources",
async_result.id,
countdown,
)
@worker_ready.connect(weak=False)
def _on_worker_ready(sender=None, **kwargs) -> None: # noqa: ARG001
"""Queue source refresh when worker starts."""
if not _is_worker_runtime():
return
try:
_queue_startup_sources_refresh()
except Exception:
logger.exception("Failed to queue startup source refresh")
@app.task(bind=True)
def debug_task(self):
print(f"Request: {self.request!r}")

View File

@@ -205,6 +205,16 @@ BACKUP_EXPORT_DIRECTORY = os.getenv(
str(PROJECT_ROOT / "media" / "backups"),
)
# Celery: сохраняем ретраи подключения на старте и для 6.x совместимости.
CELERY_BROKER_CONNECTION_RETRY = True
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
# Celery: автообновление источников при старте worker.
CELERY_STARTUP_REFRESH_ENABLED = True
CELERY_STARTUP_REFRESH_DELAY_SECONDS = 30
CELERY_STARTUP_REFRESH_LOCK_KEY = "celery:startup:parse_all_sources:lock"
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS = 3600
# Password validation
AUTH_PASSWORD_VALIDATORS = [

View File

@@ -4,7 +4,7 @@ import importlib.util
import os
import sys
from pathlib import Path
from types import SimpleNamespace
from types import ModuleType, SimpleNamespace
from unittest.mock import MagicMock, patch
from django.test import SimpleTestCase
@@ -46,6 +46,59 @@ class CeleryModuleTest(SimpleTestCase):
)
app_mock.autodiscover_tasks.assert_called_once_with()
self.assertEqual(module.app, app_mock)
self.assertIn("parse-industrial-production-daily", module.app.conf.beat_schedule)
self.assertIn("parse-manufactures-daily", module.app.conf.beat_schedule)
self.assertIn("parse-industrial-products-daily", module.app.conf.beat_schedule)
self.assertIn("parse-inspections-weekly", module.app.conf.beat_schedule)
def test_startup_refresh_queues_when_lock_acquired(self):
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
module = _load_module("isolated_core_celery_startup_refresh")
fake_tasks_module = ModuleType("apps.parsers.tasks")
apply_async_mock = MagicMock(return_value=SimpleNamespace(id="task-123"))
fake_tasks_module.parse_all_sources = SimpleNamespace(
apply_async=apply_async_mock
)
with patch.object(module, "settings", SimpleNamespace(
CELERY_STARTUP_REFRESH_ENABLED=True,
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
)), patch.object(module.cache, "add", return_value=True) as add_mock, patch.dict(
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
):
module._queue_startup_sources_refresh()
add_mock.assert_called_once_with("startup-lock", "1", timeout=120)
apply_async_mock.assert_called_once_with(countdown=45)
def test_startup_refresh_skips_when_lock_exists(self):
with patch.dict(
os.environ, {"DJANGO_SETTINGS_MODULE": "settings.test"}, clear=True
), patch.object(sys, "argv", ["python", "manage.py", "shell"]):
module = _load_module("isolated_core_celery_startup_skip")
fake_tasks_module = ModuleType("apps.parsers.tasks")
apply_async_mock = MagicMock()
fake_tasks_module.parse_all_sources = SimpleNamespace(
apply_async=apply_async_mock
)
with patch.object(module, "settings", SimpleNamespace(
CELERY_STARTUP_REFRESH_ENABLED=True,
CELERY_STARTUP_REFRESH_LOCK_KEY="startup-lock",
CELERY_STARTUP_REFRESH_LOCK_TTL_SECONDS=120,
CELERY_STARTUP_REFRESH_DELAY_SECONDS=45,
)), patch.object(module.cache, "add", return_value=False), patch.dict(
sys.modules, {"apps.parsers.tasks": fake_tasks_module}
):
module._queue_startup_sources_refresh()
apply_async_mock.assert_not_called()
def test_debug_task_prints_request(self):
with patch.dict(