Files
mostovik-backend/tests/apps/user/test_views.py
Aleksandr Meshchriakov b8a18d6da4
Some checks failed
CI/CD Pipeline / Quality Gate (push) Failing after 14s
CI/CD Pipeline / Build and Push Images (push) Has been skipped
CI/CD Pipeline / Deploy Dev in Dokploy (push) Has been skipped
CI/CD Pipeline / Internal Notify (push) Successful in 0s
feat: migrate parser data to source records
2026-05-19 20:21:31 +02:00

657 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for user DRF views"""
from apps.user.services import UserService
from django.contrib.auth import get_user_model
from django.urls import reverse
from faker import Faker
from rest_framework import status
from rest_framework.test import APIClient, APITestCase
from rest_framework_simplejwt.token_blacklist.models import BlacklistedToken
from .factories import ProfileFactory, UserFactory
User = get_user_model()
fake = Faker("ru_RU")
class RegisterViewTest(APITestCase):
"""Tests for disabled public registration endpoint."""
def setUp(self):
self.register_url = reverse("api_v1:user:register")
self.password = fake.password(length=12, special_chars=False)
self.user_data = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"password": self.password,
"password_confirm": self.password,
"phone": f"+7{fake.numerify('##########')}",
"first_name": fake.first_name(),
"middle_name": fake.first_name(),
"last_name": fake.last_name(),
}
def test_register_endpoint_is_disabled(self):
response = self.client.post(self.register_url, self.user_data, format="json")
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.assertFalse(User.objects.filter(email=self.user_data["email"]).exists())
class LoginViewTest(APITestCase):
"""Tests for LoginView"""
def setUp(self):
self.login_url = reverse("api_v1:user:login")
self.password = fake.password(length=12, special_chars=False)
self.user = UserFactory.create_user(password=self.password)
self.login_data = {"username": self.user.username, "password": self.password}
def test_login_success(self):
"""Test successful login"""
response = self.client.post(self.login_url, self.login_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("refresh", response.data)
self.assertIn("access", response.data)
def test_login_accepts_slashless_api_url(self):
"""Frontend clients can call API URLs without Django's trailing slash."""
response = self.client.post(
self.login_url.rstrip("/"), self.login_data, format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("refresh", response.data)
self.assertIn("access", response.data)
def test_login_invalid_credentials(self):
"""Test login fails with invalid credentials"""
data = self.login_data.copy()
data["password"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.login_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.assertIn("error", response.data)
def test_login_nonexistent_user(self):
"""Test login fails for nonexistent user"""
data = {
"username": fake.unique.user_name(),
"password": fake.password(length=12, special_chars=False),
}
response = self.client.post(self.login_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_login_invalid_payload_returns_400(self):
response = self.client.post(self.login_url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("username", response.data)
class LogoutViewTest(APITestCase):
def test_logout_blacklists_refresh_tokens_and_returns_empty_payload(self):
user = UserFactory.create_user()
tokens = UserService.get_tokens_for_user(user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
response = self.client.post(reverse("api_v1:user:logout"), {}, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data, {})
self.assertEqual(BlacklistedToken.objects.filter(token__user=user).count(), 1)
class CurrentUserViewTest(APITestCase):
"""Tests for CurrentUserView"""
def setUp(self):
self.user = UserFactory.create_user()
ProfileFactory.create_profile(user=self.user)
self.current_user_url = reverse("api_v1:user:current_user")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
def test_get_current_user_authenticated(self):
"""Test getting current user when authenticated"""
response = self.client.get(self.current_user_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
set(response.data.keys()),
{
"id",
"username",
"email",
"phone",
"is_active",
"role",
"role_label",
"capabilities",
"profile",
},
)
self.assertEqual(response.data["id"], self.user.id)
self.assertEqual(response.data["email"], self.user.email)
self.assertEqual(response.data["role"], "user")
self.assertFalse(response.data["capabilities"]["can_manage_exchange"])
self.assertEqual(
set(response.data["profile"].keys()),
{"first_name", "middle_name", "last_name", "full_name"},
)
def test_get_current_user_unauthenticated(self):
"""Test getting current user when unauthenticated"""
self.client.credentials() # Remove auth header
response = self.client.get(self.current_user_url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
class UserUpdateViewTest(APITestCase):
"""Tests for disabled self-service user update endpoint."""
def setUp(self):
self.user = UserFactory.create_user()
self.update_url = reverse("api_v1:user:user_update")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
self.update_data = {
"username": fake.unique.user_name(),
"phone": f"+7{fake.numerify('##########')}",
}
def test_update_user_endpoint_is_disabled(self):
response = self.client.patch(self.update_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.user.refresh_from_db()
self.assertNotEqual(self.user.username, self.update_data["username"])
class AdminUserManagementViewTest(APITestCase):
"""Tests for admin-only user management endpoints."""
def setUp(self):
self.admin = UserFactory.create_user(is_staff=True)
self.user = UserFactory.create_user()
self.tokens = UserService.get_tokens_for_user(self.admin)
self.user_tokens = UserService.get_tokens_for_user(self.user)
self.list_url = reverse("api_v1:user:admin-users")
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
def test_admin_can_list_users(self):
response = self.client.get(self.list_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
set(response.data.keys()),
{"count", "next", "previous", "results"},
)
self.assertEqual(
set(response.data["results"][0].keys()),
{
"id",
"username",
"email",
"phone",
"is_active",
"role",
"role_label",
"capabilities",
"profile",
},
)
self.assertEqual(
set(response.data["results"][0]["profile"].keys()),
{"first_name", "middle_name", "last_name", "full_name"},
)
users_by_name = {item["username"]: item for item in response.data["results"]}
usernames = set(users_by_name)
self.assertIn(self.admin.username, usernames)
self.assertIn(self.user.username, usernames)
self.assertTrue(
users_by_name[self.admin.username]["capabilities"]["can_manage_exchange"]
)
self.assertFalse(
users_by_name[self.user.username]["capabilities"]["can_manage_exchange"]
)
def test_admin_can_search_users(self):
ProfileFactory.create_profile(
user=self.user,
first_name="Сергей",
middle_name="Петрович",
last_name="Иванов",
)
another = UserFactory.create_user()
ProfileFactory.create_profile(user=another, first_name="Илья")
response = self.client.get(self.list_url, {"search": "Петрович"})
self.assertEqual(response.status_code, status.HTTP_200_OK)
usernames = [item["username"] for item in response.data["results"]]
self.assertEqual(usernames, [self.user.username])
def test_admin_can_order_users(self):
first = UserFactory.create_user()
second = UserFactory.create_user()
ProfileFactory.create_profile(user=first, first_name="Борис")
ProfileFactory.create_profile(user=second, first_name="Алексей")
response = self.client.get(self.list_url, {"ordering": "first_name"})
self.assertEqual(response.status_code, status.HTTP_200_OK)
ordered_ids = [item["id"] for item in response.data["results"]]
self.assertLess(ordered_ids.index(second.id), ordered_ids.index(first.id))
def test_admin_can_create_user_with_role(self):
password = fake.password(length=12, special_chars=False)
payload = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"phone": f"+7{fake.numerify('##########')}",
"password": password,
"role": "admin",
"first_name": "Петр",
"middle_name": "Петрович",
"last_name": "Петров",
}
response = self.client.post(self.list_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
created = User.objects.get(username=payload["username"])
self.assertTrue(created.is_staff)
self.assertEqual(
set(response.data.keys()),
{
"id",
"username",
"email",
"phone",
"role",
"role_label",
"is_active",
},
)
self.assertEqual(response.data["role"], "admin")
self.assertEqual(created.profile.first_name, "Петр")
self.assertEqual(created.profile.middle_name, "Петрович")
def test_admin_create_user_returns_duplicate_email_error(self):
payload = {
"email": self.user.email,
"username": fake.unique.user_name(),
"password": fake.password(length=12, special_chars=False),
"role": "user",
"first_name": "Петр",
"last_name": "Петров",
}
response = self.client.post(self.list_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("email", response.data["errors"][0]["details"]["fields"])
def test_admin_create_user_returns_duplicate_username_error(self):
payload = {
"email": fake.unique.email(),
"username": self.user.username,
"password": fake.password(length=12, special_chars=False),
"role": "user",
"first_name": "Петр",
"last_name": "Петров",
}
response = self.client.post(self.list_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("username", response.data["errors"][0]["details"]["fields"])
def test_admin_create_user_returns_password_validation_error(self):
payload = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"password": "123",
"role": "user",
"first_name": "Петр",
"last_name": "Петров",
}
response = self.client.post(self.list_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("password", response.data["errors"][0]["details"]["fields"])
def test_admin_can_update_user_and_role(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.user.id])
response = self.client.patch(
url,
{
"role": "admin",
"first_name": "Иван",
"middle_name": "Иванович",
"last_name": "Иванов",
"is_verified": True,
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.user.refresh_from_db()
self.assertTrue(self.user.is_staff)
self.assertTrue(self.user.is_verified)
self.assertEqual(
set(response.data.keys()),
{
"id",
"username",
"email",
"phone",
"role",
"role_label",
"is_active",
},
)
self.assertEqual(self.user.profile.first_name, "Иван")
self.assertEqual(self.user.profile.middle_name, "Иванович")
def test_admin_can_get_user_detail(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.user.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], self.user.id)
self.assertEqual(
set(response.data["profile"].keys()),
{"first_name", "middle_name", "last_name", "full_name"},
)
def test_admin_patch_user_returns_duplicate_email_error(self):
another = UserFactory.create_user()
url = reverse("api_v1:user:admin-user-detail", args=[another.id])
response = self.client.patch(
url,
{"email": self.user.email},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("email", response.data["errors"][0]["details"]["fields"])
def test_admin_patch_user_returns_duplicate_username_error(self):
another = UserFactory.create_user()
url = reverse("api_v1:user:admin-user-detail", args=[another.id])
response = self.client.patch(
url,
{"username": self.user.username},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("username", response.data["errors"][0]["details"]["fields"])
def test_admin_patch_user_returns_password_validation_error(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.user.id])
response = self.client.patch(
url,
{"password": "123"},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("password", response.data["errors"][0]["details"]["fields"])
def test_admin_cannot_patch_self_to_inactive(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id])
response = self.client.patch(url, {"is_active": False}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.data["detail"], "Нельзя деактивировать самого себя.")
def test_admin_cannot_patch_self_to_regular_user(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id])
response = self.client.patch(url, {"role": "user"}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(
response.data["detail"], "Нельзя снять у себя роль администратора."
)
def test_admin_can_patch_self_with_safe_fields(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id])
response = self.client.patch(
url,
{"first_name": "Админ", "last_name": "Админов"},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["role"], "admin")
self.assertNotIn("profile", response.data)
def test_admin_can_deactivate_user(self):
url = reverse("api_v1:user:admin-user-deactivate", args=[self.user.id])
response = self.client.post(url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.user.refresh_from_db()
self.assertFalse(self.user.is_active)
def test_admin_deactivate_user_returns_managed_user_payload(self):
url = reverse("api_v1:user:admin-user-deactivate", args=[self.user.id])
response = self.client.post(url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
set(response.data.keys()),
{
"id",
"username",
"email",
"phone",
"role",
"role_label",
"is_active",
},
)
self.assertEqual(response.data["id"], self.user.id)
self.assertFalse(response.data["is_active"])
def test_admin_can_activate_user(self):
self.user.is_active = False
self.user.save(update_fields=["is_active"])
url = reverse("api_v1:user:admin-user-activate", args=[self.user.id])
response = self.client.post(url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.user.refresh_from_db()
self.assertTrue(self.user.is_active)
self.assertEqual(
set(response.data.keys()),
{
"id",
"username",
"email",
"phone",
"role",
"role_label",
"is_active",
},
)
def test_admin_cannot_deactivate_self(self):
url = reverse("api_v1:user:admin-user-deactivate", args=[self.admin.id])
response = self.client.post(url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_regular_user_cannot_access_admin_user_management(self):
self.client.credentials(
HTTP_AUTHORIZATION=f"Bearer {self.user_tokens['access']}"
)
response = self.client.get(self.list_url)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
class ProfileDetailViewTest(APITestCase):
"""Tests for disabled self-service profile endpoints."""
def setUp(self):
self.user = UserFactory.create_user()
self.profile = ProfileFactory.create_profile(user=self.user)
self.profile_url = reverse("api_v1:user:profile_detail")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
self.update_data = {
"first_name": fake.first_name(),
"middle_name": fake.first_name(),
"last_name": fake.last_name(),
"bio": fake.text(max_nb_chars=200),
}
def test_profile_read_endpoint_is_disabled(self):
response = self.client.get(self.profile_url)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
def test_profile_update_endpoint_is_disabled(self):
response = self.client.patch(self.profile_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.profile.refresh_from_db()
self.assertNotEqual(self.profile.first_name, self.update_data["first_name"])
def test_profile_full_endpoint_is_disabled(self):
response = self.client.get(reverse("api_v1:user:profile_full"))
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
class PasswordChangeViewTest(APITestCase):
"""Tests for disabled self-service password change endpoint."""
def setUp(self):
self.old_password = fake.password(length=12, special_chars=False)
self.new_password = fake.password(length=12, special_chars=False)
self.user = UserFactory.create_user(password=self.old_password)
self.password_change_url = reverse("api_v1:user:password_change")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
self.password_data = {
"old_password": self.old_password,
"new_password": self.new_password,
"new_password_confirm": self.new_password,
}
def test_change_password_endpoint_is_disabled(self):
response = self.client.post(
self.password_change_url, self.password_data, format="json"
)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.user.refresh_from_db()
self.assertTrue(self.user.check_password(self.old_password))
class TokenRefreshViewTest(APITestCase):
"""Tests for TokenRefreshView"""
def setUp(self):
self.user = UserFactory.create_user()
self.refresh_url = reverse("api_v1:user:token_refresh")
self.tokens = UserService.get_tokens_for_user(self.user)
def test_refresh_token_success(self):
"""Test successful token refresh"""
data = {"refresh": self.tokens["refresh"]}
response = self.client.post(self.refresh_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("access", response.data)
self.assertIn("refresh", response.data)
# New refresh token should be different
# Refresh token may be the same or different depending on implementation
def test_refresh_token_invalid(self):
"""Test token refresh fails with invalid refresh token"""
data = {"refresh": fake.pystr(min_chars=20, max_chars=50)}
response = self.client.post(self.refresh_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.assertTrue(
"errors" in response.data
or "detail" in response.data
or "code" in response.data
)
def test_refresh_token_missing(self):
"""Test token refresh fails without refresh token"""
response = self.client.post(self.refresh_url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertTrue("errors" in response.data or "detail" in response.data)
class TokenVerifyViewTest(APITestCase):
def test_verify_access_token_endpoint_is_disabled(self):
user = UserFactory.create_user()
tokens = UserService.get_tokens_for_user(user)
response = self.client.post(
reverse("api_v1:user:token_verify"),
{"token": tokens["access"]},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
class DisabledSelfServiceAuthenticationTest(APITestCase):
"""Tests disabled self-service endpoints do not mutate users."""
def setUp(self):
self.user = UserFactory.create_user()
self.tokens = UserService.get_tokens_for_user(self.user)
self.update_url = reverse("api_v1:user:user_update")
self.patch_data = {"username": fake.unique.user_name()}
# Explicitly enable CSRF checks to catch accidental SessionAuthentication usage.
self.client = APIClient(enforce_csrf_checks=True)
self.client.cookies["sessionid"] = "fake-admin-session"
self.client.cookies["csrftoken"] = "fake-csrf-token"
def test_patch_with_bearer_and_session_cookies_returns_405(self):
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
response = self.client.patch(self.update_url, self.patch_data, format="json")
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
def test_patch_with_only_session_cookies_returns_405(self):
response = self.client.patch(self.update_url, self.patch_data, format="json")
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)