Files
mostovik-backend/src/apps/user/services.py
Aleksandr Meshchriakov 15285f2f49
Some checks failed
CI/CD Pipeline / Telegram Notify Success (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (push) Has been cancelled
CI/CD Pipeline / Run Tests (push) Has been cancelled
CI/CD Pipeline / Telegram Notify Success (pull_request) Has been cancelled
CI/CD Pipeline / Code Quality Checks (pull_request) Has been cancelled
CI/CD Pipeline / Run Tests (pull_request) Has been cancelled
fix: stabilize profile name migration and parser validation imports
2026-03-22 13:31:23 +01:00

517 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any
from apps.core.exceptions import NotFoundError
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.db import transaction
from django.db.models import F, Q
from rest_framework_simplejwt.token_blacklist.models import (
BlacklistedToken,
OutstandingToken,
)
from rest_framework_simplejwt.tokens import RefreshToken
from .models import Profile
User = get_user_model()
class UserService:
"""Сервисный слой для работы с пользователями"""
ROLE_USER = "user"
ROLE_ADMIN = "admin"
ROLE_CHOICES = (
(ROLE_USER, "Пользователь"),
(ROLE_ADMIN, "Администратор"),
)
ROLE_LABELS = dict(ROLE_CHOICES)
SETTINGS_SECTION_EXCHANGE = "exchange"
SETTINGS_SECTION_PARSERS = "parsers"
SETTINGS_SECTION_DATABASE_CONNECTION = "database_connection"
SETTINGS_SECTION_REGISTERS_UPLOAD = "registers_upload"
@classmethod
def create_user(
cls,
*,
email: str,
username: str,
password: str,
role: str | None = None,
**extra_fields,
) -> User:
"""
Создает нового пользователя
Args:
email: Email пользователя
username: Username пользователя
password: Пароль
**extra_fields: Дополнительные поля
Returns:
User: Созданный пользователь
Raises:
ValidationError: При некорректных данных
"""
role = role or cls.ROLE_USER
first_name = extra_fields.pop("first_name", None)
middle_name = extra_fields.pop("middle_name", None)
last_name = extra_fields.pop("last_name", None)
with transaction.atomic():
user = User.objects.create_user(
email=email, username=username, password=password, **extra_fields
)
cls.assign_role(user, role)
cls._update_or_create_profile(
user=user,
first_name=first_name or user.username,
middle_name=middle_name or "",
last_name=last_name or user.username,
)
return cls.get_users_queryset().get(id=user.id)
@classmethod
def get_users_queryset(cls):
"""Базовый queryset для админского списка пользователей."""
return (
User.objects.all()
.select_related("profile")
.prefetch_related("groups")
.order_by("-created_at")
)
@classmethod
def get_filtered_users_queryset(
cls,
*,
search: str = "",
ordering: str = "",
):
"""Queryset списка пользователей с поиском и сортировкой для админского UI."""
queryset = cls.get_users_queryset()
search_term = search.strip()
if search_term:
queryset = queryset.filter(
Q(username__icontains=search_term)
| Q(email__icontains=search_term)
| Q(phone__icontains=search_term)
| Q(profile__first_name__icontains=search_term)
| Q(profile__middle_name__icontains=search_term)
| Q(profile__last_name__icontains=search_term)
).distinct()
ordering_fields = []
ordering_map = {
"id": ("id", False),
"email": ("email", False),
"username": ("username", False),
"phone": ("phone", False),
"is_active": ("is_active", False),
"is_verified": ("is_verified", False),
"created_at": ("created_at", False),
"updated_at": ("updated_at", False),
"first_name": ("profile__first_name", True),
"middle_name": ("profile__middle_name", True),
"last_name": ("profile__last_name", True),
"role": ("is_staff", False),
}
for raw_field in (item.strip() for item in ordering.split(",") if item.strip()):
is_desc = raw_field.startswith("-")
field_name = raw_field[1:] if is_desc else raw_field
mapped_config = ordering_map.get(field_name)
if not mapped_config:
continue
mapped_field, nulls_last = mapped_config
if nulls_last:
ordering_fields.append(
F(mapped_field).desc(nulls_last=True)
if is_desc
else F(mapped_field).asc(nulls_last=True)
)
continue
ordering_fields.append(f"-{mapped_field}" if is_desc else mapped_field)
if ordering_fields:
queryset = queryset.order_by(*ordering_fields, "-created_at")
return queryset
@classmethod
def get_user_by_email(cls, email: str) -> User:
"""Получает пользователя по email
Raises:
NotFoundError: Если пользователь не найден
"""
try:
return User.objects.get(email=email)
except User.DoesNotExist as e:
raise NotFoundError(
message=f"User with email={email} not found",
details={"email": email},
) from e
@classmethod
def get_user_by_email_or_none(cls, email: str) -> User | None:
"""Получает пользователя по email или None"""
try:
return User.objects.get(email=email)
except User.DoesNotExist:
return None
@classmethod
def get_user_by_id(cls, user_id: int) -> User:
"""Получает пользователя по ID
Raises:
NotFoundError: Если пользователь не найден
"""
try:
return User.objects.get(id=user_id)
except User.DoesNotExist as e:
raise NotFoundError(
message=f"User with id={user_id} not found",
details={"user_id": user_id},
) from e
@classmethod
def get_user_by_id_or_none(cls, user_id: int) -> User | None:
"""Получает пользователя по ID или None"""
try:
return User.objects.get(id=user_id)
except User.DoesNotExist:
return None
@classmethod
def update_user(cls, user_id: int, **fields) -> User:
"""
Обновляет данные пользователя
Args:
user_id: ID пользователя
**fields: Поля для обновления
Returns:
User: Обновленный пользователь
Raises:
NotFoundError: Если пользователь не найден
"""
user = cls.get_user_by_id(user_id)
for field, value in fields.items():
setattr(user, field, value)
user.save()
return user
@classmethod
@transaction.atomic
def create_managed_user(
cls,
*,
email: str,
username: str,
password: str,
role: str,
first_name: str,
last_name: str,
middle_name: str | None = None,
**extra_fields,
) -> User:
"""Создаёт пользователя администратором и назначает роль."""
user = cls.create_user(
email=email,
username=username,
password=password,
role=role,
first_name=first_name,
middle_name=middle_name,
last_name=last_name,
**extra_fields,
)
return cls.get_users_queryset().get(id=user.id)
@classmethod
@transaction.atomic
def update_managed_user(cls, user_id: int, **fields) -> User:
"""Обновляет пользователя и его роль из админского интерфейса."""
user = cls.get_user_by_id(user_id)
role = fields.pop("role", None)
password = fields.pop("password", None)
profile_fields = {
key: fields.pop(key)
for key in ("first_name", "middle_name", "last_name")
if key in fields
}
for field, value in fields.items():
setattr(user, field, value)
if password:
user.set_password(password)
user.save()
if role is not None:
cls.assign_role(user, role)
if profile_fields:
cls._update_or_create_profile(user=user, **profile_fields)
return cls.get_users_queryset().get(id=user.id)
@classmethod
def deactivate_user(cls, user_id: int) -> User:
"""Деактивирует пользователя без физического удаления."""
user = cls.get_user_by_id(user_id)
user.is_active = False
user.save(update_fields=["is_active"])
return user
@classmethod
def activate_user(cls, user_id: int) -> User:
"""Активирует пользователя."""
user = cls.get_user_by_id(user_id)
user.is_active = True
user.save(update_fields=["is_active"])
return user
@classmethod
def delete_user(cls, user_id: int) -> None:
"""
Удаляет пользователя
Args:
user_id: ID пользователя
Raises:
NotFoundError: Если пользователь не найден
"""
user = cls.get_user_by_id(user_id)
user.delete()
@classmethod
def get_tokens_for_user(cls, user: User) -> dict[str, str]:
"""
Генерирует JWT токены для пользователя
Args:
user: Пользователь
Returns:
Dict[str, str]: refresh и access токены
"""
refresh = RefreshToken.for_user(user)
return {
"refresh": str(refresh),
"access": str(refresh.access_token),
}
@classmethod
def logout_user(cls, user: User) -> int:
"""Отзывает все активные refresh-токены пользователя."""
outstanding_tokens = OutstandingToken.objects.filter(user=user)
revoked_count = 0
for token in outstanding_tokens:
_, created = BlacklistedToken.objects.get_or_create(token=token)
revoked_count += int(created)
return revoked_count
@classmethod
def ensure_role_groups(cls) -> dict[str, Group]:
"""Гарантирует существование системных role-групп."""
groups: dict[str, Group] = {}
for role, _label in cls.ROLE_CHOICES:
group, _ = Group.objects.get_or_create(name=role)
groups[role] = group
return groups
@classmethod
def get_user_role(cls, user: User) -> str:
"""Возвращает прикладную роль пользователя."""
if user.is_superuser or user.is_staff:
return cls.ROLE_ADMIN
group_names = {group.name for group in user.groups.all()}
if cls.ROLE_ADMIN in group_names:
return cls.ROLE_ADMIN
return cls.ROLE_USER
@classmethod
def get_role_label(cls, role: str) -> str:
"""Возвращает человекочитаемое название роли."""
return cls.ROLE_LABELS.get(role, role)
@classmethod
def get_user_capabilities(cls, user: User) -> dict[str, Any]:
"""Возвращает фронтовые capability flags по роли пользователя."""
is_admin = cls.get_user_role(user) == cls.ROLE_ADMIN
return {
"can_manage_users": is_admin,
"can_manage_exchange": is_admin,
"can_manage_parsers": is_admin,
"can_manage_database_connection": is_admin,
"can_upload_registers": is_admin,
"can_refresh_dashboard": is_admin,
"settings_sections": (
[
cls.SETTINGS_SECTION_EXCHANGE,
cls.SETTINGS_SECTION_PARSERS,
cls.SETTINGS_SECTION_DATABASE_CONNECTION,
cls.SETTINGS_SECTION_REGISTERS_UPLOAD,
]
if is_admin
else []
),
}
@classmethod
def assign_role(cls, user: User, role: str) -> User:
"""Назначает одну из системных ролей через auth.Group и is_staff."""
if role not in cls.ROLE_LABELS:
raise ValueError(f"Unsupported role: {role}")
groups = cls.ensure_role_groups()
role_group_names = list(groups.keys())
current_role_groups = list(user.groups.filter(name__in=role_group_names))
if current_role_groups:
user.groups.remove(*current_role_groups)
user.groups.add(groups[role])
if role == cls.ROLE_ADMIN:
user.is_staff = True
else:
user.is_staff = False
user.is_superuser = False
user.save()
return user
@classmethod
def verify_email(cls, user_id: int) -> User:
"""
Подтверждает email пользователя
Args:
user_id: ID пользователя
Returns:
User: Обновленный пользователь
Raises:
NotFoundError: Если пользователь не найден
"""
user = cls.get_user_by_id(user_id)
user.is_verified = True
user.save()
return user
@classmethod
def _update_or_create_profile(
cls,
*,
user: User,
first_name: str | None = None,
middle_name: str | None = None,
last_name: str | None = None,
) -> Profile:
profile, _ = Profile.objects.get_or_create(user=user)
if first_name is not None:
profile.first_name = first_name
if middle_name is not None:
profile.middle_name = middle_name
if last_name is not None:
profile.last_name = last_name
profile.save()
return profile
class ProfileService:
"""Сервисный слой для работы с профилями"""
@classmethod
def get_profile_by_user_id(cls, user_id: int) -> Profile:
"""Получает профиль по ID пользователя
Raises:
NotFoundError: Если профиль не найден
"""
try:
return Profile.objects.select_related("user").get(user_id=user_id)
except Profile.DoesNotExist as e:
raise NotFoundError(
message=f"Profile for user_id={user_id} not found",
details={"user_id": user_id},
) from e
@classmethod
def get_profile_by_user_id_or_none(cls, user_id: int) -> Profile | None:
"""Получает профиль по ID пользователя или None"""
try:
return Profile.objects.select_related("user").get(user_id=user_id)
except Profile.DoesNotExist:
return None
@classmethod
def update_profile(cls, user_id: int, **fields) -> Profile:
"""
Обновляет профиль пользователя
Args:
user_id: ID пользователя
**fields: Поля для обновления
Returns:
Profile: Обновленный профиль
Raises:
NotFoundError: Если профиль не найден
"""
profile = cls.get_profile_by_user_id(user_id)
for field, value in fields.items():
setattr(profile, field, value)
profile.save()
return profile
@classmethod
def get_full_profile_data(cls, user_id: int) -> dict[str, Any]:
"""
Получает полные данные пользователя и профиля
Args:
user_id: ID пользователя
Returns:
Dict: Полные данные
Raises:
NotFoundError: Если профиль не найден
"""
profile = cls.get_profile_by_user_id(user_id)
user = profile.user
return {
"id": user.id,
"email": user.email,
"username": user.username,
"is_verified": user.is_verified,
"phone": user.phone,
"first_name": profile.first_name,
"middle_name": profile.middle_name,
"last_name": profile.last_name,
"full_name": profile.full_name,
"bio": profile.bio,
"avatar": profile.avatar.url if profile.avatar else None,
"date_of_birth": profile.date_of_birth,
"created_at": user.created_at,
"updated_at": user.updated_at,
}