fix(parsers): close stale source jobs
Some checks failed
CI/CD Pipeline / Quality Gate (push) Failing after 2m4s
CI/CD Pipeline / Build and Push Images (push) Has been skipped
CI/CD Pipeline / Deploy Dev in Dokploy (push) Has been skipped
CI/CD Pipeline / Internal Notify (push) Successful in 1s

This commit is contained in:
2026-04-28 21:38:00 +02:00
parent 26f0669670
commit d96b76d32f
9 changed files with 262 additions and 18 deletions

View File

@@ -6,12 +6,14 @@ They are easily testable and can manage transactions.
"""
import logging
from datetime import timedelta
from typing import Any, Generic, TypeVar
import django
from apps.core.exceptions import NotFoundError
from django.db import models, transaction
from django.db.models import QuerySet
from django.db.models import Q, QuerySet
from django.utils import timezone
logger = logging.getLogger(__name__)
@@ -678,3 +680,37 @@ class BackgroundJobService(BaseReadOnlyService):
.delete()
)
return deleted
@classmethod
def mark_stale_active_jobs_failed(
cls,
*,
max_age_minutes: int,
task_names: set[str] | None = None,
meta_sources: set[str] | None = None,
) -> int:
"""Mark old active jobs as failed after worker restarts or hard kills."""
from apps.core.models import JobStatus
cutoff = timezone.now() - timedelta(minutes=max_age_minutes)
queryset = cls.get_queryset().filter(
status__in=[JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY],
updated_at__lt=cutoff,
)
if task_names:
queryset = queryset.filter(task_name__in=task_names)
if meta_sources:
source_filter = Q()
for source in meta_sources:
source_filter |= Q(meta__source=source)
queryset = queryset.filter(source_filter)
stale_message = (
"Stale background job was marked failed after "
f"{max_age_minutes} minutes without progress."
)
updated = 0
for job in queryset.order_by("created_at"):
job.fail(error=stale_message)
updated += 1
return updated

View File

@@ -24,11 +24,13 @@ from apps.parsers.views import (
TASKS_BY_NAME,
build_task_kwargs,
)
from django.conf import settings
from django.core.cache import cache
from django.core.paginator import Paginator
from django.db.models import CharField, Max, Q
from django.db.models.functions import Cast
from django.http import Http404, HttpResponse
from django.utils import timezone
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
from rest_framework import status
@@ -44,6 +46,7 @@ SYSTEM_LOGS_TAG = "System Logs"
ACTIVE_JOB_STATUSES = {"pending", "started", "retry"}
SUCCESS_LOAD_STATUSES = {"success", "skipped"}
ERROR_LOAD_STATUSES = {"failed", "failure", "error"}
STALE_ACTIVE_MAX_AGE_MINUTES = 90
PARSING_SETTINGS_CACHE_KEY = "parsers:frontend_compat:parsing_settings"
PARSING_SETTINGS_FIELDS = {
@@ -295,6 +298,23 @@ def _serialize_active_job(job) -> dict[str, Any]:
}
def _stale_cutoff():
max_age_minutes = int(
getattr(
settings,
"PARSER_STALE_LOAD_MAX_AGE_MINUTES",
STALE_ACTIVE_MAX_AGE_MINUTES,
)
)
return timezone.now() - timedelta(minutes=max_age_minutes)
def _is_stale_load(load_log: ParserLoadLog | None) -> bool:
if load_log is None or load_log.status != "in_progress":
return False
return load_log.updated_at < _stale_cutoff()
def _active_tasks_for_definition(
definition: FrontendSourceCardDefinition,
) -> list[dict]:
@@ -306,6 +326,7 @@ def _active_tasks_for_definition(
queryset = BackgroundJobService.get_queryset().filter(
task_name__in=task_names,
status__in=ACTIVE_JOB_STATUSES,
updated_at__gte=_stale_cutoff(),
)
return [_serialize_active_job(job) for job in queryset.order_by("-created_at")[:10]]
@@ -330,8 +351,16 @@ def _status_for_card(
) -> str:
if not definition.is_available:
return "unavailable"
if active_tasks or (latest_load and latest_load.status == "in_progress"):
if active_tasks:
return "in_progress"
if (
latest_load
and latest_load.status == "in_progress"
and not _is_stale_load(latest_load)
):
return "in_progress"
if latest_load and latest_load.status == "in_progress":
return "error"
if latest_load and latest_load.status in ERROR_LOAD_STATUSES:
return "error"
if last_updated_at:
@@ -416,7 +445,13 @@ def _build_source_card(definition: FrontendSourceCardDefinition) -> dict[str, An
),
"last_updated_at": last_updated_at,
"next_update_at": next_update_at,
"error_message": latest_load.error_message if latest_load else "",
"error_message": (
"Загрузка зависла и будет закрыта cleanup-задачей."
if _is_stale_load(latest_load)
else latest_load.error_message
if latest_load
else ""
),
"task_names": [
PARSER_SOURCES[source_key].task_name
for source_key in definition.source_keys

View File

@@ -416,7 +416,7 @@ class ParserLoadLogService(BaseService[ParserLoadLog]):
updated = 0
active_statuses = [JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY]
for log in stale_logs:
job = (
batch_job = (
BackgroundJob.objects.filter(
status__in=active_statuses,
meta__source=log.source,
@@ -425,6 +425,18 @@ class ParserLoadLogService(BaseService[ParserLoadLog]):
.order_by("-updated_at")
.first()
)
source_job = None
if batch_job is None:
source_job = (
BackgroundJob.objects.filter(
status__in=active_statuses,
meta__source=log.source,
meta__batch_id__isnull=True,
)
.order_by("-updated_at")
.first()
)
job = batch_job or source_job
if job is not None and job.updated_at >= cutoff:
continue

View File

@@ -20,6 +20,7 @@ from apps.parsers.models import (
ParserLoadLog,
ProcurementRecord,
)
from django.conf import settings
from django.db.models import Max
from django.http import Http404
from django.utils import timezone
@@ -27,6 +28,7 @@ from rest_framework.exceptions import ValidationError
SUCCESSFUL_LOAD_STATUSES = {"success", "skipped"}
ACTIVE_JOB_STATUSES = [JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY]
STALE_ACTIVE_MAX_AGE_MINUTES = 90
@dataclass(frozen=True)
@@ -293,6 +295,9 @@ class SourceCardService:
latest_load=latest_load,
last_updated_at=last_updated_at,
)
error_message = latest_load.error_message if latest_load else ""
if cls._is_stale_load(latest_load):
error_message = cls._stale_load_message()
return {
"slug": definition.slug,
@@ -307,7 +312,7 @@ class SourceCardService:
"organizations_count": organizations_count,
"last_updated_at": last_updated_at,
"next_update_at": cls._get_next_update_at(definition, last_updated_at),
"error_message": latest_load.error_message if latest_load else "",
"error_message": error_message,
"task_names": list(definition.task_names),
"refresh_requires_params": any(
param.required for param in definition.refresh_params
@@ -721,9 +726,11 @@ class SourceCardService:
def _get_active_tasks(
cls, definition: SourceCardDefinition
) -> list[dict[str, Any]]:
cutoff = cls._stale_cutoff()
queryset = BackgroundJobService.get_queryset().filter(
task_name__in=definition.task_names,
status__in=ACTIVE_JOB_STATUSES,
updated_at__gte=cutoff,
)
return [
cls._serialize_job(job) for job in queryset.order_by("-created_at")[:10]
@@ -749,14 +756,44 @@ class SourceCardService:
return "unavailable"
if active_tasks:
return "in_progress"
if latest_load and latest_load.status == "in_progress":
if (
latest_load
and latest_load.status == "in_progress"
and not cls._is_stale_load(latest_load)
):
return "in_progress"
if latest_load and latest_load.status == "in_progress":
return "error"
if latest_load and latest_load.status == "failed":
return "error"
if last_updated_at:
return "success"
return "idle"
@classmethod
def _stale_cutoff(cls):
max_age_minutes = int(
getattr(
settings,
"PARSER_STALE_LOAD_MAX_AGE_MINUTES",
STALE_ACTIVE_MAX_AGE_MINUTES,
)
)
return timezone.now() - timedelta(minutes=max_age_minutes)
@classmethod
def _is_stale_load(cls, latest_load: ParserLoadLog | None) -> bool:
if latest_load is None or latest_load.status != "in_progress":
return False
updated_at = getattr(latest_load, "updated_at", None)
if updated_at is None:
return False
return updated_at < cls._stale_cutoff()
@staticmethod
def _stale_load_message() -> str:
return "Загрузка зависла и будет закрыта cleanup-задачей."
@classmethod
def _get_status_label(cls, status: str) -> str:
labels = {

View File

@@ -96,19 +96,27 @@ def _get_or_create_background_job(
):
"""Reuse a pre-created job or create a new one for the task."""
job = BackgroundJobService.get_by_task_id_or_none(task_id)
payload = {"source": source, **(meta or {})}
if batch_id is not None:
payload["batch_id"] = batch_id
if not job:
payload = {"source": source, **(meta or {})}
if batch_id is not None:
payload["batch_id"] = batch_id
job = BackgroundJobService.create_job(
task_id=task_id,
task_name=task_name,
user_id=requested_by_id,
meta=payload,
)
elif requested_by_id is not None and job.user_id is None:
job.user_id = requested_by_id
job.save(update_fields=["user_id", "updated_at"])
else:
update_fields = []
merged_meta = {**(job.meta or {}), **payload}
if merged_meta != job.meta:
job.meta = merged_meta
update_fields.append("meta")
if requested_by_id is not None and job.user_id is None:
job.user_id = requested_by_id
update_fields.append("user_id")
if update_fields:
job.save(update_fields=[*update_fields, "updated_at"])
return job
@@ -1930,19 +1938,27 @@ def parse_fstec_registers(
@shared_task
def cleanup_stale_parser_loads(max_age_minutes: int | None = None) -> dict:
"""Закрыть stale in_progress загрузки после рестартов worker/deploy."""
"""Закрыть stale in_progress загрузки и jobs после рестартов worker/deploy."""
if max_age_minutes is None:
max_age_minutes = getattr(
settings,
"PARSER_STALE_LOAD_MAX_AGE_MINUTES",
PARSER_STALE_LOAD_MAX_AGE_MINUTES,
)
source_values = {descriptor.source for descriptor in PARSER_SOURCES.values()}
task_names = {descriptor.task_name for descriptor in PARSER_SOURCES.values()}
marked_failed = ParserLoadLogService.mark_stale_in_progress_failed(
max_age_minutes=int(max_age_minutes)
)
marked_jobs_failed = BackgroundJobService.mark_stale_active_jobs_failed(
max_age_minutes=int(max_age_minutes),
task_names=task_names,
meta_sources=source_values,
)
return {
"status": "success",
"marked_failed": marked_failed,
"marked_jobs_failed": marked_jobs_failed,
"max_age_minutes": int(max_age_minutes),
}

View File

@@ -1,8 +1,11 @@
"""Тесты для BackgroundJob."""
from datetime import timedelta
from apps.core.models import BackgroundJob, JobStatus
from apps.core.services import BackgroundJobService
from django.test import TestCase
from django.utils import timezone
from faker import Faker
fake = Faker()
@@ -252,10 +255,6 @@ class BackgroundJobServiceTest(TestCase):
self.assertEqual([j.task_id for j in active_jobs], [job_user.task_id])
def test_cleanup_old_jobs(self):
from datetime import timedelta
from django.utils import timezone
old_job = BackgroundJobService.create_job(
task_id="job-old",
task_name="test.task",
@@ -272,3 +271,39 @@ class BackgroundJobServiceTest(TestCase):
deleted = BackgroundJobService.cleanup_old_jobs(days=30)
self.assertEqual(deleted, 1)
def test_mark_stale_active_jobs_failed_scopes_by_task_and_source(self):
stale_job = BackgroundJobService.create_job(
task_id="job-stale",
task_name="apps.parsers.tasks.parse_industrial_products",
meta={"source": "industrial_products"},
)
fresh_job = BackgroundJobService.create_job(
task_id="job-fresh",
task_name="apps.parsers.tasks.parse_industrial_products",
meta={"source": "industrial_products"},
)
unrelated_job = BackgroundJobService.create_job(
task_id="job-unrelated",
task_name="apps.other.tasks.task",
meta={"source": "industrial_products"},
)
old_timestamp = timezone.now() - timedelta(hours=3)
BackgroundJob.objects.filter(
task_id__in=[stale_job.task_id, unrelated_job.task_id]
).update(updated_at=old_timestamp)
updated = BackgroundJobService.mark_stale_active_jobs_failed(
max_age_minutes=90,
task_names={"apps.parsers.tasks.parse_industrial_products"},
meta_sources={"industrial_products"},
)
stale_job.refresh_from_db()
fresh_job.refresh_from_db()
unrelated_job.refresh_from_db()
self.assertEqual(updated, 1)
self.assertEqual(stale_job.status, JobStatus.FAILURE)
self.assertIn("Stale background job", stale_job.error)
self.assertEqual(fresh_job.status, JobStatus.PENDING)
self.assertEqual(unrelated_job.status, JobStatus.PENDING)

View File

@@ -411,6 +411,31 @@ class ParserLoadLogServiceTest(TestCase):
self.assertEqual(updated, 0)
self.assertEqual(log.status, ParserLoadLog.Status.IN_PROGRESS)
def test_mark_stale_in_progress_failed_closes_precreated_job_without_batch(self):
"""Pre-created source-card jobs without batch_id are still linked by source."""
log = ParserLoadLogFactory(
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
batch_id=2,
status=ParserLoadLog.Status.IN_PROGRESS,
)
job = BackgroundJob.objects.create(
task_id="precreated-source-card-task",
task_name="apps.parsers.tasks.parse_industrial_products",
status=JobStatus.STARTED,
meta={"source": log.source, "source_card": "manufacturers-and-products"},
)
old_timestamp = timezone.now() - timedelta(hours=3)
ParserLoadLog.objects.filter(pk=log.pk).update(updated_at=old_timestamp)
BackgroundJob.objects.filter(pk=job.pk).update(updated_at=old_timestamp)
updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90)
log.refresh_from_db()
job.refresh_from_db()
self.assertEqual(updated, 1)
self.assertEqual(log.status, ParserLoadLog.Status.FAILED)
self.assertEqual(job.status, JobStatus.FAILURE)
class IndustrialCertificateServiceTest(TestCase):
"""Tests for IndustrialCertificateService."""

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from datetime import timedelta
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
@@ -10,6 +11,7 @@ from apps.parsers.source_cards import (
)
from django.http import Http404
from django.test import SimpleTestCase
from django.utils import timezone
from rest_framework.exceptions import ValidationError
@@ -249,6 +251,19 @@ class SourceCardServiceUnitTest(SimpleTestCase):
),
"in_progress",
)
stale_in_progress_load = SimpleNamespace(
status="in_progress",
updated_at=timezone.now() - timedelta(hours=3),
)
self.assertEqual(
SourceCardService._get_status(
definition=SourceCardService.get_definition("financial-indicators"),
active_tasks=[],
latest_load=stale_in_progress_load,
last_updated_at=None,
),
"error",
)
self.assertEqual(
SourceCardService._get_status(
definition=SourceCardService.get_definition("financial-indicators"),

View File

@@ -12,6 +12,7 @@ from types import SimpleNamespace
from unittest.mock import patch
from urllib.parse import urlparse
from apps.core.services import BackgroundJobService
from apps.parsers import tasks as parser_tasks
from apps.parsers.clients.base import HTTPError
from apps.parsers.clients.minpromtorg.industrial import (
@@ -285,14 +286,46 @@ class GenericSourceFetchTestCase(TestCase):
ParserLoadLogService,
"mark_stale_in_progress_failed",
return_value=2,
) as cleanup_mock:
) as cleanup_mock, patch.object(
BackgroundJobService,
"mark_stale_active_jobs_failed",
return_value=3,
) as jobs_cleanup_mock:
result = parser_tasks.cleanup_stale_parser_loads(max_age_minutes=45)
cleanup_mock.assert_called_once_with(max_age_minutes=45)
jobs_cleanup_mock.assert_called_once()
self.assertEqual(jobs_cleanup_mock.call_args.kwargs["max_age_minutes"], 45)
self.assertEqual(result["status"], "success")
self.assertEqual(result["marked_failed"], 2)
self.assertEqual(result["marked_jobs_failed"], 3)
self.assertEqual(result["max_age_minutes"], 45)
def test_get_or_create_background_job_merges_meta_for_precreated_job(self):
BackgroundJobService.create_job(
task_id="precreated-task",
task_name="apps.parsers.tasks.parse_industrial_products",
user_id=None,
meta={
"source": ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
"source_card": "manufacturers-and-products",
},
)
job = parser_tasks._get_or_create_background_job(
task_id="precreated-task",
task_name="apps.parsers.tasks.parse_industrial_products",
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
batch_id=7,
requested_by_id=42,
meta={"source_key": "mpt_products"},
)
self.assertEqual(job.user_id, 42)
self.assertEqual(job.meta["source_card"], "manufacturers-and-products")
self.assertEqual(job.meta["source_key"], "mpt_products")
self.assertEqual(job.meta["batch_id"], 7)
@override_settings(
CELERY_TASK_ALWAYS_EAGER=True,