"""Tests for organization source backfill tasks and schedules.""" from importlib import import_module from apps.parsers.models import ParserLoadLog from django.apps import apps as django_apps from django.core.cache import cache from django.test import TestCase from django_celery_beat.models import PeriodicTask from organizations.cache import get_organization_api_cache_version from organizations.models import ( IndustrialProductionExtension, Organization, OrganizationSourceRecord, ) from organizations.tasks import ( backfill_all_organization_sources, backfill_organization_sources_for_parser_batch, ) from tests.apps.parsers.factories import IndustrialCertificateRecordFactory class OrganizationSourceBackfillTasksTest(TestCase): """Checks Celery tasks that maintain API v2 organization source extensions.""" def test_backfill_all_task_rebuilds_sources_and_invalidates_api_cache(self): organization = Organization.objects.create( name='ООО "Источник"', inn="7800000401", ogrn="1027700144401", ) IndustrialCertificateRecordFactory( inn=organization.inn, ogrn=organization.ogrn, certificate_number="FULL-SOURCE-CERT", ) cache.set("unrelated:test", {"keep": True}, timeout=60) cache_version_before = get_organization_api_cache_version() result = backfill_all_organization_sources(batch_size=10) self.assertGreaterEqual(result["scanned"], 1) self.assertEqual(result["created_records"], 1) self.assertNotEqual( get_organization_api_cache_version(), cache_version_before, ) self.assertEqual(cache.get("unrelated:test"), {"keep": True}) extension = IndustrialProductionExtension.objects.get( organization=organization, ) record = OrganizationSourceRecord.objects.get(extension=extension) self.assertEqual( record.payload["certificate_number"], "FULL-SOURCE-CERT", ) def test_backfill_parser_batch_task_limits_source_and_batch(self): organization = Organization.objects.create( name='ООО "Пакет источника"', inn="7800000402", ogrn="1027700144402", ) IndustrialCertificateRecordFactory( inn=organization.inn, ogrn=organization.ogrn, certificate_number="BATCH-SOURCE-CERT-1", load_batch=1, ) IndustrialCertificateRecordFactory( inn=organization.inn, ogrn=organization.ogrn, certificate_number="BATCH-SOURCE-CERT-2", load_batch=2, ) result = backfill_organization_sources_for_parser_batch( source=ParserLoadLog.Source.INDUSTRIAL, batch_id=2, ) self.assertEqual(result["scanned"], 1) self.assertEqual(result["created_records"], 1) record = OrganizationSourceRecord.objects.get() self.assertEqual(record.payload["certificate_number"], "BATCH-SOURCE-CERT-2") class OrganizationSnapshotScheduleMigrationTest(TestCase): """Checks legacy data migration that schedules the compatibility task.""" def test_migration_seeds_daily_snapshot_refresh_periodic_task(self): migration = import_module( "organizations.migrations.0004_seed_daily_snapshot_refresh_schedule" ) migration.seed_daily_snapshot_refresh_schedule(django_apps, None) migration.seed_daily_snapshot_refresh_schedule(django_apps, None) task = PeriodicTask.objects.get( name=migration.DAILY_ORGANIZATION_SNAPSHOT_TASK_NAME ) self.assertEqual( task.task, "organizations.tasks.refresh_all_organization_data_snapshots", ) self.assertTrue(task.enabled) self.assertEqual(task.args, "[]") self.assertEqual(task.kwargs, '{"batch_size": 100}') self.assertEqual(task.crontab.minute, "30") self.assertEqual(task.crontab.hour, "4") self.assertEqual(str(task.crontab.timezone), "Europe/Moscow")