feat(core): add core module with mixins, services, and background jobs
- Add Model Mixins: TimestampMixin, SoftDeleteMixin, AuditMixin, etc. - Add Base Services: BaseService, BulkOperationsMixin, QueryOptimizerMixin - Add Base ViewSets with bulk operations - Add BackgroundJob model for Celery task tracking - Add BaseAppCommand for management commands - Add permissions, pagination, filters, cache, logging - Migrate tests to factory_boy + faker - Add CHANGELOG.md - 297 tests passing
This commit is contained in:
671
src/apps/core/services.py
Normal file
671
src/apps/core/services.py
Normal file
@@ -0,0 +1,671 @@
|
||||
"""
|
||||
Base service classes for business logic layer.
|
||||
|
||||
Services encapsulate business logic and are independent of HTTP layer.
|
||||
They are easily testable and can manage transactions.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Generic, TypeVar
|
||||
|
||||
from apps.core.exceptions import NotFoundError
|
||||
from django.db import models, transaction
|
||||
from django.db.models import QuerySet
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Type variable for model
|
||||
M = TypeVar("M", bound=models.Model)
|
||||
|
||||
|
||||
class BaseService(Generic[M]):
|
||||
"""
|
||||
Base service class providing common CRUD operations.
|
||||
|
||||
Usage:
|
||||
class UserService(BaseService[User]):
|
||||
model = User
|
||||
|
||||
@classmethod
|
||||
def create_user(cls, *, email: str, password: str) -> User:
|
||||
# Business logic here
|
||||
user = cls.model.objects.create_user(email=email, password=password)
|
||||
return user
|
||||
"""
|
||||
|
||||
model: type[M]
|
||||
|
||||
@classmethod
|
||||
def get_queryset(cls) -> QuerySet[M]:
|
||||
"""Get base queryset for the model. Override to add default filters."""
|
||||
return cls.model.objects.all()
|
||||
|
||||
@classmethod
|
||||
def get_by_id(cls, pk: Any) -> M:
|
||||
"""
|
||||
Get entity by primary key.
|
||||
|
||||
Raises:
|
||||
NotFoundError: If entity not found
|
||||
"""
|
||||
try:
|
||||
return cls.get_queryset().get(pk=pk)
|
||||
except cls.model.DoesNotExist as e:
|
||||
raise NotFoundError(
|
||||
message=f"{cls.model.__name__} with id={pk} not found",
|
||||
code="not_found",
|
||||
details={"model": cls.model.__name__, "id": pk},
|
||||
) from e
|
||||
|
||||
@classmethod
|
||||
def get_by_id_or_none(cls, pk: Any) -> M | None:
|
||||
"""Get entity by primary key or None if not found."""
|
||||
try:
|
||||
return cls.get_queryset().get(pk=pk)
|
||||
except cls.model.DoesNotExist:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_all(cls) -> QuerySet[M]:
|
||||
"""Get all entities."""
|
||||
return cls.get_queryset()
|
||||
|
||||
@classmethod
|
||||
def filter(cls, **kwargs: Any) -> QuerySet[M]:
|
||||
"""Filter entities by given criteria."""
|
||||
return cls.get_queryset().filter(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def exists(cls, **kwargs: Any) -> bool:
|
||||
"""Check if entity with given criteria exists."""
|
||||
return cls.get_queryset().filter(**kwargs).exists()
|
||||
|
||||
@classmethod
|
||||
def count(cls, **kwargs: Any) -> int:
|
||||
"""Count entities matching criteria."""
|
||||
if kwargs:
|
||||
return cls.get_queryset().filter(**kwargs).count()
|
||||
return cls.get_queryset().count()
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def create(cls, **kwargs: Any) -> M:
|
||||
"""
|
||||
Create new entity.
|
||||
|
||||
Override this method to add business logic before/after creation.
|
||||
"""
|
||||
return cls.model.objects.create(**kwargs)
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def update(cls, instance: M, **kwargs: Any) -> M:
|
||||
"""
|
||||
Update entity fields.
|
||||
|
||||
Override this method to add business logic before/after update.
|
||||
"""
|
||||
for field, value in kwargs.items():
|
||||
setattr(instance, field, value)
|
||||
instance.save(update_fields=list(kwargs.keys()))
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def delete(cls, instance: M) -> None:
|
||||
"""
|
||||
Delete entity.
|
||||
|
||||
Override this method to implement soft delete or add business logic.
|
||||
"""
|
||||
instance.delete()
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def bulk_create(cls, instances: list[M], **kwargs: Any) -> list[M]:
|
||||
"""Bulk create entities."""
|
||||
return cls.model.objects.bulk_create(instances, **kwargs)
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def bulk_update(cls, instances: list[M], fields: list[str], **kwargs: Any) -> int:
|
||||
"""Bulk update entities."""
|
||||
return cls.model.objects.bulk_update(instances, fields, **kwargs)
|
||||
|
||||
|
||||
class BaseReadOnlyService(Generic[M]):
|
||||
"""
|
||||
Read-only service for entities that should not be modified via API.
|
||||
|
||||
Useful for reference data, logs, audit trails, etc.
|
||||
"""
|
||||
|
||||
model: type[M]
|
||||
|
||||
@classmethod
|
||||
def get_queryset(cls) -> QuerySet[M]:
|
||||
"""Get base queryset for the model."""
|
||||
return cls.model.objects.all()
|
||||
|
||||
@classmethod
|
||||
def get_by_id(cls, pk: Any) -> M:
|
||||
"""Get entity by primary key."""
|
||||
try:
|
||||
return cls.get_queryset().get(pk=pk)
|
||||
except cls.model.DoesNotExist as e:
|
||||
raise NotFoundError(
|
||||
message=f"{cls.model.__name__} with id={pk} not found",
|
||||
code="not_found",
|
||||
) from e
|
||||
|
||||
@classmethod
|
||||
def get_all(cls) -> QuerySet[M]:
|
||||
"""Get all entities."""
|
||||
return cls.get_queryset()
|
||||
|
||||
@classmethod
|
||||
def filter(cls, **kwargs: Any) -> QuerySet[M]:
|
||||
"""Filter entities by given criteria."""
|
||||
return cls.get_queryset().filter(**kwargs)
|
||||
|
||||
|
||||
class TransactionMixin:
|
||||
"""
|
||||
Mixin providing transaction helpers for services.
|
||||
|
||||
Usage:
|
||||
class PaymentService(TransactionMixin, BaseService[Payment]):
|
||||
@classmethod
|
||||
def process_payment(cls, order_id: int) -> Payment:
|
||||
with cls.atomic():
|
||||
# Multiple operations in single transaction
|
||||
...
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def atomic(cls):
|
||||
"""Get atomic transaction context manager."""
|
||||
return transaction.atomic()
|
||||
|
||||
@classmethod
|
||||
def on_commit(cls, func):
|
||||
"""Register function to be called after transaction commits."""
|
||||
transaction.on_commit(func)
|
||||
|
||||
@classmethod
|
||||
def savepoint(cls):
|
||||
"""Create a savepoint within current transaction."""
|
||||
return transaction.savepoint()
|
||||
|
||||
@classmethod
|
||||
def savepoint_rollback(cls, sid):
|
||||
"""Rollback to a savepoint."""
|
||||
transaction.savepoint_rollback(sid)
|
||||
|
||||
@classmethod
|
||||
def savepoint_commit(cls, sid):
|
||||
"""Commit a savepoint."""
|
||||
transaction.savepoint_commit(sid)
|
||||
|
||||
|
||||
class BulkOperationsMixin:
|
||||
"""
|
||||
Миксин для расширенных массовых операций.
|
||||
|
||||
Дополняет BaseService методами:
|
||||
- bulk_create_chunked: создание чанками для больших данных
|
||||
- bulk_update_or_create: upsert операция
|
||||
- bulk_delete: удаление по списку ID
|
||||
- bulk_update_fields: обновление полей по фильтру
|
||||
|
||||
Использование:
|
||||
class ProductService(BulkOperationsMixin, BaseService[Product]):
|
||||
model = Product
|
||||
|
||||
# Создание 10000 записей чанками по 500
|
||||
ProductService.bulk_create_chunked(products, chunk_size=500)
|
||||
|
||||
# Upsert по уникальному полю
|
||||
ProductService.bulk_update_or_create(
|
||||
items=data,
|
||||
unique_fields=['sku'],
|
||||
update_fields=['price', 'quantity']
|
||||
)
|
||||
"""
|
||||
|
||||
model: type[models.Model]
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def bulk_create_chunked(
|
||||
cls,
|
||||
instances: list,
|
||||
*,
|
||||
chunk_size: int = 500,
|
||||
ignore_conflicts: bool = False,
|
||||
update_conflicts: bool = False,
|
||||
update_fields: list[str] | None = None,
|
||||
unique_fields: list[str] | None = None,
|
||||
) -> int:
|
||||
"""
|
||||
Массовое создание чанками для больших объёмов.
|
||||
|
||||
Args:
|
||||
instances: Список объектов для создания
|
||||
chunk_size: Размер чанка (по умолчанию 500)
|
||||
ignore_conflicts: Игнорировать конфликты
|
||||
update_conflicts: Обновлять при конфликтах (upsert)
|
||||
update_fields: Поля для обновления при конфликте
|
||||
unique_fields: Уникальные поля для определения конфликта
|
||||
|
||||
Returns:
|
||||
Количество созданных записей
|
||||
"""
|
||||
total_created = 0
|
||||
|
||||
for i in range(0, len(instances), chunk_size):
|
||||
chunk = instances[i : i + chunk_size]
|
||||
kwargs = {
|
||||
"ignore_conflicts": ignore_conflicts,
|
||||
}
|
||||
|
||||
# Django 4.1+ поддерживает update_conflicts
|
||||
if update_conflicts and update_fields and unique_fields:
|
||||
kwargs["update_conflicts"] = True
|
||||
kwargs["update_fields"] = update_fields
|
||||
kwargs["unique_fields"] = unique_fields
|
||||
|
||||
created = cls.model.objects.bulk_create(chunk, **kwargs)
|
||||
total_created += len(created)
|
||||
|
||||
return total_created
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def bulk_update_or_create(
|
||||
cls,
|
||||
items: list[dict],
|
||||
*,
|
||||
unique_fields: list[str],
|
||||
update_fields: list[str],
|
||||
create_defaults: dict | None = None,
|
||||
) -> tuple[int, int]:
|
||||
"""
|
||||
Upsert: обновить существующие или создать новые.
|
||||
|
||||
Args:
|
||||
items: Список словарей с данными
|
||||
unique_fields: Поля для поиска существующих
|
||||
update_fields: Поля для обновления
|
||||
create_defaults: Значения по умолчанию для создания
|
||||
|
||||
Returns:
|
||||
(created_count, updated_count)
|
||||
"""
|
||||
created_count = 0
|
||||
updated_count = 0
|
||||
defaults = create_defaults or {}
|
||||
|
||||
for item in items:
|
||||
lookup = {field: item[field] for field in unique_fields}
|
||||
update_data = {
|
||||
field: item[field] for field in update_fields if field in item
|
||||
}
|
||||
|
||||
obj, created = cls.model.objects.update_or_create(
|
||||
**lookup,
|
||||
defaults={**update_data, **defaults},
|
||||
)
|
||||
|
||||
if created:
|
||||
created_count += 1
|
||||
else:
|
||||
updated_count += 1
|
||||
|
||||
return created_count, updated_count
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def bulk_delete(
|
||||
cls,
|
||||
ids: list,
|
||||
*,
|
||||
hard_delete: bool = True,
|
||||
) -> int:
|
||||
"""
|
||||
Массовое удаление по списку ID.
|
||||
|
||||
Args:
|
||||
ids: Список ID для удаления
|
||||
hard_delete: Физическое удаление (игнорирует SoftDelete)
|
||||
|
||||
Returns:
|
||||
Количество удалённых записей
|
||||
"""
|
||||
queryset = cls.model.objects.filter(pk__in=ids)
|
||||
|
||||
if hard_delete:
|
||||
# Для SoftDelete моделей используем all_objects
|
||||
if hasattr(cls.model, "all_objects"):
|
||||
queryset = cls.model.all_objects.filter(pk__in=ids)
|
||||
deleted, _ = queryset.delete()
|
||||
else:
|
||||
# Мягкое удаление
|
||||
from django.utils import timezone
|
||||
|
||||
deleted = queryset.update(is_deleted=True, deleted_at=timezone.now())
|
||||
|
||||
return deleted
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def bulk_update_fields(
|
||||
cls,
|
||||
filters: dict,
|
||||
updates: dict,
|
||||
) -> int:
|
||||
"""
|
||||
Массовое обновление полей по фильтру.
|
||||
|
||||
Args:
|
||||
filters: Фильтры для выборки
|
||||
updates: Поля и значения для обновления
|
||||
|
||||
Returns:
|
||||
Количество обновлённых записей
|
||||
|
||||
Пример:
|
||||
ProductService.bulk_update_fields(
|
||||
filters={'category': 'electronics'},
|
||||
updates={'discount': 10, 'is_featured': True}
|
||||
)
|
||||
"""
|
||||
return cls.model.objects.filter(**filters).update(**updates)
|
||||
|
||||
|
||||
class QueryOptimizerMixin:
|
||||
"""
|
||||
Миксин для автоматической оптимизации запросов.
|
||||
|
||||
Декларативный подход к select_related/prefetch_related.
|
||||
|
||||
Атрибуты:
|
||||
select_related: Список полей для select_related
|
||||
prefetch_related: Список полей для prefetch_related
|
||||
default_only: Поля для only() (ограничение столбцов)
|
||||
default_defer: Поля для defer() (исключение столбцов)
|
||||
|
||||
Использование:
|
||||
class OrderService(QueryOptimizerMixin, BaseService[Order]):
|
||||
model = Order
|
||||
select_related = ['user', 'shipping_address']
|
||||
prefetch_related = ['items', 'items__product']
|
||||
default_defer = ['description', 'internal_notes']
|
||||
|
||||
# Автоматически применяет оптимизации
|
||||
orders = OrderService.get_optimized_queryset()
|
||||
"""
|
||||
|
||||
model: type[models.Model]
|
||||
select_related: list[str] = []
|
||||
prefetch_related: list[str] = []
|
||||
default_only: list[str] = []
|
||||
default_defer: list[str] = []
|
||||
|
||||
@classmethod
|
||||
def get_optimized_queryset(cls) -> QuerySet:
|
||||
"""
|
||||
Получить оптимизированный queryset.
|
||||
|
||||
Применяет все объявленные оптимизации.
|
||||
"""
|
||||
queryset = cls.model.objects.all()
|
||||
return cls.apply_optimizations(queryset)
|
||||
|
||||
@classmethod
|
||||
def apply_optimizations(
|
||||
cls,
|
||||
queryset: QuerySet,
|
||||
*,
|
||||
include_select: bool = True,
|
||||
include_prefetch: bool = True,
|
||||
include_only: bool = True,
|
||||
include_defer: bool = True,
|
||||
) -> QuerySet:
|
||||
"""
|
||||
Применить оптимизации к queryset.
|
||||
|
||||
Args:
|
||||
queryset: Исходный queryset
|
||||
include_select: Применять select_related
|
||||
include_prefetch: Применять prefetch_related
|
||||
include_only: Применять only()
|
||||
include_defer: Применять defer()
|
||||
"""
|
||||
if include_select and cls.select_related:
|
||||
queryset = queryset.select_related(*cls.select_related)
|
||||
|
||||
if include_prefetch and cls.prefetch_related:
|
||||
queryset = queryset.prefetch_related(*cls.prefetch_related)
|
||||
|
||||
if include_only and cls.default_only:
|
||||
queryset = queryset.only(*cls.default_only)
|
||||
|
||||
if include_defer and cls.default_defer:
|
||||
queryset = queryset.defer(*cls.default_defer)
|
||||
|
||||
return queryset
|
||||
|
||||
@classmethod
|
||||
def get_list_queryset(cls) -> QuerySet:
|
||||
"""
|
||||
Queryset для списков (может исключать тяжёлые поля).
|
||||
"""
|
||||
return cls.apply_optimizations(
|
||||
cls.model.objects.all(),
|
||||
include_only=True,
|
||||
include_defer=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_detail_queryset(cls) -> QuerySet:
|
||||
"""
|
||||
Queryset для детального просмотра (все поля).
|
||||
"""
|
||||
return cls.apply_optimizations(
|
||||
cls.model.objects.all(),
|
||||
include_only=False,
|
||||
include_defer=False,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def with_counts(cls, queryset: QuerySet, *count_fields: str) -> QuerySet:
|
||||
"""
|
||||
Добавить аннотации Count.
|
||||
|
||||
Args:
|
||||
queryset: Исходный queryset
|
||||
count_fields: Поля для подсчёта
|
||||
|
||||
Пример:
|
||||
# Добавит items_count и reviews_count
|
||||
qs = ProductService.with_counts(qs, 'items', 'reviews')
|
||||
"""
|
||||
from django.db.models import Count
|
||||
|
||||
annotations = {f"{field}_count": Count(field) for field in count_fields}
|
||||
return queryset.annotate(**annotations)
|
||||
|
||||
@classmethod
|
||||
def with_exists(cls, queryset: QuerySet, **subqueries: QuerySet) -> QuerySet:
|
||||
"""
|
||||
Добавить аннотации Exists.
|
||||
|
||||
Пример:
|
||||
from apps.reviews.models import Review
|
||||
qs = ProductService.with_exists(
|
||||
qs,
|
||||
has_reviews=Review.objects.filter(product=OuterRef('pk'))
|
||||
)
|
||||
"""
|
||||
from django.db.models import Exists
|
||||
|
||||
annotations = {name: Exists(subquery) for name, subquery in subqueries.items()}
|
||||
return queryset.annotate(**annotations)
|
||||
|
||||
|
||||
class BackgroundJobService(BaseReadOnlyService):
|
||||
"""
|
||||
Сервис для управления фоновыми задачами.
|
||||
|
||||
Использование:
|
||||
# Создание задачи
|
||||
job = BackgroundJobService.create_job(
|
||||
task_id="abc-123",
|
||||
task_name="apps.myapp.tasks.process_data",
|
||||
user_id=request.user.id,
|
||||
)
|
||||
|
||||
# Получение статуса
|
||||
job = BackgroundJobService.get_by_task_id("abc-123")
|
||||
|
||||
# Список задач пользователя
|
||||
jobs = BackgroundJobService.get_user_jobs(user_id=1)
|
||||
"""
|
||||
|
||||
# Импорт модели внутри методов для избежания circular import
|
||||
|
||||
@classmethod
|
||||
def get_model(cls):
|
||||
"""Ленивый импорт модели."""
|
||||
from apps.core.models import BackgroundJob
|
||||
|
||||
return BackgroundJob
|
||||
|
||||
@classmethod
|
||||
def get_queryset(cls):
|
||||
"""Get base queryset."""
|
||||
return cls.get_model().objects.all()
|
||||
|
||||
@classmethod
|
||||
def create_job(
|
||||
cls,
|
||||
*,
|
||||
task_id: str,
|
||||
task_name: str,
|
||||
user_id: int | None = None,
|
||||
meta: dict | None = None,
|
||||
):
|
||||
"""
|
||||
Создать запись о фоновой задаче.
|
||||
|
||||
Args:
|
||||
task_id: ID задачи Celery
|
||||
task_name: Имя задачи
|
||||
user_id: ID пользователя (опционально)
|
||||
meta: Дополнительные метаданные
|
||||
|
||||
Returns:
|
||||
BackgroundJob instance
|
||||
"""
|
||||
BackgroundJob = cls.get_model()
|
||||
return BackgroundJob.objects.create(
|
||||
task_id=task_id,
|
||||
task_name=task_name,
|
||||
user_id=user_id,
|
||||
meta=meta or {},
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_by_task_id(cls, task_id: str):
|
||||
"""
|
||||
Получить задачу по ID Celery.
|
||||
|
||||
Raises:
|
||||
NotFoundError: Если задача не найдена
|
||||
"""
|
||||
BackgroundJob = cls.get_model()
|
||||
try:
|
||||
return BackgroundJob.objects.get(task_id=task_id)
|
||||
except BackgroundJob.DoesNotExist as e:
|
||||
raise NotFoundError(
|
||||
message=f"Job with task_id={task_id} not found",
|
||||
code="job_not_found",
|
||||
) from e
|
||||
|
||||
@classmethod
|
||||
def get_by_task_id_or_none(cls, task_id: str):
|
||||
"""Получить задачу по ID или None."""
|
||||
BackgroundJob = cls.get_model()
|
||||
try:
|
||||
return BackgroundJob.objects.get(task_id=task_id)
|
||||
except BackgroundJob.DoesNotExist:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_user_jobs(
|
||||
cls,
|
||||
user_id: int,
|
||||
*,
|
||||
status: str | None = None,
|
||||
limit: int = 50,
|
||||
):
|
||||
"""
|
||||
Получить задачи пользователя.
|
||||
|
||||
Args:
|
||||
user_id: ID пользователя
|
||||
status: Фильтр по статусу (опционально)
|
||||
limit: Максимальное количество записей
|
||||
|
||||
Returns:
|
||||
QuerySet задач
|
||||
"""
|
||||
qs = cls.get_queryset().filter(user_id=user_id)
|
||||
if status:
|
||||
qs = qs.filter(status=status)
|
||||
return qs[:limit]
|
||||
|
||||
@classmethod
|
||||
def get_active_jobs(cls, user_id: int | None = None):
|
||||
"""
|
||||
Получить активные (незавершённые) задачи.
|
||||
|
||||
Args:
|
||||
user_id: Фильтр по пользователю (опционально)
|
||||
"""
|
||||
from apps.core.models import JobStatus
|
||||
|
||||
qs = cls.get_queryset().filter(
|
||||
status__in=[JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY]
|
||||
)
|
||||
if user_id:
|
||||
qs = qs.filter(user_id=user_id)
|
||||
return qs
|
||||
|
||||
@classmethod
|
||||
def cleanup_old_jobs(cls, *, days: int = 30) -> int:
|
||||
"""
|
||||
Удалить старые завершённые задачи.
|
||||
|
||||
Args:
|
||||
days: Количество дней (задачи старше будут удалены)
|
||||
|
||||
Returns:
|
||||
Количество удалённых записей
|
||||
"""
|
||||
from datetime import timedelta
|
||||
|
||||
from apps.core.models import JobStatus
|
||||
from django.utils import timezone
|
||||
|
||||
cutoff = timezone.now() - timedelta(days=days)
|
||||
deleted, _ = (
|
||||
cls.get_queryset()
|
||||
.filter(
|
||||
status__in=[JobStatus.SUCCESS, JobStatus.FAILURE, JobStatus.REVOKED],
|
||||
completed_at__lt=cutoff,
|
||||
)
|
||||
.delete()
|
||||
)
|
||||
return deleted
|
||||
Reference in New Issue
Block a user