diff --git a/src/apps/exchange/serializers.py b/src/apps/exchange/serializers.py index 931ce47..3f0b333 100644 --- a/src/apps/exchange/serializers.py +++ b/src/apps/exchange/serializers.py @@ -1,9 +1,53 @@ """Сериализаторы приложения обмена данными.""" +import json +from typing import Any + from apps.exchange.models import ExchangeConnection +from django_celery_beat.models import IntervalSchedule, PeriodicTask from rest_framework import serializers +def validate_exchange_copy_payload(attrs: dict[str, Any]) -> dict[str, Any]: + """Проверить совместимость параметров запуска копирования.""" + mode = attrs["mode"] + table = attrs.get("table") + tables = attrs.get("tables") + + if mode == "single" and not table: + raise serializers.ValidationError( + {"table": "Для mode=single нужно указать table"} + ) + + if mode == "selected" and not tables: + raise serializers.ValidationError( + {"tables": "Для mode=selected нужно указать tables"} + ) + + if mode != "single" and table: + raise serializers.ValidationError( + {"table": "Поле table допустимо только для mode=single"} + ) + + if mode != "selected" and tables: + raise serializers.ValidationError( + {"tables": "Поле tables допустимо только для mode=selected"} + ) + + return attrs + + +def get_periodic_task_payload(task: PeriodicTask) -> dict[str, Any]: + """Извлечь payload exchange-задачи из kwargs django_celery_beat.""" + try: + kwargs = json.loads(task.kwargs or "{}") + except json.JSONDecodeError: + return {} + + payload = kwargs.get("payload") + return payload if isinstance(payload, dict) else {} + + class ExchangeConnectionSerializer(serializers.ModelSerializer): """Сериализатор подключения без выдачи пароля в ответах.""" @@ -61,28 +105,257 @@ class ExchangeCopyRequestSerializer(serializers.Serializer): truncate_before_copy = serializers.BooleanField(default=True) def validate(self, attrs): - mode = attrs["mode"] - table = attrs.get("table") - tables = attrs.get("tables") + return validate_exchange_copy_payload(attrs) - if mode == "single" and not table: + +class ExchangePeriodicTaskSerializer(serializers.ModelSerializer): + """Сериализатор периодической задачи обмена.""" + + schedule_type = serializers.SerializerMethodField() + interval_every = serializers.SerializerMethodField() + interval_period = serializers.SerializerMethodField() + crontab_minute = serializers.SerializerMethodField() + crontab_hour = serializers.SerializerMethodField() + crontab_day_of_week = serializers.SerializerMethodField() + crontab_day_of_month = serializers.SerializerMethodField() + crontab_month_of_year = serializers.SerializerMethodField() + crontab_timezone = serializers.SerializerMethodField() + mode = serializers.SerializerMethodField() + table = serializers.SerializerMethodField() + tables = serializers.SerializerMethodField() + truncate_before_copy = serializers.SerializerMethodField() + + class Meta: + model = PeriodicTask + fields = [ + "id", + "name", + "description", + "enabled", + "schedule_type", + "interval_every", + "interval_period", + "crontab_minute", + "crontab_hour", + "crontab_day_of_week", + "crontab_day_of_month", + "crontab_month_of_year", + "crontab_timezone", + "mode", + "table", + "tables", + "truncate_before_copy", + "last_run_at", + "total_run_count", + "date_changed", + ] + read_only_fields = fields + + def get_schedule_type(self, obj: PeriodicTask) -> str | None: + if obj.interval_id: + return "interval" + if obj.crontab_id: + return "crontab" + return None + + def get_interval_every(self, obj: PeriodicTask) -> int | None: + return obj.interval.every if obj.interval_id else None + + def get_interval_period(self, obj: PeriodicTask) -> str | None: + return obj.interval.period if obj.interval_id else None + + def get_crontab_minute(self, obj: PeriodicTask) -> str | None: + return obj.crontab.minute if obj.crontab_id else None + + def get_crontab_hour(self, obj: PeriodicTask) -> str | None: + return obj.crontab.hour if obj.crontab_id else None + + def get_crontab_day_of_week(self, obj: PeriodicTask) -> str | None: + return obj.crontab.day_of_week if obj.crontab_id else None + + def get_crontab_day_of_month(self, obj: PeriodicTask) -> str | None: + return obj.crontab.day_of_month if obj.crontab_id else None + + def get_crontab_month_of_year(self, obj: PeriodicTask) -> str | None: + return obj.crontab.month_of_year if obj.crontab_id else None + + def get_crontab_timezone(self, obj: PeriodicTask) -> str | None: + if not obj.crontab_id: + return None + timezone = obj.crontab.timezone + return str(timezone) if timezone is not None else None + + def get_mode(self, obj: PeriodicTask) -> str | None: + return get_periodic_task_payload(obj).get("mode") + + def get_table(self, obj: PeriodicTask) -> str | None: + return get_periodic_task_payload(obj).get("table") + + def get_tables(self, obj: PeriodicTask) -> list[str] | None: + return get_periodic_task_payload(obj).get("tables") + + def get_truncate_before_copy(self, obj: PeriodicTask) -> bool | None: + return get_periodic_task_payload(obj).get("truncate_before_copy") + + +class ExchangePeriodicTaskUpsertSerializer(serializers.Serializer): + """Входные данные для создания и изменения периодической задачи обмена.""" + + name = serializers.CharField(max_length=200, required=False) + description = serializers.CharField(required=False, allow_blank=True) + enabled = serializers.BooleanField(required=False) + + schedule_type = serializers.ChoiceField( + choices=["interval", "crontab"], + required=False, + ) + interval_every = serializers.IntegerField(min_value=1, required=False) + interval_period = serializers.ChoiceField( + choices=[choice[0] for choice in IntervalSchedule.PERIOD_CHOICES], + required=False, + ) + crontab_minute = serializers.CharField(max_length=64, required=False) + crontab_hour = serializers.CharField(max_length=64, required=False) + crontab_day_of_week = serializers.CharField(max_length=64, required=False) + crontab_day_of_month = serializers.CharField(max_length=64, required=False) + crontab_month_of_year = serializers.CharField(max_length=64, required=False) + + mode = serializers.ChoiceField( + choices=["all", "single", "selected"], + required=False, + ) + table = serializers.CharField(required=False) + tables = serializers.ListField( + child=serializers.CharField(), + required=False, + allow_empty=False, + ) + truncate_before_copy = serializers.BooleanField(required=False) + + def validate(self, attrs: dict[str, Any]) -> dict[str, Any]: + if not self.instance and "name" not in attrs: + raise serializers.ValidationError({"name": "Обязательное поле."}) + + schedule_type = self._resolve_schedule_type(attrs) + if not schedule_type: raise serializers.ValidationError( - {"table": "Для mode=single нужно указать table"} + {"schedule_type": "Нужно указать тип расписания."} ) - if mode == "selected" and not tables: - raise serializers.ValidationError( - {"tables": "Для mode=selected нужно указать tables"} - ) - - if mode != "single" and table: - raise serializers.ValidationError( - {"table": "Поле table допустимо только для mode=single"} - ) - - if mode != "selected" and tables: - raise serializers.ValidationError( - {"tables": "Поле tables допустимо только для mode=selected"} - ) + payload = self._build_payload(attrs) + schedule = self._build_schedule(attrs, schedule_type) + attrs["payload"] = validate_exchange_copy_payload(payload) + attrs["schedule"] = schedule return attrs + + def _resolve_schedule_type(self, attrs: dict[str, Any]) -> str | None: + if "schedule_type" in attrs: + return attrs["schedule_type"] + + if self.instance and self.instance.interval_id: + return "interval" + + if self.instance and self.instance.crontab_id: + return "crontab" + + return None + + def _build_payload(self, attrs: dict[str, Any]) -> dict[str, Any]: + current_payload = ( + get_periodic_task_payload(self.instance) if self.instance else {} + ) + mode = attrs.get("mode", current_payload.get("mode", "all")) + truncate_before_copy = attrs.get( + "truncate_before_copy", + current_payload.get("truncate_before_copy", True), + ) + + if "mode" in attrs and attrs["mode"] != "single" and "table" not in attrs: + table = None + else: + table = attrs.get("table", current_payload.get("table")) + + if "mode" in attrs and attrs["mode"] != "selected" and "tables" not in attrs: + tables = None + else: + tables = attrs.get("tables", current_payload.get("tables")) + + return { + "mode": mode, + "table": table, + "tables": tables, + "truncate_before_copy": truncate_before_copy, + } + + def _build_schedule( + self, + attrs: dict[str, Any], + schedule_type: str, + ) -> dict[str, Any]: + if schedule_type == "interval": + return self._build_interval_schedule(attrs) + return self._build_crontab_schedule(attrs) + + def _build_interval_schedule(self, attrs: dict[str, Any]) -> dict[str, Any]: + current_schedule = self.instance.interval if self.instance else None + interval_every = attrs.get( + "interval_every", + current_schedule.every if current_schedule else None, + ) + interval_period = attrs.get( + "interval_period", + current_schedule.period if current_schedule else None, + ) + + errors = {} + if interval_every is None: + errors["interval_every"] = "Обязательное поле для interval." + if interval_period is None: + errors["interval_period"] = "Обязательное поле для interval." + if errors: + raise serializers.ValidationError(errors) + + return { + "type": "interval", + "every": interval_every, + "period": interval_period, + } + + def _build_crontab_schedule(self, attrs: dict[str, Any]) -> dict[str, Any]: + current_schedule = self.instance.crontab if self.instance else None + fields = { + "minute": attrs.get( + "crontab_minute", + current_schedule.minute if current_schedule else None, + ), + "hour": attrs.get( + "crontab_hour", + current_schedule.hour if current_schedule else None, + ), + "day_of_week": attrs.get( + "crontab_day_of_week", + current_schedule.day_of_week if current_schedule else None, + ), + "day_of_month": attrs.get( + "crontab_day_of_month", + current_schedule.day_of_month if current_schedule else None, + ), + "month_of_year": attrs.get( + "crontab_month_of_year", + current_schedule.month_of_year if current_schedule else None, + ), + } + + errors = { + f"crontab_{field_name}": "Обязательное поле для crontab." + for field_name, value in fields.items() + if value is None + } + if errors: + raise serializers.ValidationError(errors) + + return { + "type": "crontab", + **fields, + } diff --git a/src/apps/exchange/services.py b/src/apps/exchange/services.py index 2a1f129..09ab6e0 100644 --- a/src/apps/exchange/services.py +++ b/src/apps/exchange/services.py @@ -2,13 +2,17 @@ from __future__ import annotations +import json from contextlib import suppress from typing import Any from apps.exchange.models import ExchangeConnection from django.apps import apps as django_apps -from django.db import connections, transaction +from django.conf import settings +from django.core.exceptions import ValidationError as DjangoValidationError +from django.db import IntegrityError, connections, transaction from django.utils import timezone +from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask class ExchangeServiceError(ValueError): @@ -481,3 +485,163 @@ class ExchangeConnectionService: connection.save( update_fields=["last_checked_at", "last_error", "updated_at"] ) + + +class ExchangePeriodicTaskService: + """Сервис управления периодическими задачами обмена.""" + + TASK_NAME = "apps.exchange.tasks.dispatch_periodic_exchange_copy" + + @classmethod + def get_queryset(cls): + return ( + PeriodicTask.objects.filter(task=cls.TASK_NAME) + .select_related("interval", "crontab") + .order_by("name") + ) + + @classmethod + @transaction.atomic + def create_periodic_task( + cls, + *, + name: str, + payload: dict[str, Any], + schedule: dict[str, Any], + description: str = "", + enabled: bool = True, + ) -> PeriodicTask: + task = PeriodicTask( + name=name, + task=cls.TASK_NAME, + kwargs=json.dumps({"payload": payload}, ensure_ascii=False), + description=description, + enabled=enabled, + ) + cls._assign_schedule(task=task, schedule=schedule) + return cls._save_task(task) + + @classmethod + @transaction.atomic + def update_periodic_task( + cls, + *, + task: PeriodicTask, + payload: dict[str, Any], + schedule: dict[str, Any], + name: str | None = None, + description: str | None = None, + enabled: bool | None = None, + ) -> PeriodicTask: + old_interval_id = task.interval_id + old_crontab_id = task.crontab_id + + if name is not None: + task.name = name + if description is not None: + task.description = description + if enabled is not None: + task.enabled = enabled + + task.kwargs = json.dumps({"payload": payload}, ensure_ascii=False) + cls._assign_schedule(task=task, schedule=schedule) + task = cls._save_task(task) + + cls._cleanup_unused_interval(old_interval_id) + cls._cleanup_unused_crontab(old_crontab_id) + return task + + @classmethod + def _assign_schedule(cls, *, task: PeriodicTask, schedule: dict[str, Any]) -> None: + if schedule["type"] == "interval": + task.interval = cls._get_or_create_interval(schedule) + task.crontab = None + return + + task.crontab = cls._get_or_create_crontab(schedule) + task.interval = None + + @classmethod + def _get_or_create_interval(cls, schedule: dict[str, Any]) -> IntervalSchedule: + interval = IntervalSchedule( + every=schedule["every"], + period=schedule["period"], + ) + cls._validate_model(interval) + interval, _ = IntervalSchedule.objects.get_or_create( + every=interval.every, + period=interval.period, + ) + return interval + + @classmethod + def _get_or_create_crontab(cls, schedule: dict[str, Any]) -> CrontabSchedule: + crontab = CrontabSchedule( + minute=schedule["minute"], + hour=schedule["hour"], + day_of_week=schedule["day_of_week"], + day_of_month=schedule["day_of_month"], + month_of_year=schedule["month_of_year"], + timezone=settings.TIME_ZONE, + ) + cls._validate_model(crontab) + crontab, _ = CrontabSchedule.objects.get_or_create( + minute=crontab.minute, + hour=crontab.hour, + day_of_week=crontab.day_of_week, + day_of_month=crontab.day_of_month, + month_of_year=crontab.month_of_year, + timezone=crontab.timezone, + ) + return crontab + + @classmethod + def _save_task(cls, task: PeriodicTask) -> PeriodicTask: + try: + task.full_clean() + task.save() + except DjangoValidationError as exc: + raise ExchangeServiceError(cls._format_validation_error(exc)) from exc + except IntegrityError as exc: + raise ExchangeServiceError( + "Периодическая задача с таким именем уже существует" + ) from exc + + return task + + @classmethod + def _validate_model(cls, instance) -> None: + try: + instance.full_clean() + except DjangoValidationError as exc: + raise ExchangeServiceError(cls._format_validation_error(exc)) from exc + + @classmethod + def _format_validation_error(cls, exc: DjangoValidationError) -> str: + if hasattr(exc, "message_dict"): + messages = [] + for field, field_errors in exc.message_dict.items(): + messages.extend(f"{field}: {error}" for error in field_errors) + return "; ".join(messages) + + return "; ".join(exc.messages) + + @classmethod + def _cleanup_unused_interval(cls, interval_id: int | None) -> None: + if not interval_id: + return + + if PeriodicTask.objects.filter(interval_id=interval_id).exists(): + return + + IntervalSchedule.objects.filter(id=interval_id).delete() + + @classmethod + def _cleanup_unused_crontab(cls, crontab_id: int | None) -> None: + if not crontab_id: + return + + if PeriodicTask.objects.filter(crontab_id=crontab_id).exists(): + return + + CrontabSchedule.objects.filter(id=crontab_id).delete() diff --git a/src/apps/exchange/tasks.py b/src/apps/exchange/tasks.py index 880a27d..2abf7e8 100644 --- a/src/apps/exchange/tasks.py +++ b/src/apps/exchange/tasks.py @@ -7,6 +7,7 @@ import uuid from typing import Any from apps.core.services import BackgroundJobService +from apps.core.tasks import PeriodicTask as CorePeriodicTask from apps.exchange.models import ExchangeConnection from apps.exchange.services import ExchangeConnectionService from celery import shared_task @@ -14,6 +15,26 @@ from celery import shared_task logger = logging.getLogger(__name__) +@shared_task(bind=True, base=CorePeriodicTask) +def dispatch_periodic_exchange_copy( + self, + *, + payload: dict[str, Any], +) -> dict[str, Any]: + """Поставить в очередь периодическое копирование через активное подключение.""" + active_connection = ExchangeConnectionService.get_active_connection() + task = copy_parsers_data_async.delay( + connection_id=active_connection.id, + payload=payload, + requested_by_id=None, + ) + return { + "status": "queued", + "task_id": task.id, + "connection_id": active_connection.id, + } + + @shared_task(bind=True) def copy_parsers_data_async( self, diff --git a/src/apps/exchange/urls.py b/src/apps/exchange/urls.py index d8365e2..9bd6958 100644 --- a/src/apps/exchange/urls.py +++ b/src/apps/exchange/urls.py @@ -4,6 +4,8 @@ from apps.exchange.views import ( ExchangeConnectionListCreateView, ExchangeConnectionTestView, ExchangeCopyDataView, + ExchangePeriodicTaskDetailView, + ExchangePeriodicTaskListCreateView, ) from django.urls import path @@ -19,6 +21,16 @@ exchange_urlpatterns = [ name="connections-test", ), path("copy/", ExchangeCopyDataView.as_view(), name="copy"), + path( + "periodic-tasks/", + ExchangePeriodicTaskListCreateView.as_view(), + name="periodic-tasks", + ), + path( + "periodic-tasks//", + ExchangePeriodicTaskDetailView.as_view(), + name="periodic-task-detail", + ), ] urlpatterns = [] diff --git a/src/apps/exchange/views.py b/src/apps/exchange/views.py index 5a18095..55c5d6e 100644 --- a/src/apps/exchange/views.py +++ b/src/apps/exchange/views.py @@ -10,10 +10,17 @@ from apps.exchange.serializers import ( ExchangeConnectionCreateSerializer, ExchangeConnectionSerializer, ExchangeCopyRequestSerializer, + ExchangePeriodicTaskSerializer, + ExchangePeriodicTaskUpsertSerializer, +) +from apps.exchange.services import ( + ExchangeConnectionService, + ExchangePeriodicTaskService, + ExchangeServiceError, ) -from apps.exchange.services import ExchangeConnectionService, ExchangeServiceError from apps.exchange.tasks import copy_parsers_data_async from django.db import IntegrityError +from django.shortcuts import get_object_or_404 from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from rest_framework import status @@ -203,3 +210,124 @@ class ExchangeCopyDataView(APIView): }, status_code=status.HTTP_202_ACCEPTED, ) + + +class ExchangePeriodicTaskListCreateView(APIView): + """API списка и создания периодических задач обмена.""" + + permission_classes = [IsAdminUser] + + @swagger_auto_schema( + tags=[EXCHANGE_TAG], + operation_summary="Список периодических задач обмена", + operation_description=( + "Возвращает периодические задачи exchange, созданные через " + "django_celery_beat." + ), + responses={ + 200: ExchangePeriodicTaskSerializer(many=True), + **ErrorResponses.ADMIN, + }, + ) + def get(self, request): + queryset = ExchangePeriodicTaskService.get_queryset() + serializer = ExchangePeriodicTaskSerializer(queryset, many=True) + return api_response(serializer.data, status_code=status.HTTP_200_OK) + + @swagger_auto_schema( + tags=[EXCHANGE_TAG], + operation_summary="Создать периодическую задачу обмена", + operation_description=( + "Создаёт периодическую задачу exchange с interval или crontab " + "расписанием. При выполнении задача использует текущее активное " + "подключение exchange." + ), + request_body=ExchangePeriodicTaskUpsertSerializer, + responses={ + 201: ExchangePeriodicTaskSerializer, + 400: CommonResponses.BAD_REQUEST, + **ErrorResponses.ADMIN, + }, + ) + def post(self, request): + serializer = ExchangePeriodicTaskUpsertSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + try: + task = ExchangePeriodicTaskService.create_periodic_task( + name=serializer.validated_data["name"], + description=serializer.validated_data.get("description", ""), + enabled=serializer.validated_data.get("enabled", True), + payload=serializer.validated_data["payload"], + schedule=serializer.validated_data["schedule"], + ) + except ExchangeServiceError as exc: + raise ValidationError({"periodic_task": str(exc)}) from exc + + output = ExchangePeriodicTaskSerializer(task) + return api_created_response(output.data) + + +class ExchangePeriodicTaskDetailView(APIView): + """API чтения и изменения периодической задачи обмена.""" + + permission_classes = [IsAdminUser] + + @swagger_auto_schema( + tags=[EXCHANGE_TAG], + operation_summary="Детали периодической задачи обмена", + responses={ + 200: ExchangePeriodicTaskSerializer, + 404: CommonResponses.NOT_FOUND, + **ErrorResponses.ADMIN, + }, + ) + def get(self, request, task_id: int): + task = get_object_or_404( + ExchangePeriodicTaskService.get_queryset(), + id=task_id, + ) + output = ExchangePeriodicTaskSerializer(task) + return api_response(output.data, status_code=status.HTTP_200_OK) + + @swagger_auto_schema( + tags=[EXCHANGE_TAG], + operation_summary="Изменить периодическую задачу обмена", + operation_description=( + "Обновляет расписание, payload и состояние exchange-задачи. " + "PATCH допускает частичное обновление." + ), + request_body=ExchangePeriodicTaskUpsertSerializer, + responses={ + 200: ExchangePeriodicTaskSerializer, + 400: CommonResponses.BAD_REQUEST, + 404: CommonResponses.NOT_FOUND, + **ErrorResponses.ADMIN, + }, + ) + def patch(self, request, task_id: int): + task = get_object_or_404( + ExchangePeriodicTaskService.get_queryset(), + id=task_id, + ) + serializer = ExchangePeriodicTaskUpsertSerializer( + task, + data=request.data, + partial=True, + ) + serializer.is_valid(raise_exception=True) + + try: + task = ExchangePeriodicTaskService.update_periodic_task( + task=task, + name=serializer.validated_data.get("name"), + description=serializer.validated_data.get("description"), + enabled=serializer.validated_data.get("enabled"), + payload=serializer.validated_data["payload"], + schedule=serializer.validated_data["schedule"], + ) + except ExchangeServiceError as exc: + raise ValidationError({"periodic_task": str(exc)}) from exc + + output = ExchangePeriodicTaskSerializer(task) + return api_response(output.data, status_code=status.HTTP_200_OK) diff --git a/tests/apps/exchange/test_serializers.py b/tests/apps/exchange/test_serializers.py index 03ccd66..b206c39 100644 --- a/tests/apps/exchange/test_serializers.py +++ b/tests/apps/exchange/test_serializers.py @@ -1,6 +1,11 @@ """Tests for exchange serializers.""" -from apps.exchange.serializers import ExchangeCopyRequestSerializer +from types import SimpleNamespace + +from apps.exchange.serializers import ( + ExchangeCopyRequestSerializer, + ExchangePeriodicTaskUpsertSerializer, +) from django.test import SimpleTestCase @@ -27,3 +32,61 @@ class ExchangeCopyRequestSerializerTest(SimpleTestCase): ) self.assertFalse(serializer_with_tables.is_valid()) self.assertIn("tables", serializer_with_tables.errors) + + +class ExchangePeriodicTaskUpsertSerializerTest(SimpleTestCase): + def test_interval_schedule_requires_fields(self): + serializer = ExchangePeriodicTaskUpsertSerializer( + data={"name": "copy-job", "schedule_type": "interval"} + ) + + self.assertFalse(serializer.is_valid()) + self.assertIn("interval_every", serializer.errors) + self.assertIn("interval_period", serializer.errors) + + def test_crontab_schedule_requires_fields(self): + serializer = ExchangePeriodicTaskUpsertSerializer( + data={"name": "copy-job", "schedule_type": "crontab"} + ) + + self.assertFalse(serializer.is_valid()) + self.assertIn("crontab_minute", serializer.errors) + self.assertIn("crontab_hour", serializer.errors) + + def test_update_mode_to_all_clears_old_single_table(self): + instance = SimpleNamespace( + interval_id=1, + interval=SimpleNamespace(every=5, period="minutes"), + crontab_id=None, + kwargs='{"payload": {"mode": "single", "table": "old_table"}}', + ) + serializer = ExchangePeriodicTaskUpsertSerializer( + instance, + data={"mode": "all"}, + partial=True, + ) + + self.assertTrue(serializer.is_valid(), serializer.errors) + self.assertEqual( + serializer.validated_data["payload"], + { + "mode": "all", + "table": None, + "tables": None, + "truncate_before_copy": True, + }, + ) + + def test_periodic_task_uses_copy_payload_validation(self): + serializer = ExchangePeriodicTaskUpsertSerializer( + data={ + "name": "copy-job", + "schedule_type": "interval", + "interval_every": 1, + "interval_period": "hours", + "mode": "single", + } + ) + + self.assertFalse(serializer.is_valid()) + self.assertIn("table", serializer.errors) diff --git a/tests/apps/exchange/test_tasks.py b/tests/apps/exchange/test_tasks.py index e191343..d475aaf 100644 --- a/tests/apps/exchange/test_tasks.py +++ b/tests/apps/exchange/test_tasks.py @@ -2,11 +2,51 @@ from __future__ import annotations from unittest.mock import MagicMock, patch -from apps.exchange.tasks import copy_parsers_data_async +from apps.exchange.services import ExchangeServiceError +from apps.exchange.tasks import copy_parsers_data_async, dispatch_periodic_exchange_copy from django.test import SimpleTestCase class ExchangeTasksTest(SimpleTestCase): + def test_dispatch_periodic_exchange_copy_enqueues_copy_with_active_connection(self): + active_connection = MagicMock(id=15) + + with patch( + "apps.exchange.tasks.ExchangeConnectionService.get_active_connection", + return_value=active_connection, + ) as get_connection_mock, patch( + "apps.exchange.tasks.copy_parsers_data_async.delay", + return_value=MagicMock(id="task-15"), + ) as delay_mock: + result = dispatch_periodic_exchange_copy.run( + payload={"mode": "all", "truncate_before_copy": True} + ) + + self.assertEqual( + result, + { + "status": "queued", + "task_id": "task-15", + "connection_id": 15, + }, + ) + get_connection_mock.assert_called_once_with() + delay_mock.assert_called_once_with( + connection_id=15, + payload={"mode": "all", "truncate_before_copy": True}, + requested_by_id=None, + ) + + def test_dispatch_periodic_exchange_copy_fails_without_active_connection(self): + with patch( + "apps.exchange.tasks.ExchangeConnectionService.get_active_connection", + side_effect=ExchangeServiceError("Активное подключение не найдено"), + ), self.assertRaisesMessage( + ExchangeServiceError, + "Активное подключение не найдено", + ): + dispatch_periodic_exchange_copy.run(payload={"mode": "all"}) + def test_copy_parsers_data_async_completes_with_existing_job(self): background_job = MagicMock() connection = MagicMock() diff --git a/tests/apps/exchange/test_views.py b/tests/apps/exchange/test_views.py index d99ac5f..9071a43 100644 --- a/tests/apps/exchange/test_views.py +++ b/tests/apps/exchange/test_views.py @@ -1,11 +1,14 @@ """Tests for exchange API views.""" +import json from types import SimpleNamespace from unittest.mock import patch from apps.exchange.models import ExchangeConnection -from apps.exchange.services import ExchangeServiceError +from apps.exchange.services import ExchangePeriodicTaskService, ExchangeServiceError +from django.conf import settings from django.urls import reverse +from django_celery_beat.models import IntervalSchedule, PeriodicTask from rest_framework import status from rest_framework.test import APITestCase @@ -20,6 +23,7 @@ class ExchangeViewsTest(APITestCase): self.connections_url = reverse("api_v1:exchange:connections") self.test_connection_url = reverse("api_v1:exchange:connections-test") self.copy_url = reverse("api_v1:exchange:copy") + self.periodic_tasks_url = reverse("api_v1:exchange:periodic-tasks") def test_connections_endpoint_admin_only(self): response = self.client.get(self.connections_url) @@ -164,3 +168,141 @@ class ExchangeViewsTest(APITestCase): self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("table", str(response.data)) + + def test_periodic_tasks_endpoint_admin_only(self): + response = self.client.get(self.periodic_tasks_url) + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + self.client.force_authenticate(self.user) + response = self.client.get(self.periodic_tasks_url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + self.client.force_authenticate(self.admin) + response = self.client.get(self.periodic_tasks_url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertTrue(response.data["success"]) + self.assertEqual(response.data["data"], []) + + def test_create_periodic_interval_task_success(self): + payload = { + "name": "exchange-copy-hourly", + "description": "Hourly sync", + "enabled": True, + "schedule_type": "interval", + "interval_every": 1, + "interval_period": "hours", + "mode": "all", + } + + self.client.force_authenticate(self.admin) + response = self.client.post(self.periodic_tasks_url, payload, format="json") + + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + task = PeriodicTask.objects.get(id=response.data["data"]["id"]) + self.assertEqual(task.task, ExchangePeriodicTaskService.TASK_NAME) + self.assertEqual(response.data["data"]["schedule_type"], "interval") + self.assertEqual(response.data["data"]["interval_every"], 1) + self.assertEqual(response.data["data"]["interval_period"], "hours") + self.assertEqual( + json.loads(task.kwargs), + { + "payload": { + "mode": "all", + "table": None, + "tables": None, + "truncate_before_copy": True, + } + }, + ) + + def test_list_periodic_tasks_returns_only_exchange_tasks(self): + interval = IntervalSchedule.objects.create(every=1, period="hours") + PeriodicTask.objects.create( + name="exchange-copy-hourly", + task=ExchangePeriodicTaskService.TASK_NAME, + interval=interval, + kwargs=json.dumps( + {"payload": {"mode": "all", "truncate_before_copy": True}} + ), + ) + PeriodicTask.objects.create( + name="another-task", + task="apps.parsers.tasks.fake_task", + interval=interval, + kwargs="{}", + ) + + self.client.force_authenticate(self.admin) + response = self.client.get(self.periodic_tasks_url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data["data"]), 1) + self.assertEqual(response.data["data"][0]["name"], "exchange-copy-hourly") + + def test_update_periodic_task_switches_to_crontab(self): + interval = IntervalSchedule.objects.create(every=1, period="hours") + task = PeriodicTask.objects.create( + name="exchange-copy-hourly", + task=ExchangePeriodicTaskService.TASK_NAME, + description="Hourly sync", + enabled=True, + interval=interval, + kwargs=json.dumps( + { + "payload": { + "mode": "all", + "table": None, + "tables": None, + "truncate_before_copy": True, + } + } + ), + ) + detail_url = reverse( + "api_v1:exchange:periodic-task-detail", + kwargs={"task_id": task.id}, + ) + payload = { + "schedule_type": "crontab", + "crontab_minute": "0", + "crontab_hour": "4", + "crontab_day_of_week": "*", + "crontab_day_of_month": "*", + "crontab_month_of_year": "*", + "mode": "single", + "table": "parsers_proxy", + "enabled": False, + } + + self.client.force_authenticate(self.admin) + response = self.client.patch(detail_url, payload, format="json") + + self.assertEqual(response.status_code, status.HTTP_200_OK) + task.refresh_from_db() + self.assertIsNone(task.interval) + self.assertIsNotNone(task.crontab) + self.assertEqual(str(task.crontab.timezone), settings.TIME_ZONE) + self.assertFalse(task.enabled) + self.assertEqual(response.data["data"]["schedule_type"], "crontab") + self.assertEqual(response.data["data"]["crontab_hour"], "4") + self.assertEqual(response.data["data"]["mode"], "single") + self.assertEqual(response.data["data"]["table"], "parsers_proxy") + self.assertFalse(IntervalSchedule.objects.filter(id=interval.id).exists()) + + def test_periodic_task_detail_returns_404_for_non_exchange_task(self): + interval = IntervalSchedule.objects.create(every=1, period="hours") + task = PeriodicTask.objects.create( + name="another-task", + task="apps.parsers.tasks.fake_task", + interval=interval, + kwargs="{}", + ) + detail_url = reverse( + "api_v1:exchange:periodic-task-detail", + kwargs={"task_id": task.id}, + ) + + self.client.force_authenticate(self.admin) + response = self.client.get(detail_url) + + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)