"""Tests for exchange API views.""" from unittest.mock import Mock, patch from apps.core.models import BackgroundJob from apps.user.services import UserService from django.urls import reverse from django_celery_beat.models import PeriodicTask from rest_framework import status from rest_framework.test import APITestCase from tests.apps.exchange.factories import ExchangeConnectionFactory from tests.apps.user.factories import UserFactory class ExchangeApiTest(APITestCase): """Tests for external DB exchange API.""" def setUp(self): self.user = UserFactory.create_user() self.admin = UserFactory.create_user(is_staff=True) self.user_tokens = UserService.get_tokens_for_user(self.user) self.admin_tokens = UserService.get_tokens_for_user(self.admin) def authenticate(self, user): tokens = UserService.get_tokens_for_user(user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}") def test_exchange_connections_require_admin(self): """Test regular users cannot manage exchange connections.""" self.authenticate(self.user) response = self.client.get(reverse("api_v1:exchange:connections")) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) @patch( "apps.exchange.views.ExchangeConnectionService.create_active_connection_and_prepare" ) def test_create_exchange_connection_uses_service(self, mock_create): """Test admin can create active exchange connection through service.""" self.authenticate(self.admin) connection = ExchangeConnectionFactory.build( id=1, server="db.example.local", is_active=True, ) mock_create.return_value = connection response = self.client.post( reverse("api_v1:exchange:connections"), { "server": "db.example.local", "port": 5432, "username": "user", "password": "secret", "database_name": "target", "schema_name": "public", }, format="json", ) self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.assertEqual(response.data["data"]["server"], "db.example.local") mock_create.assert_called_once() def test_exchange_tables_include_registers_and_parser_data(self): """Test table catalog contains registry and parser models.""" self.authenticate(self.admin) response = self.client.get(reverse("api_v1:exchange:tables")) self.assertEqual(response.status_code, status.HTTP_200_OK) tables = {item["table"] for item in response.data["data"]} self.assertIn("registers_organization", tables) self.assertIn("registers_membership_period", tables) self.assertIn("parsers_generic_record", tables) @patch("apps.exchange.views.copy_exchange_data_async.delay") def test_exchange_copy_starts_celery_task_and_job(self, mock_delay): """Test copy endpoint queues Celery task with active connection.""" self.authenticate(self.admin) connection = ExchangeConnectionFactory.create(is_active=True) mock_delay.return_value = Mock(id="exchange-task-1") response = self.client.post( reverse("api_v1:exchange:copy"), {"mode": "all", "truncate_before_copy": True}, format="json", ) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(response.data["data"]["task_id"], "exchange-task-1") mock_delay.assert_called_once_with( connection_id=connection.id, payload={"mode": "all", "truncate_before_copy": True}, requested_by_id=self.admin.id, ) self.assertTrue( BackgroundJob.objects.filter(task_id="exchange-task-1").exists() ) def test_exchange_copy_requires_active_connection(self): """Test copy endpoint reports missing active connection.""" self.authenticate(self.admin) response = self.client.post( reverse("api_v1:exchange:copy"), {"mode": "all", "truncate_before_copy": True}, format="json", ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) def test_exchange_schedule_create_and_list(self): """Test exchange schedules are stored in django-celery-beat.""" self.authenticate(self.admin) response = self.client.post( reverse("api_v1:exchange:periodic-tasks"), { "schedule_type": "interval", "interval_every": 12, "interval_period": "hours", "mode": "single", "table": "registers_organization", "truncate_before_copy": False, }, format="json", ) list_response = self.client.get(reverse("api_v1:exchange:periodic-tasks")) self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.assertEqual(list_response.status_code, status.HTTP_200_OK) self.assertTrue( PeriodicTask.objects.filter(id=response.data["data"]["id"]).exists() ) created_task = next( item for item in list_response.data["data"] if item["id"] == response.data["data"]["id"] ) payload = created_task["payload"] self.assertEqual(payload["mode"], "single") self.assertEqual(payload["table"], "registers_organization") def test_exchange_schedule_delete(self): """Test exchange schedule can be deleted.""" self.authenticate(self.admin) create_response = self.client.post( reverse("api_v1:exchange:periodic-tasks"), { "schedule_type": "daily", "crontab_hour": 2, "crontab_minute": 0, "mode": "all", }, format="json", ) task_id = create_response.data["data"]["id"] response = self.client.delete( reverse("api_v1:exchange:periodic-task-detail", args=[task_id]) ) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) self.assertFalse(PeriodicTask.objects.filter(id=task_id).exists())