Files
mostovik-backend/tests/apps/exchange/test_serializers.py
Aleksandr Meshchriakov 3de66cc25c
Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Successful in 3m16s
CI/CD Pipeline / Run Tests (push) Successful in 3m26s
CI/CD Pipeline / Telegram Notify Success (push) Failing after 1m29s
CI/CD Pipeline / Run Tests (pull_request) Successful in 1m44s
CI/CD Pipeline / Code Quality Checks (pull_request) Successful in 20m19s
CI/CD Pipeline / Telegram Notify Success (pull_request) Failing after 1m34s
Add periodic exchange task management API
2026-03-19 17:03:47 +01:00

93 lines
3.3 KiB
Python

"""Tests for exchange serializers."""
from types import SimpleNamespace
from apps.exchange.serializers import (
ExchangeCopyRequestSerializer,
ExchangePeriodicTaskUpsertSerializer,
)
from django.test import SimpleTestCase
class ExchangeCopyRequestSerializerTest(SimpleTestCase):
def test_single_mode_requires_table(self):
serializer = ExchangeCopyRequestSerializer(data={"mode": "single"})
self.assertFalse(serializer.is_valid())
self.assertIn("table", serializer.errors)
def test_selected_mode_requires_tables(self):
serializer = ExchangeCopyRequestSerializer(data={"mode": "selected"})
self.assertFalse(serializer.is_valid())
self.assertIn("tables", serializer.errors)
def test_table_and_tables_are_rejected_for_wrong_modes(self):
serializer_with_table = ExchangeCopyRequestSerializer(
data={"mode": "all", "table": "parsers_manufacturer"}
)
self.assertFalse(serializer_with_table.is_valid())
self.assertIn("table", serializer_with_table.errors)
serializer_with_tables = ExchangeCopyRequestSerializer(
data={"mode": "all", "tables": ["parsers_manufacturer"]}
)
self.assertFalse(serializer_with_tables.is_valid())
self.assertIn("tables", serializer_with_tables.errors)
class ExchangePeriodicTaskUpsertSerializerTest(SimpleTestCase):
def test_interval_schedule_requires_fields(self):
serializer = ExchangePeriodicTaskUpsertSerializer(
data={"name": "copy-job", "schedule_type": "interval"}
)
self.assertFalse(serializer.is_valid())
self.assertIn("interval_every", serializer.errors)
self.assertIn("interval_period", serializer.errors)
def test_crontab_schedule_requires_fields(self):
serializer = ExchangePeriodicTaskUpsertSerializer(
data={"name": "copy-job", "schedule_type": "crontab"}
)
self.assertFalse(serializer.is_valid())
self.assertIn("crontab_minute", serializer.errors)
self.assertIn("crontab_hour", serializer.errors)
def test_update_mode_to_all_clears_old_single_table(self):
instance = SimpleNamespace(
interval_id=1,
interval=SimpleNamespace(every=5, period="minutes"),
crontab_id=None,
kwargs='{"payload": {"mode": "single", "table": "old_table"}}',
)
serializer = ExchangePeriodicTaskUpsertSerializer(
instance,
data={"mode": "all"},
partial=True,
)
self.assertTrue(serializer.is_valid(), serializer.errors)
self.assertEqual(
serializer.validated_data["payload"],
{
"mode": "all",
"table": None,
"tables": None,
"truncate_before_copy": True,
},
)
def test_periodic_task_uses_copy_payload_validation(self):
serializer = ExchangePeriodicTaskUpsertSerializer(
data={
"name": "copy-job",
"schedule_type": "interval",
"interval_every": 1,
"interval_period": "hours",
"mode": "single",
}
)
self.assertFalse(serializer.is_valid())
self.assertIn("table", serializer.errors)