Files
mostovik-backend/src/apps/core/views.py
Aleksandr Meshchriakov 4ca6b75393
Some checks failed
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 3m20s
CI/CD Pipeline / Run Tests (pull_request) Successful in 13m45s
CI/CD Pipeline / Run API Inventory E2E Tests (pull_request) Successful in 22s
CI/CD Pipeline / Telegram Notify Success (pull_request) Has been skipped
fix(api): align contracts with frontend md
2026-03-23 13:12:10 +01:00

404 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Health check views for monitoring and orchestration.
Provides endpoints for:
- Basic liveness check (is the app running?)
- Readiness check (is the app ready to serve traffic?)
- Detailed health check (DB, Redis, Celery status)
"""
import json
import logging
import time
from typing import Any
from apps.core.openapi import CommonResponses, ErrorResponses, swagger_tag
from apps.core.serializers import (
BackgroundJobListResponseSerializer,
BackgroundJobListSerializer,
BackgroundJobSerializer,
)
from django.conf import settings
from django.db import connection
from django.http import StreamingHttpResponse
from drf_yasg.utils import swagger_auto_schema
from rest_framework import status
from rest_framework.exceptions import ValidationError
from rest_framework.permissions import AllowAny
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
logger = logging.getLogger(__name__)
# Swagger теги
HEALTH_TAG = swagger_tag("Мониторинг", "monitoring")
JOBS_TAG = swagger_tag("Фоновые задачи", "background_jobs")
class HealthCheckView(APIView):
"""
Комплексная проверка состояния системы.
Возвращает статус всех зависимостей (БД, Redis, Celery).
"""
permission_classes = [AllowAny]
authentication_classes = [] # No auth required
@swagger_auto_schema(
tags=[HEALTH_TAG],
operation_summary="Проверка состояния",
operation_description=(
"Комплексная проверка всех зависимостей системы.\n"
"Возвращает статус: healthy, degraded или unhealthy."
),
responses={
200: "Сервис работает в режиме healthy/degraded",
503: CommonResponses.SERVICE_UNAVAILABLE,
**ErrorResponses.PUBLIC,
},
)
def get(self, request: Request) -> Response:
"""Run all health checks and return status."""
checks = {}
overall_status = "healthy"
# Database check
db_check = self._check_database()
checks["database"] = db_check
if db_check["status"] != "up":
overall_status = "unhealthy"
# Redis check
redis_check = self._check_redis()
checks["redis"] = redis_check
if redis_check["status"] != "up" and overall_status == "healthy":
overall_status = "degraded"
# Celery check (optional, may be slow)
if request.query_params.get("include_celery", "").lower() == "true":
celery_check = self._check_celery()
checks["celery"] = celery_check
if celery_check["status"] != "up" and overall_status == "healthy":
overall_status = "degraded"
response_data = {
"status": overall_status,
"version": getattr(settings, "APP_VERSION", "1.0.0"),
"checks": checks,
}
# 503 only for unhealthy (critical services down)
# 200 for healthy and degraded (non-critical services down)
status_code = (
status.HTTP_503_SERVICE_UNAVAILABLE
if overall_status == "unhealthy"
else status.HTTP_200_OK
)
return Response(response_data, status=status_code)
def _check_database(self) -> dict[str, Any]:
"""Check database connectivity."""
start = time.time()
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
latency = (time.time() - start) * 1000
return {"status": "up", "latency_ms": round(latency, 2)}
except Exception as e:
logger.error(f"Database health check failed: {e}")
return {"status": "down", "error": str(e)}
def _check_redis(self) -> dict[str, Any]:
"""Check Redis connectivity."""
start = time.time()
try:
from django_redis import get_redis_connection
redis_conn = get_redis_connection("default")
redis_conn.ping()
latency = (time.time() - start) * 1000
return {"status": "up", "latency_ms": round(latency, 2)}
except ImportError:
return {"status": "skipped", "reason": "django_redis not installed"}
except Exception as e:
logger.warning(f"Redis health check failed: {e}")
return {"status": "down", "error": str(e)}
def _check_celery(self) -> dict[str, Any]:
"""Check Celery worker availability."""
try:
from core.celery import app as celery_app
inspector = celery_app.control.inspect(timeout=2.0)
active = inspector.active()
if active:
worker_count = len(active)
return {"status": "up", "workers": worker_count}
return {"status": "down", "error": "No active workers"}
except Exception as e:
logger.warning(f"Celery health check failed: {e}")
return {"status": "down", "error": str(e)}
class LivenessView(APIView):
"""
Kubernetes liveness probe.
Проверяет, запущено ли приложение.
"""
permission_classes = [AllowAny]
authentication_classes = []
@swagger_auto_schema(
tags=[HEALTH_TAG],
operation_summary="Liveness probe",
operation_description="Возвращает 200 если приложение запущено.",
responses={
200: "Приложение запущено",
**ErrorResponses.PUBLIC,
},
)
def get(self, request: Request) -> Response:
"""Simple liveness check."""
return Response({"status": "alive"}, status=status.HTTP_200_OK)
class ReadinessView(APIView):
"""
Kubernetes readiness probe.
Проверяет, готово ли приложение обрабатывать запросы.
"""
permission_classes = [AllowAny]
authentication_classes = []
@swagger_auto_schema(
tags=[HEALTH_TAG],
operation_summary="Readiness probe",
operation_description=(
"Возвращает 200 если приложение готово обрабатывать запросы."
),
responses={
200: "Приложение готово обрабатывать запросы",
503: CommonResponses.SERVICE_UNAVAILABLE,
**ErrorResponses.PUBLIC,
},
)
def get(self, request: Request) -> Response:
"""Check if app is ready to serve traffic."""
# Check database connection
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
except Exception as e:
logger.error(f"Readiness check failed - database: {e}")
return Response(
{"status": "not_ready", "reason": "database unavailable"},
status=status.HTTP_503_SERVICE_UNAVAILABLE,
)
return Response({"status": "ready"}, status=status.HTTP_200_OK)
class BackgroundJobStatusView(APIView):
"""
Получение статуса фоновой задачи.
Возвращает статус, прогресс и результат задачи.
"""
from rest_framework.permissions import IsAuthenticated
permission_classes = [IsAuthenticated]
@staticmethod
def _check_access(request: Request, job) -> Response | None:
if not request.user.is_staff and job.user_id != request.user.id:
return Response(
{"detail": "Нет доступа к этой задаче"},
status=status.HTTP_403_FORBIDDEN,
)
return None
@swagger_auto_schema(
tags=[JOBS_TAG],
operation_summary="Статус задачи",
operation_description=(
"Возвращает статус конкретной фоновой задачи.\n"
"Доступно только владельцу задачи или администратору."
),
responses={
200: BackgroundJobSerializer,
403: CommonResponses.FORBIDDEN,
404: CommonResponses.NOT_FOUND,
**ErrorResponses.AUTHENTICATED,
},
)
def get(self, request: Request, task_id: str) -> Response:
"""Получить статус задачи по task_id."""
from apps.core.services import BackgroundJobService
job = BackgroundJobService.get_by_task_id(task_id)
access_error = self._check_access(request, job)
if access_error is not None:
return access_error
serializer = BackgroundJobSerializer(job)
return Response(serializer.data)
class BackgroundJobStreamView(BackgroundJobStatusView):
"""SSE stream with job progress updates until completion."""
poll_interval_seconds = 1.0
@staticmethod
def _build_sse_message(*, event: str, payload: dict[str, Any]) -> str:
return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
def _build_progress_payload(self, job) -> dict[str, Any]:
return {
"task_id": job.task_id,
"status": "running",
"progress": job.progress,
"message": job.progress_message,
}
def _build_final_payload(self, job) -> tuple[str, dict[str, Any]]:
if job.is_successful:
return (
"completed",
{
"task_id": job.task_id,
"status": "success",
"progress": job.progress,
"result": job.result,
},
)
return (
"failed",
{
"task_id": job.task_id,
"status": "error",
"progress": job.progress,
"message": job.error
or job.progress_message
or "Задача завершилась с ошибкой",
},
)
def _event_stream(self, task_id: str):
from apps.core.services import BackgroundJobService
last_snapshot: tuple[str, int, str, str] | None = None
while True:
job = BackgroundJobService.get_by_task_id(task_id)
snapshot = (
job.status,
job.progress,
job.progress_message,
job.error,
)
if snapshot != last_snapshot:
last_snapshot = snapshot
if job.is_finished:
event, payload = self._build_final_payload(job)
yield self._build_sse_message(event=event, payload=payload)
break
yield self._build_sse_message(
event="progress",
payload=self._build_progress_payload(job),
)
time.sleep(self.poll_interval_seconds)
@swagger_auto_schema(
tags=[JOBS_TAG],
operation_summary="Поток статуса задачи",
operation_description=(
"Открывает SSE stream со статусом фоновой задачи до её завершения.\n"
"Доступно только владельцу задачи или администратору."
),
responses={
200: "SSE stream",
403: CommonResponses.FORBIDDEN,
404: CommonResponses.NOT_FOUND,
**ErrorResponses.AUTHENTICATED,
},
)
def get(self, request: Request, task_id: str) -> StreamingHttpResponse | Response:
from apps.core.services import BackgroundJobService
job = BackgroundJobService.get_by_task_id(task_id)
access_error = self._check_access(request, job)
if access_error is not None:
return access_error
response = StreamingHttpResponse(
self._event_stream(task_id),
content_type="text/event-stream",
)
response["Cache-Control"] = "no-cache"
response["X-Accel-Buffering"] = "no"
return response
class BackgroundJobListView(APIView):
"""
Список фоновых задач пользователя.
Возвращает список задач текущего пользователя с фильтрацией.
"""
from rest_framework.permissions import IsAuthenticated
permission_classes = [IsAuthenticated]
@swagger_auto_schema(
tags=[JOBS_TAG],
operation_summary="Список задач",
operation_description=(
"Возвращает список фоновых задач текущего пользователя.\n"
"Поддерживает фильтрацию по статусу (status) и лимит (limit)."
),
responses={
200: BackgroundJobListResponseSerializer,
**ErrorResponses.AUTHENTICATED,
},
)
def get(self, request: Request) -> Response:
"""Получить список задач пользователя."""
from apps.core.services import BackgroundJobService
status_filter = request.query_params.get("status")
limit_raw = request.query_params.get("limit", "50")
try:
limit = int(limit_raw)
except (TypeError, ValueError) as exc:
raise ValidationError(
{"limit": "Параметр limit должен быть целым числом"}
) from exc
if limit < 1:
raise ValidationError({"limit": "Параметр limit должен быть больше 0"})
limit = min(limit, 100)
jobs = BackgroundJobService.get_user_jobs(
user_id=request.user.id,
status=status_filter,
limit=limit,
)
serializer = BackgroundJobListSerializer(jobs, many=True)
return Response({"results": serializer.data})