- Add Model Mixins: TimestampMixin, SoftDeleteMixin, AuditMixin, etc. - Add Base Services: BaseService, BulkOperationsMixin, QueryOptimizerMixin - Add Base ViewSets with bulk operations - Add BackgroundJob model for Celery task tracking - Add BaseAppCommand for management commands - Add permissions, pagination, filters, cache, logging - Migrate tests to factory_boy + faker - Add CHANGELOG.md - 297 tests passing
313 lines
12 KiB
Python
313 lines
12 KiB
Python
"""Tests for user DRF views"""
|
|
|
|
from apps.user.models import Profile
|
|
from apps.user.services import UserService
|
|
from django.contrib.auth import get_user_model
|
|
from django.urls import reverse
|
|
from faker import Faker
|
|
from rest_framework import status
|
|
from rest_framework.test import APITestCase
|
|
|
|
from .factories import ProfileFactory, UserFactory
|
|
|
|
User = get_user_model()
|
|
fake = Faker("ru_RU")
|
|
|
|
|
|
class RegisterViewTest(APITestCase):
|
|
"""Tests for RegisterView"""
|
|
|
|
def setUp(self):
|
|
self.register_url = reverse("api_v1:user:register")
|
|
self.password = fake.password(length=12, special_chars=False)
|
|
self.user_data = {
|
|
"email": fake.unique.email(),
|
|
"username": fake.unique.user_name(),
|
|
"password": self.password,
|
|
"password_confirm": self.password,
|
|
"phone": f"+7{fake.numerify('##########')}",
|
|
}
|
|
|
|
def test_register_success(self):
|
|
"""Test successful user registration"""
|
|
response = self.client.post(self.register_url, self.user_data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
|
self.assertIn("user", response.data)
|
|
self.assertIn("tokens", response.data)
|
|
self.assertIn("refresh", response.data["tokens"])
|
|
self.assertIn("access", response.data["tokens"])
|
|
|
|
# Verify user was created
|
|
self.assertTrue(User.objects.filter(email=self.user_data["email"]).exists())
|
|
|
|
def test_register_passwords_do_not_match(self):
|
|
"""Test registration fails when passwords don't match"""
|
|
data = self.user_data.copy()
|
|
data["password_confirm"] = fake.password(length=12, special_chars=False)
|
|
|
|
response = self.client.post(self.register_url, data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
|
self.assertIn("non_field_errors", response.data)
|
|
|
|
def test_register_duplicate_email(self):
|
|
"""Test registration fails with duplicate email"""
|
|
# Create existing user
|
|
existing_user = UserFactory.create_user()
|
|
|
|
# Use the same email as existing user
|
|
data = self.user_data.copy()
|
|
data["email"] = existing_user.email
|
|
|
|
response = self.client.post(self.register_url, data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
|
self.assertIn("email", response.data)
|
|
|
|
def test_register_short_password(self):
|
|
"""Test registration fails with short password"""
|
|
short_password = fake.pystr(min_chars=3, max_chars=5)
|
|
data = self.user_data.copy()
|
|
data["password"] = short_password
|
|
data["password_confirm"] = short_password
|
|
|
|
response = self.client.post(self.register_url, data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
|
self.assertIn("password", response.data)
|
|
|
|
|
|
class LoginViewTest(APITestCase):
|
|
"""Tests for LoginView"""
|
|
|
|
def setUp(self):
|
|
self.login_url = reverse("api_v1:user:login")
|
|
self.password = fake.password(length=12, special_chars=False)
|
|
self.user = UserFactory.create_user(password=self.password)
|
|
|
|
self.login_data = {"email": self.user.email, "password": self.password}
|
|
|
|
def test_login_success(self):
|
|
"""Test successful login"""
|
|
response = self.client.post(self.login_url, self.login_data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertIn("refresh", response.data)
|
|
self.assertIn("access", response.data)
|
|
|
|
def test_login_invalid_credentials(self):
|
|
"""Test login fails with invalid credentials"""
|
|
data = self.login_data.copy()
|
|
data["password"] = fake.password(length=12, special_chars=False)
|
|
|
|
response = self.client.post(self.login_url, data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
self.assertIn("error", response.data)
|
|
|
|
def test_login_nonexistent_user(self):
|
|
"""Test login fails for nonexistent user"""
|
|
data = {
|
|
"email": fake.unique.email(),
|
|
"password": fake.password(length=12, special_chars=False),
|
|
}
|
|
|
|
response = self.client.post(self.login_url, data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
|
|
class CurrentUserViewTest(APITestCase):
|
|
"""Tests for CurrentUserView"""
|
|
|
|
def setUp(self):
|
|
self.user = UserFactory.create_user()
|
|
ProfileFactory.create_profile(user=self.user)
|
|
self.current_user_url = reverse("api_v1:user:current_user")
|
|
self.tokens = UserService.get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
|
|
|
def test_get_current_user_authenticated(self):
|
|
"""Test getting current user when authenticated"""
|
|
response = self.client.get(self.current_user_url)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertEqual(response.data["id"], self.user.id)
|
|
self.assertEqual(response.data["email"], self.user.email)
|
|
self.assertIn("profile", response.data)
|
|
|
|
def test_get_current_user_unauthenticated(self):
|
|
"""Test getting current user when unauthenticated"""
|
|
self.client.credentials() # Remove auth header
|
|
response = self.client.get(self.current_user_url)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
|
|
class UserUpdateViewTest(APITestCase):
|
|
"""Tests for UserUpdateView"""
|
|
|
|
def setUp(self):
|
|
self.user = UserFactory.create_user()
|
|
self.update_url = reverse("api_v1:user:user_update")
|
|
self.tokens = UserService.get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
|
|
|
self.update_data = {
|
|
"username": fake.unique.user_name(),
|
|
"phone": f"+7{fake.numerify('##########')}",
|
|
}
|
|
|
|
def test_update_user_success(self):
|
|
"""Test successful user update"""
|
|
response = self.client.patch(self.update_url, self.update_data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertEqual(response.data["username"], self.update_data["username"])
|
|
self.assertEqual(response.data["phone"], self.update_data["phone"])
|
|
|
|
# Verify in database
|
|
self.user.refresh_from_db()
|
|
self.assertEqual(self.user.username, self.update_data["username"])
|
|
|
|
def test_update_user_unauthenticated(self):
|
|
"""Test user update fails when unauthenticated"""
|
|
self.client.credentials() # Remove auth header
|
|
response = self.client.patch(self.update_url, self.update_data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
|
|
class ProfileDetailViewTest(APITestCase):
|
|
"""Tests for ProfileDetailView"""
|
|
|
|
def setUp(self):
|
|
self.user = UserFactory.create_user()
|
|
self.profile = ProfileFactory.create_profile(user=self.user)
|
|
self.profile_url = reverse("api_v1:user:profile_detail")
|
|
self.tokens = UserService.get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
|
|
|
self.update_data = {
|
|
"first_name": fake.first_name(),
|
|
"last_name": fake.last_name(),
|
|
"bio": fake.text(max_nb_chars=200),
|
|
}
|
|
|
|
def test_get_profile_success(self):
|
|
"""Test successful profile retrieval"""
|
|
response = self.client.get(self.profile_url)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertEqual(response.data["first_name"], self.profile.first_name)
|
|
|
|
def test_update_profile_success(self):
|
|
"""Test successful profile update"""
|
|
response = self.client.patch(self.profile_url, self.update_data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertEqual(response.data["first_name"], self.update_data["first_name"])
|
|
self.assertEqual(response.data["last_name"], self.update_data["last_name"])
|
|
|
|
# Verify in database
|
|
self.profile.refresh_from_db()
|
|
self.assertEqual(self.profile.first_name, self.update_data["first_name"])
|
|
|
|
def test_profile_created_if_not_exists(self):
|
|
"""Test profile is created if it doesn't exist"""
|
|
# Delete existing profile
|
|
self.profile.delete()
|
|
|
|
response = self.client.get(self.profile_url)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
# Profile should be created automatically
|
|
self.assertTrue(Profile.objects.filter(user=self.user).exists())
|
|
|
|
|
|
class PasswordChangeViewTest(APITestCase):
|
|
"""Tests for PasswordChangeView"""
|
|
|
|
def setUp(self):
|
|
self.old_password = fake.password(length=12, special_chars=False)
|
|
self.new_password = fake.password(length=12, special_chars=False)
|
|
self.user = UserFactory.create_user(password=self.old_password)
|
|
self.password_change_url = reverse("api_v1:user:password_change")
|
|
self.tokens = UserService.get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
|
|
|
|
self.password_data = {
|
|
"old_password": self.old_password,
|
|
"new_password": self.new_password,
|
|
"new_password_confirm": self.new_password,
|
|
}
|
|
|
|
def test_change_password_success(self):
|
|
"""Test successful password change"""
|
|
response = self.client.post(
|
|
self.password_change_url, self.password_data, format="json"
|
|
)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertIn("message", response.data)
|
|
|
|
# Verify password was changed
|
|
self.user.refresh_from_db()
|
|
self.assertTrue(self.user.check_password(self.new_password))
|
|
|
|
def test_change_password_wrong_old_password(self):
|
|
"""Test password change fails with wrong old password"""
|
|
data = self.password_data.copy()
|
|
data["old_password"] = fake.password(length=12, special_chars=False)
|
|
|
|
response = self.client.post(self.password_change_url, data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
|
self.assertIn("error", response.data)
|
|
|
|
def test_change_password_passwords_do_not_match(self):
|
|
"""Test password change fails when new passwords don't match"""
|
|
data = self.password_data.copy()
|
|
data["new_password_confirm"] = fake.password(length=12, special_chars=False)
|
|
|
|
response = self.client.post(self.password_change_url, data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
|
self.assertIn("non_field_errors", response.data)
|
|
|
|
|
|
class TokenRefreshViewTest(APITestCase):
|
|
"""Tests for TokenRefreshView"""
|
|
|
|
def setUp(self):
|
|
self.user = UserFactory.create_user()
|
|
self.refresh_url = reverse("api_v1:user:token_refresh")
|
|
self.tokens = UserService.get_tokens_for_user(self.user)
|
|
|
|
def test_refresh_token_success(self):
|
|
"""Test successful token refresh"""
|
|
data = {"refresh": self.tokens["refresh"]}
|
|
response = self.client.post(self.refresh_url, data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertIn("access", response.data)
|
|
self.assertIn("refresh", response.data)
|
|
# New refresh token should be different
|
|
# Refresh token may be the same or different depending on implementation
|
|
|
|
def test_refresh_token_invalid(self):
|
|
"""Test token refresh fails with invalid refresh token"""
|
|
data = {"refresh": fake.pystr(min_chars=20, max_chars=50)}
|
|
response = self.client.post(self.refresh_url, data, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
self.assertIn("error", response.data)
|
|
|
|
def test_refresh_token_missing(self):
|
|
"""Test token refresh fails without refresh token"""
|
|
response = self.client.post(self.refresh_url, {}, format="json")
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
|
self.assertIn("error", response.data)
|