Files
state-corp-backend/tests/apps/user/test_views.py

414 lines
16 KiB
Python

"""Tests for user DRF views"""
from datetime import datetime, timedelta
from apps.core.models import BackgroundJob, JobStatus
from apps.user.models import Profile
from apps.user.services import UserService
from django.contrib.auth import get_user_model
from django.urls import reverse
from faker import Faker
from rest_framework import status
from rest_framework.test import APITestCase
from .factories import ProfileFactory, UserFactory
User = get_user_model()
fake = Faker("ru_RU")
class RegisterViewTest(APITestCase):
"""Tests for RegisterView"""
def setUp(self):
self.register_url = reverse("api_v1:user:register")
self.password = fake.password(length=12, special_chars=False)
self.user_data = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"password": self.password,
"password_confirm": self.password,
"phone": f"+7{fake.numerify('##########')}",
}
def test_register_success(self):
"""Test successful user registration"""
response = self.client.post(self.register_url, self.user_data, format="json")
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertIn("user", response.data)
self.assertIn("tokens", response.data)
self.assertIn("refresh", response.data["tokens"])
self.assertIn("access", response.data["tokens"])
# Verify user was created
self.assertTrue(User.objects.filter(email=self.user_data["email"]).exists())
def test_register_passwords_do_not_match(self):
"""Test registration fails when passwords don't match"""
data = self.user_data.copy()
data["password_confirm"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("non_field_errors", response.data)
def test_register_duplicate_email(self):
"""Test registration fails with duplicate email"""
# Create existing user
existing_user = UserFactory.create_user()
# Use the same email as existing user
data = self.user_data.copy()
data["email"] = existing_user.email
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("email", response.data)
def test_register_short_password(self):
"""Test registration fails with short password"""
short_password = fake.pystr(min_chars=3, max_chars=5)
data = self.user_data.copy()
data["password"] = short_password
data["password_confirm"] = short_password
response = self.client.post(self.register_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("password", response.data)
class LoginViewTest(APITestCase):
"""Tests for LoginView"""
def setUp(self):
self.login_url = reverse("api_v1:user:login")
self.password = fake.password(length=12, special_chars=False)
self.user = UserFactory.create_user(password=self.password)
self.login_data = {"username": self.user.username, "password": self.password}
def test_login_success(self):
"""Test successful login"""
response = self.client.post(self.login_url, self.login_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("refresh", response.data)
self.assertIn("access", response.data)
def test_login_invalid_credentials(self):
"""Test login fails with invalid credentials"""
data = self.login_data.copy()
data["password"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.login_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.assertIn("error", response.data)
def test_login_nonexistent_user(self):
"""Test login fails for nonexistent user"""
data = {
"username": fake.unique.user_name(),
"password": fake.password(length=12, special_chars=False),
}
response = self.client.post(self.login_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
class CurrentUserViewTest(APITestCase):
"""Tests for CurrentUserView"""
def setUp(self):
self.user = UserFactory.create_user()
ProfileFactory.create_profile(user=self.user, mid_name="")
self.current_user_url = reverse("api_v1:user:current_user")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
def test_get_current_user_authenticated(self):
"""Test getting current user when authenticated"""
response = self.client.get(self.current_user_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], self.user.id)
self.assertEqual(response.data["email"], self.user.email)
self.assertEqual(response.data["is_active"], self.user.is_active)
self.assertIn("profile", response.data)
self.assertEqual(response.data["profile"]["middle_name"], "")
self.assertEqual(response.data["role"], "user")
self.assertEqual(response.data["capabilities"]["can_access_admin_page"], False)
def test_get_current_user_returns_admin_role_for_staff_user(self):
self.user.is_staff = True
self.user.save(update_fields=["is_staff"])
response = self.client.get(self.current_user_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["role"], "staff")
self.assertEqual(response.data["capabilities"]["can_access_admin_page"], True)
def test_get_current_user_unauthenticated(self):
"""Test getting current user when unauthenticated"""
self.client.credentials() # Remove auth header
response = self.client.get(self.current_user_url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
class AdminUsersManagementViewTest(APITestCase):
"""Tests for admin users endpoint."""
def setUp(self):
self.admin_user = UserFactory.create_user(is_staff=True)
self.regular_user = UserFactory.create_user()
self.job_user = UserFactory.create_user()
self.client.force_authenticate(self.admin_user)
self.url = reverse("api_v1:user:admin_users")
ProfileFactory.create_profile(
user=self.job_user,
first_name="Иван",
mid_name="Сергеевич",
last_name="Петров",
)
now = datetime(2026, 4, 14, 10, 0, 0)
completed_job = now + timedelta(minutes=3)
failed_job = now - timedelta(minutes=10)
BackgroundJob.objects.create(
task_id="admin-management-task",
task_name="apps.forms.process",
user_id=self.job_user.id,
status=JobStatus.SUCCESS,
progress=100,
progress_message="Готово",
result={"processed": 10},
started_at=now,
completed_at=completed_job,
created_at=completed_job,
updated_at=completed_job,
)
BackgroundJob.objects.create(
task_id="admin-management-task-failed",
task_name="apps.forms.process",
user_id=self.job_user.id,
status=JobStatus.FAILURE,
progress=100,
progress_message="Неуспешно",
error="error",
started_at=failed_job,
completed_at=now - timedelta(minutes=50),
created_at=failed_job,
updated_at=failed_job,
)
def test_admin_users_list_contains_import_metrics_and_profile_fields(self):
response = self.client.get(self.url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("results", response.data)
results = response.data["results"]
row_map = {row["id"]: row for row in results}
job_user_row = row_map[self.job_user.id]
regular_user_row = row_map[self.regular_user.id]
self.assertEqual(job_user_row["first_name"], "Иван")
self.assertEqual(job_user_row["middle_name"], "Сергеевич")
self.assertEqual(job_user_row["last_name"], "Петров")
self.assertEqual(job_user_row["progress_message"], "Готово")
self.assertEqual(job_user_row["result"], {"processed": 10})
self.assertIsNone(job_user_row["error"])
self.assertEqual(job_user_row["is_successful"], True)
self.assertEqual(job_user_row["duration"], 180.0)
self.assertIsNotNone(job_user_row["started_at"])
self.assertIsNotNone(job_user_row["completed_at"])
self.assertIsNone(regular_user_row["progress_message"])
self.assertIsNone(regular_user_row["result"])
self.assertIsNone(regular_user_row["error"])
self.assertIsNone(regular_user_row["is_successful"])
def test_non_admin_cannot_access_endpoint(self):
self.client.force_authenticate(self.regular_user)
response = self.client.get(self.url)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
class UserUpdateViewTest(APITestCase):
"""Tests for UserUpdateView"""
def setUp(self):
self.user = UserFactory.create_user()
self.update_url = reverse("api_v1:user:user_update")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
self.update_data = {
"username": fake.unique.user_name(),
"phone": f"+7{fake.numerify('##########')}",
}
def test_update_user_success(self):
"""Test successful user update"""
response = self.client.patch(self.update_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["username"], self.update_data["username"])
self.assertEqual(response.data["phone"], self.update_data["phone"])
# Verify in database
self.user.refresh_from_db()
self.assertEqual(self.user.username, self.update_data["username"])
def test_update_user_unauthenticated(self):
"""Test user update fails when unauthenticated"""
self.client.credentials() # Remove auth header
response = self.client.patch(self.update_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
class ProfileDetailViewTest(APITestCase):
"""Tests for ProfileDetailView"""
def setUp(self):
self.user = UserFactory.create_user()
self.profile = ProfileFactory.create_profile(user=self.user)
self.profile_url = reverse("api_v1:user:profile_detail")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
self.update_data = {
"first_name": fake.first_name(),
"mid_name": fake.first_name(),
"last_name": fake.last_name(),
"bio": fake.text(max_nb_chars=200),
}
def test_get_profile_success(self):
"""Test successful profile retrieval"""
response = self.client.get(self.profile_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["first_name"], self.profile.first_name)
def test_update_profile_success(self):
"""Test successful profile update"""
response = self.client.patch(self.profile_url, self.update_data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["first_name"], self.update_data["first_name"])
self.assertEqual(response.data["mid_name"], self.update_data["mid_name"])
self.assertEqual(response.data["last_name"], self.update_data["last_name"])
# Verify in database
self.profile.refresh_from_db()
self.assertEqual(self.profile.first_name, self.update_data["first_name"])
self.assertEqual(self.profile.mid_name, self.update_data["mid_name"])
def test_profile_created_if_not_exists(self):
"""Test profile is created if it doesn't exist"""
# Delete existing profile
self.profile.delete()
response = self.client.get(self.profile_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Profile should be created automatically
self.assertTrue(Profile.objects.filter(user=self.user).exists())
class PasswordChangeViewTest(APITestCase):
"""Tests for PasswordChangeView"""
def setUp(self):
self.old_password = fake.password(length=12, special_chars=False)
self.new_password = fake.password(length=12, special_chars=False)
self.user = UserFactory.create_user(password=self.old_password)
self.password_change_url = reverse("api_v1:user:password_change")
self.tokens = UserService.get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {self.tokens['access']}")
self.password_data = {
"old_password": self.old_password,
"new_password": self.new_password,
"new_password_confirm": self.new_password,
}
def test_change_password_success(self):
"""Test successful password change"""
response = self.client.post(
self.password_change_url, self.password_data, format="json"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("message", response.data)
# Verify password was changed
self.user.refresh_from_db()
self.assertTrue(self.user.check_password(self.new_password))
def test_change_password_wrong_old_password(self):
"""Test password change fails with wrong old password"""
data = self.password_data.copy()
data["old_password"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.password_change_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("error", response.data)
def test_change_password_passwords_do_not_match(self):
"""Test password change fails when new passwords don't match"""
data = self.password_data.copy()
data["new_password_confirm"] = fake.password(length=12, special_chars=False)
response = self.client.post(self.password_change_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("non_field_errors", response.data)
class TokenRefreshViewTest(APITestCase):
"""Tests for TokenRefreshView"""
def setUp(self):
self.user = UserFactory.create_user()
self.refresh_url = reverse("api_v1:user:token_refresh")
self.tokens = UserService.get_tokens_for_user(self.user)
def test_refresh_token_success(self):
"""Test successful token refresh"""
data = {"refresh": self.tokens["refresh"]}
response = self.client.post(self.refresh_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("access", response.data)
self.assertIn("refresh", response.data)
# New refresh token should be different
# Refresh token may be the same or different depending on implementation
def test_refresh_token_invalid(self):
"""Test token refresh fails with invalid refresh token"""
data = {"refresh": fake.pystr(min_chars=20, max_chars=50)}
response = self.client.post(self.refresh_url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.assertIn("error", response.data)
def test_refresh_token_missing(self):
"""Test token refresh fails without refresh token"""
response = self.client.post(self.refresh_url, {}, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("error", response.data)