fix(dashboard): async FNS zip upload and registry counts
This commit is contained in:
@@ -16,6 +16,7 @@ from apps.parsers.models import ParserLoadLog
|
||||
from apps.parsers.services import FNSReportService
|
||||
from apps.parsers.tasks import process_fns_file
|
||||
from django.conf import settings
|
||||
from django.utils.text import get_valid_filename
|
||||
|
||||
FNS_XLSX_FILENAME_RE = re.compile(r"^fin_\d+_\d{13,15}\.xlsx$")
|
||||
|
||||
@@ -97,6 +98,63 @@ class FNSUploadService:
|
||||
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def queue_uploaded_zip_archive(
|
||||
cls,
|
||||
*,
|
||||
archive_file,
|
||||
requested_by_id: int | None,
|
||||
) -> FNSUploadResult:
|
||||
"""Persist a ZIP upload and queue archive expansion in Celery."""
|
||||
from apps.parsers.tasks import process_fns_zip_archive
|
||||
|
||||
archive_path = cls._store_uploaded_archive(archive_file)
|
||||
if not zipfile.is_zipfile(archive_path):
|
||||
archive_path.unlink(missing_ok=True)
|
||||
raise ValueError("Загруженный файл не является корректным ZIP архивом")
|
||||
|
||||
task_id = str(uuid.uuid4())
|
||||
try:
|
||||
BackgroundJobService.create_job(
|
||||
task_id=task_id,
|
||||
task_name="apps.parsers.tasks.process_fns_zip_archive",
|
||||
user_id=requested_by_id,
|
||||
meta={
|
||||
"source": ParserLoadLog.Source.FNS_REPORTS,
|
||||
"file": archive_path.name,
|
||||
"upload_type": "zip",
|
||||
},
|
||||
)
|
||||
task = process_fns_zip_archive.apply_async(
|
||||
args=[str(archive_path)],
|
||||
kwargs={"requested_by_id": requested_by_id},
|
||||
task_id=task_id,
|
||||
)
|
||||
except Exception:
|
||||
archive_path.unlink(missing_ok=True)
|
||||
BackgroundJob.objects.filter(task_id=task_id).delete()
|
||||
raise
|
||||
|
||||
return FNSUploadResult(queued=1, skipped=0, invalid=0, task_ids=[task.id])
|
||||
|
||||
@classmethod
|
||||
def queue_zip_archive_path(
|
||||
cls,
|
||||
*,
|
||||
archive_path: str | Path,
|
||||
requested_by_id: int | None,
|
||||
) -> FNSUploadResult:
|
||||
"""Queue valid files from a ZIP archive already stored on shared disk."""
|
||||
path = Path(archive_path)
|
||||
try:
|
||||
with path.open("rb") as handle:
|
||||
return cls.queue_zip_archive(
|
||||
archive_file=handle,
|
||||
requested_by_id=requested_by_id,
|
||||
)
|
||||
finally:
|
||||
path.unlink(missing_ok=True)
|
||||
|
||||
@classmethod
|
||||
def process_uploaded_files_sync(
|
||||
cls, *, files, requested_by_id: int | None
|
||||
@@ -161,6 +219,27 @@ class FNSUploadService:
|
||||
file_name = path.name
|
||||
return file_name or None
|
||||
|
||||
@staticmethod
|
||||
def _store_uploaded_archive(archive_file) -> Path:
|
||||
archive_dir = Path(
|
||||
getattr(
|
||||
settings,
|
||||
"FNS_ARCHIVE_UPLOAD_DIRECTORY",
|
||||
Path(settings.FNS_WATCH_DIRECTORY) / "archives",
|
||||
)
|
||||
)
|
||||
archive_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
safe_name = get_valid_filename(archive_file.name or "fns-reports.zip")
|
||||
archive_path = archive_dir / f"{uuid.uuid4()}-{safe_name}"
|
||||
|
||||
archive_file.seek(0)
|
||||
with archive_path.open("wb") as target:
|
||||
for chunk in archive_file.chunks():
|
||||
target.write(chunk)
|
||||
archive_file.seek(0)
|
||||
return archive_path
|
||||
|
||||
@classmethod
|
||||
def _queue_file_bytes(
|
||||
cls,
|
||||
|
||||
@@ -2455,6 +2455,48 @@ def process_fns_file(
|
||||
)
|
||||
|
||||
|
||||
@shared_task(bind=True)
|
||||
def process_fns_zip_archive(
|
||||
self,
|
||||
archive_path: str,
|
||||
requested_by_id: int | None = None,
|
||||
) -> dict:
|
||||
"""Expand an uploaded FNS ZIP archive and queue contained Excel files."""
|
||||
from apps.parsers.fns_upload import FNSUploadService
|
||||
|
||||
task_id = self.request.id or str(uuid.uuid4())
|
||||
path = Path(archive_path)
|
||||
job = _get_or_create_background_job(
|
||||
task_id=task_id,
|
||||
task_name="apps.parsers.tasks.process_fns_zip_archive",
|
||||
source=ParserLoadLog.Source.FNS_REPORTS,
|
||||
requested_by_id=requested_by_id,
|
||||
meta={"file": path.name, "upload_type": "zip"},
|
||||
)
|
||||
job.mark_started()
|
||||
job.update_progress(10, f"Распаковка архива {path.name}...")
|
||||
|
||||
try:
|
||||
result = FNSUploadService.queue_zip_archive_path(
|
||||
archive_path=path,
|
||||
requested_by_id=requested_by_id,
|
||||
)
|
||||
except Exception as exc:
|
||||
job.fail(error=str(exc))
|
||||
raise
|
||||
|
||||
payload = {
|
||||
"status": "success",
|
||||
"queued": result.queued,
|
||||
"skipped": result.skipped,
|
||||
"invalid": result.invalid,
|
||||
"task_ids": result.task_ids,
|
||||
}
|
||||
job.update_progress(100, "Архив обработан, файлы поставлены в очередь")
|
||||
job.complete(result=payload)
|
||||
return payload
|
||||
|
||||
|
||||
@shared_task(bind=True)
|
||||
def process_fns_files_batch(self, file_paths: list[str]) -> dict:
|
||||
"""
|
||||
|
||||
@@ -949,7 +949,7 @@ class FNSReportUploadView(APIView):
|
||||
serializer = FNSZipUploadSerializer(data=request.data)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
try:
|
||||
result = FNSUploadService.queue_zip_archive(
|
||||
result = FNSUploadService.queue_uploaded_zip_archive(
|
||||
archive_file=serializer.validated_data["file"],
|
||||
requested_by_id=request.user.id,
|
||||
)
|
||||
|
||||
@@ -12,11 +12,31 @@ from registers.models import (
|
||||
class RegisterSerializer(serializers.ModelSerializer):
|
||||
"""Сериализатор реестра."""
|
||||
|
||||
active_organizations = serializers.SerializerMethodField()
|
||||
uploads_count = serializers.SerializerMethodField()
|
||||
|
||||
class Meta:
|
||||
model = Register
|
||||
fields = ["id", "name"]
|
||||
fields = ["id", "name", "active_organizations", "uploads_count"]
|
||||
read_only_fields = fields
|
||||
|
||||
def get_active_organizations(self, obj) -> int:
|
||||
annotated = getattr(obj, "active_organizations", None)
|
||||
if annotated is not None:
|
||||
return annotated
|
||||
return (
|
||||
obj.membership_periods.filter(ended_at__isnull=True)
|
||||
.values("organization_id")
|
||||
.distinct()
|
||||
.count()
|
||||
)
|
||||
|
||||
def get_uploads_count(self, obj) -> int:
|
||||
annotated = getattr(obj, "uploads_count", None)
|
||||
if annotated is not None:
|
||||
return annotated
|
||||
return obj.uploads.count()
|
||||
|
||||
|
||||
class RegisterListResponseSerializer(serializers.Serializer):
|
||||
"""Frontend-friendly wrapper for registries list."""
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from apps.core.openapi import CommonResponses, ErrorResponses, swagger_tag
|
||||
from django.db.models import Count, Q
|
||||
from django.shortcuts import get_object_or_404
|
||||
from drf_yasg import openapi
|
||||
from drf_yasg.utils import swagger_auto_schema
|
||||
@@ -40,6 +41,20 @@ class RegisterViewSet(ReadOnlyModelViewSet):
|
||||
permission_classes = [IsAuthenticated]
|
||||
search_fields = ["name"]
|
||||
|
||||
def get_queryset(self):
|
||||
return (
|
||||
Register.objects.all()
|
||||
.annotate(
|
||||
active_organizations=Count(
|
||||
"membership_periods__organization_id",
|
||||
filter=Q(membership_periods__ended_at__isnull=True),
|
||||
distinct=True,
|
||||
),
|
||||
uploads_count=Count("uploads", distinct=True),
|
||||
)
|
||||
.order_by("name")
|
||||
)
|
||||
|
||||
@swagger_auto_schema(
|
||||
tags=[REGISTERS_TAG],
|
||||
operation_summary="Список реестров",
|
||||
|
||||
Reference in New Issue
Block a user