feat: migrate parser data to source records
Some checks failed
CI/CD Pipeline / Quality Gate (push) Failing after 14s
CI/CD Pipeline / Build and Push Images (push) Has been skipped
CI/CD Pipeline / Deploy Dev in Dokploy (push) Has been skipped
CI/CD Pipeline / Internal Notify (push) Successful in 0s

This commit is contained in:
2026-05-19 20:21:31 +02:00
parent 1c7c7238be
commit b8a18d6da4
46 changed files with 2689 additions and 6179 deletions

View File

@@ -1,6 +1,5 @@
"""Tests for user DRF views"""
from apps.user.models import Profile
from apps.user.services import UserService
from django.contrib.auth import get_user_model
from django.urls import reverse
@@ -16,7 +15,7 @@ fake = Faker("ru_RU")
class RegisterViewTest(APITestCase):
"""Tests for RegisterView"""
"""Tests for disabled public registration endpoint."""
def setUp(self):
self.register_url = reverse("api_v1:user:register")
@@ -32,67 +31,11 @@ class RegisterViewTest(APITestCase):
"last_name": fake.last_name(),
}
def test_register_success(self):
"""Test successful user registration"""
def test_register_endpoint_is_disabled(self):
response = self.client.post(self.register_url, self.user_data, format="json")
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertIn("user", response.data)
self.assertIn("tokens", response.data)
self.assertIn("refresh", response.data["tokens"])
self.assertIn("access", response.data["tokens"])
# Verify user was created
created_user = User.objects.get(email=self.user_data["email"])
self.assertEqual(created_user.profile.first_name, self.user_data["first_name"])
self.assertEqual(created_user.profile.last_name, self.user_data["last_name"])
def test_register_passwords_do_not_match(self):
"""Test registration fails when passwords don't match"""
data = self.user_data.copy()
data["password_confirm"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("non_field_errors", response.data)
def test_register_duplicate_email(self):
"""Test registration fails with duplicate email"""
# Create existing user
existing_user = UserFactory.create_user()
# Use the same email as existing user
data = self.user_data.copy()
data["email"] = existing_user.email
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("email", response.data)
def test_register_short_password(self):
"""Test registration fails with short password"""
short_password = fake.pystr(min_chars=3, max_chars=5)
data = self.user_data.copy()
data["password"] = short_password
data["password_confirm"] = short_password
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("password", response.data)
def test_register_requires_first_and_last_name(self):
data = self.user_data.copy()
data.pop("first_name")
data.pop("last_name")
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("first_name", response.data)
self.assertIn("last_name", response.data)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.assertFalse(User.objects.filter(email=self.user_data["email"]).exists())
class LoginViewTest(APITestCase):
@@ -211,7 +154,7 @@ class CurrentUserViewTest(APITestCase):
class UserUpdateViewTest(APITestCase):
"""Tests for UserUpdateView"""
"""Tests for disabled self-service user update endpoint."""
def setUp(self):
self.user = UserFactory.create_user()
@@ -224,30 +167,12 @@ class UserUpdateViewTest(APITestCase):
"phone": f"+7{fake.numerify('##########')}",
}
def test_update_user_success(self):
"""Test successful user update"""
def test_update_user_endpoint_is_disabled(self):
response = self.client.patch(self.update_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["username"], self.update_data["username"])
self.assertEqual(response.data["phone"], self.update_data["phone"])
# Verify in database
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.user.refresh_from_db()
self.assertEqual(self.user.username, self.update_data["username"])
def test_update_user_unauthenticated(self):
"""Test user update fails when unauthenticated"""
self.client.credentials() # Remove auth header
response = self.client.patch(self.update_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_update_user_invalid_returns_400(self):
response = self.client.patch(self.update_url, {"username": ""}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("username", response.data)
self.assertNotEqual(self.user.username, self.update_data["username"])
class AdminUserManagementViewTest(APITestCase):
@@ -529,6 +454,27 @@ class AdminUserManagementViewTest(APITestCase):
self.user.refresh_from_db()
self.assertFalse(self.user.is_active)
def test_admin_deactivate_user_returns_managed_user_payload(self):
url = reverse("api_v1:user:admin-user-deactivate", args=[self.user.id])
response = self.client.post(url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
set(response.data.keys()),
{
"id",
"username",
"email",
"phone",
"role",
"role_label",
"is_active",
},
)
self.assertEqual(response.data["id"], self.user.id)
self.assertFalse(response.data["is_active"])
def test_admin_can_activate_user(self):
self.user.is_active = False
self.user.save(update_fields=["is_active"])
@@ -569,7 +515,7 @@ class AdminUserManagementViewTest(APITestCase):
class ProfileDetailViewTest(APITestCase):
"""Tests for ProfileDetailView"""
"""Tests for disabled self-service profile endpoints."""
def setUp(self):
self.user = UserFactory.create_user()
@@ -585,57 +531,26 @@ class ProfileDetailViewTest(APITestCase):
"bio": fake.text(max_nb_chars=200),
}
def test_get_profile_success(self):
"""Test successful profile retrieval"""
def test_profile_read_endpoint_is_disabled(self):
response = self.client.get(self.profile_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["first_name"], self.profile.first_name)
self.assertEqual(response.data["middle_name"], self.profile.middle_name)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
def test_update_profile_success(self):
"""Test successful profile update"""
def test_profile_update_endpoint_is_disabled(self):
response = self.client.patch(self.profile_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["first_name"], self.update_data["first_name"])
self.assertEqual(response.data["middle_name"], self.update_data["middle_name"])
self.assertEqual(response.data["last_name"], self.update_data["last_name"])
# Verify in database
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.profile.refresh_from_db()
self.assertEqual(self.profile.first_name, self.update_data["first_name"])
self.assertNotEqual(self.profile.first_name, self.update_data["first_name"])
def test_profile_created_if_not_exists(self):
"""Test profile is created if it doesn't exist"""
# Delete existing profile
self.profile.delete()
response = self.client.get(self.profile_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Profile should be created automatically
self.assertTrue(Profile.objects.filter(user=self.user).exists())
def test_update_profile_invalid_returns_400(self):
response = self.client.patch(
self.profile_url,
{"date_of_birth": "not-a-date"},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("date_of_birth", response.data)
def test_get_full_profile_endpoint(self):
def test_profile_full_endpoint_is_disabled(self):
response = self.client.get(reverse("api_v1:user:profile_full"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], self.user.id)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
class PasswordChangeViewTest(APITestCase):
"""Tests for PasswordChangeView"""
"""Tests for disabled self-service password change endpoint."""
def setUp(self):
self.old_password = fake.password(length=12, special_chars=False)
@@ -651,38 +566,14 @@ class PasswordChangeViewTest(APITestCase):
"new_password_confirm": self.new_password,
}
def test_change_password_success(self):
"""Test successful password change"""
def test_change_password_endpoint_is_disabled(self):
response = self.client.post(
self.password_change_url, self.password_data, format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("message", response.data)
# Verify password was changed
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.user.refresh_from_db()
self.assertTrue(self.user.check_password(self.new_password))
def test_change_password_wrong_old_password(self):
"""Test password change fails with wrong old password"""
data = self.password_data.copy()
data["old_password"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.password_change_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("error", response.data)
def test_change_password_passwords_do_not_match(self):
"""Test password change fails when new passwords don't match"""
data = self.password_data.copy()
data["new_password_confirm"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.password_change_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("non_field_errors", response.data)
self.assertTrue(self.user.check_password(self.old_password))
class TokenRefreshViewTest(APITestCase):
@@ -725,7 +616,7 @@ class TokenRefreshViewTest(APITestCase):
class TokenVerifyViewTest(APITestCase):
def test_verify_access_token_success(self):
def test_verify_access_token_endpoint_is_disabled(self):
user = UserFactory.create_user()
tokens = UserService.get_tokens_for_user(user)
@@ -735,11 +626,11 @@ class TokenVerifyViewTest(APITestCase):
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
class ApiJwtOnlyAuthenticationTest(APITestCase):
"""Tests that API auth flow is JWT-only and not session-cookie based."""
class DisabledSelfServiceAuthenticationTest(APITestCase):
"""Tests disabled self-service endpoints do not mutate users."""
def setUp(self):
self.user = UserFactory.create_user()
@@ -752,17 +643,14 @@ class ApiJwtOnlyAuthenticationTest(APITestCase):
self.client.cookies["sessionid"] = "fake-admin-session"
self.client.cookies["csrftoken"] = "fake-csrf-token"
def test_patch_with_bearer_and_session_cookies_returns_200(self):
"""Bearer JWT should authenticate even if session cookies are present."""
def test_patch_with_bearer_and_session_cookies_returns_405(self):
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
response = self.client.patch(self.update_url, self.patch_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], self.user.id)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
def test_patch_with_only_session_cookies_returns_401_not_403(self):
"""Session cookies without JWT should not trigger CSRF 403 for API auth."""
def test_patch_with_only_session_cookies_returns_405(self):
response = self.client.patch(self.update_url, self.patch_data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)