113 lines
4.1 KiB
Python
113 lines
4.1 KiB
Python
"""Tests for organization source backfill tasks and schedules."""
|
||
|
||
from importlib import import_module
|
||
|
||
from apps.parsers.models import ParserLoadLog
|
||
from django.apps import apps as django_apps
|
||
from django.core.cache import cache
|
||
from django.test import TestCase
|
||
from django_celery_beat.models import PeriodicTask
|
||
from organizations.cache import get_organization_api_cache_version
|
||
from organizations.models import (
|
||
IndustrialProductionExtension,
|
||
Organization,
|
||
OrganizationSourceRecord,
|
||
)
|
||
from organizations.tasks import (
|
||
backfill_all_organization_sources,
|
||
backfill_organization_sources_for_parser_batch,
|
||
)
|
||
|
||
from tests.apps.parsers.factories import IndustrialCertificateRecordFactory
|
||
|
||
|
||
class OrganizationSourceBackfillTasksTest(TestCase):
|
||
"""Checks Celery tasks that maintain API v2 organization source extensions."""
|
||
|
||
def test_backfill_all_task_rebuilds_sources_and_invalidates_api_cache(self):
|
||
organization = Organization.objects.create(
|
||
name='ООО "Источник"',
|
||
inn="7800000401",
|
||
ogrn="1027700144401",
|
||
)
|
||
IndustrialCertificateRecordFactory(
|
||
inn=organization.inn,
|
||
ogrn=organization.ogrn,
|
||
certificate_number="FULL-SOURCE-CERT",
|
||
)
|
||
cache.set("unrelated:test", {"keep": True}, timeout=60)
|
||
cache_version_before = get_organization_api_cache_version()
|
||
|
||
result = backfill_all_organization_sources(batch_size=10)
|
||
|
||
self.assertGreaterEqual(result["scanned"], 1)
|
||
self.assertEqual(result["created_records"], 1)
|
||
self.assertNotEqual(
|
||
get_organization_api_cache_version(),
|
||
cache_version_before,
|
||
)
|
||
self.assertEqual(cache.get("unrelated:test"), {"keep": True})
|
||
extension = IndustrialProductionExtension.objects.get(
|
||
organization=organization,
|
||
)
|
||
record = OrganizationSourceRecord.objects.get(extension=extension)
|
||
self.assertEqual(
|
||
record.payload["certificate_number"],
|
||
"FULL-SOURCE-CERT",
|
||
)
|
||
|
||
def test_backfill_parser_batch_task_limits_source_and_batch(self):
|
||
organization = Organization.objects.create(
|
||
name='ООО "Пакет источника"',
|
||
inn="7800000402",
|
||
ogrn="1027700144402",
|
||
)
|
||
IndustrialCertificateRecordFactory(
|
||
inn=organization.inn,
|
||
ogrn=organization.ogrn,
|
||
certificate_number="BATCH-SOURCE-CERT-1",
|
||
load_batch=1,
|
||
)
|
||
IndustrialCertificateRecordFactory(
|
||
inn=organization.inn,
|
||
ogrn=organization.ogrn,
|
||
certificate_number="BATCH-SOURCE-CERT-2",
|
||
load_batch=2,
|
||
)
|
||
|
||
result = backfill_organization_sources_for_parser_batch(
|
||
source=ParserLoadLog.Source.INDUSTRIAL,
|
||
batch_id=2,
|
||
)
|
||
|
||
self.assertEqual(result["scanned"], 1)
|
||
self.assertEqual(result["created_records"], 1)
|
||
record = OrganizationSourceRecord.objects.get()
|
||
self.assertEqual(record.payload["certificate_number"], "BATCH-SOURCE-CERT-2")
|
||
|
||
|
||
class OrganizationSnapshotScheduleMigrationTest(TestCase):
|
||
"""Checks legacy data migration that schedules the compatibility task."""
|
||
|
||
def test_migration_seeds_daily_snapshot_refresh_periodic_task(self):
|
||
migration = import_module(
|
||
"organizations.migrations.0004_seed_daily_snapshot_refresh_schedule"
|
||
)
|
||
|
||
migration.seed_daily_snapshot_refresh_schedule(django_apps, None)
|
||
migration.seed_daily_snapshot_refresh_schedule(django_apps, None)
|
||
|
||
task = PeriodicTask.objects.get(
|
||
name=migration.DAILY_ORGANIZATION_SNAPSHOT_TASK_NAME
|
||
)
|
||
self.assertEqual(
|
||
task.task,
|
||
"organizations.tasks.refresh_all_organization_data_snapshots",
|
||
)
|
||
self.assertTrue(task.enabled)
|
||
self.assertEqual(task.args, "[]")
|
||
self.assertEqual(task.kwargs, '{"batch_size": 100}')
|
||
self.assertEqual(task.crontab.minute, "30")
|
||
self.assertEqual(task.crontab.hour, "4")
|
||
self.assertEqual(str(task.crontab.timezone), "Europe/Moscow")
|