"""Tests for exchange API views.""" from types import SimpleNamespace from unittest.mock import patch from apps.exchange.models import ExchangeConnection from apps.exchange.services import ExchangeServiceError from django.urls import reverse from rest_framework import status from rest_framework.test import APITestCase from tests.apps.exchange.factories import ExchangeConnectionFactory from tests.apps.user.factories import UserFactory class ExchangeViewsTest(APITestCase): def setUp(self): self.user = UserFactory.create_user() self.admin = UserFactory.create_superuser() self.connections_url = reverse("api_v1:exchange:connections") self.test_connection_url = reverse("api_v1:exchange:connections-test") self.copy_url = reverse("api_v1:exchange:copy") def test_connections_endpoint_admin_only(self): response = self.client.get(self.connections_url) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) self.client.force_authenticate(self.user) response = self.client.get(self.connections_url) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) self.client.force_authenticate(self.admin) response = self.client.get(self.connections_url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertTrue(response.data["success"]) self.assertIsInstance(response.data["data"], list) @patch("apps.exchange.services.ExchangeConnectionService.validate_target_structure") @patch("apps.exchange.services.ExchangeConnectionService.test_connection") def test_create_connection_success(self, connection_mock, validate_mock): old_active = ExchangeConnectionFactory(is_active=True) payload = { "server": "127.0.0.1", "port": 5432, "username": "postgres", "password": "secret", "database_name": "target_db", "schema_name": "public", } self.client.force_authenticate(self.admin) response = self.client.post(self.connections_url, payload, format="json") self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.assertEqual(ExchangeConnection.objects.filter(is_active=True).count(), 1) new_connection = ExchangeConnection.objects.get(id=response.data["data"]["id"]) self.assertTrue(new_connection.is_active) self.assertNotEqual(new_connection.password, payload["password"]) self.assertEqual(new_connection.get_decrypted_password(), payload["password"]) old_active.refresh_from_db() self.assertFalse(old_active.is_active) connection_mock.assert_called_once() validate_mock.assert_called_once() @patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload") def test_test_connection_success(self, test_connection_mock): payload = { "server": "127.0.0.1", "port": 5432, "username": "postgres", "password": "secret", "database_name": "target_db", "schema_name": "public", } test_connection_mock.return_value = { "status": "success", "message": "ok", } self.client.force_authenticate(self.admin) response = self.client.post(self.test_connection_url, payload, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["data"]["status"], "success") self.assertEqual(ExchangeConnection.objects.count(), 0) test_connection_mock.assert_called_once_with(**payload) @patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload") def test_test_connection_failure_returns_400(self, test_connection_mock): payload = { "server": "127.0.0.1", "port": 5432, "username": "postgres", "password": "secret", "database_name": "target_db", "schema_name": "public", } test_connection_mock.side_effect = ExchangeServiceError("Connection refused") self.client.force_authenticate(self.admin) response = self.client.post(self.test_connection_url, payload, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertEqual(ExchangeConnection.objects.count(), 0) @patch("apps.exchange.services.ExchangeConnectionService.test_connection") def test_create_connection_fail_rolls_back_active(self, connection_mock): connection_mock.side_effect = ExchangeServiceError("Connection refused") old_active = ExchangeConnectionFactory(is_active=True) payload = { "server": "127.0.0.1", "port": 5432, "username": "postgres", "password": "secret", "database_name": "target_db", "schema_name": "public", } self.client.force_authenticate(self.admin) response = self.client.post(self.connections_url, payload, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertEqual(ExchangeConnection.objects.count(), 1) old_active.refresh_from_db() self.assertTrue(old_active.is_active) def test_copy_requires_active_connection(self): self.client.force_authenticate(self.admin) response = self.client.post(self.copy_url, {"mode": "all"}, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @patch("apps.exchange.views.copy_parsers_data_async.delay") @patch("apps.exchange.services.ExchangeConnectionService.get_active_connection") def test_copy_all_success(self, get_active_mock, delay_mock): active_connection = ExchangeConnectionFactory(is_active=True) get_active_mock.return_value = active_connection delay_mock.return_value = SimpleNamespace(id="task-123") self.client.force_authenticate(self.admin) response = self.client.post(self.copy_url, {"mode": "all"}, format="json") self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(response.data["data"]["status"], "started") self.assertEqual(response.data["data"]["task_id"], "task-123") self.assertEqual(response.data["data"]["connection_id"], active_connection.id) get_active_mock.assert_called_once() delay_mock.assert_called_once_with( connection_id=active_connection.id, payload={"mode": "all", "truncate_before_copy": True}, requested_by_id=self.admin.id, ) def test_copy_single_requires_table(self): self.client.force_authenticate(self.admin) response = self.client.post(self.copy_url, {"mode": "single"}, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("table", str(response.data))