Files

113 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for organization source backfill tasks and schedules."""
from importlib import import_module
from apps.parsers.models import ParserLoadLog
from django.apps import apps as django_apps
from django.core.cache import cache
from django.test import TestCase
from django_celery_beat.models import PeriodicTask
from organizations.cache import get_organization_api_cache_version
from organizations.models import (
IndustrialProductionExtension,
Organization,
OrganizationSourceRecord,
)
from organizations.tasks import (
backfill_all_organization_sources,
backfill_organization_sources_for_parser_batch,
)
from tests.apps.parsers.factories import IndustrialCertificateRecordFactory
class OrganizationSourceBackfillTasksTest(TestCase):
"""Checks Celery tasks that maintain API v2 organization source extensions."""
def test_backfill_all_task_rebuilds_sources_and_invalidates_api_cache(self):
organization = Organization.objects.create(
name='ООО "Источник"',
inn="7800000401",
ogrn="1027700144401",
)
IndustrialCertificateRecordFactory(
inn=organization.inn,
ogrn=organization.ogrn,
certificate_number="FULL-SOURCE-CERT",
)
cache.set("unrelated:test", {"keep": True}, timeout=60)
cache_version_before = get_organization_api_cache_version()
result = backfill_all_organization_sources(batch_size=10)
self.assertGreaterEqual(result["scanned"], 1)
self.assertEqual(result["created_records"], 1)
self.assertNotEqual(
get_organization_api_cache_version(),
cache_version_before,
)
self.assertEqual(cache.get("unrelated:test"), {"keep": True})
extension = IndustrialProductionExtension.objects.get(
organization=organization,
)
record = OrganizationSourceRecord.objects.get(extension=extension)
self.assertEqual(
record.payload["certificate_number"],
"FULL-SOURCE-CERT",
)
def test_backfill_parser_batch_task_limits_source_and_batch(self):
organization = Organization.objects.create(
name='ООО "Пакет источника"',
inn="7800000402",
ogrn="1027700144402",
)
IndustrialCertificateRecordFactory(
inn=organization.inn,
ogrn=organization.ogrn,
certificate_number="BATCH-SOURCE-CERT-1",
load_batch=1,
)
IndustrialCertificateRecordFactory(
inn=organization.inn,
ogrn=organization.ogrn,
certificate_number="BATCH-SOURCE-CERT-2",
load_batch=2,
)
result = backfill_organization_sources_for_parser_batch(
source=ParserLoadLog.Source.INDUSTRIAL,
batch_id=2,
)
self.assertEqual(result["scanned"], 1)
self.assertEqual(result["created_records"], 1)
record = OrganizationSourceRecord.objects.get()
self.assertEqual(record.payload["certificate_number"], "BATCH-SOURCE-CERT-2")
class OrganizationSnapshotScheduleMigrationTest(TestCase):
"""Checks legacy data migration that schedules the compatibility task."""
def test_migration_seeds_daily_snapshot_refresh_periodic_task(self):
migration = import_module(
"organizations.migrations.0004_seed_daily_snapshot_refresh_schedule"
)
migration.seed_daily_snapshot_refresh_schedule(django_apps, None)
migration.seed_daily_snapshot_refresh_schedule(django_apps, None)
task = PeriodicTask.objects.get(
name=migration.DAILY_ORGANIZATION_SNAPSHOT_TASK_NAME
)
self.assertEqual(
task.task,
"organizations.tasks.refresh_all_organization_data_snapshots",
)
self.assertTrue(task.enabled)
self.assertEqual(task.args, "[]")
self.assertEqual(task.kwargs, '{"batch_size": 100}')
self.assertEqual(task.crontab.minute, "30")
self.assertEqual(task.crontab.hour, "4")
self.assertEqual(str(task.crontab.timezone), "Europe/Moscow")