feat: обновления парсеров, тестов и миграций
Some checks failed
CI/CD Pipeline / Run Tests (push) Failing after 37s
CI/CD Pipeline / Code Quality Checks (push) Failing after 43s
CI/CD Pipeline / Build & Push Images (push) Has been skipped
CI/CD Pipeline / Deploy (dev) (push) Has been skipped
CI/CD Pipeline / Deploy (prod) (push) Has been skipped
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 0s
CI/CD Pipeline / Run Tests (pull_request) Failing after 0s
CI/CD Pipeline / Build & Push Images (pull_request) Has been skipped
CI/CD Pipeline / Deploy (dev) (pull_request) Has been skipped
CI/CD Pipeline / Deploy (prod) (pull_request) Has been skipped

- Обновлены клиенты парсеров (checko, fns, minpromtorg, proverki, zakupki)
- Добавлены новые миграции для моделей
- Расширено покрытие тестами
- Обновлены конфигурации и настройки проекта
- Добавлены утилиты для тестирования

Co-Authored-By: Warp <agent@warp.dev>
This commit is contained in:
2026-02-10 10:17:47 +01:00
parent 975d019ba5
commit ee95628a0a
59 changed files with 7292 additions and 2876 deletions

View File

@@ -1,5 +1,7 @@
"""Tests for core Celery tasks"""
import logging
from io import StringIO
from apps.core.tasks import (
BaseTask,
@@ -9,9 +11,37 @@ from apps.core.tasks import (
TransactionalTask,
)
from celery import Task
from config.celery import app as celery_app
from django.test import TestCase
@celery_app.task(base=BaseTask, bind=True)
def base_task(self, marker: str):
return marker
@celery_app.task(base=TransactionalTask, bind=True)
def transactional_task(self, marker: str):
from apps.core.models import BackgroundJob
BackgroundJob.objects.create(task_id=marker, task_name="test.tx")
if marker == "fail":
raise ValueError("boom")
return marker
@celery_app.task(base=IdempotentTask, bind=True)
def idempotent_task(self, marker: str):
from apps.core.models import BackgroundJob
BackgroundJob.objects.create(task_id=marker, task_name="test.idem")
return marker
@celery_app.task(base=TimedTask, bind=True)
def timed_task(self, marker: str):
return marker
class BaseTaskTest(TestCase):
"""Tests for BaseTask"""
@@ -80,3 +110,49 @@ class PeriodicTaskTest(TestCase):
def test_autoretry_for_is_empty(self):
"""Test autoretry_for is empty for periodic tasks"""
self.assertEqual(PeriodicTask.autoretry_for, ())
class TaskRuntimeBehaviorTest(TestCase):
def setUp(self):
self.logger = logging.getLogger("apps.core.tasks")
self.logger.setLevel(logging.INFO)
self.stream = StringIO()
handler = logging.StreamHandler(self.stream)
self.logger.handlers = []
self.logger.addHandler(handler)
def test_base_task_hooks(self):
base_task.apply(args=("ok",)).get()
base_task.request_stack.push(type("Req", (), {"retries": 1})())
base_task.on_retry(Exception("retry"), "id-1", (), {}, None)
base_task.request_stack.pop()
output = self.stream.getvalue()
self.assertIn("base_task", output)
def test_transactional_task_rolls_back(self):
from apps.core.models import BackgroundJob
with self.assertRaises(ValueError):
transactional_task.apply(args=("fail",)).get()
self.assertFalse(BackgroundJob.objects.filter(task_id="fail").exists())
def test_transactional_task_commits(self):
from apps.core.models import BackgroundJob
transactional_task.apply(args=("ok",)).get()
self.assertTrue(BackgroundJob.objects.filter(task_id="ok").exists())
def test_idempotent_task_skips_second_call(self):
from apps.core.models import BackgroundJob
idempotent_task.apply(args=("idem",)).get()
idempotent_task.apply(args=("idem",)).get()
self.assertEqual(BackgroundJob.objects.filter(task_id="idem").count(), 1)
def test_timed_task_logs_warning(self):
timed_task.slow_threshold = 0
result = timed_task.apply(args=("payload",)).get()
self.assertEqual(result, "payload")
output = self.stream.getvalue()
self.assertIn("timed_task", output)