"""Tests for user DRF views""" from apps.user.models import Profile from apps.user.services import UserService from django.contrib.auth import get_user_model from django.urls import reverse from faker import Faker from rest_framework import status from rest_framework.test import APIClient, APITestCase from rest_framework_simplejwt.token_blacklist.models import BlacklistedToken from .factories import ProfileFactory, UserFactory User = get_user_model() fake = Faker("ru_RU") class RegisterViewTest(APITestCase): """Tests for RegisterView""" def setUp(self): self.register_url = reverse("api_v1:user:register") self.password = fake.password(length=12, special_chars=False) self.user_data = { "email": fake.unique.email(), "username": fake.unique.user_name(), "password": self.password, "password_confirm": self.password, "phone": f"+7{fake.numerify('##########')}", "first_name": fake.first_name(), "middle_name": fake.first_name(), "last_name": fake.last_name(), } def test_register_success(self): """Test successful user registration""" response = self.client.post(self.register_url, self.user_data, format="json") self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.assertIn("user", response.data) self.assertIn("tokens", response.data) self.assertIn("refresh", response.data["tokens"]) self.assertIn("access", response.data["tokens"]) # Verify user was created created_user = User.objects.get(email=self.user_data["email"]) self.assertEqual(created_user.profile.first_name, self.user_data["first_name"]) self.assertEqual(created_user.profile.last_name, self.user_data["last_name"]) def test_register_passwords_do_not_match(self): """Test registration fails when passwords don't match""" data = self.user_data.copy() data["password_confirm"] = fake.password(length=12, special_chars=False) response = self.client.post(self.register_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("non_field_errors", response.data) def test_register_duplicate_email(self): """Test registration fails with duplicate email""" # Create existing user existing_user = UserFactory.create_user() # Use the same email as existing user data = self.user_data.copy() data["email"] = existing_user.email response = self.client.post(self.register_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("email", response.data) def test_register_short_password(self): """Test registration fails with short password""" short_password = fake.pystr(min_chars=3, max_chars=5) data = self.user_data.copy() data["password"] = short_password data["password_confirm"] = short_password response = self.client.post(self.register_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("password", response.data) def test_register_requires_first_and_last_name(self): data = self.user_data.copy() data.pop("first_name") data.pop("last_name") response = self.client.post(self.register_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("first_name", response.data) self.assertIn("last_name", response.data) class LoginViewTest(APITestCase): """Tests for LoginView""" def setUp(self): self.login_url = reverse("api_v1:user:login") self.password = fake.password(length=12, special_chars=False) self.user = UserFactory.create_user(password=self.password) self.login_data = {"username": self.user.username, "password": self.password} def test_login_success(self): """Test successful login""" response = self.client.post(self.login_url, self.login_data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn("refresh", response.data) self.assertIn("access", response.data) def test_login_accepts_slashless_api_url(self): """Frontend clients can call API URLs without Django's trailing slash.""" response = self.client.post( self.login_url.rstrip("/"), self.login_data, format="json" ) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn("refresh", response.data) self.assertIn("access", response.data) def test_login_invalid_credentials(self): """Test login fails with invalid credentials""" data = self.login_data.copy() data["password"] = fake.password(length=12, special_chars=False) response = self.client.post(self.login_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) self.assertIn("error", response.data) def test_login_nonexistent_user(self): """Test login fails for nonexistent user""" data = { "username": fake.unique.user_name(), "password": fake.password(length=12, special_chars=False), } response = self.client.post(self.login_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) def test_login_invalid_payload_returns_400(self): response = self.client.post(self.login_url, {}, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("username", response.data) class LogoutViewTest(APITestCase): def test_logout_blacklists_refresh_tokens_and_returns_empty_payload(self): user = UserFactory.create_user() tokens = UserService.get_tokens_for_user(user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}") response = self.client.post(reverse("api_v1:user:logout"), {}, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data, {}) self.assertEqual(BlacklistedToken.objects.filter(token__user=user).count(), 1) class CurrentUserViewTest(APITestCase): """Tests for CurrentUserView""" def setUp(self): self.user = UserFactory.create_user() ProfileFactory.create_profile(user=self.user) self.current_user_url = reverse("api_v1:user:current_user") self.tokens = UserService.get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}") def test_get_current_user_authenticated(self): """Test getting current user when authenticated""" response = self.client.get(self.current_user_url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual( set(response.data.keys()), { "id", "username", "email", "phone", "is_active", "role", "role_label", "capabilities", "profile", }, ) self.assertEqual(response.data["id"], self.user.id) self.assertEqual(response.data["email"], self.user.email) self.assertEqual(response.data["role"], "user") self.assertFalse(response.data["capabilities"]["can_manage_exchange"]) self.assertEqual( set(response.data["profile"].keys()), {"first_name", "middle_name", "last_name", "full_name"}, ) def test_get_current_user_unauthenticated(self): """Test getting current user when unauthenticated""" self.client.credentials() # Remove auth header response = self.client.get(self.current_user_url) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) class UserUpdateViewTest(APITestCase): """Tests for UserUpdateView""" def setUp(self): self.user = UserFactory.create_user() self.update_url = reverse("api_v1:user:user_update") self.tokens = UserService.get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}") self.update_data = { "username": fake.unique.user_name(), "phone": f"+7{fake.numerify('##########')}", } def test_update_user_success(self): """Test successful user update""" response = self.client.patch(self.update_url, self.update_data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["username"], self.update_data["username"]) self.assertEqual(response.data["phone"], self.update_data["phone"]) # Verify in database self.user.refresh_from_db() self.assertEqual(self.user.username, self.update_data["username"]) def test_update_user_unauthenticated(self): """Test user update fails when unauthenticated""" self.client.credentials() # Remove auth header response = self.client.patch(self.update_url, self.update_data, format="json") self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) def test_update_user_invalid_returns_400(self): response = self.client.patch(self.update_url, {"username": ""}, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("username", response.data) class AdminUserManagementViewTest(APITestCase): """Tests for admin-only user management endpoints.""" def setUp(self): self.admin = UserFactory.create_user(is_staff=True) self.user = UserFactory.create_user() self.tokens = UserService.get_tokens_for_user(self.admin) self.user_tokens = UserService.get_tokens_for_user(self.user) self.list_url = reverse("api_v1:user:admin-users") self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}") def test_admin_can_list_users(self): response = self.client.get(self.list_url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual( set(response.data.keys()), {"count", "next", "previous", "results"}, ) self.assertEqual( set(response.data["results"][0].keys()), { "id", "username", "email", "phone", "is_active", "role", "role_label", "capabilities", "profile", }, ) self.assertEqual( set(response.data["results"][0]["profile"].keys()), {"first_name", "middle_name", "last_name", "full_name"}, ) users_by_name = {item["username"]: item for item in response.data["results"]} usernames = set(users_by_name) self.assertIn(self.admin.username, usernames) self.assertIn(self.user.username, usernames) self.assertTrue( users_by_name[self.admin.username]["capabilities"]["can_manage_exchange"] ) self.assertFalse( users_by_name[self.user.username]["capabilities"]["can_manage_exchange"] ) def test_admin_can_search_users(self): ProfileFactory.create_profile( user=self.user, first_name="Сергей", middle_name="Петрович", last_name="Иванов", ) another = UserFactory.create_user() ProfileFactory.create_profile(user=another, first_name="Илья") response = self.client.get(self.list_url, {"search": "Петрович"}) self.assertEqual(response.status_code, status.HTTP_200_OK) usernames = [item["username"] for item in response.data["results"]] self.assertEqual(usernames, [self.user.username]) def test_admin_can_order_users(self): first = UserFactory.create_user() second = UserFactory.create_user() ProfileFactory.create_profile(user=first, first_name="Борис") ProfileFactory.create_profile(user=second, first_name="Алексей") response = self.client.get(self.list_url, {"ordering": "first_name"}) self.assertEqual(response.status_code, status.HTTP_200_OK) ordered_ids = [item["id"] for item in response.data["results"]] self.assertLess(ordered_ids.index(second.id), ordered_ids.index(first.id)) def test_admin_can_create_user_with_role(self): password = fake.password(length=12, special_chars=False) payload = { "email": fake.unique.email(), "username": fake.unique.user_name(), "phone": f"+7{fake.numerify('##########')}", "password": password, "role": "admin", "first_name": "Петр", "middle_name": "Петрович", "last_name": "Петров", } response = self.client.post(self.list_url, payload, format="json") self.assertEqual(response.status_code, status.HTTP_201_CREATED) created = User.objects.get(username=payload["username"]) self.assertTrue(created.is_staff) self.assertEqual( set(response.data.keys()), { "id", "username", "email", "phone", "role", "role_label", "is_active", }, ) self.assertEqual(response.data["role"], "admin") self.assertEqual(created.profile.first_name, "Петр") self.assertEqual(created.profile.middle_name, "Петрович") def test_admin_create_user_returns_duplicate_email_error(self): payload = { "email": self.user.email, "username": fake.unique.user_name(), "password": fake.password(length=12, special_chars=False), "role": "user", "first_name": "Петр", "last_name": "Петров", } response = self.client.post(self.list_url, payload, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("email", response.data["errors"][0]["details"]["fields"]) def test_admin_create_user_returns_duplicate_username_error(self): payload = { "email": fake.unique.email(), "username": self.user.username, "password": fake.password(length=12, special_chars=False), "role": "user", "first_name": "Петр", "last_name": "Петров", } response = self.client.post(self.list_url, payload, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("username", response.data["errors"][0]["details"]["fields"]) def test_admin_create_user_returns_password_validation_error(self): payload = { "email": fake.unique.email(), "username": fake.unique.user_name(), "password": "123", "role": "user", "first_name": "Петр", "last_name": "Петров", } response = self.client.post(self.list_url, payload, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("password", response.data["errors"][0]["details"]["fields"]) def test_admin_can_update_user_and_role(self): url = reverse("api_v1:user:admin-user-detail", args=[self.user.id]) response = self.client.patch( url, { "role": "admin", "first_name": "Иван", "middle_name": "Иванович", "last_name": "Иванов", "is_verified": True, }, format="json", ) self.assertEqual(response.status_code, status.HTTP_200_OK) self.user.refresh_from_db() self.assertTrue(self.user.is_staff) self.assertTrue(self.user.is_verified) self.assertEqual( set(response.data.keys()), { "id", "username", "email", "phone", "role", "role_label", "is_active", }, ) self.assertEqual(self.user.profile.first_name, "Иван") self.assertEqual(self.user.profile.middle_name, "Иванович") def test_admin_can_get_user_detail(self): url = reverse("api_v1:user:admin-user-detail", args=[self.user.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["id"], self.user.id) self.assertEqual( set(response.data["profile"].keys()), {"first_name", "middle_name", "last_name", "full_name"}, ) def test_admin_patch_user_returns_duplicate_email_error(self): another = UserFactory.create_user() url = reverse("api_v1:user:admin-user-detail", args=[another.id]) response = self.client.patch( url, {"email": self.user.email}, format="json", ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("email", response.data["errors"][0]["details"]["fields"]) def test_admin_patch_user_returns_duplicate_username_error(self): another = UserFactory.create_user() url = reverse("api_v1:user:admin-user-detail", args=[another.id]) response = self.client.patch( url, {"username": self.user.username}, format="json", ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("username", response.data["errors"][0]["details"]["fields"]) def test_admin_patch_user_returns_password_validation_error(self): url = reverse("api_v1:user:admin-user-detail", args=[self.user.id]) response = self.client.patch( url, {"password": "123"}, format="json", ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("password", response.data["errors"][0]["details"]["fields"]) def test_admin_cannot_patch_self_to_inactive(self): url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id]) response = self.client.patch(url, {"is_active": False}, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertEqual(response.data["detail"], "Нельзя деактивировать самого себя.") def test_admin_cannot_patch_self_to_regular_user(self): url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id]) response = self.client.patch(url, {"role": "user"}, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertEqual( response.data["detail"], "Нельзя снять у себя роль администратора." ) def test_admin_can_patch_self_with_safe_fields(self): url = reverse("api_v1:user:admin-user-detail", args=[self.admin.id]) response = self.client.patch( url, {"first_name": "Админ", "last_name": "Админов"}, format="json", ) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["role"], "admin") self.assertNotIn("profile", response.data) def test_admin_can_deactivate_user(self): url = reverse("api_v1:user:admin-user-deactivate", args=[self.user.id]) response = self.client.post(url, {}, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.user.refresh_from_db() self.assertFalse(self.user.is_active) def test_admin_can_activate_user(self): self.user.is_active = False self.user.save(update_fields=["is_active"]) url = reverse("api_v1:user:admin-user-activate", args=[self.user.id]) response = self.client.post(url, {}, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.user.refresh_from_db() self.assertTrue(self.user.is_active) self.assertEqual( set(response.data.keys()), { "id", "username", "email", "phone", "role", "role_label", "is_active", }, ) def test_admin_cannot_deactivate_self(self): url = reverse("api_v1:user:admin-user-deactivate", args=[self.admin.id]) response = self.client.post(url, {}, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) def test_regular_user_cannot_access_admin_user_management(self): self.client.credentials( HTTP_AUTHORIZATION=f"Bearer {self.user_tokens['access']}" ) response = self.client.get(self.list_url) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) class ProfileDetailViewTest(APITestCase): """Tests for ProfileDetailView""" def setUp(self): self.user = UserFactory.create_user() self.profile = ProfileFactory.create_profile(user=self.user) self.profile_url = reverse("api_v1:user:profile_detail") self.tokens = UserService.get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}") self.update_data = { "first_name": fake.first_name(), "middle_name": fake.first_name(), "last_name": fake.last_name(), "bio": fake.text(max_nb_chars=200), } def test_get_profile_success(self): """Test successful profile retrieval""" response = self.client.get(self.profile_url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["first_name"], self.profile.first_name) self.assertEqual(response.data["middle_name"], self.profile.middle_name) def test_update_profile_success(self): """Test successful profile update""" response = self.client.patch(self.profile_url, self.update_data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["first_name"], self.update_data["first_name"]) self.assertEqual(response.data["middle_name"], self.update_data["middle_name"]) self.assertEqual(response.data["last_name"], self.update_data["last_name"]) # Verify in database self.profile.refresh_from_db() self.assertEqual(self.profile.first_name, self.update_data["first_name"]) def test_profile_created_if_not_exists(self): """Test profile is created if it doesn't exist""" # Delete existing profile self.profile.delete() response = self.client.get(self.profile_url) self.assertEqual(response.status_code, status.HTTP_200_OK) # Profile should be created automatically self.assertTrue(Profile.objects.filter(user=self.user).exists()) def test_update_profile_invalid_returns_400(self): response = self.client.patch( self.profile_url, {"date_of_birth": "not-a-date"}, format="json", ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("date_of_birth", response.data) def test_get_full_profile_endpoint(self): response = self.client.get(reverse("api_v1:user:profile_full")) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["id"], self.user.id) class PasswordChangeViewTest(APITestCase): """Tests for PasswordChangeView""" def setUp(self): self.old_password = fake.password(length=12, special_chars=False) self.new_password = fake.password(length=12, special_chars=False) self.user = UserFactory.create_user(password=self.old_password) self.password_change_url = reverse("api_v1:user:password_change") self.tokens = UserService.get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}") self.password_data = { "old_password": self.old_password, "new_password": self.new_password, "new_password_confirm": self.new_password, } def test_change_password_success(self): """Test successful password change""" response = self.client.post( self.password_change_url, self.password_data, format="json" ) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn("message", response.data) # Verify password was changed self.user.refresh_from_db() self.assertTrue(self.user.check_password(self.new_password)) def test_change_password_wrong_old_password(self): """Test password change fails with wrong old password""" data = self.password_data.copy() data["old_password"] = fake.password(length=12, special_chars=False) response = self.client.post(self.password_change_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("error", response.data) def test_change_password_passwords_do_not_match(self): """Test password change fails when new passwords don't match""" data = self.password_data.copy() data["new_password_confirm"] = fake.password(length=12, special_chars=False) response = self.client.post(self.password_change_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("non_field_errors", response.data) class TokenRefreshViewTest(APITestCase): """Tests for TokenRefreshView""" def setUp(self): self.user = UserFactory.create_user() self.refresh_url = reverse("api_v1:user:token_refresh") self.tokens = UserService.get_tokens_for_user(self.user) def test_refresh_token_success(self): """Test successful token refresh""" data = {"refresh": self.tokens["refresh"]} response = self.client.post(self.refresh_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn("access", response.data) self.assertIn("refresh", response.data) # New refresh token should be different # Refresh token may be the same or different depending on implementation def test_refresh_token_invalid(self): """Test token refresh fails with invalid refresh token""" data = {"refresh": fake.pystr(min_chars=20, max_chars=50)} response = self.client.post(self.refresh_url, data, format="json") self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) self.assertTrue( "errors" in response.data or "detail" in response.data or "code" in response.data ) def test_refresh_token_missing(self): """Test token refresh fails without refresh token""" response = self.client.post(self.refresh_url, {}, format="json") self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertTrue("errors" in response.data or "detail" in response.data) class TokenVerifyViewTest(APITestCase): def test_verify_access_token_success(self): user = UserFactory.create_user() tokens = UserService.get_tokens_for_user(user) response = self.client.post( reverse("api_v1:user:token_verify"), {"token": tokens["access"]}, format="json", ) self.assertEqual(response.status_code, status.HTTP_200_OK) class ApiJwtOnlyAuthenticationTest(APITestCase): """Tests that API auth flow is JWT-only and not session-cookie based.""" def setUp(self): self.user = UserFactory.create_user() self.tokens = UserService.get_tokens_for_user(self.user) self.update_url = reverse("api_v1:user:user_update") self.patch_data = {"username": fake.unique.user_name()} # Explicitly enable CSRF checks to catch accidental SessionAuthentication usage. self.client = APIClient(enforce_csrf_checks=True) self.client.cookies["sessionid"] = "fake-admin-session" self.client.cookies["csrftoken"] = "fake-csrf-token" def test_patch_with_bearer_and_session_cookies_returns_200(self): """Bearer JWT should authenticate even if session cookies are present.""" self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}") response = self.client.patch(self.update_url, self.patch_data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["id"], self.user.id) def test_patch_with_only_session_cookies_returns_401_not_403(self): """Session cookies without JWT should not trigger CSRF 403 for API auth.""" response = self.client.patch(self.update_url, self.patch_data, format="json") self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)