Schedule organization snapshot refresh
All checks were successful
CI/CD Pipeline / Quality Gate (push) Successful in 22s
CI/CD Pipeline / Build and Push Images (push) Successful in 5s
CI/CD Pipeline / Internal Notify (push) Successful in 1s
CI/CD Pipeline / Deploy Dev in Dokploy (push) Successful in 1s

This commit is contained in:
2026-05-06 20:54:13 +02:00
parent 0f17ff6773
commit 507ae2063a
3 changed files with 140 additions and 0 deletions

View File

@@ -0,0 +1,67 @@
"""Tests for organization snapshot tasks and schedules."""
from importlib import import_module
from django.apps import apps as django_apps
from django.core.cache import cache
from django.test import TestCase
from django_celery_beat.models import PeriodicTask
from organizations.models import Organization
from organizations.tasks import refresh_all_organization_data_snapshots
from tests.apps.parsers.factories import IndustrialCertificateRecordFactory
class OrganizationSnapshotTasksTest(TestCase):
"""Checks Celery tasks that maintain API v2 organization snapshots."""
def test_refresh_all_task_rebuilds_snapshots_and_clears_api_cache(self):
organization = Organization.objects.create(
name='ООО "Снапшот"',
inn="7800000401",
ogrn="1027700144401",
)
IndustrialCertificateRecordFactory(
inn=organization.inn,
ogrn=organization.ogrn,
certificate_number="FULL-SNAPSHOT-CERT",
)
cache.set("api:v2:organizations:test", {"stale": True}, timeout=60)
result = refresh_all_organization_data_snapshots(batch_size=10)
self.assertEqual(result["processed"], 1)
self.assertEqual(result["created"], 1)
self.assertEqual(result["updated"], 0)
self.assertIsNone(cache.get("api:v2:organizations:test"))
snapshot = organization.data_snapshot
self.assertEqual(
snapshot.data["industrial"][0]["certificate_number"],
"FULL-SNAPSHOT-CERT",
)
class OrganizationSnapshotScheduleMigrationTest(TestCase):
"""Checks data migration that schedules full snapshot refresh."""
def test_migration_seeds_daily_snapshot_refresh_periodic_task(self):
migration = import_module(
"organizations.migrations.0004_seed_daily_snapshot_refresh_schedule"
)
migration.seed_daily_snapshot_refresh_schedule(django_apps, None)
migration.seed_daily_snapshot_refresh_schedule(django_apps, None)
task = PeriodicTask.objects.get(
name=migration.DAILY_ORGANIZATION_SNAPSHOT_TASK_NAME
)
self.assertEqual(
task.task,
"organizations.tasks.refresh_all_organization_data_snapshots",
)
self.assertTrue(task.enabled)
self.assertEqual(task.args, "[]")
self.assertEqual(task.kwargs, '{"batch_size": 100}')
self.assertEqual(task.crontab.minute, "30")
self.assertEqual(task.crontab.hour, "4")
self.assertEqual(str(task.crontab.timezone), "Europe/Moscow")