From 507ae2063aca3eff3baef2c24bdba51d0a0ac745 Mon Sep 17 00:00:00 2001 From: Aleksandr Meshchriakov Date: Wed, 6 May 2026 20:54:13 +0200 Subject: [PATCH] Schedule organization snapshot refresh --- ...04_seed_daily_snapshot_refresh_schedule.py | 59 ++++++++++++++++ src/organizations/tasks.py | 14 ++++ tests/apps/organizations/test_tasks.py | 67 +++++++++++++++++++ 3 files changed, 140 insertions(+) create mode 100644 src/organizations/migrations/0004_seed_daily_snapshot_refresh_schedule.py create mode 100644 tests/apps/organizations/test_tasks.py diff --git a/src/organizations/migrations/0004_seed_daily_snapshot_refresh_schedule.py b/src/organizations/migrations/0004_seed_daily_snapshot_refresh_schedule.py new file mode 100644 index 0000000..21a6893 --- /dev/null +++ b/src/organizations/migrations/0004_seed_daily_snapshot_refresh_schedule.py @@ -0,0 +1,59 @@ +import json + +from django.db import migrations + +DAILY_ORGANIZATION_SNAPSHOT_TASK_NAME = "organizations:data-snapshots:daily-msk" +DAILY_ORGANIZATION_SNAPSHOT_TASK_PATH = ( + "organizations.tasks.refresh_all_organization_data_snapshots" +) +DAILY_MSK_CRON = { + "minute": "30", + "hour": "4", + "day_of_week": "*", + "day_of_month": "*", + "month_of_year": "*", + "timezone": "Europe/Moscow", +} + + +def seed_daily_snapshot_refresh_schedule(apps, schema_editor): + CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule") + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + + crontab, _ = CrontabSchedule.objects.get_or_create(**DAILY_MSK_CRON) + field_names = {field.name for field in PeriodicTask._meta.fields} + schedule_fields = {"crontab": crontab} + for field_name in ("interval", "solar", "clocked"): + if field_name in field_names: + schedule_fields[field_name] = None + + PeriodicTask.objects.update_or_create( + name=DAILY_ORGANIZATION_SNAPSHOT_TASK_NAME, + defaults={ + "task": DAILY_ORGANIZATION_SNAPSHOT_TASK_PATH, + "args": json.dumps([]), + "kwargs": json.dumps({"batch_size": 100}), + "enabled": True, + "description": "Daily full refresh for API v2 organization snapshots.", + **schedule_fields, + }, + ) + + +def remove_daily_snapshot_refresh_schedule(apps, schema_editor): + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + PeriodicTask.objects.filter(name=DAILY_ORGANIZATION_SNAPSHOT_TASK_NAME).delete() + + +class Migration(migrations.Migration): + dependencies = [ + ("django_celery_beat", "0018_improve_crontab_helptext"), + ("organizations", "0003_allow_branch_kpp_organizations"), + ] + + operations = [ + migrations.RunPython( + seed_daily_snapshot_refresh_schedule, + reverse_code=remove_daily_snapshot_refresh_schedule, + ), + ] diff --git a/src/organizations/tasks.py b/src/organizations/tasks.py index 1fc2324..a9c12c8 100644 --- a/src/organizations/tasks.py +++ b/src/organizations/tasks.py @@ -12,6 +12,20 @@ from organizations.services import OrganizationDataSnapshotRefreshService logger = logging.getLogger(__name__) +@shared_task +def refresh_all_organization_data_snapshots(batch_size: int = 100) -> dict: + """Refresh all organization data snapshots for API v2.""" + result = OrganizationDataSnapshotRefreshService.refresh(batch_size=batch_size) + cache.clear() + payload = { + "processed": result.processed, + "created": result.created, + "updated": result.updated, + } + logger.info("All organization data snapshots refreshed: %s", payload) + return payload + + @shared_task def refresh_organization_data_snapshots_for_parser_batch( *, diff --git a/tests/apps/organizations/test_tasks.py b/tests/apps/organizations/test_tasks.py new file mode 100644 index 0000000..0183bd0 --- /dev/null +++ b/tests/apps/organizations/test_tasks.py @@ -0,0 +1,67 @@ +"""Tests for organization snapshot tasks and schedules.""" + +from importlib import import_module + +from django.apps import apps as django_apps +from django.core.cache import cache +from django.test import TestCase +from django_celery_beat.models import PeriodicTask +from organizations.models import Organization +from organizations.tasks import refresh_all_organization_data_snapshots + +from tests.apps.parsers.factories import IndustrialCertificateRecordFactory + + +class OrganizationSnapshotTasksTest(TestCase): + """Checks Celery tasks that maintain API v2 organization snapshots.""" + + def test_refresh_all_task_rebuilds_snapshots_and_clears_api_cache(self): + organization = Organization.objects.create( + name='ООО "Снапшот"', + inn="7800000401", + ogrn="1027700144401", + ) + IndustrialCertificateRecordFactory( + inn=organization.inn, + ogrn=organization.ogrn, + certificate_number="FULL-SNAPSHOT-CERT", + ) + cache.set("api:v2:organizations:test", {"stale": True}, timeout=60) + + result = refresh_all_organization_data_snapshots(batch_size=10) + + self.assertEqual(result["processed"], 1) + self.assertEqual(result["created"], 1) + self.assertEqual(result["updated"], 0) + self.assertIsNone(cache.get("api:v2:organizations:test")) + snapshot = organization.data_snapshot + self.assertEqual( + snapshot.data["industrial"][0]["certificate_number"], + "FULL-SNAPSHOT-CERT", + ) + + +class OrganizationSnapshotScheduleMigrationTest(TestCase): + """Checks data migration that schedules full snapshot refresh.""" + + def test_migration_seeds_daily_snapshot_refresh_periodic_task(self): + migration = import_module( + "organizations.migrations.0004_seed_daily_snapshot_refresh_schedule" + ) + + migration.seed_daily_snapshot_refresh_schedule(django_apps, None) + migration.seed_daily_snapshot_refresh_schedule(django_apps, None) + + task = PeriodicTask.objects.get( + name=migration.DAILY_ORGANIZATION_SNAPSHOT_TASK_NAME + ) + self.assertEqual( + task.task, + "organizations.tasks.refresh_all_organization_data_snapshots", + ) + self.assertTrue(task.enabled) + self.assertEqual(task.args, "[]") + self.assertEqual(task.kwargs, '{"batch_size": 100}') + self.assertEqual(task.crontab.minute, "30") + self.assertEqual(task.crontab.hour, "4") + self.assertEqual(str(task.crontab.timezone), "Europe/Moscow")