Files
Aleksandr Meshchriakov b8a18d6da4
Some checks failed
CI/CD Pipeline / Quality Gate (push) Failing after 14s
CI/CD Pipeline / Build and Push Images (push) Has been skipped
CI/CD Pipeline / Deploy Dev in Dokploy (push) Has been skipped
CI/CD Pipeline / Internal Notify (push) Successful in 0s
feat: migrate parser data to source records
2026-05-19 20:21:31 +02:00

326 lines
12 KiB
Python

"""Tests for exchange API views."""
import json
from unittest.mock import patch
from apps.exchange.models import ExchangeConnection
from apps.exchange.services import ExchangePeriodicTaskService, ExchangeServiceError
from django.conf import settings
from django.urls import reverse
from django_celery_beat.models import IntervalSchedule, PeriodicTask
from rest_framework import status
from rest_framework.test import APITestCase
from tests.apps.exchange.factories import ExchangeConnectionFactory
from tests.apps.user.factories import UserFactory
class ExchangeViewsTest(APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
self.admin = UserFactory.create_superuser()
self.connections_url = reverse("api_v1:exchange:connections")
self.test_connection_url = reverse("api_v1:exchange:connections-test")
self.copy_url = reverse("api_v1:exchange:copy")
self.periodic_tasks_url = reverse("api_v1:exchange:periodic-tasks")
def test_connections_endpoint_admin_only(self):
response = self.client.get(self.connections_url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.client.force_authenticate(self.user)
response = self.client.get(self.connections_url)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.client.force_authenticate(self.admin)
response = self.client.get(self.connections_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIsInstance(response.data["results"], list)
@patch("apps.exchange.services.ExchangeConnectionService.validate_target_structure")
@patch("apps.exchange.services.ExchangeConnectionService.prepare_target_structure")
@patch("apps.exchange.services.ExchangeConnectionService.test_connection")
def test_create_connection_success(
self,
connection_mock,
prepare_mock,
validate_mock,
):
old_active = ExchangeConnectionFactory(is_active=True)
payload = {
"server": "127.0.0.1",
"port": 5432,
"username": "postgres",
"password": "secret",
"database_name": "target_db",
"schema_name": "public",
}
self.client.force_authenticate(self.admin)
response = self.client.post(self.connections_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(ExchangeConnection.objects.filter(is_active=True).count(), 1)
self.assertEqual(
set(response.data.keys()),
{
"id",
"server",
"port",
"username",
"database_name",
"schema_name",
"is_active",
},
)
new_connection = ExchangeConnection.objects.get(id=response.data["id"])
self.assertTrue(new_connection.is_active)
self.assertNotEqual(new_connection.password, payload["password"])
self.assertEqual(new_connection.get_decrypted_password(), payload["password"])
old_active.refresh_from_db()
self.assertFalse(old_active.is_active)
connection_mock.assert_called_once()
prepare_mock.assert_called_once()
validate_mock.assert_called_once()
@patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload")
def test_test_connection_success(self, test_connection_mock):
payload = {
"server": "127.0.0.1",
"port": 5432,
"username": "postgres",
"password": "secret",
"database_name": "target_db",
"schema_name": "public",
}
test_connection_mock.return_value = {
"status": "success",
"message": "ok",
}
self.client.force_authenticate(self.admin)
response = self.client.post(self.test_connection_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(response.data["success"])
self.assertEqual(response.data["message"], "ok")
self.assertEqual(ExchangeConnection.objects.count(), 0)
test_connection_mock.assert_called_once_with(**payload)
@patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload")
def test_test_connection_failure_returns_400(self, test_connection_mock):
payload = {
"server": "127.0.0.1",
"port": 5432,
"username": "postgres",
"password": "secret",
"database_name": "target_db",
"schema_name": "public",
}
test_connection_mock.side_effect = ExchangeServiceError("Connection refused")
self.client.force_authenticate(self.admin)
response = self.client.post(self.test_connection_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(ExchangeConnection.objects.count(), 0)
@patch("apps.exchange.services.ExchangeConnectionService.test_connection")
def test_create_connection_fail_rolls_back_active(self, connection_mock):
connection_mock.side_effect = ExchangeServiceError("Connection refused")
old_active = ExchangeConnectionFactory(is_active=True)
payload = {
"server": "127.0.0.1",
"port": 5432,
"username": "postgres",
"password": "secret",
"database_name": "target_db",
"schema_name": "public",
}
self.client.force_authenticate(self.admin)
response = self.client.post(self.connections_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(ExchangeConnection.objects.count(), 1)
old_active.refresh_from_db()
self.assertTrue(old_active.is_active)
def test_copy_endpoint_is_disabled(self):
self.client.force_authenticate(self.admin)
response = self.client.post(self.copy_url, {"mode": "all"}, format="json")
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
def test_tables_endpoint_is_disabled(self):
self.client.force_authenticate(self.admin)
response = self.client.get(reverse("api_v1:exchange:tables"))
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
def test_periodic_tasks_endpoint_admin_only(self):
response = self.client.get(self.periodic_tasks_url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.client.force_authenticate(self.user)
response = self.client.get(self.periodic_tasks_url)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.client.force_authenticate(self.admin)
response = self.client.get(self.periodic_tasks_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["results"], [])
def test_create_periodic_interval_task_success(self):
payload = {
"schedule_type": "interval",
"interval_every": 1,
"interval_period": "hours",
"notify_on_error": True,
}
self.client.force_authenticate(self.admin)
response = self.client.post(self.periodic_tasks_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
task = PeriodicTask.objects.get(id=response.data["id"])
self.assertEqual(task.task, ExchangePeriodicTaskService.TASK_NAME)
self.assertEqual(
set(response.data.keys()),
{
"id",
"schedule_type",
"interval_every",
"interval_period",
"notify_on_error",
},
)
self.assertEqual(response.data["schedule_type"], "interval")
self.assertEqual(response.data["interval_every"], 1)
self.assertEqual(response.data["interval_period"], "hours")
self.assertTrue(response.data["notify_on_error"])
self.assertEqual(
json.loads(task.kwargs),
{
"payload": {
"mode": "all",
"table": None,
"tables": None,
"truncate_before_copy": True,
"notify_on_error": True,
}
},
)
def test_list_periodic_tasks_returns_only_exchange_tasks(self):
interval = IntervalSchedule.objects.create(every=1, period="hours")
PeriodicTask.objects.create(
name="exchange-copy-hourly",
task=ExchangePeriodicTaskService.TASK_NAME,
interval=interval,
kwargs=json.dumps(
{"payload": {"mode": "all", "truncate_before_copy": True}}
),
)
PeriodicTask.objects.create(
name="another-task",
task="apps.parsers.tasks.fake_task",
interval=interval,
kwargs="{}",
)
self.client.force_authenticate(self.admin)
response = self.client.get(self.periodic_tasks_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data["results"]), 1)
self.assertEqual(response.data["results"][0]["schedule_type"], "interval")
def test_update_periodic_task_switches_to_crontab(self):
interval = IntervalSchedule.objects.create(every=1, period="hours")
task = PeriodicTask.objects.create(
name="exchange-copy-hourly",
task=ExchangePeriodicTaskService.TASK_NAME,
description="Hourly sync",
enabled=True,
interval=interval,
kwargs=json.dumps(
{
"payload": {
"mode": "all",
"table": None,
"tables": None,
"truncate_before_copy": True,
}
}
),
)
detail_url = reverse(
"api_v1:exchange:periodic-task-detail",
kwargs={"task_id": task.id},
)
payload = {
"schedule_type": "daily",
"crontab_minute": 0,
"crontab_hour": 4,
"notify_on_error": True,
}
self.client.force_authenticate(self.admin)
response = self.client.patch(detail_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
task.refresh_from_db()
self.assertIsNone(task.interval)
self.assertIsNotNone(task.crontab)
self.assertEqual(str(task.crontab.timezone), settings.TIME_ZONE)
self.assertTrue(task.enabled)
self.assertEqual(response.data["schedule_type"], "daily")
self.assertEqual(response.data["crontab_hour"], 4)
self.assertTrue(response.data["notify_on_error"])
self.assertFalse(IntervalSchedule.objects.filter(id=interval.id).exists())
def test_periodic_task_detail_get_is_disabled(self):
interval = IntervalSchedule.objects.create(every=1, period="hours")
task = PeriodicTask.objects.create(
name="another-task",
task="apps.parsers.tasks.fake_task",
interval=interval,
kwargs="{}",
)
detail_url = reverse(
"api_v1:exchange:periodic-task-detail",
kwargs={"task_id": task.id},
)
self.client.force_authenticate(self.admin)
response = self.client.get(detail_url)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
def test_periodic_task_detail_delete_is_disabled(self):
interval = IntervalSchedule.objects.create(every=1, period="hours")
task = PeriodicTask.objects.create(
name="exchange-copy-hourly",
task=ExchangePeriodicTaskService.TASK_NAME,
interval=interval,
kwargs="{}",
)
detail_url = reverse(
"api_v1:exchange:periodic-task-detail",
kwargs={"task_id": task.id},
)
self.client.force_authenticate(self.admin)
response = self.client.delete(detail_url)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.assertTrue(PeriodicTask.objects.filter(id=task.id).exists())