Align frontend API contracts
This commit is contained in:
@@ -7,6 +7,7 @@ Provides endpoints for:
|
||||
- Detailed health check (DB, Redis, Celery status)
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
@@ -15,6 +16,7 @@ from apps.core.openapi import CommonResponses, ErrorResponses, swagger_tag
|
||||
from apps.core.serializers import BackgroundJobListSerializer, BackgroundJobSerializer
|
||||
from django.conf import settings
|
||||
from django.db import connection
|
||||
from django.http import StreamingHttpResponse
|
||||
from drf_yasg.utils import swagger_auto_schema
|
||||
from rest_framework import status
|
||||
from rest_framework.exceptions import ValidationError
|
||||
@@ -212,6 +214,15 @@ class BackgroundJobStatusView(APIView):
|
||||
|
||||
permission_classes = [IsAuthenticated]
|
||||
|
||||
@staticmethod
|
||||
def _check_access(request: Request, job) -> Response | None:
|
||||
if not request.user.is_staff and job.user_id != request.user.id:
|
||||
return Response(
|
||||
{"detail": "Нет доступа к этой задаче"},
|
||||
status=status.HTTP_403_FORBIDDEN,
|
||||
)
|
||||
return None
|
||||
|
||||
@swagger_auto_schema(
|
||||
tags=[JOBS_TAG],
|
||||
operation_summary="Статус задачи",
|
||||
@@ -231,19 +242,112 @@ class BackgroundJobStatusView(APIView):
|
||||
from apps.core.services import BackgroundJobService
|
||||
|
||||
job = BackgroundJobService.get_by_task_id(task_id)
|
||||
|
||||
# Проверка доступа: только владелец или админ.
|
||||
# Задачи без владельца считаем системными и не показываем обычным пользователям.
|
||||
if not request.user.is_staff and job.user_id != request.user.id:
|
||||
return Response(
|
||||
{"detail": "Нет доступа к этой задаче"},
|
||||
status=status.HTTP_403_FORBIDDEN,
|
||||
)
|
||||
access_error = self._check_access(request, job)
|
||||
if access_error is not None:
|
||||
return access_error
|
||||
|
||||
serializer = BackgroundJobSerializer(job)
|
||||
return Response(serializer.data)
|
||||
|
||||
|
||||
class BackgroundJobStreamView(BackgroundJobStatusView):
|
||||
"""SSE stream with job progress updates until completion."""
|
||||
|
||||
poll_interval_seconds = 1.0
|
||||
|
||||
@staticmethod
|
||||
def _build_sse_message(*, event: str, payload: dict[str, Any]) -> str:
|
||||
return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
|
||||
|
||||
def _build_progress_payload(self, job) -> dict[str, Any]:
|
||||
return {
|
||||
"task_id": job.task_id,
|
||||
"status": "running",
|
||||
"progress": job.progress,
|
||||
"message": job.progress_message,
|
||||
}
|
||||
|
||||
def _build_final_payload(self, job) -> tuple[str, dict[str, Any]]:
|
||||
if job.is_successful:
|
||||
return (
|
||||
"completed",
|
||||
{
|
||||
"task_id": job.task_id,
|
||||
"status": "success",
|
||||
"progress": job.progress,
|
||||
"result": job.result,
|
||||
},
|
||||
)
|
||||
|
||||
return (
|
||||
"failed",
|
||||
{
|
||||
"task_id": job.task_id,
|
||||
"status": "error",
|
||||
"progress": job.progress,
|
||||
"message": job.error or job.progress_message or "Задача завершилась с ошибкой",
|
||||
},
|
||||
)
|
||||
|
||||
def _event_stream(self, task_id: str):
|
||||
from apps.core.services import BackgroundJobService
|
||||
|
||||
last_snapshot: tuple[str, int, str, str] | None = None
|
||||
|
||||
while True:
|
||||
job = BackgroundJobService.get_by_task_id(task_id)
|
||||
snapshot = (
|
||||
job.status,
|
||||
job.progress,
|
||||
job.progress_message,
|
||||
job.error,
|
||||
)
|
||||
|
||||
if snapshot != last_snapshot:
|
||||
last_snapshot = snapshot
|
||||
if job.is_finished:
|
||||
event, payload = self._build_final_payload(job)
|
||||
yield self._build_sse_message(event=event, payload=payload)
|
||||
break
|
||||
|
||||
yield self._build_sse_message(
|
||||
event="progress",
|
||||
payload=self._build_progress_payload(job),
|
||||
)
|
||||
|
||||
time.sleep(self.poll_interval_seconds)
|
||||
|
||||
@swagger_auto_schema(
|
||||
tags=[JOBS_TAG],
|
||||
operation_summary="Поток статуса задачи",
|
||||
operation_description=(
|
||||
"Открывает SSE stream со статусом фоновой задачи до её завершения.\n"
|
||||
"Доступно только владельцу задачи или администратору."
|
||||
),
|
||||
responses={
|
||||
200: "SSE stream",
|
||||
403: CommonResponses.FORBIDDEN,
|
||||
404: CommonResponses.NOT_FOUND,
|
||||
**ErrorResponses.AUTHENTICATED,
|
||||
},
|
||||
)
|
||||
def get(self, request: Request, task_id: str) -> StreamingHttpResponse | Response:
|
||||
from apps.core.services import BackgroundJobService
|
||||
|
||||
job = BackgroundJobService.get_by_task_id(task_id)
|
||||
access_error = self._check_access(request, job)
|
||||
if access_error is not None:
|
||||
return access_error
|
||||
|
||||
response = StreamingHttpResponse(
|
||||
self._event_stream(task_id),
|
||||
content_type="text/event-stream",
|
||||
)
|
||||
response["Cache-Control"] = "no-cache"
|
||||
response["X-Accel-Buffering"] = "no"
|
||||
return response
|
||||
|
||||
|
||||
class BackgroundJobListView(APIView):
|
||||
"""
|
||||
Список фоновых задач пользователя.
|
||||
@@ -290,4 +394,4 @@ class BackgroundJobListView(APIView):
|
||||
)
|
||||
|
||||
serializer = BackgroundJobListSerializer(jobs, many=True)
|
||||
return Response(serializer.data)
|
||||
return Response({"results": serializer.data})
|
||||
|
||||
Reference in New Issue
Block a user