All checks were successful
CI/CD Pipeline / Manual Action Help (push) Has been skipped
CI/CD Pipeline / Start Dev Containers in Dokploy (push) Has been skipped
CI/CD Pipeline / Drop and Recreate Dev Database (push) Has been skipped
CI/CD Pipeline / Quality Gate (push) Successful in 53s
CI/CD Pipeline / Build and Push Images (push) Successful in 3m28s
CI/CD Pipeline / Internal Notify (push) Successful in 1s
CI/CD Pipeline / Deploy Dev in Dokploy (push) Successful in 1s
759 lines
29 KiB
Python
759 lines
29 KiB
Python
"""Tests for user DRF views"""
|
||
|
||
from apps.user.models import Profile
|
||
from apps.user.services import UserService
|
||
from django.contrib.auth import get_user_model
|
||
from django.urls import reverse
|
||
from faker import Faker
|
||
from rest_framework import status
|
||
from rest_framework.test import APIClient, APITestCase
|
||
from rest_framework_simplejwt.token_blacklist.models import BlacklistedToken
|
||
|
||
from .factories import ProfileFactory, UserFactory
|
||
|
||
User = get_user_model()
|
||
fake = Faker("ru_RU")
|
||
|
||
|
||
class RegisterViewTest(APITestCase):
|
||
"""Tests for RegisterView"""
|
||
|
||
def setUp(self):
|
||
self.register_url = reverse("api_v1:user:register")
|
||
self.password = fake.password(length=12, special_chars=False)
|
||
self.user_data = {
|
||
"email": fake.unique.email(),
|
||
"username": fake.unique.user_name(),
|
||
"password": self.password,
|
||
"password_confirm": self.password,
|
||
"phone": f"+7{fake.numerify('##########')}",
|
||
"first_name": fake.first_name(),
|
||
"middle_name": fake.first_name(),
|
||
"last_name": fake.last_name(),
|
||
}
|
||
|
||
def test_register_success(self):
|
||
"""Test successful user registration"""
|
||
response = self.client.post(self.register_url, self.user_data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
||
self.assertIn("user", response.data)
|
||
self.assertIn("tokens", response.data)
|
||
self.assertIn("refresh", response.data["tokens"])
|
||
self.assertIn("access", response.data["tokens"])
|
||
|
||
# Verify user was created
|
||
created_user = User.objects.get(email=self.user_data["email"])
|
||
self.assertEqual(created_user.profile.first_name, self.user_data["first_name"])
|
||
self.assertEqual(created_user.profile.last_name, self.user_data["last_name"])
|
||
|
||
def test_register_passwords_do_not_match(self):
|
||
"""Test registration fails when passwords don't match"""
|
||
data = self.user_data.copy()
|
||
data["password_confirm"] = fake.password(length=12, special_chars=False)
|
||
|
||
response = self.client.post(self.register_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("non_field_errors", response.data)
|
||
|
||
def test_register_duplicate_email(self):
|
||
"""Test registration fails with duplicate email"""
|
||
# Create existing user
|
||
existing_user = UserFactory.create_user()
|
||
|
||
# Use the same email as existing user
|
||
data = self.user_data.copy()
|
||
data["email"] = existing_user.email
|
||
|
||
response = self.client.post(self.register_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("email", response.data)
|
||
|
||
def test_register_short_password(self):
|
||
"""Test registration fails with short password"""
|
||
short_password = fake.pystr(min_chars=3, max_chars=5)
|
||
data = self.user_data.copy()
|
||
data["password"] = short_password
|
||
data["password_confirm"] = short_password
|
||
|
||
response = self.client.post(self.register_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("password", response.data)
|
||
|
||
def test_register_requires_first_and_last_name(self):
|
||
data = self.user_data.copy()
|
||
data.pop("first_name")
|
||
data.pop("last_name")
|
||
|
||
response = self.client.post(self.register_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("first_name", response.data)
|
||
self.assertIn("last_name", response.data)
|
||
|
||
|
||
class LoginViewTest(APITestCase):
|
||
"""Tests for LoginView"""
|
||
|
||
def setUp(self):
|
||
self.login_url = reverse("api_v1:user:login")
|
||
self.password = fake.password(length=12, special_chars=False)
|
||
self.user = UserFactory.create_user(password=self.password)
|
||
|
||
self.login_data = {"username": self.user.username, "password": self.password}
|
||
|
||
def test_login_success(self):
|
||
"""Test successful login"""
|
||
response = self.client.post(self.login_url, self.login_data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertIn("refresh", response.data)
|
||
self.assertIn("access", response.data)
|
||
|
||
def test_login_invalid_credentials(self):
|
||
"""Test login fails with invalid credentials"""
|
||
data = self.login_data.copy()
|
||
data["password"] = fake.password(length=12, special_chars=False)
|
||
|
||
response = self.client.post(self.login_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||
self.assertIn("error", response.data)
|
||
|
||
def test_login_nonexistent_user(self):
|
||
"""Test login fails for nonexistent user"""
|
||
data = {
|
||
"username": fake.unique.user_name(),
|
||
"password": fake.password(length=12, special_chars=False),
|
||
}
|
||
|
||
response = self.client.post(self.login_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||
|
||
def test_login_invalid_payload_returns_400(self):
|
||
response = self.client.post(self.login_url, {}, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("username", response.data)
|
||
|
||
|
||
class LogoutViewTest(APITestCase):
|
||
def test_logout_blacklists_refresh_tokens_and_returns_empty_payload(self):
|
||
user = UserFactory.create_user()
|
||
tokens = UserService.get_tokens_for_user(user)
|
||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
|
||
|
||
response = self.client.post(reverse("api_v1:user:logout"), {}, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(response.data, {})
|
||
self.assertEqual(BlacklistedToken.objects.filter(token__user=user).count(), 1)
|
||
|
||
|
||
class CurrentUserViewTest(APITestCase):
|
||
"""Tests for CurrentUserView"""
|
||
|
||
def setUp(self):
|
||
self.user = UserFactory.create_user()
|
||
ProfileFactory.create_profile(user=self.user)
|
||
self.current_user_url = reverse("api_v1:user:current_user")
|
||
self.tokens = UserService.get_tokens_for_user(self.user)
|
||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
||
|
||
def test_get_current_user_authenticated(self):
|
||
"""Test getting current user when authenticated"""
|
||
response = self.client.get(self.current_user_url)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(
|
||
set(response.data.keys()),
|
||
{
|
||
"id",
|
||
"username",
|
||
"email",
|
||
"phone",
|
||
"is_active",
|
||
"role",
|
||
"role_label",
|
||
"capabilities",
|
||
"profile",
|
||
},
|
||
)
|
||
self.assertEqual(response.data["id"], self.user.id)
|
||
self.assertEqual(response.data["email"], self.user.email)
|
||
self.assertEqual(response.data["role"], "user")
|
||
self.assertFalse(response.data["capabilities"]["can_manage_exchange"])
|
||
self.assertEqual(
|
||
set(response.data["profile"].keys()),
|
||
{"first_name", "middle_name", "last_name", "full_name"},
|
||
)
|
||
|
||
def test_get_current_user_unauthenticated(self):
|
||
"""Test getting current user when unauthenticated"""
|
||
self.client.credentials() # Remove auth header
|
||
response = self.client.get(self.current_user_url)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||
|
||
|
||
class UserUpdateViewTest(APITestCase):
|
||
"""Tests for UserUpdateView"""
|
||
|
||
def setUp(self):
|
||
self.user = UserFactory.create_user()
|
||
self.update_url = reverse("api_v1:user:user_update")
|
||
self.tokens = UserService.get_tokens_for_user(self.user)
|
||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
||
|
||
self.update_data = {
|
||
"username": fake.unique.user_name(),
|
||
"phone": f"+7{fake.numerify('##########')}",
|
||
}
|
||
|
||
def test_update_user_success(self):
|
||
"""Test successful user update"""
|
||
response = self.client.patch(self.update_url, self.update_data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(response.data["username"], self.update_data["username"])
|
||
self.assertEqual(response.data["phone"], self.update_data["phone"])
|
||
|
||
# Verify in database
|
||
self.user.refresh_from_db()
|
||
self.assertEqual(self.user.username, self.update_data["username"])
|
||
|
||
def test_update_user_unauthenticated(self):
|
||
"""Test user update fails when unauthenticated"""
|
||
self.client.credentials() # Remove auth header
|
||
response = self.client.patch(self.update_url, self.update_data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||
|
||
def test_update_user_invalid_returns_400(self):
|
||
response = self.client.patch(self.update_url, {"username": ""}, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("username", response.data)
|
||
|
||
|
||
class AdminUserManagementViewTest(APITestCase):
|
||
"""Tests for admin-only user management endpoints."""
|
||
|
||
def setUp(self):
|
||
self.admin = UserFactory.create_user(is_staff=True)
|
||
self.user = UserFactory.create_user()
|
||
self.tokens = UserService.get_tokens_for_user(self.admin)
|
||
self.user_tokens = UserService.get_tokens_for_user(self.user)
|
||
self.list_url = reverse("api_v1:user:admin-users")
|
||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
||
|
||
def test_admin_can_list_users(self):
|
||
response = self.client.get(self.list_url)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(
|
||
set(response.data.keys()),
|
||
{"count", "next", "previous", "results"},
|
||
)
|
||
self.assertEqual(
|
||
set(response.data["results"][0].keys()),
|
||
{
|
||
"id",
|
||
"username",
|
||
"email",
|
||
"phone",
|
||
"is_active",
|
||
"role",
|
||
"role_label",
|
||
"capabilities",
|
||
"profile",
|
||
},
|
||
)
|
||
self.assertEqual(
|
||
set(response.data["results"][0]["profile"].keys()),
|
||
{"first_name", "middle_name", "last_name", "full_name"},
|
||
)
|
||
users_by_name = {item["username"]: item for item in response.data["results"]}
|
||
usernames = set(users_by_name)
|
||
self.assertIn(self.admin.username, usernames)
|
||
self.assertIn(self.user.username, usernames)
|
||
self.assertTrue(
|
||
users_by_name[self.admin.username]["capabilities"]["can_manage_exchange"]
|
||
)
|
||
self.assertFalse(
|
||
users_by_name[self.user.username]["capabilities"]["can_manage_exchange"]
|
||
)
|
||
|
||
def test_admin_can_search_users(self):
|
||
ProfileFactory.create_profile(
|
||
user=self.user,
|
||
first_name="Сергей",
|
||
middle_name="Петрович",
|
||
last_name="Иванов",
|
||
)
|
||
another = UserFactory.create_user()
|
||
ProfileFactory.create_profile(user=another, first_name="Илья")
|
||
|
||
response = self.client.get(self.list_url, {"search": "Петрович"})
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
usernames = [item["username"] for item in response.data["results"]]
|
||
self.assertEqual(usernames, [self.user.username])
|
||
|
||
def test_admin_can_order_users(self):
|
||
first = UserFactory.create_user()
|
||
second = UserFactory.create_user()
|
||
ProfileFactory.create_profile(user=first, first_name="Борис")
|
||
ProfileFactory.create_profile(user=second, first_name="Алексей")
|
||
|
||
response = self.client.get(self.list_url, {"ordering": "first_name"})
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
ordered_ids = [item["id"] for item in response.data["results"]]
|
||
self.assertLess(ordered_ids.index(second.id), ordered_ids.index(first.id))
|
||
|
||
def test_admin_can_create_user_with_role(self):
|
||
password = fake.password(length=12, special_chars=False)
|
||
payload = {
|
||
"email": fake.unique.email(),
|
||
"username": fake.unique.user_name(),
|
||
"phone": f"+7{fake.numerify('##########')}",
|
||
"password": password,
|
||
"role": "admin",
|
||
"first_name": "Петр",
|
||
"middle_name": "Петрович",
|
||
"last_name": "Петров",
|
||
}
|
||
|
||
response = self.client.post(self.list_url, payload, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
||
created = User.objects.get(username=payload["username"])
|
||
self.assertTrue(created.is_staff)
|
||
self.assertEqual(
|
||
set(response.data.keys()),
|
||
{
|
||
"id",
|
||
"username",
|
||
"email",
|
||
"phone",
|
||
"role",
|
||
"role_label",
|
||
"is_active",
|
||
},
|
||
)
|
||
self.assertEqual(response.data["role"], "admin")
|
||
self.assertEqual(created.profile.first_name, "Петр")
|
||
self.assertEqual(created.profile.middle_name, "Петрович")
|
||
|
||
def test_admin_create_user_returns_duplicate_email_error(self):
|
||
payload = {
|
||
"email": self.user.email,
|
||
"username": fake.unique.user_name(),
|
||
"password": fake.password(length=12, special_chars=False),
|
||
"role": "user",
|
||
"first_name": "Петр",
|
||
"last_name": "Петров",
|
||
}
|
||
|
||
response = self.client.post(self.list_url, payload, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("email", response.data["errors"][0]["details"]["fields"])
|
||
|
||
def test_admin_create_user_returns_duplicate_username_error(self):
|
||
payload = {
|
||
"email": fake.unique.email(),
|
||
"username": self.user.username,
|
||
"password": fake.password(length=12, special_chars=False),
|
||
"role": "user",
|
||
"first_name": "Петр",
|
||
"last_name": "Петров",
|
||
}
|
||
|
||
response = self.client.post(self.list_url, payload, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("username", response.data["errors"][0]["details"]["fields"])
|
||
|
||
def test_admin_create_user_returns_password_validation_error(self):
|
||
payload = {
|
||
"email": fake.unique.email(),
|
||
"username": fake.unique.user_name(),
|
||
"password": "123",
|
||
"role": "user",
|
||
"first_name": "Петр",
|
||
"last_name": "Петров",
|
||
}
|
||
|
||
response = self.client.post(self.list_url, payload, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("password", response.data["errors"][0]["details"]["fields"])
|
||
|
||
def test_admin_can_update_user_and_role(self):
|
||
url = reverse("api_v1:user:admin-user-detail", args=[self.user.id])
|
||
|
||
response = self.client.patch(
|
||
url,
|
||
{
|
||
"role": "admin",
|
||
"first_name": "Иван",
|
||
"middle_name": "Иванович",
|
||
"last_name": "Иванов",
|
||
"is_verified": True,
|
||
},
|
||
format="json",
|
||
)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.user.refresh_from_db()
|
||
self.assertTrue(self.user.is_staff)
|
||
self.assertTrue(self.user.is_verified)
|
||
self.assertEqual(
|
||
set(response.data.keys()),
|
||
{
|
||
"id",
|
||
"username",
|
||
"email",
|
||
"phone",
|
||
"role",
|
||
"role_label",
|
||
"is_active",
|
||
},
|
||
)
|
||
self.assertEqual(self.user.profile.first_name, "Иван")
|
||
self.assertEqual(self.user.profile.middle_name, "Иванович")
|
||
|
||
def test_admin_can_get_user_detail(self):
|
||
url = reverse("api_v1:user:admin-user-detail", args=[self.user.id])
|
||
|
||
response = self.client.get(url)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(response.data["id"], self.user.id)
|
||
self.assertEqual(
|
||
set(response.data["profile"].keys()),
|
||
{"first_name", "middle_name", "last_name", "full_name"},
|
||
)
|
||
|
||
def test_admin_patch_user_returns_duplicate_email_error(self):
|
||
another = UserFactory.create_user()
|
||
url = reverse("api_v1:user:admin-user-detail", args=[another.id])
|
||
|
||
response = self.client.patch(
|
||
url,
|
||
{"email": self.user.email},
|
||
format="json",
|
||
)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("email", response.data["errors"][0]["details"]["fields"])
|
||
|
||
def test_admin_patch_user_returns_duplicate_username_error(self):
|
||
another = UserFactory.create_user()
|
||
url = reverse("api_v1:user:admin-user-detail", args=[another.id])
|
||
|
||
response = self.client.patch(
|
||
url,
|
||
{"username": self.user.username},
|
||
format="json",
|
||
)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("username", response.data["errors"][0]["details"]["fields"])
|
||
|
||
def test_admin_patch_user_returns_password_validation_error(self):
|
||
url = reverse("api_v1:user:admin-user-detail", args=[self.user.id])
|
||
|
||
response = self.client.patch(
|
||
url,
|
||
{"password": "123"},
|
||
format="json",
|
||
)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("password", response.data["errors"][0]["details"]["fields"])
|
||
|
||
def test_admin_cannot_patch_self_to_inactive(self):
|
||
url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id])
|
||
|
||
response = self.client.patch(url, {"is_active": False}, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertEqual(response.data["detail"], "Нельзя деактивировать самого себя.")
|
||
|
||
def test_admin_cannot_patch_self_to_regular_user(self):
|
||
url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id])
|
||
|
||
response = self.client.patch(url, {"role": "user"}, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertEqual(
|
||
response.data["detail"], "Нельзя снять у себя роль администратора."
|
||
)
|
||
|
||
def test_admin_can_patch_self_with_safe_fields(self):
|
||
url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id])
|
||
|
||
response = self.client.patch(
|
||
url,
|
||
{"first_name": "Админ", "last_name": "Админов"},
|
||
format="json",
|
||
)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(response.data["role"], "admin")
|
||
self.assertNotIn("profile", response.data)
|
||
|
||
def test_admin_can_deactivate_user(self):
|
||
url = reverse("api_v1:user:admin-user-deactivate", args=[self.user.id])
|
||
|
||
response = self.client.post(url, {}, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.user.refresh_from_db()
|
||
self.assertFalse(self.user.is_active)
|
||
|
||
def test_admin_can_activate_user(self):
|
||
self.user.is_active = False
|
||
self.user.save(update_fields=["is_active"])
|
||
url = reverse("api_v1:user:admin-user-activate", args=[self.user.id])
|
||
|
||
response = self.client.post(url, {}, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.user.refresh_from_db()
|
||
self.assertTrue(self.user.is_active)
|
||
self.assertEqual(
|
||
set(response.data.keys()),
|
||
{
|
||
"id",
|
||
"username",
|
||
"email",
|
||
"phone",
|
||
"role",
|
||
"role_label",
|
||
"is_active",
|
||
},
|
||
)
|
||
|
||
def test_admin_cannot_deactivate_self(self):
|
||
url = reverse("api_v1:user:admin-user-deactivate", args=[self.admin.id])
|
||
|
||
response = self.client.post(url, {}, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
|
||
def test_regular_user_cannot_access_admin_user_management(self):
|
||
self.client.credentials(
|
||
HTTP_AUTHORIZATION=f"Bearer {self.user_tokens['access']}"
|
||
)
|
||
|
||
response = self.client.get(self.list_url)
|
||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||
|
||
|
||
class ProfileDetailViewTest(APITestCase):
|
||
"""Tests for ProfileDetailView"""
|
||
|
||
def setUp(self):
|
||
self.user = UserFactory.create_user()
|
||
self.profile = ProfileFactory.create_profile(user=self.user)
|
||
self.profile_url = reverse("api_v1:user:profile_detail")
|
||
self.tokens = UserService.get_tokens_for_user(self.user)
|
||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
||
|
||
self.update_data = {
|
||
"first_name": fake.first_name(),
|
||
"middle_name": fake.first_name(),
|
||
"last_name": fake.last_name(),
|
||
"bio": fake.text(max_nb_chars=200),
|
||
}
|
||
|
||
def test_get_profile_success(self):
|
||
"""Test successful profile retrieval"""
|
||
response = self.client.get(self.profile_url)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(response.data["first_name"], self.profile.first_name)
|
||
self.assertEqual(response.data["middle_name"], self.profile.middle_name)
|
||
|
||
def test_update_profile_success(self):
|
||
"""Test successful profile update"""
|
||
response = self.client.patch(self.profile_url, self.update_data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(response.data["first_name"], self.update_data["first_name"])
|
||
self.assertEqual(response.data["middle_name"], self.update_data["middle_name"])
|
||
self.assertEqual(response.data["last_name"], self.update_data["last_name"])
|
||
|
||
# Verify in database
|
||
self.profile.refresh_from_db()
|
||
self.assertEqual(self.profile.first_name, self.update_data["first_name"])
|
||
|
||
def test_profile_created_if_not_exists(self):
|
||
"""Test profile is created if it doesn't exist"""
|
||
# Delete existing profile
|
||
self.profile.delete()
|
||
|
||
response = self.client.get(self.profile_url)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
# Profile should be created automatically
|
||
self.assertTrue(Profile.objects.filter(user=self.user).exists())
|
||
|
||
def test_update_profile_invalid_returns_400(self):
|
||
response = self.client.patch(
|
||
self.profile_url,
|
||
{"date_of_birth": "not-a-date"},
|
||
format="json",
|
||
)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("date_of_birth", response.data)
|
||
|
||
def test_get_full_profile_endpoint(self):
|
||
response = self.client.get(reverse("api_v1:user:profile_full"))
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(response.data["id"], self.user.id)
|
||
|
||
|
||
class PasswordChangeViewTest(APITestCase):
|
||
"""Tests for PasswordChangeView"""
|
||
|
||
def setUp(self):
|
||
self.old_password = fake.password(length=12, special_chars=False)
|
||
self.new_password = fake.password(length=12, special_chars=False)
|
||
self.user = UserFactory.create_user(password=self.old_password)
|
||
self.password_change_url = reverse("api_v1:user:password_change")
|
||
self.tokens = UserService.get_tokens_for_user(self.user)
|
||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
||
|
||
self.password_data = {
|
||
"old_password": self.old_password,
|
||
"new_password": self.new_password,
|
||
"new_password_confirm": self.new_password,
|
||
}
|
||
|
||
def test_change_password_success(self):
|
||
"""Test successful password change"""
|
||
response = self.client.post(
|
||
self.password_change_url, self.password_data, format="json"
|
||
)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertIn("message", response.data)
|
||
|
||
# Verify password was changed
|
||
self.user.refresh_from_db()
|
||
self.assertTrue(self.user.check_password(self.new_password))
|
||
|
||
def test_change_password_wrong_old_password(self):
|
||
"""Test password change fails with wrong old password"""
|
||
data = self.password_data.copy()
|
||
data["old_password"] = fake.password(length=12, special_chars=False)
|
||
|
||
response = self.client.post(self.password_change_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("error", response.data)
|
||
|
||
def test_change_password_passwords_do_not_match(self):
|
||
"""Test password change fails when new passwords don't match"""
|
||
data = self.password_data.copy()
|
||
data["new_password_confirm"] = fake.password(length=12, special_chars=False)
|
||
|
||
response = self.client.post(self.password_change_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertIn("non_field_errors", response.data)
|
||
|
||
|
||
class TokenRefreshViewTest(APITestCase):
|
||
"""Tests for TokenRefreshView"""
|
||
|
||
def setUp(self):
|
||
self.user = UserFactory.create_user()
|
||
self.refresh_url = reverse("api_v1:user:token_refresh")
|
||
self.tokens = UserService.get_tokens_for_user(self.user)
|
||
|
||
def test_refresh_token_success(self):
|
||
"""Test successful token refresh"""
|
||
data = {"refresh": self.tokens["refresh"]}
|
||
response = self.client.post(self.refresh_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertIn("access", response.data)
|
||
self.assertIn("refresh", response.data)
|
||
# New refresh token should be different
|
||
# Refresh token may be the same or different depending on implementation
|
||
|
||
def test_refresh_token_invalid(self):
|
||
"""Test token refresh fails with invalid refresh token"""
|
||
data = {"refresh": fake.pystr(min_chars=20, max_chars=50)}
|
||
response = self.client.post(self.refresh_url, data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||
self.assertTrue(
|
||
"errors" in response.data
|
||
or "detail" in response.data
|
||
or "code" in response.data
|
||
)
|
||
|
||
def test_refresh_token_missing(self):
|
||
"""Test token refresh fails without refresh token"""
|
||
response = self.client.post(self.refresh_url, {}, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||
self.assertTrue("errors" in response.data or "detail" in response.data)
|
||
|
||
|
||
class TokenVerifyViewTest(APITestCase):
|
||
def test_verify_access_token_success(self):
|
||
user = UserFactory.create_user()
|
||
tokens = UserService.get_tokens_for_user(user)
|
||
|
||
response = self.client.post(
|
||
reverse("api_v1:user:token_verify"),
|
||
{"token": tokens["access"]},
|
||
format="json",
|
||
)
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
|
||
|
||
class ApiJwtOnlyAuthenticationTest(APITestCase):
|
||
"""Tests that API auth flow is JWT-only and not session-cookie based."""
|
||
|
||
def setUp(self):
|
||
self.user = UserFactory.create_user()
|
||
self.tokens = UserService.get_tokens_for_user(self.user)
|
||
self.update_url = reverse("api_v1:user:user_update")
|
||
self.patch_data = {"username": fake.unique.user_name()}
|
||
|
||
# Explicitly enable CSRF checks to catch accidental SessionAuthentication usage.
|
||
self.client = APIClient(enforce_csrf_checks=True)
|
||
self.client.cookies["sessionid"] = "fake-admin-session"
|
||
self.client.cookies["csrftoken"] = "fake-csrf-token"
|
||
|
||
def test_patch_with_bearer_and_session_cookies_returns_200(self):
|
||
"""Bearer JWT should authenticate even if session cookies are present."""
|
||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
||
|
||
response = self.client.patch(self.update_url, self.patch_data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||
self.assertEqual(response.data["id"], self.user.id)
|
||
|
||
def test_patch_with_only_session_cookies_returns_401_not_403(self):
|
||
"""Session cookies without JWT should not trigger CSRF 403 for API auth."""
|
||
response = self.client.patch(self.update_url, self.patch_data, format="json")
|
||
|
||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|