Files
mostovik-backend/tests/apps/user/test_views.py
Aleksandr Meshchriakov 3d298ce352
Some checks failed
CI/CD Pipeline / Run Tests (pull_request) Successful in 1m53s
CI/CD Pipeline / Telegram Notify Success (push) Has been cancelled
CI/CD Pipeline / Run Tests (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 2m54s
CI/CD Pipeline / Telegram Notify Success (pull_request) Has been skipped
feat: expand platform APIs, sources, and test coverage
2026-03-17 12:56:48 +01:00

516 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for user DRF views"""
from apps.user.models import Profile
from apps.user.services import UserService
from django.contrib.auth import get_user_model
from django.urls import reverse
from faker import Faker
from rest_framework import status
from rest_framework.test import APIClient, APITestCase
from .factories import ProfileFactory, UserFactory
User = get_user_model()
fake = Faker("ru_RU")
class RegisterViewTest(APITestCase):
"""Tests for RegisterView"""
def setUp(self):
self.register_url = reverse("api_v1:user:register")
self.password = fake.password(length=12, special_chars=False)
self.user_data = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"password": self.password,
"password_confirm": self.password,
"phone": f"+7{fake.numerify('##########')}",
}
def test_register_success(self):
"""Test successful user registration"""
response = self.client.post(self.register_url, self.user_data, format="json")
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertIn("user", response.data)
self.assertIn("tokens", response.data)
self.assertIn("refresh", response.data["tokens"])
self.assertIn("access", response.data["tokens"])
# Verify user was created
self.assertTrue(User.objects.filter(email=self.user_data["email"]).exists())
def test_register_passwords_do_not_match(self):
"""Test registration fails when passwords don't match"""
data = self.user_data.copy()
data["password_confirm"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("non_field_errors", response.data)
def test_register_duplicate_email(self):
"""Test registration fails with duplicate email"""
# Create existing user
existing_user = UserFactory.create_user()
# Use the same email as existing user
data = self.user_data.copy()
data["email"] = existing_user.email
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("email", response.data)
def test_register_short_password(self):
"""Test registration fails with short password"""
short_password = fake.pystr(min_chars=3, max_chars=5)
data = self.user_data.copy()
data["password"] = short_password
data["password_confirm"] = short_password
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("password", response.data)
class LoginViewTest(APITestCase):
"""Tests for LoginView"""
def setUp(self):
self.login_url = reverse("api_v1:user:login")
self.password = fake.password(length=12, special_chars=False)
self.user = UserFactory.create_user(password=self.password)
self.login_data = {"username": self.user.username, "password": self.password}
def test_login_success(self):
"""Test successful login"""
response = self.client.post(self.login_url, self.login_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("refresh", response.data)
self.assertIn("access", response.data)
def test_login_invalid_credentials(self):
"""Test login fails with invalid credentials"""
data = self.login_data.copy()
data["password"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.login_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.assertIn("error", response.data)
def test_login_nonexistent_user(self):
"""Test login fails for nonexistent user"""
data = {
"username": fake.unique.user_name(),
"password": fake.password(length=12, special_chars=False),
}
response = self.client.post(self.login_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_login_invalid_payload_returns_400(self):
response = self.client.post(self.login_url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("username", response.data)
class LogoutViewTest(APITestCase):
def test_logout_returns_success_message(self):
user = UserFactory.create_user()
tokens = UserService.get_tokens_for_user(user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
response = self.client.post(reverse("api_v1:user:logout"), {}, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["message"], "Успешный выход")
class CurrentUserViewTest(APITestCase):
"""Tests for CurrentUserView"""
def setUp(self):
self.user = UserFactory.create_user()
ProfileFactory.create_profile(user=self.user)
self.current_user_url = reverse("api_v1:user:current_user")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
def test_get_current_user_authenticated(self):
"""Test getting current user when authenticated"""
response = self.client.get(self.current_user_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], self.user.id)
self.assertEqual(response.data["email"], self.user.email)
self.assertEqual(response.data["role"], "user")
self.assertFalse(response.data["capabilities"]["can_refresh_dashboard"])
self.assertIn("profile", response.data)
def test_get_current_user_unauthenticated(self):
"""Test getting current user when unauthenticated"""
self.client.credentials() # Remove auth header
response = self.client.get(self.current_user_url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
class UserUpdateViewTest(APITestCase):
"""Tests for UserUpdateView"""
def setUp(self):
self.user = UserFactory.create_user()
self.update_url = reverse("api_v1:user:user_update")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
self.update_data = {
"username": fake.unique.user_name(),
"phone": f"+7{fake.numerify('##########')}",
}
def test_update_user_success(self):
"""Test successful user update"""
response = self.client.patch(self.update_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["username"], self.update_data["username"])
self.assertEqual(response.data["phone"], self.update_data["phone"])
# Verify in database
self.user.refresh_from_db()
self.assertEqual(self.user.username, self.update_data["username"])
def test_update_user_unauthenticated(self):
"""Test user update fails when unauthenticated"""
self.client.credentials() # Remove auth header
response = self.client.patch(self.update_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_update_user_invalid_returns_400(self):
response = self.client.patch(self.update_url, {"username": ""}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("username", response.data)
class AdminUserManagementViewTest(APITestCase):
"""Tests for admin-only user management endpoints."""
def setUp(self):
self.admin = UserFactory.create_user(is_staff=True)
self.user = UserFactory.create_user()
self.tokens = UserService.get_tokens_for_user(self.admin)
self.user_tokens = UserService.get_tokens_for_user(self.user)
self.list_url = reverse("api_v1:user:admin-users")
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
def test_admin_can_list_users(self):
response = self.client.get(self.list_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
usernames = {item["username"] for item in response.data}
self.assertIn(self.admin.username, usernames)
self.assertIn(self.user.username, usernames)
def test_admin_can_create_user_with_role(self):
password = fake.password(length=12, special_chars=False)
payload = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"phone": f"+7{fake.numerify('##########')}",
"password": password,
"role": "admin",
"first_name": "Петр",
"last_name": "Петров",
}
response = self.client.post(self.list_url, payload, format="json")
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
created = User.objects.get(username=payload["username"])
self.assertTrue(created.is_staff)
self.assertEqual(response.data["role"], "admin")
self.assertEqual(created.profile.first_name, "Петр")
def test_admin_can_update_user_and_role(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.user.id])
response = self.client.patch(
url,
{"role": "admin", "first_name": "Иван", "is_verified": True},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.user.refresh_from_db()
self.assertTrue(self.user.is_staff)
self.assertTrue(self.user.is_verified)
self.assertEqual(self.user.profile.first_name, "Иван")
def test_admin_can_get_user_detail(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.user.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], self.user.id)
def test_admin_cannot_patch_self_to_inactive(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id])
response = self.client.patch(url, {"is_active": False}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.data["detail"], "Нельзя деактивировать самого себя.")
def test_admin_cannot_patch_self_to_regular_user(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id])
response = self.client.patch(url, {"role": "user"}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(
response.data["detail"], "Нельзя снять у себя роль администратора."
)
def test_admin_can_patch_self_with_safe_fields(self):
url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id])
response = self.client.patch(url, {"first_name": "Админ"}, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["profile"]["first_name"], "Админ")
def test_admin_can_deactivate_user(self):
url = reverse("api_v1:user:admin-user-deactivate", args=[self.user.id])
response = self.client.post(url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.user.refresh_from_db()
self.assertFalse(self.user.is_active)
def test_admin_cannot_deactivate_self(self):
url = reverse("api_v1:user:admin-user-deactivate", args=[self.admin.id])
response = self.client.post(url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_regular_user_cannot_access_admin_user_management(self):
self.client.credentials(
HTTP_AUTHORIZATION=f"Bearer {self.user_tokens['access']}"
)
response = self.client.get(self.list_url)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
class ProfileDetailViewTest(APITestCase):
"""Tests for ProfileDetailView"""
def setUp(self):
self.user = UserFactory.create_user()
self.profile = ProfileFactory.create_profile(user=self.user)
self.profile_url = reverse("api_v1:user:profile_detail")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
self.update_data = {
"first_name": fake.first_name(),
"last_name": fake.last_name(),
"bio": fake.text(max_nb_chars=200),
}
def test_get_profile_success(self):
"""Test successful profile retrieval"""
response = self.client.get(self.profile_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["first_name"], self.profile.first_name)
def test_update_profile_success(self):
"""Test successful profile update"""
response = self.client.patch(self.profile_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["first_name"], self.update_data["first_name"])
self.assertEqual(response.data["last_name"], self.update_data["last_name"])
# Verify in database
self.profile.refresh_from_db()
self.assertEqual(self.profile.first_name, self.update_data["first_name"])
def test_profile_created_if_not_exists(self):
"""Test profile is created if it doesn't exist"""
# Delete existing profile
self.profile.delete()
response = self.client.get(self.profile_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Profile should be created automatically
self.assertTrue(Profile.objects.filter(user=self.user).exists())
def test_update_profile_invalid_returns_400(self):
response = self.client.patch(
self.profile_url,
{"date_of_birth": "not-a-date"},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("date_of_birth", response.data)
def test_get_full_profile_endpoint(self):
response = self.client.get(reverse("api_v1:user:profile_full"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], self.user.id)
class PasswordChangeViewTest(APITestCase):
"""Tests for PasswordChangeView"""
def setUp(self):
self.old_password = fake.password(length=12, special_chars=False)
self.new_password = fake.password(length=12, special_chars=False)
self.user = UserFactory.create_user(password=self.old_password)
self.password_change_url = reverse("api_v1:user:password_change")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
self.password_data = {
"old_password": self.old_password,
"new_password": self.new_password,
"new_password_confirm": self.new_password,
}
def test_change_password_success(self):
"""Test successful password change"""
response = self.client.post(
self.password_change_url, self.password_data, format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("message", response.data)
# Verify password was changed
self.user.refresh_from_db()
self.assertTrue(self.user.check_password(self.new_password))
def test_change_password_wrong_old_password(self):
"""Test password change fails with wrong old password"""
data = self.password_data.copy()
data["old_password"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.password_change_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("error", response.data)
def test_change_password_passwords_do_not_match(self):
"""Test password change fails when new passwords don't match"""
data = self.password_data.copy()
data["new_password_confirm"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.password_change_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("non_field_errors", response.data)
class TokenRefreshViewTest(APITestCase):
"""Tests for TokenRefreshView"""
def setUp(self):
self.user = UserFactory.create_user()
self.refresh_url = reverse("api_v1:user:token_refresh")
self.tokens = UserService.get_tokens_for_user(self.user)
def test_refresh_token_success(self):
"""Test successful token refresh"""
data = {"refresh": self.tokens["refresh"]}
response = self.client.post(self.refresh_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("access", response.data)
self.assertIn("refresh", response.data)
# New refresh token should be different
# Refresh token may be the same or different depending on implementation
def test_refresh_token_invalid(self):
"""Test token refresh fails with invalid refresh token"""
data = {"refresh": fake.pystr(min_chars=20, max_chars=50)}
response = self.client.post(self.refresh_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.assertTrue(
"errors" in response.data
or "detail" in response.data
or "code" in response.data
)
def test_refresh_token_missing(self):
"""Test token refresh fails without refresh token"""
response = self.client.post(self.refresh_url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertTrue("errors" in response.data or "detail" in response.data)
class TokenVerifyViewTest(APITestCase):
def test_verify_access_token_success(self):
user = UserFactory.create_user()
tokens = UserService.get_tokens_for_user(user)
response = self.client.post(
reverse("api_v1:user:token_verify"),
{"token": tokens["access"]},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
class ApiJwtOnlyAuthenticationTest(APITestCase):
"""Tests that API auth flow is JWT-only and not session-cookie based."""
def setUp(self):
self.user = UserFactory.create_user()
self.tokens = UserService.get_tokens_for_user(self.user)
self.update_url = reverse("api_v1:user:user_update")
self.patch_data = {"username": fake.unique.user_name()}
# Explicitly enable CSRF checks to catch accidental SessionAuthentication usage.
self.client = APIClient(enforce_csrf_checks=True)
self.client.cookies["sessionid"] = "fake-admin-session"
self.client.cookies["csrftoken"] = "fake-csrf-token"
def test_patch_with_bearer_and_session_cookies_returns_200(self):
"""Bearer JWT should authenticate even if session cookies are present."""
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
response = self.client.patch(self.update_url, self.patch_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], self.user.id)
def test_patch_with_only_session_cookies_returns_401_not_403(self):
"""Session cookies without JWT should not trigger CSRF 403 for API auth."""
response = self.client.patch(self.update_url, self.patch_data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)