feat: add parser source dashboard and scheduling
All checks were successful
CI/CD Pipeline / Code Quality Checks (pull_request) Successful in 1m6s
CI/CD Pipeline / Run Tests (pull_request) Successful in 1m18s
CI/CD Pipeline / Build Docker Images (pull_request) Has been skipped
CI/CD Pipeline / Push to Gitea Registry (pull_request) Has been skipped

This commit is contained in:
2026-04-27 23:36:28 +02:00
parent 199d871923
commit 44355deeb3
96 changed files with 15015 additions and 309 deletions

View File

@@ -0,0 +1 @@
"""Tests for exchange app."""

View File

@@ -0,0 +1,19 @@
"""Фабрики для приложения exchange."""
import factory
from apps.exchange.models import ExchangeConnection
class ExchangeConnectionFactory(factory.django.DjangoModelFactory):
"""Фабрика подключения exchange."""
class Meta:
model = ExchangeConnection
server = "127.0.0.1"
port = 5432
username = factory.Sequence(lambda n: f"exchange_user_{n}")
password = "secret" # noqa: S105
database_name = "exchange_db"
schema_name = "public"
is_active = False

View File

@@ -0,0 +1,167 @@
"""Tests for exchange API views."""
from unittest.mock import Mock, patch
from apps.core.models import BackgroundJob
from apps.user.services import UserService
from django.urls import reverse
from django_celery_beat.models import PeriodicTask
from rest_framework import status
from rest_framework.test import APITestCase
from tests.apps.exchange.factories import ExchangeConnectionFactory
from tests.apps.user.factories import UserFactory
class ExchangeApiTest(APITestCase):
"""Tests for external DB exchange API."""
def setUp(self):
self.user = UserFactory.create_user()
self.admin = UserFactory.create_user(is_staff=True)
self.user_tokens = UserService.get_tokens_for_user(self.user)
self.admin_tokens = UserService.get_tokens_for_user(self.admin)
def authenticate(self, user):
tokens = UserService.get_tokens_for_user(user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
def test_exchange_connections_require_admin(self):
"""Test regular users cannot manage exchange connections."""
self.authenticate(self.user)
response = self.client.get(reverse("api_v1:exchange:connections"))
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
@patch(
"apps.exchange.views.ExchangeConnectionService.create_active_connection_and_prepare"
)
def test_create_exchange_connection_uses_service(self, mock_create):
"""Test admin can create active exchange connection through service."""
self.authenticate(self.admin)
connection = ExchangeConnectionFactory.build(
id=1,
server="db.example.local",
is_active=True,
)
mock_create.return_value = connection
response = self.client.post(
reverse("api_v1:exchange:connections"),
{
"server": "db.example.local",
"port": 5432,
"username": "user",
"password": "secret",
"database_name": "target",
"schema_name": "public",
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(response.data["data"]["server"], "db.example.local")
mock_create.assert_called_once()
def test_exchange_tables_include_registers_and_parser_data(self):
"""Test table catalog contains registry and parser models."""
self.authenticate(self.admin)
response = self.client.get(reverse("api_v1:exchange:tables"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
tables = {item["table"] for item in response.data["data"]}
self.assertIn("registers_organization", tables)
self.assertIn("registers_membership_period", tables)
self.assertIn("parsers_generic_record", tables)
@patch("apps.exchange.views.copy_exchange_data_async.delay")
def test_exchange_copy_starts_celery_task_and_job(self, mock_delay):
"""Test copy endpoint queues Celery task with active connection."""
self.authenticate(self.admin)
connection = ExchangeConnectionFactory.create(is_active=True)
mock_delay.return_value = Mock(id="exchange-task-1")
response = self.client.post(
reverse("api_v1:exchange:copy"),
{"mode": "all", "truncate_before_copy": True},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(response.data["data"]["task_id"], "exchange-task-1")
mock_delay.assert_called_once_with(
connection_id=connection.id,
payload={"mode": "all", "truncate_before_copy": True},
requested_by_id=self.admin.id,
)
self.assertTrue(
BackgroundJob.objects.filter(task_id="exchange-task-1").exists()
)
def test_exchange_copy_requires_active_connection(self):
"""Test copy endpoint reports missing active connection."""
self.authenticate(self.admin)
response = self.client.post(
reverse("api_v1:exchange:copy"),
{"mode": "all", "truncate_before_copy": True},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_exchange_schedule_create_and_list(self):
"""Test exchange schedules are stored in django-celery-beat."""
self.authenticate(self.admin)
response = self.client.post(
reverse("api_v1:exchange:periodic-tasks"),
{
"schedule_type": "interval",
"interval_every": 12,
"interval_period": "hours",
"mode": "single",
"table": "registers_organization",
"truncate_before_copy": False,
},
format="json",
)
list_response = self.client.get(reverse("api_v1:exchange:periodic-tasks"))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(list_response.status_code, status.HTTP_200_OK)
self.assertTrue(
PeriodicTask.objects.filter(id=response.data["data"]["id"]).exists()
)
created_task = next(
item
for item in list_response.data["data"]
if item["id"] == response.data["data"]["id"]
)
payload = created_task["payload"]
self.assertEqual(payload["mode"], "single")
self.assertEqual(payload["table"], "registers_organization")
def test_exchange_schedule_delete(self):
"""Test exchange schedule can be deleted."""
self.authenticate(self.admin)
create_response = self.client.post(
reverse("api_v1:exchange:periodic-tasks"),
{
"schedule_type": "daily",
"crontab_hour": 2,
"crontab_minute": 0,
"mode": "all",
},
format="json",
)
task_id = create_response.data["data"]["id"]
response = self.client.delete(
reverse("api_v1:exchange:periodic-task-detail", args=[task_id])
)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertFalse(PeriodicTask.objects.filter(id=task_id).exists())