fix(parsers): detect stale jobs by run age
This commit is contained in:
@@ -693,9 +693,15 @@ class BackgroundJobService(BaseReadOnlyService):
|
|||||||
from apps.core.models import JobStatus
|
from apps.core.models import JobStatus
|
||||||
|
|
||||||
cutoff = timezone.now() - timedelta(minutes=max_age_minutes)
|
cutoff = timezone.now() - timedelta(minutes=max_age_minutes)
|
||||||
queryset = cls.get_queryset().filter(
|
queryset = (
|
||||||
status__in=[JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY],
|
cls.get_queryset()
|
||||||
updated_at__lt=cutoff,
|
.filter(
|
||||||
|
status__in=[JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY],
|
||||||
|
)
|
||||||
|
.filter(
|
||||||
|
Q(started_at__isnull=False, started_at__lt=cutoff)
|
||||||
|
| Q(started_at__isnull=True, created_at__lt=cutoff)
|
||||||
|
)
|
||||||
)
|
)
|
||||||
if task_names:
|
if task_names:
|
||||||
queryset = queryset.filter(task_name__in=task_names)
|
queryset = queryset.filter(task_name__in=task_names)
|
||||||
|
|||||||
@@ -323,10 +323,16 @@ def _active_tasks_for_definition(
|
|||||||
for source_key in definition.source_keys
|
for source_key in definition.source_keys
|
||||||
if source_key in PARSER_SOURCES
|
if source_key in PARSER_SOURCES
|
||||||
]
|
]
|
||||||
queryset = BackgroundJobService.get_queryset().filter(
|
queryset = (
|
||||||
task_name__in=task_names,
|
BackgroundJobService.get_queryset()
|
||||||
status__in=ACTIVE_JOB_STATUSES,
|
.filter(
|
||||||
updated_at__gte=_stale_cutoff(),
|
task_name__in=task_names,
|
||||||
|
status__in=ACTIVE_JOB_STATUSES,
|
||||||
|
)
|
||||||
|
.filter(
|
||||||
|
Q(started_at__isnull=False, started_at__gte=_stale_cutoff())
|
||||||
|
| Q(started_at__isnull=True, created_at__gte=_stale_cutoff())
|
||||||
|
)
|
||||||
)
|
)
|
||||||
return [_serialize_active_job(job) for job in queryset.order_by("-created_at")[:10]]
|
return [_serialize_active_job(job) for job in queryset.order_by("-created_at")[:10]]
|
||||||
|
|
||||||
|
|||||||
@@ -416,33 +416,24 @@ class ParserLoadLogService(BaseService[ParserLoadLog]):
|
|||||||
updated = 0
|
updated = 0
|
||||||
active_statuses = [JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY]
|
active_statuses = [JobStatus.PENDING, JobStatus.STARTED, JobStatus.RETRY]
|
||||||
for log in stale_logs:
|
for log in stale_logs:
|
||||||
batch_job = (
|
active_jobs = BackgroundJob.objects.filter(
|
||||||
BackgroundJob.objects.filter(
|
status__in=active_statuses,
|
||||||
status__in=active_statuses,
|
meta__source=log.source,
|
||||||
meta__source=log.source,
|
).filter(Q(meta__batch_id=log.batch_id) | Q(meta__batch_id__isnull=True))
|
||||||
meta__batch_id=log.batch_id,
|
fresh_jobs = active_jobs.filter(
|
||||||
)
|
Q(started_at__isnull=False, started_at__gte=cutoff)
|
||||||
.order_by("-updated_at")
|
| Q(started_at__isnull=True, created_at__gte=cutoff)
|
||||||
.first()
|
|
||||||
)
|
)
|
||||||
source_job = None
|
if fresh_jobs.exists():
|
||||||
if batch_job is None:
|
|
||||||
source_job = (
|
|
||||||
BackgroundJob.objects.filter(
|
|
||||||
status__in=active_statuses,
|
|
||||||
meta__source=log.source,
|
|
||||||
meta__batch_id__isnull=True,
|
|
||||||
)
|
|
||||||
.order_by("-updated_at")
|
|
||||||
.first()
|
|
||||||
)
|
|
||||||
job = batch_job or source_job
|
|
||||||
if job is not None and job.updated_at >= cutoff:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
cls.mark_failed(log, stale_message)
|
cls.mark_failed(log, stale_message)
|
||||||
updated += 1
|
updated += 1
|
||||||
if job is not None:
|
stale_jobs = active_jobs.filter(
|
||||||
|
Q(started_at__isnull=False, started_at__lt=cutoff)
|
||||||
|
| Q(started_at__isnull=True, created_at__lt=cutoff)
|
||||||
|
)
|
||||||
|
for job in stale_jobs.order_by("created_at"):
|
||||||
job.fail(error=stale_message)
|
job.fail(error=stale_message)
|
||||||
|
|
||||||
return updated
|
return updated
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ from apps.parsers.models import (
|
|||||||
ProcurementRecord,
|
ProcurementRecord,
|
||||||
)
|
)
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db.models import Max
|
from django.db.models import Max, Q
|
||||||
from django.http import Http404
|
from django.http import Http404
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from rest_framework.exceptions import ValidationError
|
from rest_framework.exceptions import ValidationError
|
||||||
@@ -727,10 +727,16 @@ class SourceCardService:
|
|||||||
cls, definition: SourceCardDefinition
|
cls, definition: SourceCardDefinition
|
||||||
) -> list[dict[str, Any]]:
|
) -> list[dict[str, Any]]:
|
||||||
cutoff = cls._stale_cutoff()
|
cutoff = cls._stale_cutoff()
|
||||||
queryset = BackgroundJobService.get_queryset().filter(
|
queryset = (
|
||||||
task_name__in=definition.task_names,
|
BackgroundJobService.get_queryset()
|
||||||
status__in=ACTIVE_JOB_STATUSES,
|
.filter(
|
||||||
updated_at__gte=cutoff,
|
task_name__in=definition.task_names,
|
||||||
|
status__in=ACTIVE_JOB_STATUSES,
|
||||||
|
)
|
||||||
|
.filter(
|
||||||
|
Q(started_at__isnull=False, started_at__gte=cutoff)
|
||||||
|
| Q(started_at__isnull=True, created_at__gte=cutoff)
|
||||||
|
)
|
||||||
)
|
)
|
||||||
return [
|
return [
|
||||||
cls._serialize_job(job) for job in queryset.order_by("-created_at")[:10]
|
cls._serialize_job(job) for job in queryset.order_by("-created_at")[:10]
|
||||||
|
|||||||
@@ -291,7 +291,7 @@ class BackgroundJobServiceTest(TestCase):
|
|||||||
old_timestamp = timezone.now() - timedelta(hours=3)
|
old_timestamp = timezone.now() - timedelta(hours=3)
|
||||||
BackgroundJob.objects.filter(
|
BackgroundJob.objects.filter(
|
||||||
task_id__in=[stale_job.task_id, unrelated_job.task_id]
|
task_id__in=[stale_job.task_id, unrelated_job.task_id]
|
||||||
).update(updated_at=old_timestamp)
|
).update(created_at=old_timestamp, updated_at=timezone.now())
|
||||||
|
|
||||||
updated = BackgroundJobService.mark_stale_active_jobs_failed(
|
updated = BackgroundJobService.mark_stale_active_jobs_failed(
|
||||||
max_age_minutes=90,
|
max_age_minutes=90,
|
||||||
|
|||||||
@@ -426,7 +426,10 @@ class ParserLoadLogServiceTest(TestCase):
|
|||||||
)
|
)
|
||||||
old_timestamp = timezone.now() - timedelta(hours=3)
|
old_timestamp = timezone.now() - timedelta(hours=3)
|
||||||
ParserLoadLog.objects.filter(pk=log.pk).update(updated_at=old_timestamp)
|
ParserLoadLog.objects.filter(pk=log.pk).update(updated_at=old_timestamp)
|
||||||
BackgroundJob.objects.filter(pk=job.pk).update(updated_at=old_timestamp)
|
BackgroundJob.objects.filter(pk=job.pk).update(
|
||||||
|
created_at=old_timestamp,
|
||||||
|
updated_at=timezone.now(),
|
||||||
|
)
|
||||||
|
|
||||||
updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90)
|
updated = ParserLoadLogService.mark_stale_in_progress_failed(max_age_minutes=90)
|
||||||
|
|
||||||
|
|||||||
@@ -4,13 +4,14 @@ from datetime import timedelta
|
|||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
from apps.core.models import BackgroundJob, JobStatus
|
||||||
from apps.parsers.source_cards import (
|
from apps.parsers.source_cards import (
|
||||||
SourceCardDefinition,
|
SourceCardDefinition,
|
||||||
SourceCardService,
|
SourceCardService,
|
||||||
SourceItemDefinition,
|
SourceItemDefinition,
|
||||||
)
|
)
|
||||||
from django.http import Http404
|
from django.http import Http404
|
||||||
from django.test import SimpleTestCase
|
from django.test import SimpleTestCase, TestCase, override_settings
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from rest_framework.exceptions import ValidationError
|
from rest_framework.exceptions import ValidationError
|
||||||
|
|
||||||
@@ -291,3 +292,43 @@ class SourceCardServiceUnitTest(SimpleTestCase):
|
|||||||
),
|
),
|
||||||
"idle",
|
"idle",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@override_settings(PARSER_STALE_LOAD_MAX_AGE_MINUTES=90)
|
||||||
|
class SourceCardServiceDatabaseTest(TestCase):
|
||||||
|
def test_get_active_tasks_ignores_old_jobs_even_when_updated_recently(self):
|
||||||
|
job = BackgroundJob.objects.create(
|
||||||
|
task_id="old-source-task",
|
||||||
|
task_name="apps.parsers.tasks.parse_industrial_products",
|
||||||
|
status=JobStatus.STARTED,
|
||||||
|
progress=10,
|
||||||
|
meta={"source": "industrial_products"},
|
||||||
|
)
|
||||||
|
old_timestamp = timezone.now() - timedelta(hours=3)
|
||||||
|
BackgroundJob.objects.filter(pk=job.pk).update(
|
||||||
|
created_at=old_timestamp,
|
||||||
|
started_at=old_timestamp,
|
||||||
|
updated_at=timezone.now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
tasks = SourceCardService._get_active_tasks(
|
||||||
|
SourceCardService.get_definition("manufacturers-and-products")
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(tasks, [])
|
||||||
|
|
||||||
|
def test_get_active_tasks_keeps_recent_pending_jobs(self):
|
||||||
|
BackgroundJob.objects.create(
|
||||||
|
task_id="fresh-source-task",
|
||||||
|
task_name="apps.parsers.tasks.parse_industrial_products",
|
||||||
|
status=JobStatus.PENDING,
|
||||||
|
progress=0,
|
||||||
|
meta={"source": "industrial_products"},
|
||||||
|
)
|
||||||
|
|
||||||
|
tasks = SourceCardService._get_active_tasks(
|
||||||
|
SourceCardService.get_definition("manufacturers-and-products")
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(len(tasks), 1)
|
||||||
|
self.assertEqual(tasks[0]["task_id"], "fresh-source-task")
|
||||||
|
|||||||
Reference in New Issue
Block a user